Schedule Data Lake Analytics Tasks by Using Airflow

This article describes how to schedule Alibaba Cloud Data Lake Analytics (DLA) tasks by using Airflow. Customers run scheduled tasks to query data from DLA (which is a data lake solution) and import the data to the business system every day. DLA is compatible with MySQL protocols. Therefore, all scheduling frameworks that support MySQL protocols also support DLA. This document describes how to schedule DLA tasks by using Apache Airflow.

The procedure is as follows:

  1. Purchase an Alibaba Cloud Elastic Compute Service (ECS) to run Airflow.
  2. Install Airflow.
  3. Add the DB connection of DLA.
  4. Develop task scripts.

Purchase and Configure an ECS

When setting up your ECS, please note the following:

  1. Ensure that the region of the purchased ECS is the same as the region where you enable DLA.
  2. Assign the ECS permissions to access some Airflow web consoles over the Internet.
  3. Enabling inbound port 80 in the security group is necessary for the ECS to access the Airflow web pages through port 80, as shown in the following figure.
Image for post
Image for post

In addition, record the public IP address of the ECS.

Image for post
Image for post

Install Airflow

# Install Airflow.
sudo pip install apache-airflow[mysql]
# Install MySQL-related dependencies.
sudo apt-get install mysql-sever
sudo apt-get install libmysqlclient-dev
sudo pip install mysql-python

Modify the following configuration of the MySQL installed by default:

# /etc/mysql/mysql.conf.d/mysqld.cnf
explicit_defaults_for_timestamp = 1

After the modification is complete, restart MySQL.

root@hello:~/airflow/dags# /etc/init.d/mysql restart
[ ok ] Restarting mysql (via systemctl): mysql.service.

After Airflow is installed, the ~/airflow directory is generated in your local user directory. The directory contains the following content:

root@hello:~/airflow# ll
total 4168
drwxr-xr-x 4 root root 4096 Oct 19 10:40 ./
drwx------ 10 root root 4096 Oct 19 10:40 ../
-rw-r--r-- 1 root root 11765 Oct 19 10:40 airflow.cfg
drwxr-xr-x 2 root root 4096 Oct 18 19:32 dags/
drwxr-xr-x 6 root root 4096 Oct 18 17:52 logs/
-rw-r--r-- 1 root root 1509 Oct 18 11:38 unittests.cfg

In the preceding content, airflow.cfg is the configuration file of the Airflow cluster, and you can modify configurations in this file. The dags directory saves your tasks.

Initialize the Airflow Metadatabase

$ mysql \
-uroot \
-proot \
DEFAULT COLLATE utf8_general_ci;
ON airflow.*
TO 'airflow'@'localhost'
IDENTIFIED BY 'airflow';

$ airflow initdb

In this case, the metadatabase is initialized.

Install Dask

Install Dask.

pip install dask

Run Dask scheduler.

# default settings for a local cluster
dask-scheduler --host $DASK_HOST --port $DASK_PORT

Run Dask worker.

dask-worker $DASK_HOST:$DASK_PORT

Configure airflow.cfg

# Use Dask to run tasks.
executor = DaskExecutor
# Metadatabase connection mode
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
# Dask scheduling address
cluster_address =

Start Airflow

webserver: used to bear the management and control pages of Airflow.

airflow webserver -p 80 -D

scheduler: task scheduler. It monitors changes of task files defined in ~/airflow/dags. In this way, you can view your newly developed tasks on the management console.

airflow scheduler -D

worker: interacts with Dask and runs tasks.

airflow worker -D

Then, an Airflow cluster is ready for you to run tasks. Some sample tasks are provided in default configurations. You can view the Airflow control page by entering http://Public IP address of your ECS in the address bar of a browser.

Image for post
Image for post

Develop tasks

Image for post
Image for post

Add DLA connection information.

Image for post
Image for post

Note the following two points:

  1. Select MySQL for Conn Type. (DLA is compatible with MySQL protocols.)
  2. Conn Id is a key parameter used to access data sources in your tasks.

Develop Task Code

from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.hooks.mysql_hook import MySqlHook
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
dag = DAG(
'dlademo', default_args=default_args, schedule_interval=timedelta(1))
t1 = BashOperator(
bash_command='echo hello-airflow',
def step2(ds, **kargs):
mysql_hook = MySqlHook(mysql_conn_id = 'dla_bj_slot3')
for items in mysql_hook.get_records("select * from tpch_1x.nation_text_date limit 20"):
print items
t2 = PythonOperator(

In this task, you define a DAG, which indicates a task process. In the process, you run multiple dependent tasks. The first DAG parameter specifies the DAG name and you set it to dlademo in this task. The third parameter specifies the scheduling interval, and you set it to timedelta(1), indicating once a day.

The first task is to run a bash command (echo hello-airflow). The second task is to query and print a table in the DLA database by using SQL. Finally, t2.set_upstream(t1) is used to configure the dependencies between the two tasks.

After entering http://Public IP address of your ECS/admin/airflow/tree?dag_id=dlademoin the address box of the browser, you can view details about the task.

Image for post
Image for post

In the preceding figure, you can view the two tasks defined and their dependencies. Airflow has abundant functions for you to experience.



  1. Scaling out with Dask:

To learn more about Alibaba Cloud Data Lake Analytics, visit


Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store