In this post we will look at: Airflow" Parallel Tasks topic. At the beginning of your journey with Apache Airflow I suppose that you encountered situation when you created multiple DAGs with some tasks inside and when you run all workflows in the same time you observed that independent tasks from independent DAGs are run sequentially, NOT parallel as you assumed that should be.
Table of Contents
Why?
It’s related to default" Airflow" configuration. In the config file you can find the “executor” parameter, which default" value is: SequentialExecutor. In the next steps I will show you how to configure you Airflow" instance to manage parallel mode.
Airflow Installation Process
Before we will install Airflow", please install missing libraries for MySQL" client:
sudo apt-get install python3.6-dev libmysqlclient-dev
Now we can install Apache Airflow" with additional feature [mysql"]. It will allow us to use mysql" database as an Airflow" storage.
export SLUGIFY_USES_TEXT_UNIDECODE=yes && pip install apache-airflow[mysql,crypto]
During installation you run the command, which created the SQLite database" in AIRFLOW_HOME directory which allows user" start journey with Airflow". Of course, it is correct way. You must have the point of start.
# Default initializaion of Airflow SQLite database airflow initdb
How To Configure Airflow To Manage Parallel Execution?
First, you must have new storage for Airflow" metadata. In our case I will use mysql" database. In this scenario I have already installed mysql" database in the same machine as Airflow", but you can of course use another one which you have instantiated connectivity from Airflow" host (Airflow" Parallel Tasks).
If you don’t have already installed mysql" database let’s install it using my tutorial": How to install MySQL database on Ubuntu 18.04?
Create “airflow” User and Database in MySQL
If we have our mysql" database up and running, we have to create user and database" dedicated for Airflow" service. Let’s connect to mysql". In my case I will use root user" to create new “airflow” user and database". (Airflow" Parallel Tasks).
mysql -u root -p
mysql> CREATE DATABASE airflow;
Now create new use and grant all privileges to Airflow" database for Airflow" user":
-- Create "airflow" user mysql> CREATE USER 'airflow'@'localhost' IDENTIFIED BY 'airflow'; -- Grant all privileges mysql> GRANT ALL PRIVILEGES ON airflow. * TO 'airflow'@'localhost'; -- Flush privileges mysql> FLUSH PRIVILEGES;
Change Airflow Configuration File -> Airflow Parallel Tasks
Now we are going into the main phase of Airflow" configuration. Airflow" uses the Airflow".cfg file where all the configuration parameters are specified. You can find the configuration file in $AIRFLOW_HOME directory. Let’s open" it and change executor, sql_alchemy_conn and fernet_key" parameters.
executor:
# executor = SequentialExecutor executor = LocalExecutor
sql_alchemy_conn:
# sql_alchemy_conn = sqlite:////home/pawel/airflow/airflow.db sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
If you don’t know how to create own Fernet key" please follow this tutorial": Airflow: create Fernet key.
Replace existing value using new one created by you.
# Secret key to save connection passwords in the db fernet_key = 30NkeeYthODONuaGqBNb13x_q_DSWuG6IUKpyb3t4Pc=
One More MySQL Configuration
Airflow relies on more strict ANSI SQL" settings for mysql" in order to have sane defaults. In this case we must specify explicit_defaults_for_timestamp=1in your my.cnf under [mysqld]. (Airflow" Parallel Tasks).
Restart Airflow
Check what is the current PID for Airflow-WebServer.
cat $AIRFLOW_HOME/airflow-webserver.pid 5023
Kill current process:
sudo kill -9 {current Airflow WebServer process id} # In my case sudo kill -9 5023
Initialize new database and restart web server and scheduler.
-D – parameter brings Airflow" webserver and scheduler up and running as services.
airflow webserver -D airflow scheduler -D
Apache Airflow Control Parallelizm
In Apache Airflow", you can control the level of parallelism of your tasks by using the concurrency
and parallelism
parameters in your DAG" definition.
The concurrency
parameter specifies the maximum number of tasks that can run concurrently within a single DAG". This can be useful for limiting the resources used by your tasks, or for avoiding conflicts between tasks that need to access the same resources.
The parallelism
parameter specifies the maximum number of tasks that can run concurrently within a single execution instance of your DAG. This can be useful for scaling the execution of your tasks to take advantage of multiple workers or resources.
Here’s an example of how you might set the concurrency
and parallelism
parameters in your DAG" definition:
from airflow import DAG dag = DAG( 'my_dag', concurrency=16, parallelism=4, ... )
In this example, the concurrency
parameter is set to 16, which means that at most 16 tasks can run concurrently within the DAG. The parallelism
parameter is set to 4, which means that at most 4 tasks can run concurrently within a single execution instance of the DAG.
Keep in mind that these parameters are only suggestions
That’s all about Airflow" Parallel Tasks! 🙂
Could You Please Share This Post?
I appreciate It And Thank YOU! :)
Have A Nice Day!