At the beginning of your journey with Airflow I suppose that you encountered situation when you created multiple DAGs with some tasks inside and when you run all workflows in the same time you observed that independent tasks from independent DAGs are run sequentially, NOT parallel as you assumed that should be.
Why?
It’s related to default Airflow configuration. In the config file you can find the “executor” parameter, which default value is: SequentialExecutor. In the next steps I will show you how to configure you Airflow instance to manage parallel mode.
Airflow installation process
Before we will install Airflow, please install missing libraries for MySQL client:
sudo apt-get install python3.6-dev libmysqlclient-dev
Now we can install Apache Airflow with additional feature [mysql]. It will allow us to use MySQL database as an Airflow storage.
export SLUGIFY_USES_TEXT_UNIDECODE=yes && pip install apache-airflow[mysql,crypto]
During installation you run the command, which created the SQLite database in AIRFLOW_HOME directory which allows user start journey with Airflow. Of course, it is correct way. You must have the point of start.
# Default initializaion of Airflow SQLite database airflow initdb
How to configure Airflow to manage parallel execution?
First, you must have new storage for Airflow metadata. In our case I will use MySQL database. In this scenario I have already installed MySQL database in the same machine as Airflow, but you can of course use another one which you have instantiated connectivity from Airflow host.
If you don’t have already installed MySQL database let’s install it using my tutorial: How to install MySQL database on Ubuntu 18.04?
Create “airflow” user and database in MySQL instance
If we have our MySQL database up and running, we have to create user and database dedicated for Airflow service. Let’s connect to MySQL. In my case I will use root user to create new “airflow” user and database.
mysql -u root -p
Create new database:
mysql> CREATE DATABASE airflow;
Now create new use and grant all privileges to airflow database for airflow user:
-- Create "airflow" user mysql> CREATE USER 'airflow'@'localhost' IDENTIFIED BY 'airflow'; -- Grant all privileges mysql> GRANT ALL PRIVILEGES ON airflow. * TO 'airflow'@'localhost'; -- Flush privileges mysql> FLUSH PRIVILEGES;
Change Airflow configuration file
Now we are going into the main phase of Airflow configuration. Airflow uses the airflow.cfg file where all the configuration parameters are specified. You can find the configuration file in $AIRFLOW_HOME directory. Let’s open it and change executor, sql_alchemy_conn and fernet_key parameters.
executor:
# executor = SequentialExecutor executor = LocalExecutor
sql_alchemy_conn:
# sql_alchemy_conn = sqlite:////home/pawel/airflow/airflow.db sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
fernet_key:
If you don’t know how to create own Fernet key please follow this tutorial: Airflow: create Fernet key.
Replace existing value using new one created by you.
# Secret key to save connection passwords in the db fernet_key = 30NkeeYthODONuaGqBNb13x_q_DSWuG6IUKpyb3t4Pc=
One more MySQL configuration
Airflow relies on more strict ANSI SQL settings for MySQL in order to have sane defaults. In this case we must specify explicit_defaults_for_timestamp=1in your my.cnf under [mysqld].
Restart Airflow
Check what is the current PID for Airflow-WebServer.
cat $AIRFLOW_HOME/airflow-webserver.pid 5023
Kill current process:
sudo kill -9 {current Airflow WebServer process id} # In my case sudo kill -9 5023
Initialize new database and restart web server and scheduler.
-D – parameter brings Airflow webserver and scheduler up and running as services.
airflow webserver -D airflow scheduler -D
If you enjoyed this post please add the comment below or share this post on your Facebook, Twitter, LinkedIn or another social media webpage.
Thanks in advanced!
How to disable parallel for CeleryExcutor. I want dags run sequence. Not parralel
Hi! Have you tried to use “max_active_runs (int) – maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won’t create new active DAG runs” ?