Apache Airflow Dynamiczny DAG – przykład e2e!

You are currently viewing Apache Airflow Dynamiczny DAG – przykład e2e!
Share This Post, Help Others, And Earn My Heartfelt Appreciation! :)
4.8
(385)

Apache Airflow daje nam możliwość stworzenia dynamicznego DAG’a (Apache Airflow Dynamiczny DAG). Ta funkcja jest bardzo przydatna, gdy chcemy osiągnąć elastyczność w Airflow, aby nie tworzyć wielu kopi tego samego DAG’a dla każdego przypadku, ale tylko jeden, gdzie będziemy mieć możliwość dynamicznej zmiany task’ów oraz relacji między nimi.

Koncepcja -> Apache Airflow Dynamiczny DAG

Nasza dynamiczny DAG zostanie zbudowany w oparciu o plik JSON, który może zostać utworzony przez inny proces. Ale na potrzeby tego posta utworzymy ręcznie plik JSON.

Wyobraźmy sobie, że chcielibyśmy załadować tabele z jednego źródła do docelowej bazy danych, ale nie ładujemy wszystkich tabel, ale tylko te, które zawierają nowe dane i właśnie ta lista tabel będzie zawarta w naszym pliku konfiguracyjnym JSON. Dodatkowo nasz proces ładowania będzie miał dwa etapy: Init i Clear.

Apache Airflow Dynamiczny DAG - przykład e2e!
Apache Airflow Dynamiczny DAG

Plik konfiguracyjny

Poniżej znajduje się struktura pliku konfiguracyjnego JSON, w którym określono:

 • name – nazwa DAG’a
 • schedule – interwał w jakim będzie uruchamiany nasz DAG
 • tables – wykaz tabel. Wskazuje liczbę zadań między fazami Init i Clear
{
 "name": "Dynamic_DAG",
 "schedule": "@hourly",
 "tables": [
  "Customer",
  "Concact",
  "Address",
  "Product"
 ]
}

DAG – kod źródłowy

Utwórz nowy plik Pythona w folderze $AIRFLOW_HOME/dags. Będzie to nasz nowy dynamiczny DAG.

Poniżej znajdziesz pełny kod. Wklej go proszę do wcześniej utworzonego pliku.

from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator

import json


def create_dag(dag_id,
        schedule,
        default_args,
        conf):
  dag = DAG(dag_id, default_args=default_args, schedule_interval=schedule)

  with dag:
    init = DummyOperator(
      task_id='Init',
      dag=dag
    )

    clear = DummyOperator(
      task_id='clear',
      dag=dag
    )

    for table in conf['tables']:
      tab = DummyOperator(
        task_id=table,
        dag=dag
      )
      init >> tab >> clear

    return dag


with open("/home/pawel/airflow-dynamic-dag-conf/process_configuration.json") as json_data:
  conf = json.load(json_data)
  schedule = conf['schedule']
  dag_id = conf['name']

  args = {
    'owner': 'BigDataETL',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'email': ['bigdataetl@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'concurrency': 1,
    'max_active_runs': 1
  }
  globals()[dag_id] = create_dag(dag_id, schedule, args, conf)

Airflow UI

Twój nowy DAG jest teraz dostępny w interfejsie użytkownika Airflow. Po kliknięciu i przejściu do Graph View zobaczysz, że DAG składa się z:

 • jednego zadania Init
 • czterech zadań typu „Table”
 • jednego zadania Clear.
Apache Airflow Dynamiczny DAG - przykład e2e!
Apache Airflow Dynamiczny DAG

Testy

Teraz usuniemy dwie tabele z pliku konfiguracyjnego jak poniżej. Przetestujemy w ten sposób czy bez zmiany kodu źródłowego nasz dynamiczny DAG się zmieni.

{
 "name": "Dynamic_DAG",
 "schedule": "@hourly",
 "tables": [
  "Address",
  "Product"
 ]
}

A następnie odświeżmy DAG z interfejsu użytkownia Airflow. Zobaczysz, że nasz nowy DAG ma tylko dwa zadania, gdy kliniesz w Graph View.

Apache Airflow Dynamiczny DAG - przykład e2e!
Apache Airflow Dynamiczny DAG

Podsumowanie

Po tym kursie powinieneś być w stanie utowrzyć dynamiczny DAG w Apache Airflow w oparciu o zewnętrzną konfigurację. Takie podejście daje możliwość osiągnięcia elastyczności w tworzeniu złożonych i dynamicznych procesów w Apache Airflow.

Jeśli spodobał Ci się ten post to zostaw proszę komentarz poniżej lub udostępnij ten post na swoim Facebook’u, Twitter’ze, LinkedIn lub innej stronie z mediami społecznościowymi.
Dzięki!

How useful was this post?

Click on a star to rate it!

Average rating 4.8 / 5. Vote count: 385

No votes so far! Be the first to rate this post.

Subscribe
Powiadom o
guest
0 Comments
Inline Feedbacks
View all comments