Basic Apache Flink Tutorial: SQL Programming Practice

  • How to use the SQL CLI client.
  • How to run an SQL query on a stream.
  • Run window aggregate and non-window aggregate to understand the differences between them.
  • How to use SQL to consume Kafka data.
  • How to use SQL to write results to Kafka and ElasticSearch.
  • The article assumes that you already have basic SQL knowledge.

Prepare the Environment

  • Flink SQL Client: To submit queries and visualize results.
  • Flink JobManager and TaskManager: To run Flink SQL tasks.
  • Apache Kafka: To generate input streams and write result streams.
  • Apache ZooKeeper: a Kafka dependency.
  • ElasticSearch: to write results.
  • Linux & MacOS
docker-compose up -d
  • Windows
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d

Run the Flink SQL CLI Client

docker-compose exec sql-client ./sql-client.sh

Data Introduction

Flink SQL> DESCRIBE Rides;
root
|-- rideId: Long // Ride ID (including two records, an input and an output)
|-- taxiId: Long // Taxi ID
|-- isStart: Boolean // Start or end
|-- lon: Float // Longitude
|-- lat: Float // Latitude
|-- rideTime: TimeIndicatorTypeInfo(rowtime) // Time
|-- psgCnt: Integer // Number of passengers

Example 1: Filter

SELECT * FROM Rides WHERE isInNYC(lon, lat);

Example 2: Group Aggregate

SELECT psgCnt, COUNT(*) AS cnt 
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;

Example 3: Window Aggregate

SELECT 
toAreaId(lon, lat) AS area,
TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end,
COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat) and isStart
GROUP BY
toAreaId(lon, lat),
TUMBLE(rideTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;

The Differences Between Window Aggregate And Group Aggregate

SELECT  DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id, COUNT(*) as pv
FROM T
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id

Example 4: Write the Append Stream to Kafka

docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning
INSERT INTO Sink_TenMinPsgCnts 
SELECT
TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart,
TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
CAST(SUM(psgCnt) AS BIGINT) AS cnt
FROM Rides
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);

Example 5: Write the Update Stream to ElasticSearch

INSERT INTO Sink_AreaCnts 
SELECT toAreaId(lon, lat) AS areaId, COUNT(*) AS cnt
FROM Rides
WHERE isStart
GROUP BY toAreaId(lon, lat);

Summary

Original Source:

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

AuroraFS — Procedure to check your airdrop

7 Practical Steps to Build Your Perfect Developer Resume

100 Days of code(Day 5 and Day 6)

BitSet extension to store multiple bits per position in Java

Memphis @newdaisytheatre Tonight!!! -tc3

2. Binary Search in Swift — Data Structure

Review Castle 240 RGB v2 / Learn to install a Water Cooling

Download Mac Os High Sierra On Pc

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

A Gentle Introduction to dbatools Commands

Cassandra Database

Spring Data Processing Pipeline: Getting Started with YugabyteDB CDC

CI/CD In Snowflake Using Schemachange And Jenkins