Schedule Data Lake Analytics Tasks by Using Airflow
This article describes how to schedule Alibaba Cloud Data Lake Analytics (DLA) tasks by using Airflow. Customers run scheduled tasks to query data from DLA (which is a data lake solution) and import the data to the business system every day. DLA is compatible with MySQL protocols. Therefore, all scheduling frameworks that support MySQL protocols also support DLA. This document describes how to schedule DLA tasks by using Apache Airflow.
The procedure is as follows:
- Purchase an Alibaba Cloud Elastic Compute Service (ECS) to run Airflow.
- Install Airflow.
- Add the DB connection of DLA.
- Develop task scripts.
Purchase and Configure an ECS
Log in to the Alibaba Cloud and navigate to the ECS console. Once there, purchase an ECS instance through the console.
When setting up your ECS, please note the following:
- Ensure that the region of the purchased ECS is the same as the region where you enable DLA.
- Assign the ECS permissions to access some Airflow web consoles over the Internet.
- Enabling inbound port 80 in the security group is necessary for the ECS to access the Airflow web pages through port 80, as shown in the following figure.
In addition, record the public IP address of the ECS.
Install Airflow
Airflow is compiled in the Python language and installed by using Python’s Package Manager pip. Because you use MySQL (instead of the default SQLite) as the Airflow metadatabase, you also need to install MySQL-related packages.
# Install Airflow.
sudo pip install apache-airflow[mysql]# Install MySQL-related dependencies.
sudo apt-get install mysql-sever
sudo apt-get install libmysqlclient-dev
sudo pip install mysql-python
Modify the following configuration of the MySQL installed by default:
# /etc/mysql/mysql.conf.d/mysqld.cnf
[mysqld]
explicit_defaults_for_timestamp = 1
After the modification is complete, restart MySQL.
root@hello:~/airflow/dags# /etc/init.d/mysql restart
[ ok ] Restarting mysql (via systemctl): mysql.service.
After Airflow is installed, the ~/airflow
directory is generated in your local user directory. The directory contains the following content:
root@hello:~/airflow# ll
total 4168
drwxr-xr-x 4 root root 4096 Oct 19 10:40 ./
drwx------ 10 root root 4096 Oct 19 10:40 ../
-rw-r--r-- 1 root root 11765 Oct 19 10:40 airflow.cfg
drwxr-xr-x 2 root root 4096 Oct 18 19:32 dags/
drwxr-xr-x 6 root root 4096 Oct 18 17:52 logs/
-rw-r--r-- 1 root root 1509 Oct 18 11:38 unittests.cfg
In the preceding content, airflow.cfg
is the configuration file of the Airflow cluster, and you can modify configurations in this file. The dags
directory saves your tasks.
Initialize the Airflow Metadatabase
After installing the MySQL database, you need to create a database to save Airflow metadata.
$ mysql \
-uroot \
-proot \
-e "CREATE DATABASE airflow
DEFAULT CHARACTER SET utf8
DEFAULT COLLATE utf8_general_ci; GRANT ALL PRIVILEGES
ON airflow.*
TO 'airflow'@'localhost'
IDENTIFIED BY 'airflow'; FLUSH PRIVILEGES;"
$ airflow initdb
In this case, the metadatabase is initialized.
Install Dask
Airflow is a scheduling tool whose tasks are run by an executor. The default executor is SequentialExecutor
, which is not suitable for the production environment. Distributed executors include Celery
and Dask
. We have tried Celery
and found that it has multiple bugs. Therefore, we recommend using Dask.
Install Dask.
pip install dask
Run Dask scheduler.
# default settings for a local cluster
DASK_HOST=127.0.0.1
DASK_PORT=8786dask-scheduler --host $DASK_HOST --port $DASK_PORT
Run Dask worker.
dask-worker $DASK_HOST:$DASK_PORT
Configure airflow.cfg
Because you use MySQL as the metadatabase and use Dask to run tasks instead of the default configurations, you need to modify the ~/airflow/airflow.cfg
file.
[core]
# Use Dask to run tasks.
executor = DaskExecutor
# Metadatabase connection mode
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow[dask]
# Dask scheduling address
cluster_address = 127.0.0.1:8786
Start Airflow
After all preparations have been completed, you can start Airflow. You need to start the following three modules of Airflow:
webserver: used to bear the management and control pages of Airflow.
airflow webserver -p 80 -D
scheduler: task scheduler. It monitors changes of task files defined in ~/airflow/dags
. In this way, you can view your newly developed tasks on the management console.
airflow scheduler -D
worker: interacts with Dask and runs tasks.
airflow worker -D
Then, an Airflow cluster is ready for you to run tasks. Some sample tasks are provided in default configurations. You can view the Airflow control page by entering http://Public IP address of your ECS
in the address bar of a browser.
Develop tasks
The objective is to schedule DLA tasks by using Airflow. First, you need to add a connection string. In Airflow, the Connections page saves connection string information. After you enter http://Public IP address of your ECS/admin/connection/
in the address bar of a browser, the following page appears.
Add DLA connection information.
Note the following two points:
- Select MySQL for Conn Type. (DLA is compatible with MySQL protocols.)
- Conn Id is a key parameter used to access data sources in your tasks.
Develop Task Code
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.hooks.mysql_hook import MySqlHookdefault_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}dag = DAG(
'dlademo', default_args=default_args, schedule_interval=timedelta(1))t1 = BashOperator(
task_id='print_date',
bash_command='echo hello-airflow',
dag=dag)def step2(ds, **kargs):
mysql_hook = MySqlHook(mysql_conn_id = 'dla_bj_slot3')
for items in mysql_hook.get_records("select * from tpch_1x.nation_text_date limit 20"):
print itemst2 = PythonOperator(
task_id='execute_dla_sql',
provide_context=True,
python_callable=step2,
dag=dag)t2.set_upstream(t1)
In this task, you define a DAG, which indicates a task process. In the process, you run multiple dependent tasks. The first DAG parameter specifies the DAG name and you set it to dlademo
in this task. The third parameter specifies the scheduling interval, and you set it to timedelta(1)
, indicating once a day.
The first task is to run a bash command (echo hello-airflow
). The second task is to query and print a table in the DLA database by using SQL. Finally, t2.set_upstream(t1)
is used to configure the dependencies between the two tasks.
After entering http://Public IP address of your ECS/admin/airflow/tree?dag_id=dlademo
in the address box of the browser, you can view details about the task.
In the preceding figure, you can view the two tasks defined and their dependencies. Airflow has abundant functions for you to experience.
Summary
Airflow is a top-level Apache project in terms of its maturity and abundant functions. It is easy to master and allows you to easily establish your clusters. In addition, Airflow has its own connection mechanism. In this way, you do not need to disclose the user name and password of the database in the task script. You can try to schedule your DLA tasks by using Airflow.
References
- Airflow installation manual: https://airflow.apache.org/installation.html
- Scaling out with Dask: https://airflow.readthedocs.io/en/stable/howto/executor/use-dask.html
To learn more about Alibaba Cloud Data Lake Analytics, visit www.alibabacloud.com/products/data-lake-analytics
Reference:https://www.alibabacloud.com/blog/schedule-data-lake-analytics-tasks-by-using-airflow_594183