On August 22, 2019, Apache Flink version 1.9.0 was officially released to the public. This new version is the first release after the internal version of Alibaba’s Blink was merged and integrated into Apache’s official version of Flink.
So there are some significant changes in store with this all-new update. Notable features in this new release are batch-style recovery for batch jobs and a preview of the new Blink-based query engine for Table API and SQL queries. At the same time, this release also provides the State Processor API, which has been one of the most frequently requested features and enables you to be able to flexibly read and write savepoints with Flink DataSet jobs. The release also includes a reworked WebUI and previews for Flink’s new Python Table API and its integration with the Apache Hive ecosystem.
From the very beginning, the goal of the Apache Flink project has been to develop a stream processing system that can unify and serve to power many forms of real-time as well as offline data processing applications in addition to several different event-driven applications. In this release, Apache together with Alibaba Cloud have made a huge step forward towards that initial goal, by integrating Blink’s stream and batch processing capabilities under a unified run time.
This article serves as a reference point to anyone interested in this update and wants to know what all there is to expect. This article will overview all major new features, improvements, important changes to Apache Flink in this new update. This article will also look into Apache’s future development plans.
Note: The binary distribution and source artifacts for Flink 1.9 are now available from the Downloads page of the Flink project, along with the updated documentation. Flink 1.9 is API-compatible with previous 1.x releases for APIs annotated with the @Public annotation. You can share your ideas with the community through Flink mailing lists or JIRA.
New Features and Improvements
Fine-grained Batch Job Recovery (FLIP-1)
In this new release, the time to recover a batch, whether it be a DataSet, Table API, or SQL job from a task failure has been significantly reduced.
Until Flink 1.9, task failures in batch jobs were recovered through a convoluted process where you had to cancel all tasks and restart the entire job again. That is, the job had to be started from scratch and all progress was voided.
However, this version of Flink keeps intermediate results on the edge of the network shuffle and also uses this data to only recover tasks that are affected by failures-in other words, the entire process can be much simpler as a result. Involved with this process, you can have a failover region that is the set of tasks that are connected through pipelined data exchanges. The batch-shuffle connections of a job define the boundaries of its failover regions. For more on all of this, see FLIP-1.
To use this new failover strategy, make sure that you have the
jobmanager.execution.failover-strategy: region entry in your
Note: The configuration of the 1.9 distribution package has that entry by default, but when reusing a configuration file from previous setups, you have to add it manually.
The “Region” failover strategy described abaove also speeds up and improves the recovery of “embarrassingly parallel” streaming jobs, namely, jobs without any shuffle like
keyBy() or rebalance. When such a job is recovered, only the tasks of the affected pipeline (failover region) are restarted. For all other streaming jobs, the recovery behavior is the same as in prior Flink versions.
State Processor API (FLIP-43)
Until Flink 1.9, accessing the state of a job from the outside was limited to the (still) experimental Queryable State. This release introduces a new, powerful library to read, write and modify state snapshots by using the DataSet API. In practice, this means:
- Flink job states can be bootstrapped by reading data from external systems, for example, external databases, and converting it into a savepoint.
- States in savepoints can be queried by using any of Flink’s batch APIs (DataSet, Table, SQL). For example, to analyze relevant state patterns or check discrepancies in state for auditing or troubleshooting applications.
- The schema of state in savepoints can be migrated offline, compared to the previous approach requiring online migration on schema access.
- Invalid data in savepoints can be identified and corrected.
The new State Processor API covers all variations of snapshots: savepoints, full checkpoints, and incremental checkpoints. For more details, see FLIP-43.
“Cancel-with-savepoint” is a common operation for stopping or restarting, as well as forking or updating Flink jobs. However, the existing implementation of this had the issue of not completely guaranteeing data persistence on data transferred to external storage systems for exactly-once sinks. To improve the end-to-end semantics when stopping a job, Flink 1.9 has introduced a new
SUSPEND mode that stops a job with a savepoint, ensuring the consistency of the output data. You can suspend a job with the Flink CLI client as follows:
bin/flink stop -p [:targetSavepointDirectory] :jobId
The final job state is set to
FINISHED upon success, allowing you to be able to easily verify whether an operation fails. For more information about this, check out FLIP-34.
Flink WebUI Refactoring
After this discussion (see the link given here) about modernizing the WebUI of Flink, the community decided to refactor this component by using the latest stable version of Angular-which happened to be Angular 7.0 and later. This redesigned, updated version is the default in 1.9.0. However, Flink 1.9.0 includes a link that allows you to switch to the older WebUI.
Note: One thing that is important to note is that, given these major changes, in the future, the old version of the WebUI will not be guaranteed to have the same features as the new version. The old WebUI version will be deprecated when the new version reaches a stable status.
Preview of the New Blink SQL Query Processor
With the integration of Alibaba’s Blink to Apache Flink, the community worked on integrating Blink’s query optimizer and run time for the Table API and SQL query features into Flink. The first step involved in this was the refactoring of the monolithic
flink-table module into smaller modules (see FLIP-32). For the Java and Scala API modules, the optimizer, and run time modules, this means clear layering and well-defined interfaces.
The next step was extending the Blink planner to implement a new optimizer interface. Two pluggable query processors are available to run Table API and SQL statements: versions of the Flink processor earlier than 1.9 and the new Blink-based query processor. The Blink-based query processor offers better SQL coverage, with full TPC-H support in 1.9. Next, TPC-DS support is planned for the next release.
There’s also improved batch query performance based on more extensive query optimization with cost-based plan selection and more optimization rules. Last, there’s also improved code-generation and tuned operator implementations. The Blink-based query processor also enables more powerful streaming, along with some long-awaited new features, such as dimension table join, TopN, and deduplication, optimizations for solving data skew in aggregation scenarios, and more useful built-in functions.
Note: The semantics and set of supported operations of the two query processors are mostly, but not fully aligned. For more details, see the Release Notes.
However, the integration of Blink’s query processor is not all complete quite yet. The processor in pre-1.9 versions of Flink is still the default processor in Flink 1.9 and recommended in production environment.
You can enable the Blink processor by configuring the
EnvironmentSettings when creating a
TableEnvironment. The selected processor must be in the classpath of the running Java process. For cluster settings, by default, both query processors automatically load to the classpaths. When running a query from your IDE you need to explicitly add a planner dependency to your project.
Other Improvements to the Table API and SQL
Besides the exciting progress around the Blink planner, the community also worked on a whole set of other improvements to these interfaces, including:
- Scala-free Table API and SQL for Java Users (see FLIP-32)
As part of the refactoring and splitting of the flink-table module, two separate API modules for Java and Scala were created. For those using Java, this does not result in really significant changes. However, Java users can use the Table API and SQL now without using a bunch of Scala dependencies.
- Refactoring of the Table API and SQL Type System (see FLIP-37)
The community implemented a new data type system to detach the Table API from the TypeInformation class in Flink and improve its compliance with the SQL standard. This work is still in progress and expected to be completed in the next release. In Flink 1.9, UDFs are not ported to the new type system yet.
- Multi-column and Multi-row Transformations for the Table API (see FLIP-29)
The functionality of the Table API was extended with a set of transformations that support multi-row and multi-column inputs and outputs. These transformations significantly ease the implementation of processing logic that would be cumbersome to implement with relational operators.
- New, Unified Catalog APIs
The community refactored and replaced some existing catalog APIs to provide a unified way to process internal and external catalogs. This effort was mainly initiated in preparation for the Hive integration. Additionally, the rework improves the overall convenience of managing catalog metadata in Flink.
- DDL Support in the SQL API ( see FLINK-10232)
Before the 1.9 version, Flink SQL only supported DML statements such as SELECT and INSERT. External tables, specifically table sources and sinks, had to be registered by using Java/Scala code or configuration files. In Flink 1.9, the community added support for SQL DDL statements to register and remove tables, specifically with the commands
CREATE TABLE and
DROP TABLE. However, the community did not add stream-specific syntax extensions to define timestamp extraction and watermark generation. Full support for streaming scenarios is planned for the next release.
A Preview of the Hive Integration (FLINK-10556)
Apache Hive is widely used in the Hadoop ecosystem to store and query large amounts of structured data. Besides being a query processor, Hive features a catalog called Metastore to manage and organize large datasets. A common integration point for query processors is to integrate with Hive Metastore so that Hive can be used for data management.
Recently, the community started implementing an external catalog for the Flink Table API and SQL queries that connect to the Hive Metastore. In Flink 1.9, you can now query and process different data format types that are stored in Hive. Moreover, the Hive integration includes support for using Hive UDFs in Flink Table API or SQL queries. For more details, see FLINK-10556.
Previously, tables defined in the Table API and SQL were always temporary. The new catalog connector also allows persisting a table in Metastore that is created with a SQL DDL statement. This means that you can connect to Metastore and register a table, for example, a table similar to a Kafka topic. From now on, you can query that table whenever your catalog is connected to Metastore.
Note: It’s important to note that the Hive support in Flink 1.9 is experimental. The community are planning to stabilize these features in the next release and are looking forward to your feedback.
Preview of the New Python Table API (FLIP-38)
This release also introduces the first version of the Python Table API (see FLIP-38). This marks a start towards the community’s goal of bringing full-fledged Python support to Flink. The feature was designed as a slim Python API wrapper around the Table API, basically translating Python Table API method calls into Java Table API calls. In Flink 1.9, the Python Table API currently does not support UDFs and only enables standard relational operations. Support for UDFs in Python is on the roadmap of future releases.
If you’d like to try the new Python API, you need to manually install PyFlink. You can see the walkthrough in the document and start exploring yourself. The community is currently preparing a
pyflink Python package that can be installed through pip.
- The Table API and SQL are now part of the default configuration of the Flink distribution. Before, the Table API and SQL had to be enabled by moving the corresponding JAR file from
- The machine learning library (
flink-ml) has been removed in preparation for FLIP-39.
- The old DataSet and DataStream Python APIs have been removed and the new Python API introduced in FLIP-38 is recommended.
- Flink can be compiled and run in Java 9. Some components that interact with external systems, such connectors, file systems, and reporters, may not work since the corresponding projects may have skipped Java 9 support.
You can review the Release Notes for a more detailed overview of changes and new features if you plan to upgrade from an existing version of Flink.