Demo: How to Build Streaming Applications Based on Flink SQL

Preparation

Prepare a Linux or MacOS computer with Docker and Java 8 installed.

Use Docker Compose to Start Clusters

The components required in this demo are all orchestrated in containers, so just use docker-compose to start them with one click. Run the wget command to automatically download the docker-compose.yml file, or manually download the file.

mkdir flink-demo; cd flink-demo;
wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml
  • MySQL: Integrate MySQL 5.7 and create a category table (category) in advance. In addition, pre-fill the mappings between sub-categories and top-level categories to subsequently use this table as a dimension table.
  • Kafka: It is mainly used as a data source. The DataGen component automatically enters data into the container.
  • Zookeeper: It’s a Kafka container dependency.
  • Elasticsearch: It mainly stores data generated by Flink SQL.
  • Kibana: It’s used to visualize the data in Elasticsearch.
docker-compose up -d
docker-compose down

Download and Install a Local Flink Cluster

We recommend manually downloading and installing Flink, instead of automatically starting Flink through Docker. This gives a more intuitive understanding of the components, dependencies, and scripts of Flink.

-P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \wget                                                 -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.0/flink-sql-connector-elasticsearch7_2.11-1.10.0.jar | \wget -Pz ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar

Create a Kafka Table by Using DDL

After the Datagen container is started, it will continuously write data to Kafka’s user_behavior topic. This data contains the user behavior for November 27, 2017 (including the click, purchase, additional purchase, and like behaviors). Each row represents a user behavior, with the user ID, product ID, product category ID, behavior type, and time in JSON format. Note that this raw dataset is from the Alibaba Cloud Tianchi public dataset.

docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'
{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
{"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(), -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址
'format.type' = 'json' -- 数据源格式为 json
);

Statistics on Hourly Transactions

Create an Elasticsearch Table by Using DDL

First, create an Elasticsearch (ES) result table on the SQL CLI. In this case, save two pieces of data: hour and transaction volume.

CREATE TABLE buy_cnt_per_hour ( 
hour_of_day BIGINT,
buy_cnt BIGINT
) WITH (
'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector
'connector.version' = '6', -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本
'connector.hosts' = 'http://localhost:9200', -- elasticsearch 地址
'connector.index' = 'buy_cnt_per_hour', -- elasticsearch 索引名,相当于数据库的表名
'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相当于数据库的库名
'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新
'format.type' = 'json', -- 输出数据格式 json
'update-mode' = 'append'
);

Submit a Query

The hourly transaction volume refers to the number of “buy” user behaviors in each hour. Therefore, the TUMBLE window function is required to implement the hourly window. Then, each window collects the number of “buy” events separately. To implement this, filter out the “buy” data first and then run COUNT(*).

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

Use Kibana to Visualize Results

Sart the Kibana container through Docker Compose. Access Kibana at http://localhost:5601. First, configure an index pattern by clicking "Management" in the left-side toolbar and find "Index Patterns". Next, click "Create Index Pattern" and enter the full index name "buy_cnt_per_hour" to create the index pattern. After creating the index pattern, Kibana will perceive the index and allows to explore the data.

Collect Statistics on the Number of Unique Viewers for Every 10 Minutes of a Day

Visualize the cumulative number of unique viewers (UVs) at each moment of the day, which represents the total number of UVs from 00:00 to the current time. The resulting curve always increases.

CREATE TABLE cumulative_uv (
time_str STRING,
uv BIGINT
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'cumulative_uv',
'connector.document-type' = 'user_behavior',
'format.type' = 'json',
'update-mode' = 'upsert'
);
CREATE VIEW uv_per_10min AS
SELECT
MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

Top-level Category Ranking

The last visualization is a category list, which allows seeing the most important categories. However, the 5,000 categories in the source data are too detailed and do not have much significance in the ranking. To address this problem, include them in a smaller number of top-level categories. Therefore, in the MySQL container, prepare mapping data between sub-categories and top-level categories and use it as a dimension table.

CREATE TABLE category_dim (
sub_category_id BIGINT, -- 子类目
parent_category_id BIGINT -- 顶级类目
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink',
'connector.table' = 'category',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
);
CREATE TABLE top_category (
category_name STRING, -- 类目名称
buy_cnt BIGINT -- 销量
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'top_category',
'connector.document-type' = 'user_behavior',
'format.type' = 'json',
'update-mode' = 'upsert'
);
CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior,
CASE C.parent_category_id
WHEN 1 THEN '服饰鞋包'
WHEN 2 THEN '家装家饰'
WHEN 3 THEN '家电'
WHEN 4 THEN '美妆'
WHEN 5 THEN '母婴'
WHEN 6 THEN '3C数码'
WHEN 7 THEN '运动户外'
WHEN 8 THEN '食品'
ELSE '其他'
END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;
INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

Summary

This article describes how to use Flink SQL to integrate Kafka, MySQL, Elasticsearch, and Kibana and quickly build a real-time analysis application. The entire process can be completed by using SQL plain text, without a line of Java or Scala code. Hope this article gave you a better understanding of the convenient and powerful Flink SQL, which features a convenient connection to various external systems, native support for event times and disordered data processing, dimension table join, and a wide range of built-in functions. We hope the practical exercises are entertaining and inspiring.

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store