By Yang Kete ‘Rooney’.
Only a month or so ago, on August 22, the newest version of Apache Flink, 1.9, was officially released. This version of Flink is a big deal. And it’s special because it represents a merger of Alibaba’s in-house version of Blink with Apache’s Flink.
This update has been several months in the making. In as early as January, Alibaba announced the news that Blink would become open-source and also contribute code to Apache’s Flink. The merger was far from simple, with around 1,500,000 lines of code being modified since the previous release. It not only involved major structural changes, but major updates and enhancements to the feature set of Flink.
In this article, we will go over some of the major changes and new features included in this newest, just released version of Flink. But, before we get into anything else, let’s briefly review some key points about why Alibaba choose to make Blink open-source during this transition, what it meant for developers, and what does it mean for the future of Alibaba’s Blink as well as Apache’s Flink:
- The move of Blink to go open-source is mainly about moving a large body of core code for new features, performance optimization and stability improvement that Alibaba has accumulated in stream computing and batch processing to community developers. In short, the core code of Blink can be described as being based on the open-source Flink engine and relying on the internal businesses of the Group.
- Blink is open-source in the form of a branch, that is, it will become a branch under the Apache Flink project after it made fully open-source.
- The goal for making Blink open-source is not to have it become another active project, but to make Flink better. That is, by this change, community developers-including you-can learn how exactly Blink code was written and implement and then use these insights to improve the efficiency of merging Blink features into Flink.
Between when Alibaba announced that it would be making Blink open source and when Flink 1.9 got released, a span of around six months have passed. But with all things said and done, the original vision and goal that Alibaba set forth with making Blink open source and merging features into Flink have come into fruition-thanks of course to much community support along the way. Although some of Blink’s features have still to be merged into Flink, the powerful enhancements made through the merger are clear.
First, I would like to share the following figures on the comparison between Flink 1.9.0 and previous versions:
In terms of the number of resolved issues and the number of code commits, 1.9.0 has reached or exceeded the number addressed in the two previous versions combined. Next, in terms the number of contributors, Flink has attracted a much higher number of community contributors than ever before, many of whom are probably from China.
Upgrades to the Underlying Architecture
Generally speaking, when it comes to all things IT, if the system changes a lot, it probably has something to do with changes or upgrades to the underlying architecture of the system. The changes that occurred with this release of Flink, of course, are certainly not an exception to this rule. Flink has taken a big step in the direction of stream-batch integration. First, let’s look at the architecture diagram of earlier Flink versions:
Readers familiar with Flink will know that the architecture diagram on the left very well. Simply put, Flink has two independent processes, DataStream API and DataSet API, on top of its distributed streaming execution engine to describe stream computing and batch processing jobs. On top of the two APIs, a stream-batch unified API is provided, namely the Table API & SQL. As a result, users can use the same Table API program or SQL to describe the stream-batch job, but they may also need to tell the Flink engine whether to run in the form of a stream or a batch at the run time. At this time, the optimizer of the Table layer optimizes the program into a DataStream job or a DataSet job.
However, if we look closely at the implementation details at the underlying layer of DataStream and DataSet, we can find that the two APIs do not actually have much in common. They have independent translation and optimization processes, and they also use completely different tasks during actual operation. Such inconsistencies can be problematic for both users and developers.
For users, they need to choose between the two APIs when writing jobs, while the two APIs are not only semantically different, but also support different types of connectors, which inevitably causes problems. Although the Table module has been unified on the API, the underlying implementation is still based on DataStream and DataSet, so it will also be affected by the inconsistencies.
For developers, since the two processes are relatively independent, reusing code is a challenge. When developing some new features, developers often need to develop similar features twice, and each API has a long development path, which is basically an end-to-end modification. This greatly reduces their development efficiency. If two independent technology stacks exist for a long time, it will not only cause a long-term waste of manpower, but may also eventually lead to slower development of all Flink features.
Based on some initial explorations of Blink, Alibaba developers and community developers had a close discussion with the developers in the community, and determined the technical architecture route of Flink in the future.
In future Flink versions, devlopers at Apache, Alibaba, along with community developers will discard the DataSet API. The user APIs are mainly divided into the DataStream API, which describes the physical execution plan, and the Table & SQL, which describes the relational plan. The DataStream API provides users with more of a “WYSIWYG” (What You See Is What You Get) experience. Users describe and orchestrate the relationship between operators themselves, and the engine will not interfere and optimize too much. While Table API & SQL maintains its current style and provides the relational expression API. The engine will optimize based on the user’s intent, and select the optimal execution plan. It is worth mentioning that the two APIs will provide both stream computing and batch processing features in the future. Under the two user APIs, they share the same technology stack at the implementation layer. For example, they use a unified DAG data structure to describe jobs, a unified StreamOperator to write operator logic, and a unified streaming distributed execution engine.
Changes to Table API and SQL Queries
When Blink was made open-source, the Table module of Blink as been used for a new underlying architecture design in Flink. Therefore, in Flink 1.9.0, the Table module naturally became the first to use the adjusted architecture. However, in order minimize impact to the user experience of earlier versions, developers at both Alibaba and Apache still need to find a way for the two architectures to coexist.
For this purpose, the community developers have made a series of efforts, including splitting the Table module (FLIP-32, where FLIP is Flink Improvement Proposals, which specifically records some proposals for major changes to Flink), sorting out the dependencies of Java and Scala API, and proposing the Planner interface to support various Planner implementations. The Planner is responsible for the specific optimization and the translation of Table jobs into execution graphs. Developers can move all the original implementations to the Flink Planner, and then put the code that connects the new architecture into the Blink Planner.
The Query Processor in the figure is the implementation of Planner
This approach has a lot of benefits. It not only makes the Table module clearer after being split, but also, more importantly, does not affect the user experience of earlier versions.
In version 1.9.0, developers have already merged most of the SQL features that were originally open-source from Blink. These are the new features and performance optimizations that have been obtained through repeated modifications in the internal scenarios at Alibaba over the past few years. I believe they can bring Flink to a higher level.
In addition to the architecture upgrade, the Table module has also undergone significant refactoring and added new features in 1.9.0, including:
1.FLIP-37: To refactor the Table API type system.
2.FLIP-29: To add the API for multi-row and multi-column operations to the Table module.
3.FLINK-10232: To implement the preliminary SQL DDL support.
4.FLIP-30: To implement the brand new unified Catalog API.
5.FLIP-38: To add the Table API for Python.
With these new features and subsequent fixes and improvements, the Flink Table API & SQL will play an increasingly important role in the future.
Improvements to Batch Processing
The batch processing feature of Flink has made significant progress in version 1.9.0. After the architecture adjustment, Flink 1.9.0 has added several improvements to the batch processing feature.
The first and foremost is to optimize the cost of the batch processing error recovery: FLIP-1 (Fine Grained Recovery from Task Failures). From the number of this FLIP, we can see that the optimization was proposed long ago, and the unfinished feature in FLIP-1 can finally be implemented in version 1.9.0. In the new version, if an error occurs in a batch job, Flink first calculates the impact scope of this error, that is, the Failover Region. In a batch job, some nodes can transmit pipelined data over the network, but some other nodes can save the output data in blocking mode first, and then the downstream reads the stored data for data transmission. If the data output by the operator has been completely saved, then it is not necessary to pull the operator to rerun. In this way, the error recovery can be controlled to a relatively small range.
In extreme cases, if data is stored in the disk at every place where the Shuffle operation is needed for a job, then it is similar to the behavior of MapReduce and Spark. It is just that Flink supports more advanced usage. You can control whether each Shuffle is directly connected through the network or by storing files on the disk.
With the file-based Shuffle, it is easy to think of whether the Shuffle implementation can be converted into plug-ins. Yes. The community is also making improvements in this direction: FLIP-31 (Pluggable Shuffle Service). For example, developers can use the Auxiliary Service of Yarn as a Shuffle implementation. Developers can even write a distributed service to help batch tasks to shuffle. Recently, some work in this area has been shared on Facebook. At Alibaba, developers have used this architecture to support processing hundreds of TBs of data for a single job. Equipped with such a plug-in mechanism, Flink can easily connect to these more efficient and flexible implementations, so that the Shuffle, the long-standing problem of batch processing, can be better solved.
Improvements to Stream Processing
After all, stream computing is still the main field of Flink development. Therefore, in version 1.9.0, it is important not to forget to make some improvements in this area. This version adds a very practical feature, that is, FLIP-43 (State Processor API). In Flink, the access to the state data and to the savepoint composed of the state data has always been highly popular among community users. In versions earlier than Flink 1.9.0, the Queryable State feature was developed. However, the application scenario of this feature is limited and the effect is not ideal. Therefore, not many people have used this feature. While, the State Processor API provides more flexible access methods, and enables users to perform some of the more technical features:
- Users can use this API to read data from other external systems in advance, convert them to the Flink Savepoint format, and then enable the Flink job to start from this savepoint. In this way, many cold start problems can be avoided.
- Use Flink State Processor API to directly analyze the state data. The state data has always been a “black box” for users. Users do not know whether the data stored in it is right or wrong and whether any exceptions have occurred. With this API, users can analyze the state data just like other data.
- Revise the dirty data. If a piece of dirty data contaminates your state, you can also use this API to fix and correct such problems.
- Migrate the state. Suppose a user modifies the logic of the job and wants to reuse the state of most of the original jobs, and also wants to make some fine-tuning. Then, the user can use this API to complete the corresponding work.
All of the above are very common requirements and problems in the stream computing field, and can be solve through this flexible API. Therefore, I am personally very optimistic about the application prospects of this API.
Speaking of the savepoint, I would also like to mention another practical feature developed by the community, that is FLIP-34 (Stop with Savepoint). As we all know, in Flink, checkpoints are performed periodically and a global state snapshot is maintained. Suppose we encounter a scenario in which the user actively suspends the job in the middle of two checkpoint cycles, and then restarts it later. In this case, Flink automatically reads the last successfully saved global state snapshot, and starts to compute the data after the last global snapshot. This can ensure that the state data is neither too much nor too little, but the output to Sink already has duplicate data. With this feature, Flink will take a global snapshot while suspending the job, and store it to the savepoint. The next time you start the job, the job will be started from this savepoint, so Sink will not receive unexpected duplicate data. However, this cannot solve the problem of repeated data output to Sink caused by automatic failover while the job is running.
Apache Hive has always been an important force in the Hadoop ecosystem. To better promote the batch feature of Flink, the integration with Hive is essential. During the development of version 1.9.0, we are very pleased to have two Apache Hive PMC specialists to promote the integration of Flink and Hive.
The first thing to be solved is to use Flink to read Hive data. With the help of the unified Catalog API proposed in FLIP-30, Flink has now fully realized the access to Hive Meta Store. At the same time, we have added the connector for Hive. Currently, formats, such as CSV, Sequence File, Orc and Parquet are supported. To read Hive tables directly from Flink, users only need to configure how HMS is accessed. Additionally, the Flink compatibility with custom Hive functions allows User-Defined Functions (UDFs), User-Defined Tabular Functions (UDTFs), and User-Defined Aggregate Functions (UDAFs) to be run directly in Flink SQL.
Flink currently provides simple writing support and only allows inserting into a new table (using the
INSERT INTO statement). Hive compatibility has always been a high priority for the Flink community. Continuous improvements are expected in future versions of Flink.
So, in summary, Flink 1.9.0 was finally launched after more than six months of intense development. During the process, many Chinese developers and users alike joined the Flink community and a massive amount of code was contributed to the community, indicating a promising start for Flink. In the future, we will continue to invest more in the functionality and ecosystem of the Flink community and popularize Flink in China or throughout the world.