Sneak Peek: Apache Flink 1.11 Is Coming Soon!

Image for post
Image for post

Edited by Gao Yun and Cheng Hequn. Reviewed by Wang Zhijiang.

Flink 1.11 is about to be officially released! To satisfy your curiosity, we invited core developers from Flink to discuss the features of Flink 1.11. Compared to version 1.10, Flink 1.11 has been improved in many aspects. The team is committed to further improving the availability and performance of Flink.

This article will describe the new features, improvements, and important changes of Flink 1.11 as well as Flink’s future development plans. For more information, see the corresponding FLIP or JIRA pages and stay tuned for our subsequent special live broadcasts.

Cluster Deployment and Resource Management

Currently, Flink allows you to use a separate client to create a JobGraph and submit a job. During actual use, the download of JAR packages for jobs occupies a large amount of bandwidth on client machines and requires you to start a separate client process that occupies non-managed resources. To resolve these problems, Flink 1.11 provides an application mode, in which the master node takes over JobGraph generation and job submission.

You can enable the application mode by running the bin/flink run-application. Currently, the application mode supports Flink deployed in YARN mode and Flink deployed in Kubernetes mode. In the YARN application mode, the dependencies required for running tasks are passed from the client to the Flink Master by using local resources on YARN. Then, the tasks are submitted on the Flink Master. The Kubernetes application mode allows you to build an image that contains your JAR package and dependencies. It automatically creates a TaskManager (TM) based on the job and terminates the entire cluster after completion.

In versions earlier than 1.11, JAR packages in the Flink lib directory had to be uploaded each time a job was submitted on YARN, consuming additional storage space and communication bandwidth. Flink 1.11 allows you to provide multiple remote lib directories. Then, Flink caches files from the directories to YARN nodes, avoiding unnecessary JAR package uploads and downloads. In this way, commit and launch are accelerated. The sample code is:

./bin/flink run -m yarn-cluster -d \
-yD yarn.provided.lib.dirs=hdfs://myhdfs/flink/lib,hdfs://myhdfs/flink/plugins \

In addition, Flink 1.11 also allows you to create jobs by directly using JAR packages on a remote file system. In this way, the overhead caused by JAR package downloads is further reduced. The sample code is:

./bin/flink run-application -p 10 -t yarn-application \
-yD yarn.provided.lib.dirs="hdfs://myhdfs/flink/lib" \

In version 1.11, Flink on Kubernetes supports the application mode proposed on FLIP-85, which has better isolation than the session mode.

Some features were added to Flink to support the features of Kubernetes, such as Node Selector, Label, Annotation, and Toleration. To facilitate integration with Hadoop, Flink also allows the system to automatically mount Hadoop configurations based on environment variables.

Previously, the Flink project provided different Dockerfiles to create Flink Docker images. Currently, these Docker images are incorporated into the apache/flink-docker[1] project.

In some scenarios, such as Docker and NAT port mapping scenarios, the local network addresses and ports viewed by the JobManager (JM) and TM processes may be different from the addresses and ports used by other processes to access the processes externally. Previously, Flink did not allow you to set different local and remote addresses for TM and JM. As a result, when Flink was used in NAT networks such as Docker, problems occurred on Flink and the exposure scope of listener ports cannot be restricted.

Flink 1.11 introduces different parameters for local and remote listener addresses and ports. The following commands are used to configure remote listener addresses and ports:

  • jobmanager.rpc.address
  • jobmanager.rpc.port
  • taskmanager.rpc.port

The following commands are used to configure local listener addresses and ports:

  • jobmanager.bind-host
  • jobmanager.rpc.bind-port
  • taskmanager.bind-host
  • taskmanager.rpc.bind-port

A major change in Flink 1.10 was that the memory model and configuration rules[2] of TM were redefined. Flink 1.11 further adjusts the memory model and configuration rules of JM to align them with those of TM.

For more information about the memory configuration rules, see the relevant user documentation[3].

As machine learning and deep learning develop, an increasing number of Flink jobs are embedded in machine learning or deep learning models, creating demands for GPU resources. In versions earlier than 1.11, Flink could not manage extended resources, such as GPU resources. To resolve this problem, Flink 1.11 provides a unified framework for managing extended resources and embeds support for GPU resources based on this framework.

For the extended-resource management framework and GPU resource management configurations, see the Public Interface section on the corresponding FLIP page by clicking this link. The community is currently working on the corresponding user documentation, which will be available in the future.

To prevent Flink batch jobs from occupying excessive resources, Flink 1.11 introduces a new configuration item: slotmanager.number-of-slots.max. This parameter allows you to limit the maximum number of slots in the entire Flink cluster. This parameter is recommended only for batch table and SQL jobs that use the Blink Planner.

Previously, only .log and .out logs could be read on the WebUI. However, other files, such as GC logs, may also exist in the log directory. The new version of WebUI allows you to access all logs in the log directory, then reload logs, download logs, and display logs in full-screen mode.

Previously, the WebUI could only display 20 historical failover exceptions for a single job. If job failover occurs frequently, the initial exception, which is most likely the root cause, is soon overshadowed, increasing the difficulty in troubleshooting. To address this problem, the new WebUI version provides a paginated display, allowing you to view more historical exceptions.

Image for post
Image for post

Thread Dump is vital for troubleshooting some job problems. In versions earlier than 1.11, if you need to perform a Thread Dump, you had to log on to the machine where the TM was located. In Flink 1.11, the WebUI integrates the Thread Dump feature by adding a Thread Dump tab, allowing you to obtain Thread Dump of the TM directly through the WebUI.

Image for post
Image for post

Source & Sink

FLIP-27 is a major feature in Flink 1.11. The conventional Flink source API had some problems. For example, different sources had to be implemented for streaming jobs and batch jobs, no unified data partition discovery logic was available, the source implementer had to process the locking logic, and no public architecture was provided. As a result, source developers had to manually handle multithreading problems and other problems. These problems increased the difficulty in implementing sources in Flink.

FLIP-27 introduces a brand new source API. This set of API operations provides unified data partition discovery, management, and other features. This allows you to focus on the partition information reading logic and the data reading logic without needing to deal with complex thread synchronization. In this way, source implementation is greatly simplified and more built-in features can be provided for sources in the future.

Flink 1.11 adds the support for two common file formats Avro and ORC on common StreamingFileSinks.




OrcBulkWriterFactory<Record> factory = new OrcBulkWriterFactory<>(
new RecordVectorizer(schema), writerProps, new Configuration());
.forBulkFormat(new Path(outDir.toURI()), factory)

State Management

Flink 1.11 replaces the absolute paths of savepoint files with relative paths, allowing you to move a savepoint without manually modifying the path in the meta. Note: This feature is unavailable after Entropy Injection is enabled in the S3 file system.

Before Flink 1.11 was released, the TM could be notified of checkpoint success. In Flink 1.11, a mechanism for notifying the TM of checkpoint failures is added. This mechanism not only allows you to cancel an ongoing checkpoint, but also allows you to receive corresponding notifications by using the notifyCheckpointAborted API added to CheckpointListener.

This feature is not incorporated into the code of Flink 1.11, but you can try it out by downloading it. Heap StateBackend achieves better performance by maintaining states as Java objects. However, the memory occupied by Heap StateBackend was previously uncontrollable, resulting in severe GC problems.

To resolve this problem, SpillableKeyedStateBackend was released to support data spilling to disk, allowing StateBackends to limit their memory size. For more information about SpillableKeyedStateBackend, click this link.

After Local Recovery is enabled by default, failover can be accelerated.

This feature is still under development but will probably be included in Flink 1.11. The state.backend.fs.memory-threshold parameter determines when the FsStatebackend needs to write state data out of the memory. Previously, the default value of 1 KB often led to a large number of small files and affected the state access performance. Therefore, in Flink 1.11, the default value is changed to 20 KB. Note: This change may increase the memory usage of the JM, especially when the concurrency of operators is high or the union state is used[4].

Table & SQL

Compared with the previous class inference mechanism, the new class inference mechanism can provide more class information about input parameters, allowing you to implement more flexible processing logic. Currently, this feature supports user-defined functions (UDFs) and user-defined table functions (UDTFs) but does not support user-defined aggregate functions (UDAFs.)

Flink 1.11 provides the following enhancements to the TableEnvironment API:

  1. Previously, sqlUpdate executed DDL statements immediately upon reception but executed DML statements only when env.execute was executed. In Flink 1.11, both DDL statements and DML statements are executed when env.executeSql is executed.
  2. Queries that return results are supported, such as SHOW TABLE and EXPLAIN SQL statements.
  3. Caching multiple SQL statements for execution is supported.
  4. The COLLECT method is added to allow you to obtain query execution results.

In versions earlier than 1.11, you had to manually copy database table schemas to Flink when you used Flink to read data from or write data to relational databases or read change logs. This process was tedious and prone to errors, greatly increasing the cost of using Flink. Flink 1.11 provides catalog management that is based on JDBC and PostgreSQL so Flink can automatically read table schemas, reducing the need for manual operations.

Users have long asked for a Change Data Capture (CDC) mechanism to import dynamic data from external systems, such as MySQL BinLog and Kafka Compacted Topic, into Flink and write the Update and Retract streams in Flink to external systems. Flink 1.11 adds support for reading and writing CDC data. Currently, Flink supports CDC Debezium and Canal formats.

Flink simplifies the structure of the current TableSource and TableSink APIs, providing the basis for supporting the CDC feature. This avoids the dependency on the DataStream API and resolves the problem where only the Blink Planner supported efficient source and sink implementations. For more information about the changes in the APIs, visit this website.

FLIP-122 reorganizes the “with” configuration items of Table Connector and SQL Connector. Due to historical reasons, some “with” configuration items are redundant or inconsistent. For example, all configuration items are prefixed with “connector,” but the naming rules of the configuration items are different. After the configuration items were modified, these redundancy and inconsistency issues were all resolved. Note: The existing configuration items are still available. For the new configuration items, visit this website.

Dynamic table properties allow you to dynamically modify the configuration items of a table when you use the table, freeing you from the need to re-declare the table’s DDL statements after changing configuration items. As shown in the following code, dynamic properties allow you to run /+ OPTIONS('k1'='v1')/ to override property values in DDL statements.

EMP /*+ OPTIONS('k1'='v1', 'k2'='v2') */
DEPT /*+ OPTIONS('a.b.c'='v3', 'd.e.f'='v4') */
EMP.deptno = DEPT.deptno
  1. FileSystem Connectors provide support for the CSV, ORC, Parquet, JSON, and Avro formats. In addition, Flink SQL provides full support for the Batch FileSystem Connector and Streaming FileSystem Connector.
  2. Flink SQL supports the Hive streaming sink.

FLIP-123 provides support for the Hive dialect, allowing you to perform operations by executing Hive DDL and DML statements.

DataStream API

Flink Kafka Shuffle provides the DataStream API to allow you to use Kafka as a message bus between link operators and provides a mechanism for simultaneously exchanging and storing data. This mechanism has the following advantages:

  1. Shuffled data can be reused.
  2. When a job performs failure recovery, only persisted data is involved. This avoids the restart of the entire graph and is in line with the exactly-once semantics.

This mechanism can supplement failure recovery for large-scale streaming jobs before the ongoing failover refactoring job of Flink is completed.

Note: The work has been completed, but whether this feature will be released in Flink 1.11 is still under discussion. The new WatermarkAssigner API integrates the two previous types of Watermark APIs: AssignerWithPunctuatedWatermarks and AssignerWithPeriodicWatermarks. This makes it easier to support watermark insertion during source implementation in the future.

Flink 1.11 supports multi-input operators, but no complete DataStream API has been provided for this feature. To use this feature, you need to manually create MultipleInputTransformation and MultipleConnectedStreams.

MultipleInputTransformation<Long> transform = new MultipleInputTransformation<>(
"My Operator",
new SumAllInputOperatorFactory(),
new MultipleConnectedStreams(env)

PyFlink & ML

Previously, Python UDFs could be executed in both stream mode and the batch mode of the Blink Planner, but only the stream mode of the Flink Planner. After this feature is released, Python UDFs can be executed in both the stream mode and the batch mode of the two planners.

A UDTF allows you to write one entry and output multiple entries. Python UDTFs can be executed in both stream mode and the batch mode of the two planners.

Flink uses Cython to optimize the computational logic of the serialization and deserialization operations of the Coder module and the computational logic of the Operation module. The end-to-end performance is dozens of times higher than Flink 1.10.

Pandas UDFs use pandas.Series as the input and output class and support batch data processing. Generally, Pandas UDFs have better performance than common UDFs because they reduce the serialization and deserialization overhead of data exchange between Java and Python processes. In addition, Pandas UDFs can process data in batches, reducing the number and overhead of calls to Python UDFs. Furthermore, Python UDFs allow you to conveniently use Pandas-related Python libraries.

In Flink 1.11, you can use the to_pandas() method on a table object to obtain a corresponding Pandas DataFrame object or use the from_pandas() method on a table object to convert a Pandas DataFrame object to a table object.

import pandas as pd
import numpy as np
# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()

Currently, Flink supports four types of custom metrics, Counter, Gauges, Meters, and Distributions. In addition, Flink allows you to define User Scope and User Variables of the corresponding metrics.

Previously, Python UDFs could only be used in the Python table API. With Flink’s support for the registration of Python UDFs in SQL DDL, SQL users can conveniently use Python UDFs. Flink also supports the use of Python UDFs in SQL Clients, allowing you to register Python UDFs and manage their dependencies in SQL Clients.

Flink 1.9 introduced the ML Pipeline API to enhance the ease of use and scalability of Flink ML. Due to Python’s wide use in ML areas, FLIP-96 provides a set of corresponding Python Pipeline APIs to help Python users.

Runtime Optimizations

In Flink’s current checkpoint mechanism, each operator can create snapshots only after it receives aligned barriers from all upstream operators. Then, the operator continues to send the barriers downstream. When backpressure occurs, it may take a long time to pass barriers from upstream operators to downstream operators, resulting in checkpoint timeout.

To address this problem, Flink 1.11 adds the unaligned checkpoint mechanism. After unaligned checkpoints are enabled, soon after a barrier is received, Flink can perform a checkpoint and save the data being transmitted between upstream and downstream operators to a snapshot in the form of a state. This greatly shortens the checkpoint completion time without depending on the processing capability of the operators and resolves the problem where checkpoints have timed out due to backpressure.

To enable the unaligned checkpoint mechanism, you can call env.getCheckpointConfig().enableUnalignedCheckpoints();.

Flink 1.11 can be integrated with ZooKeeper 3.5, allowing you to use some new Zookeeper features, such as SSL.

Flink 1.11 has modified the loading logic of the ClassLoader on the TM. Previously, a ClassLoader was created after each failover. In Flink 1.11, a ClassLoader will be cached as long as any slot occupied by the target job is available. This modification has a certain impact on the semantics of job failover because the Static field will not be reloaded after a failover. However, the JVM meta can be protected from out of memory (OOM) errors caused by the creation of a large number of ClassLoaders.

Flink 1.11 upgrades the log system Log4j to 2.x. This resolves some problems with Log4j 1.x and adds support for some new features of Log4j 2.x.

Flink 1.11 reuses the memory of Flink’s buffer to store data received on the downstream network. In this way, memory replications from the Netty layer to Flink’s buffer are reduced and the resulting additional overhead of the direct memory is also reduced. This lowers the probability of direct memory OOM for online jobs and the probability that a container will be terminated due to memory overuse.

These are the new features of Flink 1.11. The community will continue to share more relevant technical knowledge in the future.



Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store