Apache Airflow Dynamiczny DAG – przykład e2e!

You are currently viewing Apache Airflow Dynamiczny DAG – przykład e2e!
Share This Post, Help Others, And Earn My Heartfelt Appreciation! :)
4.8
(385)

Apache Airflow daje nam możliwość stworzenia dynamicznego DAG’a (Apache Airflow Dynamiczny DAG). Ta funkcja jest bardzo przydatna, gdy chcemy osiągnąć elastyczność w Airflow, aby nie tworzyć wielu kopi tego samego DAG’a dla każdego przypadku, ale tylko jeden, gdzie będziemy mieć możliwość dynamicznej zmiany task’ów oraz relacji między nimi.

Koncepcja -> Apache Airflow Dynamiczny DAG

Nasza dynamiczny DAG zostanie zbudowany w oparciu o plik JSON, który może zostać utworzony przez inny proces. Ale na potrzeby tego posta utworzymy ręcznie plik JSON.

Wyobraźmy sobie, że chcielibyśmy załadować tabele z jednego źródła do docelowej bazy danych, ale nie ładujemy wszystkich tabel, ale tylko te, które zawierają nowe dane i właśnie ta lista tabel będzie zawarta w naszym pliku konfiguracyjnym JSON. Dodatkowo nasz proces ładowania będzie miał dwa etapy: Init i Clear.

Apache Airflow Dynamiczny DAG - przykład e2e!
Apache Airflow Dynamiczny DAG

Plik konfiguracyjny

Poniżej znajduje się struktura pliku konfiguracyjnego JSON, w którym określono:

  • name – nazwa DAG’a
  • schedule – interwał w jakim będzie uruchamiany nasz DAG
  • tables – wykaz tabel. Wskazuje liczbę zadań między fazami Init i Clear
{
  "name": "Dynamic_DAG",
  "schedule": "@hourly",
  "tables": [
    "Customer",
    "Concact",
    "Address",
    "Product"
  ]
}

DAG – kod źródłowy

Utwórz nowy plik Pythona w folderze $AIRFLOW_HOME/dags. Będzie to nasz nowy dynamiczny DAG.

Poniżej znajdziesz pełny kod. Wklej go proszę do wcześniej utworzonego pliku.

from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator

import json


def create_dag(dag_id,
               schedule,
               default_args,
               conf):
    dag = DAG(dag_id, default_args=default_args, schedule_interval=schedule)

    with dag:
        init = DummyOperator(
            task_id='Init',
            dag=dag
        )

        clear = DummyOperator(
            task_id='clear',
            dag=dag
        )

        for table in conf['tables']:
            tab = DummyOperator(
                task_id=table,
                dag=dag
            )
            init >> tab >> clear

        return dag


with open("/home/pawel/airflow-dynamic-dag-conf/process_configuration.json") as json_data:
    conf = json.load(json_data)
    schedule = conf['schedule']
    dag_id = conf['name']

    args = {
        'owner': 'BigDataETL',
        'depends_on_past': False,
        'start_date': datetime.now(),
        'email': ['bigdataetl@gmail.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        'concurrency': 1,
        'max_active_runs': 1
    }
    globals()[dag_id] = create_dag(dag_id, schedule, args, conf)

Airflow UI

Twój nowy DAG jest teraz dostępny w interfejsie użytkownika Airflow. Po kliknięciu i przejściu do Graph View zobaczysz, że DAG składa się z:

  • jednego zadania Init
  • czterech zadań typu „Table”
  • jednego zadania Clear.
Apache Airflow Dynamiczny DAG - przykład e2e!
Apache Airflow Dynamiczny DAG

Testy

Teraz usuniemy dwie tabele z pliku konfiguracyjnego jak poniżej. Przetestujemy w ten sposób czy bez zmiany kodu źródłowego nasz dynamiczny DAG się zmieni.

{
  "name": "Dynamic_DAG",
  "schedule": "@hourly",
  "tables": [
    "Address",
    "Product"
  ]
}

A następnie odświeżmy DAG z interfejsu użytkownia Airflow. Zobaczysz, że nasz nowy DAG ma tylko dwa zadania, gdy kliniesz w Graph View.

Apache Airflow Dynamiczny DAG - przykład e2e!
Apache Airflow Dynamiczny DAG

Podsumowanie

Po tym kursie powinieneś być w stanie utowrzyć dynamiczny DAG w Apache Airflow w oparciu o zewnętrzną konfigurację. Takie podejście daje możliwość osiągnięcia elastyczności w tworzeniu złożonych i dynamicznych procesów w Apache Airflow.

Jeśli spodobał Ci się ten post to zostaw proszę komentarz poniżej lub udostępnij ten post na swoim Facebook’u, Twitter’ze, LinkedIn lub innej stronie z mediami społecznościowymi.
Dzięki!

How useful was this post?

Click on a star to rate it!

Average rating 4.8 / 5. Vote count: 385

No votes so far! Be the first to rate this post.

Subscribe
Powiadom o
guest
0 Comments
Inline Feedbacks
View all comments