The Flink Ecosystem: A Quick Start to PyFlink

Why Is PyFlink Necessary?

Flink on Python and Python on Flink

Python and the Big Data Ecosystem

Why Flink and Python?

  • Advantageous architecture: Flink is a pure stream computing engine with unified stream and batch processing capabilities.
  • Fresh vitality: Flink is the most active open-source project in 2019 according to objective statistics of ASF.
  • High reliability: As an open-source project, Flink has long been tested and widely applied in big data companies’ production environments.

The PyFlink Architecture

  1. Make all Flink features available to Python users.
  2. Run Python’s analysis and computing functions on Flink to improve Python’s ability to resolve big data issues.

Making Flink Features Available to Python Users

Running the Analysis and Computing Functions of Python on Flink

  1. Select a typical Python class library and add its APIs to PyFlink. This method takes a long time because Python contains too many class libraries. Before incorporating any APIs, we need to streamline Python execution.
  2. Based on the existing Flink Table APIs and the characteristics of Python class libraries, we can treat all existing Python class library functions as user-defined functions and integrate them into Flink. This is supported in Flink versions 1.10 and later. What is the key issue of function integration? Again, it lies in the execution of Python user-defined functions.

How Do We Use PyFlink?

PyFlink’s Application Scenarios

  1. Event-driven scenarios, such as click farming and monitoring.
  2. Data analysis, such as inventory management and data visualization.
  3. Data pipelines, also known as ETL scenarios, such as log parsing.
  4. Machine learning, such as targeted recommendations.

PyFlink Installation

PyFlink APIs

User-Defined Functions Definition in PyFlink

One Case of Defining a Python User-Defined Function

PyFlink Case: Real-time Log Analysis for Alibaba Cloud CDN


import re
import json
from pyFlink.table import DataTypes
from pyFlink.table.udf import udf
from urllib.parse import quote_plus
from urllib.request import urlopen
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def ip_to_province(ip):
'ip': '',
'pro': '河北省',
'proCode': '130000',
'city': '石家庄市',
'cityCode': '130100',
'region': '灵寿县',
'regionCode': '130126',
'addr': '河北省石家庄市灵寿县 电信',
'regionNames': '',
'err': ''
urlobj = urlopen( \
'' % quote_plus(ip))
data = str(, "gbk")
pos ="{[^{}]+\}", data).span()
geo_data = json.loads(data[pos[0]:pos[1]])
if geo_data['pro']:
return geo_data['pro']
return geo_data['err']
return "UnKnow"
kafka_source_ddl = """
CREATE TABLE cdn_access_log (
client_ip VARCHAR,
request_time BIGINT,
response_size BIGINT,
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'access_log',
'' = 'localhost:2181',
'' = 'localhost:9092',
'format.type' = 'csv',
'format.ignore-parse-errors' = 'true'
mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
province VARCHAR,
access_count BIGINT,
total_download BIGINT,
download_speed DOUBLE
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/Flink',
'connector.table' = 'access_statistic',
'connector.username' = 'root',
'connector.password' = 'root',
'connector.write.flush.interval' = '1s'
# 核心的统计逻辑
.select("uuid, "
"ip_to_province(client_ip) as province, " # IP 转换为地区名称
"response_size, request_time")\
.select( # 计算访问量
"province, count(uuid) as access_count, "
# 计算下载总量
"sum(response_size) as total_download, "
# 计算下载速度
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
import osfrom pyFlink.datastream import StreamExecutionEnvironment
from pyFlink.table import StreamTableEnvironment, EnvironmentSettings
from enjoyment.cdn.cdn_udf import ip_to_province
from enjoyment.cdn.cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl
# 创建Table Environment, 并选择使用的Planner
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
# 创建Kafka数据源表
# 创建MySql结果表
# 注册IP转换地区名称的UDF
t_env.register_function("ip_to_province", ip_to_province)
# 添加依赖的Python文件
os.path.dirname(os.path.abspath(__file__)) + "/enjoyment/cdn/")
os.path.abspath(__file__)) + "/enjoyment/cdn/")
# 核心的统计逻辑
.select("uuid, "
"ip_to_province(client_ip) as province, " # IP 转换为地区名称
"response_size, request_time")\
.select( # 计算访问量
"province, count(uuid) as access_count, "
# 计算下载总量
"sum(response_size) as total_download, "
# 计算下载速度
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
# 执行作业

What Are the Future Prospects for PyFlink?

Objective-Driven Roadmap

PyFlink 1.11 Preview

PyFlink’s Roadmap, Mission and Vision

Core Committers for PyFlink

  • Fu Dian: a committer for Flink and two other top-level Apache projects. Fu has made great contributions to PyFlink.
  • Huang Xingbo: a dedicated PyFlink UDF performance optimizer. Huang was once a champion of the Alibaba security algorithm challenge competition, and has achieved many good results in AI and middleware performance competitions.
  • Cheng Hequn: a well-known committer in the Flink community. Cheng has shared very useful information many times. Many users may still remember his Flink Knowledge Map.
  • Zhong Wei: a committer who has focused on the user-defined function dependency management and ease-of-use optimization of PyFlink. Zhong has contributed a lot of code.


About the Author

Original Source:



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website: