Use Python API in Apache Flink
By Sun Jincheng
History, Status Quo, and Future Development of Apache Flink Python API
Reasons Why Apache Flink Supports Python
Apache Flink is an open-source, big data computing engine with a unified stream and batch data processing capabilities. Apache Flink 1.9.0 provides a machine learning (ML) API and a new Python API. Next, we will describe details about why Apache Flink supports Python.
- Python is one of the most popular development languages
- Python is supported by many open-source projects
Considering the complete Python ecosystem, Apache Flink has invested heavily in version 1.9 to launch a completely new, PyFlink. As big data, artificial intelligence (AI) is closely related to Python.
- Python is favored by machine learning (ML).
According to the statistics, Python is the most frequently required language for ML jobs, matching 0.129% of job postings. Compared to the R language at 0.076%, Python is favored more by the ML industry. As an interpretive language, Python’s design principle is, “There should only be one way to do things.” As one of the most popular languages in the world, based on its simplicity and ease of use, Python is a good ecosystem in the big data computing field. It also has a promising future in ML. Therefore, we recently launched Python API with a completely new architecture in Apache Flink 1.9.
Apache Flink is a computing engine with a unified stream and batch data processing capabilities. The community attaches great importance to Flink users and hopes to provide more access and channels to Flink, like Java and Scala. This will make Flink more convenient to use by more users and allow them to benefit from the value brought about by the big data computing capacity of Flink. Starting from Apache Flink 1.9, the Apache Flink community launches Python API with a completely new technical architecture, which supports the most commonly used operators, such as JOIN, AGG, and WINDOW.
Python API — RoadMap
Although Apache Flink 1.9 allows Python to utilize user-defined Java functions, it does not support defining Python native user-defined functions. Therefore, support is provided for Python user-defined functions and Python data analysis library, Pandas, in Apache Flink 1.10. In addition, we will add support for the DataStream API and ML API in Apache Flink 1.11.
Apache Flink Python API Architecture and Development Environment
Python Table API Architecture
The new Python API architecture is composed of the user API module, communication module between a Python virtual machine (VM) and Java VM, and module that submits tasks to the Flink cluster for operation.
How does a Python VM communicate with a Java VM? The Python VM has a Python gateway that maintains a connection with the Java VM, which has a GateWayServer that receives calls from the Python VM.
Apache Flink versions earlier than 1.9 already support Python API in DataSet and DataStream modules. However, they use two different APIs, respectively: DataSet API and DataStream API. For a stream computing engine with a unified stream and batch data processing capabilities, such as Flink, a unified architecture is vitally important. The existing Python DataSet API and DataStream API use the JPython technical architecture. However, JPython cannot properly support the Python 3.X series. Therefore, the existing Python API architecture is abandoned, and a completely new technical architecture is adopted starting with Flink 1.9. This new Python API is implemented based on the Table API.
Communication between Table API and Python API is implemented through communication between a Python VM and Java VM. The Python API communicates with the Java API while Python API writes and calls. Operating the Python API is similar to operating the Java Table API. The new architecture has the following advantages.
- No need to create a new set of operators, instead easily maintain consistency with functions of the Java Table API.
- Optimize the Python API by using the Java Table API optimization model. This ensures that jobs written by using the Python API provide optimal performance.
When a Python VM initiates a request on a Java object, the Java VM creates an object, saves it in a storage structure, and assigns an ID to the object. Then, it sends the ID to the Python VM, which operates on the object with the corresponding object ID. The Python VM can operate on all objects of the Java VM, which ensures that the Python Table API has identical functions with the Java Table API and can use its existing performance optimization model.
In the new architecture and communication model, a Python VM calls a Java Table API by simply obtaining the corresponding Java object ID and passing the name and parameters of the call method to the Java VM. Therefore, developing a Python Table API follows the same procedure as developing a Java Table API. Next, let’s explore how to develop a simple Python API job.
Python Table API — Job Development
Generally, a Python Table Job is divided into four steps. Considering the current situation, first, decide whether you want the job to run in batch mode or streaming mode. Users of later versions can skip this step, but users of Apache Flink 1.9 must make this decision.
After deciding on the job running mode, know where the data is from, and how to define the data source, schema, and data type. Then, write the computational logic (a computational operation to be performed on the data) and persist the final computation results to a specified system. Next, define the sink. Like defining a data source, define the sink schema and every field type within it.
Next, let’s understand how to code for each of the above steps by using the Python API. First, create an execution environment, which must be eventually a table environment. In this table environment, there must be a Table Config module that has some configuration parameters to be passed to the RunTime layer during the running process. This module must also provide some custom configuration items that can be used during the actual service development stage.
After creating the execution environment, we must define the data source table. As an example, data records in a CSV file are separated by commas (,) and fields are listed in the Field column. The table contains only one field — word, and the type of this field is String.
After defining and describing the data source, as well as converting the data source structure to a table, what data structure and data type will be in the Table API layer? Next, let’s see how to add fields and field types by using
with_SCHEMA. Here, there is only one field, with the String data type. The data source is registered as a table in the catalog for subsequent queries and computations.
Then, create a result table. After the computation, store the computation results into a persistent system. For example, to write a WordCount job; firstly, there is a storage table with two fields: word and count, the data types of which are respectively String and Bigint. Then register this table as a Sink.
After registering the table sink, look at how to write the computational logic. In fact, writing WordCount with the Python API is as simple as writing it with the Table API. Different from DataStream, Python API requires only a one-line statement to write a WordCount job. For example, first, scan the source table and then use the GROUP BY statement to group rows by word. Next, use the SELECT statement to select a word, and use the aggregation function to calculate the count of each word. Finally, insert the calculation result into the result table.
Python Table API — Development Environment
The critical question is how to run the WordCount job, exactly? First, set up the development environment. Different versions of software may be installed on different machines. Here are some requirements for software versions.
Second, build a binary Java release package based on the source code. Therefore, clone the master branch code, and fetch the 1.9 branch. Of course, you can use the master code. However, the master code is not stable enough and we recommend that you use the 1.9 branch code. Now, let’s go through the steps. First, compile the code. For example.
git clone https://github.com/apache/flink.git
cd flink; git fetch origin release-1.9
git checkout -b release-1.9 origin/release-1.9
mvn clean install -DskipTests -Dfast
After compilation, locate the release package in the corresponding directory.
tar -zcvf flink-1.9.0.tar.gz flink-1.9.0
After building the Java API, verify the API and build a Python release package.
All Python users know that to install packages through pip install, we must integrate the dependency libraries with the local Python environment or install these dependency libraries in the local environment.
This applies to Flink. Package PyFlink into a resource package recognized by Pypip for installation. Use the following command to copy the package and install it in your own environment.
cd flink-Python;Python setup.py sdist
This process simply wraps the Java release package together with some Java packages and Python packages of some PyFlink modules. Find a new
apache-flink-1.9.dev0.tar.gz package in the dist directory.
apache-flink-1.9.dev0.tar.gz file in the dist directory is the PyFlink package that can be used for installation by using pip install. The Apache Flink 1.9 installation package contains both Flink Table and Flink Table Blink. Flink simultaneously supports two planners. We can freely switch between the default Flink Planner and Blink Planner. You are encouraged to try each of them by yourselves. After packaging, we can try installing the package in our environment.
Using a very simple command, first, check whether the command is correct. Before running the command, use pip to check the list to see if the package has already been installed. Then, try installing the package that is prepared in the previous step. In real scenarios, to install an upgrade, the new package is installed.
pip install dist/*.tar.gz
pip list|grep flink
After installing the package, use the WordCount job written previously to check whether the environment is correct. To verify whether the environment is correct, run the following command to directly clone the environment code repository.
git clone https://github.com/sunjincheng121/enjoyment.code.git
cd enjoyment.code; Python word_count.py
Next, let’s try it out. Find the WordCount job file created previously in this directory. Let’s directly use python
word_count.py to verify whether the environment is OK. Apache Flink Python API would start a mini-cluster to run the WordCount job. Now, the job is already running on the mini-cluster.
In this process, the code first reads a source file, and then writes the result to a CSV file. In this directory, find the
sink.csv file. For details about the operation steps, refer to the video titled, The Status Quo and Planning of Apache Flink Python API, posted in Apache Flink Community China.
Now, let’s discuss the integrated development environment (IDE) setup. We recommend that you use PyCharm to develop Python-related logic and jobs.
For IDE setup details, scan the QR code or directly visit the blog at https://enjoyment.cool You may have many Python environments, but you must select the one that you used for the pip install. This is very important. For details about the operation steps, refer to the video titled, The Status Quo and Planning of Apache Flink Python API.
Python Table API — Job Submission
What methods can we use to submit jobs? First, use the CLI method, which submits jobs to an existing cluster. To use this method, you must start a cluster. The directory of the build is usually under build-target. Directly run this command to start a cluster. Note that this process involves an external Web port. Configure the port number in
flink-conf.yaml file. Now, use the command in the PPT to start a cluster. To verify that the cluster started successfully, either check the log or visit the site with a browser. If the cluster started successfully, let's look at how to submit a job.
Use Flink run to submit the job by executing the following code.
./bin/flink run -py ~/training/0806/enjoyment.code/myPyFlink/enjoyment/word_count_cli.py
Besides using py to specify the Python file, use pym to specify the Python module, pyfs to specify the Python resource files, and j to specify the JAR package.
Apache Flink 1.9 offers a more convenient method: Python Shell allows us to write results obtained by Python API in an interactive manner. Python Shell executes in two modes: local and remote, which are not significantly different. First, let’s try the local mode by running the following command.
This command will start a mini-cluster. After running the code, it returns the Flink logo with text FLINK — PYTHON — SHELL, as well as some sample scripts that demonstrate this feature. Enter these scripts to return the correct output and result. Here you can write either streaming or batch. For details about the operation steps, refer to the video.
Now, you have some basic understanding of the Python Table API architecture of Apache Flink 1.9 and how to set up an environment for the Python Table API. Considered a simple WordCount example to see how to run a job in IDE, and how to submit the job through Flink run and Python Shell. Also, experienced some interactive methods to use Flink Python APIs. After introducing the Flink environment setup and demonstration of a simple example, Let’s move on to key operators of Apache Flink 1.9.
Introduction and Application of Key Operators of Flink Python APIs
Python Table API Operators
We have already covered how to create a job. First, select the execution mode: streaming or batch. Then, define the tables to be used (the source table and result table), schema, and data type. After that, write the computational logic. Finally, use the built-in aggregation functions of the Python API, such as Count, Sum, Max, and Min. For example, when we wrote the WordCount job, we used the Count function.
Apache Flink 1.9 satisfies most of the users’regular needs. Now, let us look at Flink Table API operators supported by Apache Flink 1.9, apart from those that we have already seen. Flink Table API operators (Python Table API operators and Java Table API operators) support the following types of operations.
First, single-stream operations, such as SELECT, FILTER, aggregation operations, window operations, and column operations
(add_columns and drop_columns).
Second, dual-stream operations, such as JOIN, MINUS, and UNION.
All these operators are well supported by Python Table API. In Apache Flink 1.9, Python Table API is almost identical to Java Table API in terms of functionality. Next, let’s understand how to write the above operators and develop Python operators.
Python Table API Operators — Watermark Definition
You may have noticed that this article doesn’t mention a data stream attribute, time series. Data streams may be out of order, which is an objective status of data streams. Apache Flink processes out-of-order data streams by using the Watermark mechanism.
How to define Watermark in Python API?
Assume that you have a JSON format data file, which contains two fields: a and DateTime, the types of which are String and Time, respectively. To define a watermark, add a rowtime column when creating the Schema, and the rowtime data type must be Timestamp.
Define watermark by various means. Use
watermarks_periodic_bounded to periodically send watermarks. The number 60000 refers to 60000 ms, which is equal to 60s or 1 min. This definition allows the program to process out-of-order data streams within a one-minute period. Therefore, a larger value indicates a higher tolerance for out-of-order data and higher latency. For details about how watermark works, visit my blog at http://1t.click/7dM
Python Table API — Java UDF
Finally, let’s introduce the application of Java user-defined functions (UDFs) in Apache Flink 1.9. Although Apache Flink 1.9 does not support Python UDFs, it allows us to use Java UDFs in Python. Apache Flink 1.9 optimizes and restructures the Table module. To develop Java UDFs, develop Python APIs by importing a simple dependency: Flink-table-common.
Next, focus on how to develop a Python API by using Java UDFs. Assume that we need to develop an UDF to calculate the length of a string. We need to use
t_env.register_java_function to register the Java function in Python, by passing the name and full path of the Java function. After that, we can call the UDF by using its registered name. For details, visit my blog http://1t.click/HQF
How to run a Java UDF? Use the Flink run command to run it. As mentioned before, we use -j to include the JAR package of the UDF.
Do Java UDFs only support Scalar Function? No. Java UDFs support not only Scalar functions but also Table functions and Aggregate functions.
Reference Links of the Python Table API
Listed are links of some commonly used documents and my blog for your reference. Hopefully, they are helpful to you.
This article introduced the history and development roadmap of Apache Flink Python API. Then, it described the reasons why we would change the architecture of the Apache Flink Python API and the latest architecture available. It also provided our upcoming plans and new features for Apache Flink Python API. I encourage you to share your suggestions and thoughts with us.