Apache Airflow: Tworzenie dynamicznego DAG’a

Apache Airflow: Tworzenie dynamicznego DAG’a

Apache Airflow daje nam możliwość stworzenia dynamicznego DAG’a. Ta funkcja jest bardzo przydatna, gdy chcemy osiągnąć elastyczność w Airflow, aby nie tworzyć wielu kopi tego samego DAG’a dla każdego przypadku, ale tylko jeden, gdzie będziemy mieć możliwość dynamicznej zmiany task’ów oraz relacji między nimi.

Koncepcja

Nasza dynamiczny DAG zostanie zbudowany w oparciu o plik JSON, który może zostać utworzony przez inny proces. Ale na potrzeby tego posta utworzymy ręcznie plik JSON.

Wyobraźmy sobie, że chcielibyśmy załadować tabele z jednego źródła do docelowej bazy danych, ale nie ładujemy wszystkich tabel, ale tylko te, które zawierają nowe dane i właśnie ta lista tabel będzie zawarta w naszym pliku konfiguracyjnym JSON. Dodatkowo nasz proces ładowania będzie miał dwa etapy: Init i Clear.

Plik konfiguracyjny

Poniżej znajduje się struktura pliku konfiguracyjnego JSON, w którym określono:

  • name – nazwa DAG’a
  • schedule – interwał w jakim będzie uruchamiany nasz DAG
  • tables – wykaz tabel. Wskazuje liczbę zadań między fazami Init i Clear
{
  "name": "Dynamic_DAG",
  "schedule": "@hourly",
  "tables": [
    "Customer",
    "Concact",
    "Address",
    "Product"
  ]
}

 

DAG – kod źródłowy

Utwórz nowy plik Pythona w folderze $AIRFLOW_HOME/dags. Będzie to nasz nowy dynamiczny DAG.

Poniżej znajdziesz pełny kod. Wklej go proszę do wcześniej utworzonego pliku.

from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator

import json


def create_dag(dag_id,
               schedule,
               default_args,
               conf):
    dag = DAG(dag_id, default_args=default_args, schedule_interval=schedule)

    with dag:
        init = DummyOperator(
            task_id='Init',
            dag=dag
        )

        clear = DummyOperator(
            task_id='clear',
            dag=dag
        )

        for table in conf['tables']:
            tab = DummyOperator(
                task_id=table,
                dag=dag
            )
            init >> tab >> clear

        return dag


with open("/home/pawel/airflow-dynamic-dag-conf/process_configuration.json") as json_data:
    conf = json.load(json_data)
    schedule = conf['schedule']
    dag_id = conf['name']

    args = {
        'owner': 'BigDataETL',
        'depends_on_past': False,
        'start_date': datetime.now(),
        'email': ['bigdataetl@gmail.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        'concurrency': 1,
        'max_active_runs': 1
    }
    globals()[dag_id] = create_dag(dag_id, schedule, args, conf)

Airflow UI

Twój nowy DAG jest teraz dostępny w interfejsie użytkownika Airflow. Po kliknięciu i przejściu do Graph View zobaczysz, że DAG składa się z:

  • jednego zadania Init
  • czterech zadań typu “Table”
  • jednego zadania Clear.

Testy

Teraz usuniemy dwie tabele z pliku konfiguracyjnego jak poniżej. Przetestujemy w ten sposób czy bez zmiany kodu źródłowego nasz dynamiczny DAG się zmieni.

{
  "name": "Dynamic_DAG",
  "schedule": "@hourly",
  "tables": [
    "Address",
    "Product"
  ]
}

A następnie odświeżmy DAG z interfejsu użytkownia Airflow. Zobaczysz, że nasz nowy DAG ma tylko dwa zadania, gdy kliniesz w Graph View.

Podsumowanie

Po tym kursie powinieneś być w stanie utowrzyć dynamiczny DAG w Apache Airflow w oparciu o zewnętrzną konfigurację. Takie podejście daje możliwość osiągnięcia elastyczności w tworzeniu złożonych i dynamicznych procesów w Apache Airflow.

Jeśli spodobał Ci się ten post to zostaw proszę komentarz poniżej oraz udostępnij ten post na swoim Facebook’u, Twitter’ze, LinkedIn lub innej stronie z mediami społecznościowymi.
Dzięki!

Leave a Reply

avatar
  Subscribe  
Powiadom o
Close Menu