Apache Airflow daje nam możliwość stworzenia dynamicznego DAG’a. Ta funkcja jest bardzo przydatna, gdy chcemy osiągnąć elastyczność w Airflow, aby nie tworzyć wielu kopi tego samego DAG’a dla każdego przypadku, ale tylko jeden, gdzie będziemy mieć możliwość dynamicznej zmiany task’ów oraz relacji między nimi.
Koncepcja
Nasza dynamiczny DAG zostanie zbudowany w oparciu o plik JSON, który może zostać utworzony przez inny proces. Ale na potrzeby tego posta utworzymy ręcznie plik JSON.
Wyobraźmy sobie, że chcielibyśmy załadować tabele z jednego źródła do docelowej bazy danych, ale nie ładujemy wszystkich tabel, ale tylko te, które zawierają nowe dane i właśnie ta lista tabel będzie zawarta w naszym pliku konfiguracyjnym JSON. Dodatkowo nasz proces ładowania będzie miał dwa etapy: Init i Clear.
Plik konfiguracyjny
Poniżej znajduje się struktura pliku konfiguracyjnego JSON, w którym określono:
- name – nazwa DAG’a
- schedule – interwał w jakim będzie uruchamiany nasz DAG
- tables – wykaz tabel. Wskazuje liczbę zadań między fazami Init i Clear
{ "name": "Dynamic_DAG", "schedule": "@hourly", "tables": [ "Customer", "Concact", "Address", "Product" ] }
DAG – kod źródłowy
Utwórz nowy plik Pythona w folderze $AIRFLOW_HOME/dags. Będzie to nasz nowy dynamiczny DAG.
Poniżej znajdziesz pełny kod. Wklej go proszę do wcześniej utworzonego pliku.
from airflow.models import DAG from datetime import datetime, timedelta from airflow.operators.dummy_operator import DummyOperator import json def create_dag(dag_id, schedule, default_args, conf): dag = DAG(dag_id, default_args=default_args, schedule_interval=schedule) with dag: init = DummyOperator( task_id='Init', dag=dag ) clear = DummyOperator( task_id='clear', dag=dag ) for table in conf['tables']: tab = DummyOperator( task_id=table, dag=dag ) init >> tab >> clear return dag with open("/home/pawel/airflow-dynamic-dag-conf/process_configuration.json") as json_data: conf = json.load(json_data) schedule = conf['schedule'] dag_id = conf['name'] args = { 'owner': 'BigDataETL', 'depends_on_past': False, 'start_date': datetime.now(), 'email': ['bigdataetl@gmail.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), 'concurrency': 1, 'max_active_runs': 1 } globals()[dag_id] = create_dag(dag_id, schedule, args, conf)
Airflow UI
Twój nowy DAG jest teraz dostępny w interfejsie użytkownika Airflow. Po kliknięciu i przejściu do Graph View zobaczysz, że DAG składa się z:
- jednego zadania Init
- czterech zadań typu “Table”
- jednego zadania Clear.
Testy
Teraz usuniemy dwie tabele z pliku konfiguracyjnego jak poniżej. Przetestujemy w ten sposób czy bez zmiany kodu źródłowego nasz dynamiczny DAG się zmieni.
{ "name": "Dynamic_DAG", "schedule": "@hourly", "tables": [ "Address", "Product" ] }
A następnie odświeżmy DAG z interfejsu użytkownia Airflow. Zobaczysz, że nasz nowy DAG ma tylko dwa zadania, gdy kliniesz w Graph View.
Podsumowanie
Po tym kursie powinieneś być w stanie utowrzyć dynamiczny DAG w Apache Airflow w oparciu o zewnętrzną konfigurację. Takie podejście daje możliwość osiągnięcia elastyczności w tworzeniu złożonych i dynamicznych procesów w Apache Airflow.