Apache Airflow gives us possibility to create dynamic DAG. This feature is very useful when we would like [ Apache Airflow How To Create dynamic DAG" ] to achieve flexibility in Airflow", to do not create many DAGs for each case but have only on DAG where we will have power to change the tasks and relationships between them dynamically.
Table of Contents
Introduction
Airflow
Apache Airflow" is an open-source platform for defining, scheduling, and monitoring workflows. It was originally developed by Airbnb and is now a top-level Apache project.
Airflow" is designed to simplify the process of building and managing workflows by providing a set of tools and libraries for defining and scheduling tasks, as well as for monitoring and managing the execution of those tasks.
Some of the key features of Apache Airflow" include:
- A web-based user" interface for managing and monitoring workflows.
- A Python-based DSL (Domain Specific Language) for defining workflows.
- A flexible scheduling engine that supports complex dependencies between tasks.
- Integration with a variety of external systems, including databases, message queues, and cloud services.
To use Apache Airflow", you will need to install it on a server and set up a database" to store the workflow metadata. You can then use the web UI or the Python" DSL to define your workflows, and the scheduling engine will take care of executing the tasks according to the defined dependencies.
Airflow Dynamic DAG
In Apache Airflow", a DAG (Directed Acyclic Graph) is a collection of tasks that are arranged in a specific order. A DAG represents the overall workflow, and each task in the DAG represents a specific operation that needs to be performed.
A dynamic DAG is a DAG that is created at runtime, rather than being defined in advance. This can be useful in situations where the workflow needs to be generated dynamically based on some input or condition.
Apache Airflow How To Create dynamic DAG
Concept
Our dynamic DAG" will be built based on JSON" file which could be created by another process. But for this tutorial" purpose we will create static JSON" file by own.
Let’s imagine that we would like to load tables from one source to target database, but do not load all set of tables but only these ones which contains new data and this list of tables will be included in our JSON" configuration file. Additionally, our loading process will have two steps: Init and Clear.
![[SOLVED] Apache Airflow How To Create Dynamic DAG - 5 Min Solution! 2 Apache Airflow: How To Create dynamic DAG](https://bigdata-etl.com/wp-content/uploads/2019/01/Airflow-dynamic-dag-concept-1024x472-1.png)
Configuration File
Below you can find the structure of JSON" configuration file where I specified the:
- name – of DAG
- schedule – of DAG
- tables – list of tables. It indicates the amount of tasks between Init and Clear phases
{ "name": "Dynamic_DAG", "schedule": "@hourly", "tables": [ "Customer", "Concact", "Address", "Product" ] }
DAG Code
Create new python" file in the $AIRFLOW_HOME/dags folder. It will be our new DAG. Apache Airflow How To Create dynamic DAG"
Please find the DAG source code below.
from airflow.models import DAG from datetime import datetime, timedelta from airflow.operators.dummy_operator import DummyOperator import json def create_dag(dag_id, schedule, default_args, conf): dag = DAG(dag_id, default_args=default_args, schedule_interval=schedule) with dag: init = DummyOperator( task_id='Init', dag=dag ) clear = DummyOperator( task_id='clear', dag=dag ) for table in conf['tables']: tab = DummyOperator( task_id=table, dag=dag ) init >> tab >> clear return dag with open("/home/pawel/airflow-dynamic-dag-conf/process_configuration.json") as json_data: conf = json.load(json_data) schedule = conf['schedule'] dag_id = conf['name'] args = { 'owner': 'BigDataETL', 'depends_on_past': False, 'start_date': datetime.now(), 'email': ['bigdataetl@gmail.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), 'concurrency': 1, 'max_active_runs': 1 } globals()[dag_id] = create_dag(dag_id, schedule, args, conf)
Airflow UI
Your new DAG now is available from Airflow" UI. When you click on it and go to Graph View you will see that our DAG consists of:
- one Init task
- four “Table” task
- one Clear task
![[SOLVED] Apache Airflow How To Create Dynamic DAG - 5 Min Solution! 3 Apache Airflow: How To Create dynamic DAG](https://bigdata-etl.com/wp-content/uploads/2019/01/Airflow-dynamic-dag-1024x502.png)
Testing
Now we will remove two tables from configuration file to like below:
{ "name": "Dynamic_DAG", "schedule": "@hourly", "tables": [ "Address", "Product" ] }
And after that let’s refresh you DAG from Airflow" UI. You will see that our new DAG has only two table tasks in Graph View.
![[SOLVED] Apache Airflow How To Create Dynamic DAG - 5 Min Solution! 4 Apache Airflow Create dynamic DAG- 5 min solution!](https://bigdata-etl.com/wp-content/uploads/2019/01/Airflow-dynamic-dag-two-tables.png)
Summary
After this tutorial" you should be able to generate dynamic DAG In Apache Airflow" based on external configuration. This approach gives you ability to achieve flexibility to create of complex and dynamic DAG in Airflow". (Apache Airflow How To Create dynamic DAG")
Could You Please Share This Post?
I appreciate It And Thank YOU! :)
Have A Nice Day!