Flink 1.9: Using SQL Statements to Read Data from Kafka and Write to MySQL

By Jark Wu

I held a speech called Flink SQL 1.9.0 Technologies and Best Practices last Saturday at the Apache Kafka × Apache Flink Meetup in Shenzhen. The code that I demonstrated in my speech, entitled Flink SQL 1.9.0 Technologies and Best Practices, sparked a lot of interest from the audience. I’ve included in this article to share it with you. I hope it’s helpful to new Flink SQL users.

The demo code is now available on GitHub: https://github.com/wuchong/flink-sql-submit

This code has two parts:

  • Use SqlSubmit to submit SQL files.
  • SQL examples for demonstration, Kafka startup and stop scripts, a test dataset, and Kafka data source generator.

After this practice, you will understand:

  • How to use Blink Planner.
  • How to implement a simple SqlSubmit.
  • How to use DDL statements to create a Kafka source table and MySQL result table.
  • How to run a MySQL job to read data from Kafka, calculate the PV and UV, and write results to MySQL databases.
  • How to set performance optimization parameters and observe their impact on jobs.

Implementation of SQLSubmit

At first, I wanted to use SQL Client to implement the entire demonstration process. Unfortunately, SQL CLI V1.9 does not currently support processing CREATE TABLE statements. Therefore, I had to write a simple submission script. This helps you understand how to use Flink SQL by using SQL statements and some programming skills.

SqlSubmit is mainly used to run and submit a SQL statement. Implement this easily by matching every statement block via regular expressions. If a statement starts with CREATE TABLE or INSERT INTO, SqlSubmit calls tEnv.sqlUpdate(...). If a statement starts with SET, SqlSubmit it sets the configuration in TableConfig. Here is the core code:

Use DDL Statements to Connect to a Kafka Source Table

You can find a test dataset (taken from the publicly available Alibaba Cloud Tianchi Data Sets) in the flink-sql-submit project on GitHub. You can find it at src/main/resources/user_behavior.log. The data is in JSON format. Here is what it looks like:

To make the data look more like a real Kafka data source, I also wrote a source-generator.sh (you can view the source code if you are interested). It automatically reads data from user_behavior.log and feeds the data to the user_behavior Kafka topic at a rate of 1 data record/ms.

After creating the data source, we can use the data definition language (DDL) statement to create a Kafka topic and then connect to this topic (for details, see src/main/resources/q1.sql)

Note: Some users find some parameters (such as connector.properties.0.key) difficult to understand. Therefore, we plan to improve and simplify the connector parameter configuration in the next community edition.

Use DDL Statements to Connect to a MySQL Result Table

Use the JDBC connector provided by Flink to connect to MySQL.

Example:

PV and UV Calculation

For instance, to calculate the page views (PVs) and unique visits (UVs) of a website for every hour, most developers may think of the tumbling window method. However, this article introduces another option: the Group Aggregation method.

This method uses the built-in DATE_FORMAT function to normalize log time into a “yyyy-MM-dd HH:00” string, and group data by this string (by hour). Next, use the COUNT(*) function to calculate the PVs, and the COUNT(DISTINCT user_id) function to calculate the UVs. This method does the incremental calculation (for example, +1) every time a new data record is received; it generates the latest result and provides high output in real-time.

Then use the INSERT INTO statement to write the query result into the pvuv_sink MySQL table that we have defined earlier.

During the meetup last Saturday, I introduced the ways to optimize the performance of this method in detail.

Practical Demonstration

Environment Requirements

Install some required services before the demonstration:

  • Local Flink cluster: to run the Flink SQL task.
  • Local Kafka cluster: used as the data source.
  • MySQL database: used as a result table.

Install Flink on a Local Cluster

1 Download and decompress the Flink 1.9.0 installation package. https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz

2 Download the following dependency JAR packages and copy them to the flink-1.9.0/lib/directory. The operation is dependent on the implementation of each connector.

3 Change the taskmanager.numberOfTaskSlots value in flink-1.9.0/conf/flink-conf.yaml to 10 as the demo task may consume more than one slot.

4 Run the ./bin/start-cluster.sh file in the flink-1.9.0 directory to start the cluster.

In case the operation is successful, we should be able to access the Flink Web UI at http://localhost:8081.

Image for post
Image for post

Also, we need to fill the Flink installation path in the env.sh file of the flink-sql-submit project to submit SQL tasks. For example, the installation path on my PC is:

Install Kafka on a Local Cluster

Download and decompress the Kafka 2.2.0 installation package. https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz

Fill the installation path in the env.sh file of the flink-sql-submit project. For example, the installation path on my PC is KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0

Run ./start-kafka.sh under the flink-sql-submit directory to start the Kafka cluster.

Run the JPS on the command line. The Kafka cluster started successfully if you see the Kafka process and the QuorumPeerMain process.

Install MySQL

Download MySQL from the official website and install it https://dev.mysql.com/downloads/mysql/

If you run a Docker environment, you can also install MySQL through Docker https://hub.docker.com/_/mysql

Then, create a database named flink-test in MySQL and create the pvuv_sink table based on the preceding schema.

Submit an SQL Task

1 Run the ./source-generator.sh file in the flink-sql-submit directory to automatically create the user_behavior topic and fill it with data in real-time.

Image for post
Image for post

2 Run ./run.sh q1 in the flink-sql-submit directory. After successful submission, you can view the topology in Web UI.

Image for post
Image for post

Also, use the MySQL client to view the PV and UV variation in real-time.

Image for post
Image for post

Conclusion

This article shows you how to set up a basic cluster environment by using SqlSubmit to submit an SQL task and connect to the external system. To comment parameters for performance optimization, check this:

Enable these parameters to view their impact on the job.

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store