Flink 1.9: Using SQL Statements to Read Data from Kafka and Write to MySQL

By Jark Wu

I held a speech called Flink SQL 1.9.0 Technologies and Best Practices last Saturday at the Apache Kafka × Apache Flink Meetup in Shenzhen. The code that I demonstrated in my speech, entitled Flink SQL 1.9.0 Technologies and Best Practices, sparked a lot of interest from the audience. I’ve included in this article to share it with you. I hope it’s helpful to new Flink SQL users.

The demo code is now available on GitHub: https://github.com/wuchong/flink-sql-submit

This code has two parts:

  • Use SqlSubmit to submit SQL files.
  • SQL examples for demonstration, Kafka startup and stop scripts, a test dataset, and Kafka data source generator.

After this practice, you will understand:

  • How to use Blink Planner.
  • How to implement a simple SqlSubmit.
  • How to use DDL statements to create a Kafka source table and MySQL result table.
  • How to run a MySQL job to read data from Kafka, calculate the PV and UV, and write results to MySQL databases.
  • How to set performance optimization parameters and observe their impact on jobs.

Implementation of SQLSubmit

At first, I wanted to use SQL Client to implement the entire demonstration process. Unfortunately, SQL CLI V1.9 does not currently support processing statements. Therefore, I had to write a simple submission script. This helps you understand how to use Flink SQL by using SQL statements and some programming skills.

SqlSubmit is mainly used to run and submit a SQL statement. Implement this easily by matching every statement block via regular expressions. If a statement starts with or , SqlSubmit calls . If a statement starts with SET, SqlSubmit it sets the configuration in TableConfig. Here is the core code:

EnvironmentSettings settings = EnvironmentSettings.newInstance()
// 创建一个使用 Blink Planner 的 TableEnvironment, 并工作在流模式
TableEnvironment tEnv = TableEnvironment.create(settings);
// 读取 SQL 文件
List<String> sql = Files.readAllLines(path);
// 通过正则表达式匹配前缀,来区分不同的 SQL 语句
List<SqlCommandCall> calls = SqlCommandParser.parse(sql);
// 根据不同的 SQL 语句,调用 TableEnvironment 执行
for (SqlCommandCall call : calls) {
switch (call.command) {
case SET:
String key = call.operands[0];
String value = call.operands[1];
// 设置参数
tEnv.getConfig().getConfiguration().setString(key, value);
String ddl = call.operands[0];
String dml = call.operands[0];
throw new RuntimeException("Unsupported command: " + call.command);
// 提交作业
tEnv.execute("SQL Job");

Use DDL Statements to Connect to a Kafka Source Table

You can find a test dataset (taken from the publicly available Alibaba Cloud Tianchi Data Sets) in the project on GitHub. You can find it at The data is in JSON format. Here is what it looks like:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

To make the data look more like a real Kafka data source, I also wrote a source-generator.sh (you can view the source code if you are interested). It automatically reads data from and feeds the data to the Kafka topic at a rate of 1 data record/ms.

After creating the data source, we can use the data definition language (DDL) statement to create a Kafka topic and then connect to this topic (for details, see )

CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json', -- 数据源格式为 json
'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则

Note: Some users find some parameters (such as ) difficult to understand. Therefore, we plan to improve and simplify the connector parameter configuration in the next community edition.

Use DDL Statements to Connect to a MySQL Result Table

Use the JDBC connector provided by Flink to connect to MySQL.


CREATE TABLE pvuv_sink (
) WITH (
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
'connector.table' = 'pvuv_sink', -- 表名
'connector.username' = 'root', -- 用户名
'connector.password' = '123456', -- 密码
'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条

PV and UV Calculation

For instance, to calculate the page views (PVs) and unique visits (UVs) of a website for every hour, most developers may think of the tumbling window method. However, this article introduces another option: the Group Aggregation method.

INSERT INTO pvuv_sink
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

This method uses the built-in DATE_FORMAT function to normalize log time into a “yyyy-MM-dd HH:00” string, and group data by this string (by hour). Next, use the COUNT(*) function to calculate the PVs, and the function to calculate the UVs. This method does the incremental calculation (for example, +1) every time a new data record is received; it generates the latest result and provides high output in real-time.

Then use the statement to write the query result into the MySQL table that we have defined earlier.

During the meetup last Saturday, I introduced the ways to optimize the performance of this method in detail.

Practical Demonstration

Environment Requirements

Install some required services before the demonstration:

  • Local Flink cluster: to run the Flink SQL task.
  • Local Kafka cluster: used as the data source.
  • MySQL database: used as a result table.

Install Flink on a Local Cluster

1 Download and decompress the Flink 1.9.0 installation package. https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz

2 Download the following dependency JAR packages and copy them to the . The operation is dependent on the implementation of each connector.

3 Change the taskmanager.numberOfTaskSlots value in to 10 as the demo task may consume more than one slot.

4 Run the file in the flink-1.9.0 directory to start the cluster.

In case the operation is successful, we should be able to access the Flink Web UI at .

Image for post
Image for post

Also, we need to fill the Flink installation path in the file of the flink-sql-submit project to submit SQL tasks. For example, the installation path on my PC is:


Install Kafka on a Local Cluster

Download and decompress the Kafka 2.2.0 installation package. https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz

Fill the installation path in the file of the project. For example, the installation path on my PC is

Run under the directory to start the Kafka cluster.

Run the JPS on the command line. The Kafka cluster started successfully if you see the Kafka process and the QuorumPeerMain process.

Install MySQL

Download MySQL from the official website and install it https://dev.mysql.com/downloads/mysql/

If you run a Docker environment, you can also install MySQL through Docker https://hub.docker.com/_/mysql

$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

Then, create a database named flink-test in MySQL and create the based on the preceding schema.

Submit an SQL Task

1 Run the file in the directory to automatically create the topic and fill it with data in real-time.

Image for post
Image for post

2 Run q1 in the directory. After successful submission, you can view the topology in Web UI.

Image for post
Image for post

Also, use the MySQL client to view the PV and UV variation in real-time.

Image for post
Image for post


This article shows you how to set up a basic cluster environment by using SqlSubmit to submit an SQL task and connect to the external system. To comment parameters for performance optimization, check this:


Enable these parameters to view their impact on the job.

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store