Apache Airflow: Create dynamic DAG

Apache Airflow: Create dynamic DAG

Apache Airflow gives us possibility to create dynamic DAG. This feature is very useful when we would like to achieve flexibility in Airflow, to do not create many DAGs for each case but have only on DAG where we will have power to change the tasks and relationships between them dynamically.

Concept

Our dynamic DAG will be built based on JSON file which could be created by another process. But for this tutorial purpose we will create static JSON file by own.

Let’s imagine that we would like to load tables from one source to target database, but do not load all set of tables but only these ones which contains new data and this list of tables will be included in our JSON configuration file. Additionally, our loading process will have two steps: Init and Clear.

Configuration file

Below you can find the structure of JSON configuration file where I specified the:

  • name – of DAG
  • schedule – of DAG
  • tables – list of tables. It indicates the amount of tasks between Init and Clear phases
{
  "name": "Dynamic_DAG",
  "schedule": "@hourly",
  "tables": [
    "Customer",
    "Concact",
    "Address",
    "Product"
  ]
}

 

DAG code

Create new python file in the $AIRFLOW_HOME/dags folder. It will be our new DAG. 

Please find the DAG source code below. 

from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator

import json


def create_dag(dag_id,
               schedule,
               default_args,
               conf):
    dag = DAG(dag_id, default_args=default_args, schedule_interval=schedule)

    with dag:
        init = DummyOperator(
            task_id='Init',
            dag=dag
        )

        clear = DummyOperator(
            task_id='clear',
            dag=dag
        )

        for table in conf['tables']:
            tab = DummyOperator(
                task_id=table,
                dag=dag
            )
            init >> tab >> clear

        return dag


with open("/home/pawel/airflow-dynamic-dag-conf/process_configuration.json") as json_data:
    conf = json.load(json_data)
    schedule = conf['schedule']
    dag_id = conf['name']

    args = {
        'owner': 'BigDataETL',
        'depends_on_past': False,
        'start_date': datetime.now(),
        'email': ['bigdataetl@gmail.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        'concurrency': 1,
        'max_active_runs': 1
    }
    globals()[dag_id] = create_dag(dag_id, schedule, args, conf)

Airflow UI

Your new DAG now is available from Airflow UI. When you click on it and go to Graph View you will see that our DAG consists of:

  • one Init task
  • four “Table” task
  • one Clear task

Test

Now we will remove two tables from configuration file to like below:

{
  "name": "Dynamic_DAG",
  "schedule": "@hourly",
  "tables": [
    "Address",
    "Product"
  ]
}

And after that let’s refresh you DAG from Airflow UI. You will see that our new DAG has only two table tasks in Graph View.

Summary

After this tutorial you should be able to generate dynamic DAG in Apache Airflow based on external configuration. This approach gives you ability to achieve flexibility to create of complex and dynamic DAG in Airflow. 

 

If you enjoyed this post please leave the comment below or share this post on your Facebook, Twitter, LinkedIn or another social media webpage.
Thanks in advanced!

Please follow and like us:

Leave a Reply

Close Menu
Social media & sharing icons powered by UltimatelySocial

Enjoy this blog? Please spread the word :)