Basic Apache Flink Tutorial: SQL Programming Practice

By Jark Wu

This article describes tutorial practice based on the open-source sql-training project of Ververica, and Flink 1.7.2. It uses five examples throughout the Flink SQL programming practice, mainly covering the following aspects:

  • How to use the SQL CLI client.
  • How to run an SQL query on a stream.
  • Run window aggregate and non-window aggregate to understand the differences between them.
  • How to use SQL to consume Kafka data.
  • How to use SQL to write results to Kafka and ElasticSearch.
  • The article assumes that you already have basic SQL knowledge.

Prepare the Environment

Note: There may not be enough resources configured in Docker by default, which may cause the Flink job to crash while running. Therefore, we recommend configuring the resources in Docker to 3–4 GB and 3–4 CPUs.

In this tutorial, we installed the environment using Docker Compose, which accommodates containers of various services, including:

  • Flink SQL Client: To submit queries and visualize results.
  • Flink JobManager and TaskManager: To run Flink SQL tasks.
  • Apache Kafka: To generate input streams and write result streams.
  • Apache ZooKeeper: a Kafka dependency.
  • ElasticSearch: to write results.

The Docker Compose configuration file is provided. You can download the docker-compose.yml file directly.

Next, open the command line window, enter the directory where the docker-compose.yml file is stored, and then run the following command:

  • Linux & MacOS
docker-compose up -d
  • Windows
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d

The docker-compose command starts every required container. Docker automatically downloads images (nearly 2.3 GB) from Docker Hub the first time it runs, which may take a while. After that, it starts in a matter of seconds. If the operation is successful, you can see the following output on the command line; you can also access Flink Web UI at http://localhost:8081.

Run the Flink SQL CLI Client

docker-compose exec sql-client ./sql-client.sh

The command starts the Flink SQL CLI client in the container. Then, you see the following ‘welcome’ interface.

Data Introduction

Flink SQL> DESCRIBE Rides;
root
|-- rideId: Long // Ride ID (including two records, an input and an output)
|-- taxiId: Long // Taxi ID
|-- isStart: Boolean // Start or end
|-- lon: Float // Longitude
|-- lat: Float // Latitude
|-- rideTime: TimeIndicatorTypeInfo(rowtime) // Time
|-- psgCnt: Integer // Number of passengers

For more information about the Rides table, see training-config.yaml.

Example 1: Filter

Note: Some built-in functions are predefined in the Docker environment, such as isInNYC(lon, lat) to determine whether there are latitude and longitude in New York, and toAreaId(lon, lat) to convert them into specific areas.

Therefore, you can use isInNYC to filter driving records in New York quickly. Run the following query in SQL CLI:

SELECT * FROM Rides WHERE isInNYC(lon, lat);

SQL CLI submits an SQL task to the Docker cluster; it continuously pulls data from the data source (Rides streams are in Kafka) and filters the required data through isInNYC. The SQL CLI also enters visualization mode and constantly refreshes and displays the filtered results in real-time:

You can also go to http://localhost:8081 to see how the Flink job is running.

Example 2: Group Aggregate

Group the number of passengers by psgCnt, and use COUNT(*) to compute the number of events for each group. Note that you need to filter the driving record data that occurred in New York (isInNYC) before grouping. Run the following query in SQL CLI:

SELECT psgCnt, COUNT(*) AS cnt 
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;

The visualized results of the SQL CLI are as follows; while the results are changing every second, the maximum number of passengers cannot exceed six.

Example 3: Window Aggregate

This process involves window aggregation (every five minutes), so the Tumbling Window syntax is required.

For each area, you need to group them by AreaId based on the number of entered vehicles. Before grouping, you need to filter driving records of entered vehicles using the isStart field and use COUNT(*) to count the number of vehicles.

Areas with at least five vehicles: This is a filter condition based on statistical values using the SQL HAVING clause.

The final query is as follows:

SELECT 
toAreaId(lon, lat) AS area,
TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end,
COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat) and isStart
GROUP BY
toAreaId(lon, lat),
TUMBLE(rideTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;

After running the query in the SQL CLI, the visualized results are as follows. The result for each area and window_end will not change after output, but the result with a batch of new windows is output every five minutes.

A batch of new windows is generated every 30 seconds during the demonstration because we have performed 10 times faster by reading the source in the Docker environment (relative to the original speed).

The Differences Between Window Aggregate And Group Aggregate

However, the Group Aggregate outputs the latest results every time a piece of data is processed. The results have an updated status, similar to data found in a database, and its output stream is an Update stream.

Another difference is that the windows have watermarks. With this feature, you know exactly which windows have expired to clear the expired state in time to ensure a stable state size.

However, for the Group Aggregate, since it is impossible to know which data has expired, the state size will grow limitlessly, which is not stable enough for production operations. Therefore, configure the State TTL for Group Aggregate jobs.

For example, to count real-time PV of each store every day, you need to set TTL to 24+ hours because its previous state from a day ago is generally not used.

SELECT  DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id, COUNT(*) as pv
FROM T
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id

If the TTL value is too small, clearing useful states and data does happen which results in data accuracy issues. This is also a parameter that the user needs to weigh in.

Example 4: Write the Append Stream to Kafka

It is possible to write While Append streams to any storage systems and to log systems, such as Kafka.

Let’s suppose you want to write the “number of passengers for every 10 minutes” stream to Kafka.

A Kafka result table Sink_TenMinPsgCnts has been predefined (for the complete table definition, see training-config.yaml).

Before executing the query, run the following command to monitor the data written to TenMinPsgCnts topic:

docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning

The Tumbling Window can describe the number of passengers for every 10 minutes. You can use the INSERT INTO Sink_TenMinPsgCnts command to directly write the query results into the result table.

INSERT INTO Sink_TenMinPsgCnts 
SELECT
TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart,
TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
CAST(SUM(psgCnt) AS BIGINT) AS cnt
FROM Rides
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);

As seen above, write TenMinPsgCnts data topic to Kafka in JSON format:

Example 5: Write the Update Stream to ElasticSearch

An ElasticSearch result table Sink_AreaCnts has been predefined (for the complete table definition, see training-config.yaml). This table contains only two fields: areaId and cnt.

Similarly, you also use INSERT INTO to write the query results directly to the Sink_AreaCnts table.

INSERT INTO Sink_AreaCnts 
SELECT toAreaId(lon, lat) AS areaId, COUNT(*) AS cnt
FROM Rides
WHERE isStart
GROUP BY toAreaId(lon, lat);

After you run the preceding query in SQL CLI, Elasticsearch automatically creates the area-cnts index. Elasticsearch provides a REST API. You can visit the following URLs.

As the query continues to run, you can also observe that some statistical values (_all.primaries.docs.count and _all.primaries.docs.deleted) are constantly increasing: http://localhost:9200/area-cnts/_stats

Summary

If you are interested, you can perform more in-depth practices based on the Docker environment, such as running your own UDF, UDTF, and UDAF, and querying other built-in source tables.

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.