Edited by Gao Yun and Cheng Hequn. Reviewed by Wang Zhijiang.
Flink 1.11 is about to be officially released! To satisfy your curiosity, we invited core developers from Flink to discuss the features of Flink 1.11. Compared to version 1.10, Flink 1.11 has been improved in many aspects. The team is committed to further improving the availability and performance of Flink.
This article will describe the new features, improvements, and important changes of Flink 1.11 as well as Flink’s future development plans. For more information, see the corresponding FLIP or JIRA pages and stay tuned for our subsequent special live broadcasts.
Cluster Deployment and Resource Management
1. [FLIP-85] Supports the Application Mode
Currently, Flink allows you to use a separate client to create a JobGraph and submit a job. During actual use, the download of JAR packages for jobs occupies a large amount of bandwidth on client machines and requires you to start a separate client process that occupies non-managed resources. To resolve these problems, Flink 1.11 provides an application mode, in which the master node takes over JobGraph generation and job submission.
You can enable the application mode by running the bin/flink run-application. Currently, the application mode supports Flink deployed in YARN mode and Flink deployed in Kubernetes mode. In the YARN application mode, the dependencies required for running tasks are passed from the client to the Flink Master by using local resources on YARN. Then, the tasks are submitted on the Flink Master. The Kubernetes application mode allows you to build an image that contains your JAR package and dependencies. It automatically creates a TaskManager (TM) based on the job and terminates the entire cluster after completion.
2. [Flink-13938] [Flink-17632] Flink YARN: Supports Remote Caching of JAR Packages from Flink lib Directories and the Use of Remote JAR Packages for Creating Jobs
In versions earlier than 1.11, JAR packages in the Flink lib directory had to be uploaded each time a job was submitted on YARN, consuming additional storage space and communication bandwidth. Flink 1.11 allows you to provide multiple remote lib directories. Then, Flink caches files from the directories to YARN nodes, avoiding unnecessary JAR package uploads and downloads. In this way, commit and launch are accelerated. The sample code is:
./bin/flink run -m yarn-cluster -d \
-yD yarn.provided.lib.dirs=hdfs://myhdfs/flink/lib,hdfs://myhdfs/flink/plugins \
In addition, Flink 1.11 also allows you to create jobs by directly using JAR packages on a remote file system. In this way, the overhead caused by JAR package downloads is further reduced. The sample code is:
./bin/flink run-application -p 10 -t yarn-application \
-yD yarn.provided.lib.dirs="hdfs://myhdfs/flink/lib" \
3. [Flink-14460] Enhances Kubernetes Features on Flink
In version 1.11, Flink on Kubernetes supports the application mode proposed on FLIP-85, which has better isolation than the session mode.
Some features were added to Flink to support the features of Kubernetes, such as Node Selector, Label, Annotation, and Toleration. To facilitate integration with Hadoop, Flink also allows the system to automatically mount Hadoop configurations based on environment variables.
4. [FLIP-111] Unifies Docker Images
Previously, the Flink project provided different Dockerfiles to create Flink Docker images. Currently, these Docker images are incorporated into the apache/flink-docker project.
5. [Flink-15911] Supports Separated Local Bind Listener Network Ports and External Access Addresses and Ports
In some scenarios, such as Docker and NAT port mapping scenarios, the local network addresses and ports viewed by the JobManager (JM) and TM processes may be different from the addresses and ports used by other processes to access the processes externally. Previously, Flink did not allow you to set different local and remote addresses for TM and JM. As a result, when Flink was used in NAT networks such as Docker, problems occurred on Flink and the exposure scope of listener ports cannot be restricted.
Flink 1.11 introduces different parameters for local and remote listener addresses and ports. The following commands are used to configure remote listener addresses and ports:
The following commands are used to configure local listener addresses and ports:
1. [Flink-16614] Unifies Memory Resource Configurations on JM
A major change in Flink 1.10 was that the memory model and configuration rules of TM were redefined. Flink 1.11 further adjusts the memory model and configuration rules of JM to align them with those of TM.
For more information about the memory configuration rules, see the relevant user documentation.
2. [FLIP-108] Supports Scheduling of Extended Resources such as GPUs
As machine learning and deep learning develop, an increasing number of Flink jobs are embedded in machine learning or deep learning models, creating demands for GPU resources. In versions earlier than 1.11, Flink could not manage extended resources, such as GPU resources. To resolve this problem, Flink 1.11 provides a unified framework for managing extended resources and embeds support for GPU resources based on this framework.
For the extended-resource management framework and GPU resource management configurations, see the Public Interface section on the corresponding FLIP page by clicking this link. The community is currently working on the corresponding user documentation, which will be available in the future.
3. [FLINK-16605] Supports the Setting of the Maximum Number of Slots for Batch Jobs
To prevent Flink batch jobs from occupying excessive resources, Flink 1.11 introduces a new configuration item:
slotmanager.number-of-slots.max. This parameter allows you to limit the maximum number of slots in the entire Flink cluster. This parameter is recommended only for batch table and SQL jobs that use the Blink Planner.
1. [FLIP-103] Improves the Display of JM and TM Logs on the WebUI
Previously, only .log and .out logs could be read on the WebUI. However, other files, such as GC logs, may also exist in the log directory. The new version of WebUI allows you to access all logs in the log directory, then reload logs, download logs, and display logs in full-screen mode.
2. [FLIP-99] Displays More Historical Failover Exceptions
Previously, the WebUI could only display 20 historical failover exceptions for a single job. If job failover occurs frequently, the initial exception, which is most likely the root cause, is soon overshadowed, increasing the difficulty in troubleshooting. To address this problem, the new WebUI version provides a paginated display, allowing you to view more historical exceptions.
3. [Flink-14816] Supports Thread Dump on Pages
Thread Dump is vital for troubleshooting some job problems. In versions earlier than 1.11, if you need to perform a Thread Dump, you had to log on to the machine where the TM was located. In Flink 1.11, the WebUI integrates the Thread Dump feature by adding a Thread Dump tab, allowing you to obtain Thread Dump of the TM directly through the WebUI.
Source & Sink
1. [FLIP-27] Adds the Source API
FLIP-27 is a major feature in Flink 1.11. The conventional Flink source API had some problems. For example, different sources had to be implemented for streaming jobs and batch jobs, no unified data partition discovery logic was available, the source implementer had to process the locking logic, and no public architecture was provided. As a result, source developers had to manually handle multithreading problems and other problems. These problems increased the difficulty in implementing sources in Flink.
FLIP-27 introduces a brand new source API. This set of API operations provides unified data partition discovery, management, and other features. This allows you to focus on the partition information reading logic and the data reading logic without needing to deal with complex thread synchronization. In this way, source implementation is greatly simplified and more built-in features can be provided for sources in the future.
2. [FLINK-11395] StreamingFileSink: Supports the Avro and ORC Formats
Flink 1.11 adds the support for two common file formats Avro and ORC on common
OrcBulkWriterFactory<Record> factory = new OrcBulkWriterFactory<>(
new RecordVectorizer(schema), writerProps, new Configuration());
.forBulkFormat(new Path(outDir.toURI()), factory)
1. [FLINK-5763] Modifies the Structure of Savepoint Files to Make Savepoints Self-Contained and Portable
Flink 1.11 replaces the absolute paths of savepoint files with relative paths, allowing you to move a savepoint without manually modifying the path in the meta. Note: This feature is unavailable after Entropy Injection is enabled in the S3 file system.
2. [FLINK-8871] Adds a Callback for Checkpoint Failures and Notifies the TM of Checkpoint Failures
Before Flink 1.11 was released, the TM could be notified of checkpoint success. In Flink 1.11, a mechanism for notifying the TM of checkpoint failures is added. This mechanism not only allows you to cancel an ongoing checkpoint, but also allows you to receive corresponding notifications by using the
notifyCheckpointAborted API added to
3. [FLINK-12692] Supports Disk Spilling in HeapKeyedStateBackend
This feature is not incorporated into the code of Flink 1.11, but you can try it out by downloading it. Heap
StateBackend achieves better performance by maintaining states as Java objects. However, the memory occupied by Heap
StateBackend was previously uncontrollable, resulting in severe GC problems.
To resolve this problem,
SpillableKeyedStateBackend was released to support data spilling to disk, allowing
StateBackends to limit their memory size. For more information about
SpillableKeyedStateBackend, click this link.
4. [Flink-15507] Enables Local Recovery for RocksDBStateBackend by Default
After Local Recovery is enabled by default, failover can be accelerated.
5. Changes the Default Value of the state.backend.fs.memory-threshold Parameter to 20 KB
This feature is still under development but will probably be included in Flink 1.11. The
state.backend.fs.memory-threshold parameter determines when the
FsStatebackend needs to write state data out of the memory. Previously, the default value of 1 KB often led to a large number of small files and affected the state access performance. Therefore, in Flink 1.11, the default value is changed to 20 KB. Note: This change may increase the memory usage of the JM, especially when the concurrency of operators is high or the union state is used.
Table & SQL
1. [FLIP-65] Optimizes the Class Inference Mechanism in UDFs of the Table API
Compared with the previous class inference mechanism, the new class inference mechanism can provide more class information about input parameters, allowing you to implement more flexible processing logic. Currently, this feature supports user-defined functions (UDFs) and user-defined table functions (UDTFs) but does not support user-defined aggregate functions (UDAFs.)
2. [FLIP-84] Optimizes the TableEnvironment API
Flink 1.11 provides the following enhancements to the
sqlUpdateexecuted DDL statements immediately upon reception but executed DML statements only when
env.executewas executed. In Flink 1.11, both DDL statements and DML statements are executed when
- Queries that return results are supported, such as
- Caching multiple SQL statements for execution is supported.
COLLECTmethod is added to allow you to obtain query execution results.
3. [FLIP-93] Supports JDBC Catalog and PostgreSQL Catalog
In versions earlier than 1.11, you had to manually copy database table schemas to Flink when you used Flink to read data from or write data to relational databases or read change logs. This process was tedious and prone to errors, greatly increasing the cost of using Flink. Flink 1.11 provides catalog management that is based on JDBC and PostgreSQL so Flink can automatically read table schemas, reducing the need for manual operations.
4. [FLIP-105] Supports ChangeLog Sources
Users have long asked for a Change Data Capture (CDC) mechanism to import dynamic data from external systems, such as MySQL BinLog and Kafka Compacted Topic, into Flink and write the Update and Retract streams in Flink to external systems. Flink 1.11 adds support for reading and writing CDC data. Currently, Flink supports CDC Debezium and Canal formats.
5. [FLIP-95] Optimizes the TableSource and TableSink APIs
Flink simplifies the structure of the current
TableSink APIs, providing the basis for supporting the CDC feature. This avoids the dependency on the DataStream API and resolves the problem where only the Blink Planner supported efficient source and sink implementations. For more information about the changes in the APIs, visit this website.
6. [FLIP-122] Modifies the Configuration Items of Connectors
FLIP-122 reorganizes the “with” configuration items of Table Connector and SQL Connector. Due to historical reasons, some “with” configuration items are redundant or inconsistent. For example, all configuration items are prefixed with “connector,” but the naming rules of the configuration items are different. After the configuration items were modified, these redundancy and inconsistency issues were all resolved. Note: The existing configuration items are still available. For the new configuration items, visit this website.
7. [FLIP-113] Supports Dynamic Table Properties for Flink SQL
Dynamic table properties allow you to dynamically modify the configuration items of a table when you use the table, freeing you from the need to re-declare the table’s DDL statements after changing configuration items. As shown in the following code, dynamic properties allow you to run
/+ OPTIONS('k1'='v1')/ to override property values in DDL statements.
EMP /*+ OPTIONS('k1'='v1', 'k2'='v2') */
DEPT /*+ OPTIONS('a.b.c'='v3', 'd.e.f'='v4') */
EMP.deptno = DEPT.deptno
8. [FLIP-115] Supports Hive in Flink SQL
- FileSystem Connectors provide support for the CSV, ORC, Parquet, JSON, and Avro formats. In addition, Flink SQL provides full support for the Batch FileSystem Connector and Streaming FileSystem Connector.
- Flink SQL supports the Hive streaming sink.
9. [FLIP-123] Supports Hive-Compatible DDL and DML Statements
FLIP-123 provides support for the Hive dialect, allowing you to perform operations by executing Hive DDL and DML statements.
1. [FLINK-15670] Kafka Shuffle: Uses Kafka as the Job Message Bus to Provide a Mechanism for Simultaneously Exchanging and Storing Data between Operators
Flink Kafka Shuffle provides the DataStream API to allow you to use Kafka as a message bus between link operators and provides a mechanism for simultaneously exchanging and storing data. This mechanism has the following advantages:
- Shuffled data can be reused.
- When a job performs failure recovery, only persisted data is involved. This avoids the restart of the entire graph and is in line with the exactly-once semantics.
This mechanism can supplement failure recovery for large-scale streaming jobs before the ongoing failover refactoring job of Flink is completed.
2. [FLIP-126] Optimizes the WatermarkAssigner API of Sources
Note: The work has been completed, but whether this feature will be released in Flink 1.11 is still under discussion. The new
WatermarkAssigner API integrates the two previous types of Watermark APIs:
AssignerWithPeriodicWatermarks. This makes it easier to support watermark insertion during source implementation in the future.
3. [FLIP-92] Supports Multi-Input Operators
Flink 1.11 supports multi-input operators, but no complete DataStream API has been provided for this feature. To use this feature, you need to manually create
MultipleInputTransformation<Long> transform = new MultipleInputTransformation<>(
PyFlink & ML
1. [FLINK-15636] Supports Execution of Python UDFs in Flink Planner’s Batch Mode
Previously, Python UDFs could be executed in both stream mode and the batch mode of the Blink Planner, but only the stream mode of the Flink Planner. After this feature is released, Python UDFs can be executed in both the stream mode and the batch mode of the two planners.
2. [FLINK-14500] Supports Python UDTFs
A UDTF allows you to write one entry and output multiple entries. Python UDTFs can be executed in both stream mode and the batch mode of the two planners.
3. [FLIP-121] Optimizes the Execution Efficiency of Python UDFs by Using Cython
Flink uses Cython to optimize the computational logic of the serialization and deserialization operations of the Coder module and the computational logic of the Operation module. The end-to-end performance is dozens of times higher than Flink 1.10.
4. [FLIP-97] Supports Pandas UDFs
Pandas UDFs use
pandas.Series as the input and output class and support batch data processing. Generally, Pandas UDFs have better performance than common UDFs because they reduce the serialization and deserialization overhead of data exchange between Java and Python processes. In addition, Pandas UDFs can process data in batches, reducing the number and overhead of calls to Python UDFs. Furthermore, Python UDFs allow you to conveniently use Pandas-related Python libraries.
5. [FLIP-120] Supports Conversion between PyFlink Table and Pandas DataFrame
In Flink 1.11, you can use the
to_pandas() method on a table object to obtain a corresponding Pandas
DataFrame object or use the
from_pandas() method on a table object to convert a Pandas
DataFrame object to a table object.
import pandas as pd
import numpy as np# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()
6. [FLIP-112] Supports User-Defined Metrics in Python UDFs
Currently, Flink supports four types of custom metrics, Counter, Gauges, Meters, and Distributions. In addition, Flink allows you to define User Scope and User Variables of the corresponding metrics.
7. [FLIP-106] Supports Python UDFs in SQL DDL and SQL Clients
Previously, Python UDFs could only be used in the Python table API. With Flink’s support for the registration of Python UDFs in SQL DDL, SQL users can conveniently use Python UDFs. Flink also supports the use of Python UDFs in SQL Clients, allowing you to register Python UDFs and manage their dependencies in SQL Clients.
8. [FLIP-96] Supports the Python Pipeline API
Flink 1.9 introduced the ML Pipeline API to enhance the ease of use and scalability of Flink ML. Due to Python’s wide use in ML areas, FLIP-96 provides a set of corresponding Python Pipeline APIs to help Python users.
1. [FLIP-76] Supports Unaligned Checkpoints
In Flink’s current checkpoint mechanism, each operator can create snapshots only after it receives aligned barriers from all upstream operators. Then, the operator continues to send the barriers downstream. When backpressure occurs, it may take a long time to pass barriers from upstream operators to downstream operators, resulting in checkpoint timeout.
To address this problem, Flink 1.11 adds the unaligned checkpoint mechanism. After unaligned checkpoints are enabled, soon after a barrier is received, Flink can perform a checkpoint and save the data being transmitted between upstream and downstream operators to a snapshot in the form of a state. This greatly shortens the checkpoint completion time without depending on the processing capability of the operators and resolves the problem where checkpoints have timed out due to backpressure.
To enable the unaligned checkpoint mechanism, you can call
2. [FLINK-13417] Supports Zookeeper 3.5
Flink 1.11 can be integrated with ZooKeeper 3.5, allowing you to use some new Zookeeper features, such as SSL.
3. [FLINK-16408] Supports Slot-Level ClassLoader Reuse
Flink 1.11 has modified the loading logic of the
ClassLoader on the TM. Previously, a
ClassLoader was created after each failover. In Flink 1.11, a ClassLoader will be cached as long as any slot occupied by the target job is available. This modification has a certain impact on the semantics of job failover because the Static field will not be reloaded after a failover. However, the JVM meta can be protected from out of memory (OOM) errors caused by the creation of a large number of
4. [FLINK-15672] Upgrades the Log System Log4j to 2.x
Flink 1.11 upgrades the log system Log4j to 2.x. This resolves some problems with Log4j 1.x and adds support for some new features of Log4j 2.x.
5. [FLINK-10742] Reduces the Number of Data Replications and Memory Usage at the TM Receiver
Flink 1.11 reuses the memory of Flink’s buffer to store data received on the downstream network. In this way, memory replications from the Netty layer to Flink’s buffer are reduced and the resulting additional overhead of the direct memory is also reduced. This lowers the probability of direct memory OOM for online jobs and the probability that a container will be terminated due to memory overuse.
These are the new features of Flink 1.11. The community will continue to share more relevant technical knowledge in the future.