The Flink Ecosystem: A Quick Start to PyFlink

Why Is PyFlink Necessary?

Flink on Python and Python on Flink

So, what exactly is PyFlink? As its name suggests, PyFlink is simply a combination of Apache Flink with Python, or rather Flink on Python. But what does Flink on Python mean? First, the combination of the two means that you can use all of Flink’s features in Python. And, more important than that, PyFlink also allows you to use the computing capabilities of Python’s extensive ecosystem on Flink, which can in turn help further facilitate the development of its ecosystem. In other words, it’s a win-win for both sides. If you dive a bit deeper into this topic, you’ll find that the integration of the Flink framework and Python language is by no means a coincidence.

Python and the Big Data Ecosystem

The python language is closely connected to big data. To understand this, we can take a look at some of the practical problems people are solving with Python. A user survey shows that most people are using Python for data analysis and machine learning applications. For these sorts of scenarios, several desirable solutions are also addressed in the big data space. Apart from expanding the audience of big data products, the integration of Python and big data greatly enhances the capabilities of the Python ecosystem by extending its standalone architecture to a distributed architecture. This also explains the strong demand for Python in analyzing massive amounts of data.

Why Flink and Python?

The integration of Python and big data is in line with several other recent trends. But, again, why does Flink now support Python, as opposed to Go or R or another language? And also, why do most users choose PyFlink over PySpark and PyHive?

  • Fresh vitality: Flink is the most active open-source project in 2019 according to objective statistics of ASF.
  • High reliability: As an open-source project, Flink has long been tested and widely applied in big data companies’ production environments.

The PyFlink Architecture

To implement PyFlink, we need to know the key objectives to be achieved and the core issues to be resolved. What are PyFlink’s key objectives? In short, the key objectives of PyFlink are detailed as follows:

  1. Run Python’s analysis and computing functions on Flink to improve Python’s ability to resolve big data issues.

Making Flink Features Available to Python Users

To implement PyFlink, do we need to develop a Python engine on Flink, like the existing Java engine? The answer is no. Attempts were made in Flink versions 1.8 and earlier, but they didn’t work well. A basic design principle is to achieve given objectives at minimal costs. The simplest but best way is to provide one layer of Python APIs and reuse the existing computing engine.

Running the Analysis and Computing Functions of Python on Flink

The previous section describes how to make Flink features available to Python users. This section shows you how to run Python functions on Flink. Generally, we can run Python functions on Flink in one of two ways:

  1. Based on the existing Flink Table APIs and the characteristics of Python class libraries, we can treat all existing Python class library functions as user-defined functions and integrate them into Flink. This is supported in Flink versions 1.10 and later. What is the key issue of function integration? Again, it lies in the execution of Python user-defined functions.

How Do We Use PyFlink?

With the knowledge of PyFlink’s architecture and the ideas behind it, let’s look at specific application scenarios of PyFlink for a better understanding of the hows and whys behind it.

PyFlink’s Application Scenarios

What are the business scenarios that PyFlink supports? We can analyze its application scenarios from two perspectives: Python and Java. Bear in mind that PyFlink is suitable for all scenarios where Java can apply, too.

  1. Data analysis, such as inventory management and data visualization.
  2. Data pipelines, also known as ETL scenarios, such as log parsing.
  3. Machine learning, such as targeted recommendations.

PyFlink Installation

Before using any API, you need to install PyFlink. Currently, to install PyFlink, run the command: pip install apache-Flink.

PyFlink APIs

PyFlink APIs are fully aligned with Java Table APIs to support various relational and window operations. Some ease-of-use PyFlink APIs are even more powerful than SQL APIs, such as APIs specific to column operations. In addition to APIs, PyFlink also provides multiple ways to define Python UDFs.

User-Defined Functions Definition in PyFlink

ScalarFunction can be extended (for example, by adding metrics) to provide more auxiliary features. In addition, PyFlink user-function functionss support all method definitions that Python supports, such as the lambda, named, and callable functions.

One Case of Defining a Python User-Defined Function

In this example case, we add up two numbers. First, for this, import necessary classes, then define the previously mentioned functions. This is pretty straightforward, so let’s proceed to a practical case.

PyFlink Case: Real-time Log Analysis for Alibaba Cloud CDN

Here I take Alibaba Cloud Content Deliver Network (CDN)’s real-time log analysis feature as an example to show you how to use PyFlink to resolve practical business problems. Alibaba Cloud CDN is used to accelerate resource downloads. Generally, CDN logs are parsed in a common pattern: First, collect log data from edge nodes, and then save that data to message queues. Second, combine message queues and Realtime Compute clusters to perform real-time log analysis. Third, write analysis results into the storage system. In this example, the architecture is instantiated, Kafka is used as a message queue, Flink is used for real-time computing, and the final data is stored in a MySQL database.

Requirements

For convenience, we have simplified the actual business statistical requirements. In this example, statistics for page views, downloads, and download speeds are collected by region. In terms of data formats, we have selected only core fields. For example, uuid indicates a unique log ID, client_ip indicates the access source, request_time indicates the resource download duration, and response_size indicates the resource data size. Here, the original logs do not contain a region field despite the requirement to collect statistics by region. Therefore, we need to define a Python UDF to query the region of each data point according to the client_ip. Let's analyze how to define the user-defined function.

import re
import json
from pyFlink.table import DataTypes
from pyFlink.table.udf import udf
from urllib.parse import quote_plus
from urllib.request import urlopen
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def ip_to_province(ip):
"""
format:
{
'ip': '27.184.139.25',
'pro': '河北省',
'proCode': '130000',
'city': '石家庄市',
'cityCode': '130100',
'region': '灵寿县',
'regionCode': '130126',
'addr': '河北省石家庄市灵寿县 电信',
'regionNames': '',
'err': ''
}
"""
try:
urlobj = urlopen( \
'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip))
data = str(urlobj.read(), "gbk")
pos = re.search("{[^{}]+\}", data).span()
geo_data = json.loads(data[pos[0]:pos[1]])
if geo_data['pro']:
return geo_data['pro']
else:
return geo_data['err']
except:
return "UnKnow"
kafka_source_ddl = """
CREATE TABLE cdn_access_log (
uuid VARCHAR,
client_ip VARCHAR,
request_time BIGINT,
response_size BIGINT,
uri VARCHAR
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'access_log',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'csv',
'format.ignore-parse-errors' = 'true'
)
"""
mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
province VARCHAR,
access_count BIGINT,
total_download BIGINT,
download_speed DOUBLE
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/Flink',
'connector.table' = 'access_statistic',
'connector.username' = 'root',
'connector.password' = 'root',
'connector.write.flush.interval' = '1s'
)
"""
# 核心的统计逻辑
t_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, " # IP 转换为地区名称
"response_size, request_time")\
.group_by("province")\
.select( # 计算访问量
"province, count(uuid) as access_count, "
# 计算下载总量
"sum(response_size) as total_download, "
# 计算下载速度
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic")
import osfrom pyFlink.datastream import StreamExecutionEnvironment
from pyFlink.table import StreamTableEnvironment, EnvironmentSettings
from enjoyment.cdn.cdn_udf import ip_to_province
from enjoyment.cdn.cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl
# 创建Table Environment, 并选择使用的Planner
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
# 创建Kafka数据源表
t_env.sql_update(kafka_source_ddl)
# 创建MySql结果表
t_env.sql_update(mysql_sink_ddl)
# 注册IP转换地区名称的UDF
t_env.register_function("ip_to_province", ip_to_province)
# 添加依赖的Python文件
t_env.add_Python_file(
os.path.dirname(os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_udf.py")
t_env.add_Python_file(os.path.dirname(
os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_connector_ddl.py")
# 核心的统计逻辑
t_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, " # IP 转换为地区名称
"response_size, request_time")\
.group_by("province")\
.select( # 计算访问量
"province, count(uuid) as access_count, "
# 计算下载总量
"sum(response_size) as total_download, "
# 计算下载速度
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic")
# 执行作业
t_env.execute("pyFlink_parse_cdn_log")

What Are the Future Prospects for PyFlink?

In general, business development with PyFlink is simple. You can easily describe the business logic through SQL or Table APIs without understanding the underlying implementation. Let’s take a look at the overall prospects for PyFlink.

Objective-Driven Roadmap

The development of PyFlink has always been driven by the goals to make Flink features available to Python users and to integrate Python functions into Flink. According to the PyFlink roadmap shown below, we first established communication between PyVM and JVM. Then, in Flink 1.9, we provided Python Table APIs that opened existing Flink Table API features to Python users. In Flink 1.10, we prepared for integrating Python functions into Flink by doing the following: integrating Apache Beam, setting up the Python user-defined function execution environment, managing Python’s dependencies on other class libraries, and defining user-defined function APIs for users to support Python user-defined functions.

PyFlink 1.11 Preview

Let’s quickly look at the key points of PyFlink in the upcoming version Flink 1.11.

PyFlink’s Roadmap, Mission and Vision

We have already defined PyFlink, and described its significance, API architecture, and user-defined function architecture, as well as the trade-offs behind the architecture and the benefits of it. We have gone through the CDN case, PyFlink roadmap, and key points of PyFlink in Flink 1.11. But, what else do we need to know?

Core Committers for PyFlink

Finally, here’s the core committers for PyFlink.

  • Huang Xingbo: a dedicated PyFlink UDF performance optimizer. Huang was once a champion of the Alibaba security algorithm challenge competition, and has achieved many good results in AI and middleware performance competitions.
  • Cheng Hequn: a well-known committer in the Flink community. Cheng has shared very useful information many times. Many users may still remember his Flink Knowledge Map.
  • Zhong Wei: a committer who has focused on the user-defined function dependency management and ease-of-use optimization of PyFlink. Zhong has contributed a lot of code.

Summary

In this post, we have analyzed PyFlink in depth. In the PyFlink API architecture, Py4J is used for communications between PyVM and JVM, and semantic consistency is kept between Python and Java APIs in their design. In the Python user-defined function architecture, Apache Beam’s Portability Framework has been integrated to provide efficient and stable Python user-defined functions. Also, the thoughts behind the architecture, technical trade-offs, and advantages of the existing architecture have been interpreted.

About the Author

The author of this article, Sun Jincheng, joined Alibaba in 2011. Sun has led the development of many internal core systems during his nine years of work at Alibaba, such as Alibaba Group’s behavioral log management system, Alilang, cloud transcoding system, and document conversion system. He got to know the Apache Flink community in early 2016. At first, he participated in community development as a developer. Later, he led the development of specific modules, and then took charge of the construction of the Apache Flink Python API (PyFlink). He is currently a PMC member of Apache Flink and ALC (Beijing) and a committer for Apache Flink, Apache Beam, and Apache IoTDB.

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store