Basic Apache Flink Tutorial: SQL Programming Practice

Prepare the Environment

docker-compose up -d
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d

Run the Flink SQL CLI Client

docker-compose exec sql-client ./sql-client.sh

Data Introduction

Flink SQL> DESCRIBE Rides;
root
|-- rideId: Long // Ride ID (including two records, an input and an output)
|-- taxiId: Long // Taxi ID
|-- isStart: Boolean // Start or end
|-- lon: Float // Longitude
|-- lat: Float // Latitude
|-- rideTime: TimeIndicatorTypeInfo(rowtime) // Time
|-- psgCnt: Integer // Number of passengers

Example 1: Filter

SELECT * FROM Rides WHERE isInNYC(lon, lat);

Example 2: Group Aggregate

SELECT psgCnt, COUNT(*) AS cnt 
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;

Example 3: Window Aggregate

SELECT 
toAreaId(lon, lat) AS area,
TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end,
COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat) and isStart
GROUP BY
toAreaId(lon, lat),
TUMBLE(rideTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;

The Differences Between Window Aggregate And Group Aggregate

SELECT  DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id, COUNT(*) as pv
FROM T
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id

Example 4: Write the Append Stream to Kafka

docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning
INSERT INTO Sink_TenMinPsgCnts 
SELECT
TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart,
TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
CAST(SUM(psgCnt) AS BIGINT) AS cnt
FROM Rides
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);

Example 5: Write the Update Stream to ElasticSearch

INSERT INTO Sink_AreaCnts 
SELECT toAreaId(lon, lat) AS areaId, COUNT(*) AS cnt
FROM Rides
WHERE isStart
GROUP BY toAreaId(lon, lat);

Summary

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store