By Wang Zhijiang (Taojiang)
Apache Flink 1.11.0 was officially released on July 7, 2020. Over the past four months, Flink has been enhanced in terms of its ecosystem, ease-of-use, production availability, and stability. Wang Zhijiang is a member of the Apache Flink Project Management Committee (PMC), a Senior Technical Expert at Alibaba, and one of the release managers of Flink 1.11.0. In this article, he will share his deep insights into the long-awaited features of Flink 1.11.0 and explain some of the representative features from different perspectives.
The Development of Flink 1.11.0
First, let’s review the general release process of the Flink community, so you can better understand and engage in how the community works.
When the Flink community plans a version, it selects one or two volunteers as release managers. For Flink 1.11.0, the two release managers were Wang Zhijang (me) from China and Piotr Nowojski from Ververica, headquartered in Berlin. This shows that Chinese developers and their contributions constitute an important part of the Flink community.
The feature kickoff starts after release managers are chosen. In the Flink community, the version planning cycle can be a long process that aims to address some major issues. To ensure the release quality, this process may consist of multiple phases and cover more than one version. Each version is designed to meet different needs. For example, the two Flink versions released before 1.11.0 were designed to enhance batch processing capabilities, whereas Flink 1.11.0 was designed to make stream processing capabilities easier to use. The Flink community uses a mailing list to discuss features to be added. This aims to collect comments and feedback from more users and developers.
The development cycle is typically two to three months. An approximate feature freeze time is determined in advance. Then, the Flink community publishes and tests release candidates and fixes bugs. After several rounds of iteration, the Flink community votes on a relatively stable candidate version. This serves as the basis for the official release.
The community started feature planning for Flink 1.11.0 in early March 2020 and officially released this version in early July. This whole process took nearly four months. Compared with earlier versions, Flink 1.11.0 was improved in terms of its ecosystem, ease-of-use, production availability, and stability. These enhancements will be explained in the following sections.
Overview of Flink 1.11.0
Flink 1.11.0 was officially released after four candidate versions were published following the “freeze time” feature. A total of 236 contributors participated in the development of the 1.11.0 version, and they submitted 2,325 commits. This version fixed 1,474 JIRA issues and involved more than 30 Flink Improvement Proposals (FLIPs.)
Five Flink versions have been released so far. Since version 1.9.0, Flink has seen rapid development, with its performance metrics nearly doubling compared with earlier versions. The proprietary Blink project of Alibaba was fully integrated with Apache Flink starting from 1.9.0 until 1.10.0. This significantly improved the ecosystem construction, functionality, performance, and production availability of Flink.
Flink 1.11.0 was initially designed to make Flink easier to use and improve user experience in business production. It does not involve major architecture adjustments or feature development. This version was a minor update that was developed through fast iteration. However, according to the preceding statistics, the performance metrics of Flink 1.11.0 are comparable to those of 1.9.0 and 1.10.0. The number of issues fixed and participating contributors have been increasing since the first Flink version was released. The proportion of Chinese contributors reached 62% for Flink 1.11.0.
The following sections explain the long-awaited features of Flink 1.11.0 from different perspectives, especially the features related to out-of-the-box APIs and the execution engine. For a complete list of features, see the Flink release blog.
Ecosystem and Ease-of-Use Enhancements
Ease-of-use is closely tied to the ecosystem. Ecosystem incompatibilities generally harm ease-of-use, while ease-of-use enhancements can contribute to the development of more sophisticated ecosystems. This close relationship between ecosystems and ease-of-use is very important when you use tables and SQL APIs.
(1) CDC Supported by Tables and SQL
Change data capture (CDC) is widely used in scenarios, such as data replication, cache update, inter-microservice data synchronization, and audit logs. Open-source CDC tools, such as MySQL CDC, are used by many companies. In many scenarios, Flink must apply CDC tools and parse them in tables and SQL APIs. This requirement has already been discussed many times. CDC tools support real-time processing of changelog streams and enrich Flink scenarios. For example, CDC can be used with Flink to synchronize data from MySQL to PostgreSQL or Elasticsearch or to perform a temporal join on a changelog with low latency.
Flink provides the append and update modes used to materialize changelog streams into dynamic tables. Flink versions earlier than 1.11.0 can materialize changelog streams into dynamic tables in append mode. The update mode is supported in Flink 1.11.0.
It is necessary to encode and decode update operations between external systems and Flink so changelogs can be parsed and output. Considering that sources and sinks serve as a type of bridge to external systems, FLIP-95 defines new interfaces for table sources and table sinks. These interfaces can be used to encode and decode update operations.
According to a public CDC research report, Debezium and Canal are the most popular CDC tools. They are used to synchronize changelogs to other systems, such as message queues. FLIP-105 supports the Debezium and Canal formats. Kafka sources can parse the two formats and output update events. Avro (Debezium) and Protobuf (Canal) will be supported in later versions.
CREATE TABLE my_table (
...) WITH (
'connector'='...', -- e.g. 'kafka'
'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)
'debezium-json.ignore-parse-errors'='true' -- default: false
(2) JDBC Catalog Supported by Tables and SQL
In Flink versions earlier than 1.11.0, if you read and write relational databases or read changelogs by using sources and sinks, you must manually create the corresponding schema. When the database schema is changed, you must manually update the corresponding Flink job to maintain schema consistency. Otherwise, a runtime error occurs and causes the job to fail. These two manual operations are cumbersome and result in poor user experience.
The two operations are required when you connect any external systems to Flink. Flink 1.11.0 was primarily designed to streamline the connection to relational databases. FLIP-93 provides the Java Database Connectivity (JDBC) catalog API and PostgreSQL catalog implementation. This makes it easier to connect Flink to other types of relational databases.
Flink 1.11.0 allows you to automatically retrieve table schema when you use Flink SQL without having to enter data definition language (DDL) statements. Flink 1.11.0 checks for inconsistent schemas during the compilation process. This avoids job failures due to the runtime errors arising from schema inconsistency. This is a typical enhancement made to improve ease-of-use and user experience.
(3) Hive Real-Time Data Warehouses
Since version 1.9.0 ecosystem enhancements have been made to Flink to integrate Flink with Hive and ultimately build Hive data warehouses capable of batch and stream processing. After the upgrades to 1.9.0 and 1.10.0, Flink became compatible with batch processing and could apply it to production environments. In the TPC-DS 10 TB benchmark, Flink 1.11.0 performs more than seven times better than Hive 3.0.
In the Hive ecosystem, Flink 1.11.0 implements a Hive real-time data warehouse solution that supports batch and stream processing. This solution delivers an improved user experience for end-to-end extract, transform, and load (ETL) in streaming mode. This is another enhancement Flink 1.11.0 made to improve compatibility, performance, and ease-of-use.
The real-time data warehouse solution supports real-time read and write operations on Hive based on the stream processing capabilities of Flink.
- Write Operations on Hive: FLIP-115 fully extends the basic capabilities and implementation of the Filesystem connector. Table sinks and SQL sinks support all types of formats, such as CSV, JSON, Avro, Parquet, and ORC. They also support all Hive table formats.
- Partition Support: The visibility of data imported to Hive is controlled through partition commit.
sink.partition-commit.triggeris used to control the partition commit time and
sink.partition-commit.policy.kindis used to select a commit policy. You can commit success files and metastores.
- Read Operations on Hive: Data can be read from Hive in real-time in streaming mode. The system incrementally reads new partitions by monitoring the partition creation process. Alternatively, it can incrementally read new files by monitoring the file creation process in folders.
The following enhancements are made to improve Hive availability:
- FLIP-123 provides a Hive dialect designed for syntax compatibility. It allows you to directly transfer Hive scripts to Flink for execution without having to switch between the command-line interfaces (CLIs) of Flink and Hive.
- Hive dependencies are internally supported. You do not have to download these dependencies by yourself. To run these dependencies, you only need to download a package and configure
In terms of Hive performance, Flink 1.10.0 supports vectorized read of ORC in Hive 2 and later. Flink 1.11.0 supports vectorized read of Parquet and ORC in all Hive versions. This further improves performance.
(4) A New Source Interface
As mentioned above, Flink is connected to external systems through sources and sinks. These two components are essential for improving ecosystems, availability, and the end-to-end user experience. The Flink community started to plan source refactoring one year ago. This can be traced back to FLIP-27. The refactoring design must be fully considered because it involves many complex internal mechanisms and the implementation of various source connectors. Flink 1.10.0 implemented source refactoring in a proof of concept (POC) program. Flink 1.11.0 completes source refactoring.
First, let’s review the source-related problems.
- In Flink, it is difficult to refactor an existing source or re-implement a production-ready source connector. Besides the lack of reusable public code, you need to delve into the myriad of internal details of Flink and implement event time allocation, watermark output, idleness monitoring, and a threading model.
- Different sources are required by batch and stream processing.
- Partitions, splits, and shards are not explicitly expressed by interfaces. For example, the split discovery logic and data consumption are coupled with source function implementation. This makes it more complicated to implement sources of Kafka and Kinesis.
- At the runtime execution layer, if checkpoint locks are preempted by source functions, a series of problems occur and it is difficult to optimize the framework.
These pain points were solved in FLIP-27.
Split enumerators are introduced to job managers, and source readers are introduced to task managers. This decouples split discovery from consumption request processing and facilitates the combination of different policies. For example, existing Kafka connectors include various partition discovery policies that are coupled with implementation. The new architecture provides a source reader that is adapted to the implementation of multiple split enumerators. This allows you to use different partition discovery policies in a single implementation mode.
The new architecture also provides a source connector that unifies batch and stream processing. For limited input in batch processing, a split enumerator generates a fixed number of split sets, and each split is a finite dataset. For unlimited input in stream processing, a split enumerator generates an infinite number of splits, or a split is an infinite dataset.
The complex timestamp assigner and watermark generator are built and run transparently in the source reader module. If you want to implement a new source connector, you do not need to re-implement the timestamp assigner and watermark generator.
The existing source connectors of Flink will be re-implemented based on the new architecture. Compatibility with legacy sources will still be maintained in several Flink versions. You can develop new sources by following the instructions in the release notes.
(5) PyFlink Ecosystem
Python is widely used in machine learning and data analytics. Flink 1.9.0 and later are compatible with the Python ecosystem. Python and Flink are integrated into PyFlink, which empowers Python with the real-time distributed processing capabilities of Flink. PyFlink integrated with Flink 1.9.0 or 1.10.0 supports the Python Table API and user-defined functions (UDFs.) PyFlink integrated with Flink 1.11.0 supports the Python ecosystem library Pandas and can be integrated with SQL DDL and clients. In addition, the performance of Python UDFs is significantly improved.
In PyFlink integrated with Flink 1.9.0 and 1.10.0, a Python UDF processes only one data item during each call and requires serialization and deserialization in Java and Python. This causes excessive overhead. Flink 1.11.0 allows you to define and use custom, vectorized Python UDFs in tables and SQL jobs. You only need to set the
udf_type parameter to pandas in the UDF modifier. This provides the following advantages:
- A UDF can process N data items during each call.
- The data format is based on Apache Arrow, which significantly reduces the serialization and deserialization overhead between Java and Python processes.
- When you use Python, you can develop high-performance Python UDFs based on common Python libraries for data analytics, such as Numpy and Pandas.
PyFlink integrated with Flink 1.11.0 also supports the following features:
- You can seamlessly convert a Pandas dataframe to a PyFlink table and vice versa. This feature is provided in FLIP-120 and it makes the Pandas ecosystem easier to use and more compatible.
- You can define and use Python user-defined table functions (UDTFs) (FLINK-14500) in tables and SQL. Java and Scala UDTFs are no longer required.
- The performance of Python UDFs is optimized in Cython (FLIP-121), achieving 30 times the performance of Python UDFs in PyFlink integrated with Flink 1.10.0.
- You can define custom metrics for Python UDFs (FLIP-112) to monitor and debug UDF execution more easily.
The preceding sections explain API-related enhancements that allow you to develop jobs more easily in Flink. The following sections explain the changes made to the execution engine in Flink 1.11.0.
Enhancements Related to Production Availability and Stability
(1) Support for Application Mode and Kubernetes Enhancements
Flink versions earlier than 1.11.0 support the following two runtime modes:
- Session Mode: A cluster is started in advance so that all jobs share the cluster’s resources at runtime. This avoids the extra overhead that arises from starting an independent cluster for each job. However, the session mode hampers isolation. If a job causes the container of a task manager to fail, all jobs in this container are restarted. Each job is managed by an independent job manager, and all job managers run in the same process. This can easily lead to a load bottleneck.
- Per-Job Mode: An independent cluster is started for each job according to resource demand. This solves the problem of poor isolation in session mode. In addition, the job manager for each job runs in an independent process. This significantly reduces the load.
In both modes, you need to run code on your client to compile and generate a JobGraph and then commit it to the cluster for execution. In this process, you need to download the required JAR package and upload it to the cluster. This can easily lead to load bottlenecks for the client and network, especially when you use a shared client.
To solve this problem, Flink 1.11.0 introduces the application mode (FLIP-85.) In this mode, a cluster is started for each application, and all jobs that belong to the application run in the cluster. The processes of JobGraph generation and job commit are transferred from the client to the different job managers. The load generated by the download and upload operations is distributed to various clusters. This avoids load bottlenecks on a single client.
You can use the application mode through
bin/flink run-application. This mode is supported by YARN and Kubernetes. YARN applications transfer the job runtime dependencies from clients to job managers by using YARN local resources. Kubernetes applications allow you to create an image that contains your JAR package and dependencies. Task managers are automatically created based on jobs. The whole cluster is released after job execution is completed. Therefore, application mode provides better isolation than the session mode. Kubernetes no longer supports per-job mode. In a sense, application mode replaces per-job mode when committing jobs in a cluster.
Flink 1.11.0 provides native support for Kubernetes and improves many basic features (FLINK-14460) that are now ready for use in production environments. These features include node selectors, labels, annotations, and toleration. For easier integration with Hadoop, Flink 1.11.0 supports automatically mounting Hadoop configurations based on environment variables.
(2) Optimization of Checkpoints and Savepoints
Checkpoints and savepoints are among the most sophisticated and competitive features of Flink. The community is extremely careful when making any modifications to these two features. The most recent major versions contained few functional and architectural adjustments related to the two features. However, the mailing list includes user feedback and complaints about checkpoints and savepoints. For example, some checkpoints remain unfinished for a long time and ultimately fail, and savepoints sometimes become unavailable after jobs are restarted. Flink 1.11.0 solves these problems to improve production availability and stability.
In Flink versions earlier than 1.11.0, the metadata and state data of savepoints are stored in two different directories. If you transfer state data from the storage directory, it is difficult for you to identify the mapping relationship between metadata and state data. This may cause accidental deletion of the directory and complicate the directory cleanup process. Flink 1.11.0 stores metadata and state data in the same directory for ease of data transfer and reuse. In Flink versions earlier than 1.11.0, metadata references state data by using absolute paths. This makes the referenced state data unavailable after it is transferred from the original directory and its absolute path is changed. This problem is solved in Flink 1.11.0 by having metadata reference state data by using relative paths (FLINK-5763). This makes it easier to manage, maintain, and reuse savepoints.
In actual production environments, checkpoints often remain unfinished for a long time and ultimately fail due to a timeout error. Job failover results in the replay of a lot of historical data. Job execution does not progress for a long time, and end-to-end latency increases. Flink 1.11.0 optimizes checkpoints and accelerates checkpoint execution in a variety of ways. The result is lightweight checkpoints that can be executed in minutes or even seconds.
A checkpoint coordinator is added to notify tasks of any canceled checkpoints (FLINK-8871.) This avoids the burden on the system caused when tasks continue to execute checkpoints that have been canceled. After tasks stop executing canceled checkpoints, they can quickly proceed to execute new checkpoints that are triggered by the coordinator. This helps avoid checkpoint failure due to execution timeout. The checkpoint coordinator also facilitates the local recovery process, which is enabled by default. It allows tasks to promptly clean up expired checkpoint resources.
In back pressure scenarios, a large number of buffers accumulate on a data link. As a result, checkpoint barriers are placed after buffers and cannot be promptly processed and aligned by tasks. This significantly slows down the checkpoint execution progress. Flink 1.11.0 solves this problem through the following two approaches:
1. Reduce the total number of buffers on a data link (FLINK-16428) so checkpoint barriers can be processed and aligned as soon as possible.
- The upstream output end controls the threshold (the backlog) of how many buffers can be accumulated per subpartition. This prevents too many buffers from accumulating on a single link when the load is unbalanced.
- The default upstream and downstream buffer configurations are modified appropriately, without affecting network throughput.
- The basic protocols for upstream and downstream data transmission are adjusted so that zero exclusive buffers can be configured for a single data link without being locked. This decouples the total number of buffers from the number of concurrent jobs. A custom buffer ratio can be defined as needed to balance between throughput and checkpoint execution progress.
Some of the preceding optimizations are completed in Flink 1.11.0. The remaining optimizations will be completed in the next version.
2. Support unaligned checkpoints (FLIP-76), which solve the problem of checkpoint barrier alignment in back pressure scenarios. This approach was designed as early as in Flink 1.10.0. However, it involves major modifications to many modules, and its implementation and threading models are complex. We tested and compared the performance of two solutions by using the POC and prototype methods before we determined the final solution. We built the minimum viable product (MVP) version in Flink 1.11.0. This is the only major feature that Flink 1.11.0 provides for the execution engine. The basic ideas are listed below
- Checkpoint barrier buffers are transmitted independently of data buffers and they do not wait in the input and output queues for processing. This decouples checkpoint barriers from the computing capabilities of operators. Only network latency affects the inter-node transmission of checkpoint barriers, and the latency is negligible.
- The checkpoints between the input links of each operator can be executed before barrier alignment is completed. Checkpoint execution is triggered when the first checkpoint barrier arrives. This further accelerates checkpoint execution, and the overall execution progress is not affected by latency on a single link.
- To maintain consistency with the aligned checkpoint syntax, we persistently store the snapshots of all unprocessed input and output data buffers as channel states when checkpoints are executed. The channel states are restored together with operator states during failover. The aligned checkpoint approach ensures that all data prior to a checkpoint barrier is processed and that operator states reflect the real-time states. The unaligned checkpoint approach is used to replay the operator states that reflect unprocessed data prior to a checkpoint barrier by using channel states when failover restarts. These two approaches are consistent with each other in terms of state restoration. The process of in-flight buffer persistence is completed in the asynchronous checkpointing phase. Lightweight buffer reference is completed in the synchronous checkpointing phase. This does not take up too much of the operators’ computing time and does not affect throughout.
The unaligned checkpoint approach can significantly accelerate checkpoint execution when serious back pressure occurs. This is because this approach is closely related to the system storage performance and does not rely on the overall computing throughout. This is equivalent to decoupling computing from storage. However, the unaligned checkpoint approach has certain limitations. For example, it increases the overall state size and causes extra storage I/O overhead. Therefore, this approach is not suitable for scenarios with I/O bottlenecks.
In Flink 1.11.0, the unaligned checkpoint mode is not the default mode, so it must be manually enabled. In addition, it takes effect only in exactly-once mode. The unaligned savepoint mode is currently not supported because savepoints involve job rescaling and channel states do not currently support state splitting. This will be supported in later versions. Therefore, the aligned savepoint mode is still in use and may take a long time in back pressure scenarios.
An increasing number of Chinese contributors have participated in the development of the core features of Flink 1.11.0. This signals the vigorous growth of the Flink ecosystem in China. For example, contributors from Tencent participated in the development of the Kubernetes and checkpoint features. Contributors from ByteDance participated in the development of tables, SQL, and engine networks. I hope that more companies will participate in the Flink open-source community and share their experience in different fields. This will keep Flink at the forefront of technology and benefit more users.
A major version of Flink is being planned after the release of the “minor version” 1.11.0. I believe that the next major version will offer many important features.