What’s All Involved with Blink Merging with Apache Flink?

By Yang Kete ‘Rooney’.

Only a month or so ago, on August 22, the newest version of Apache Flink, 1.9, was officially released. This version of Flink is a big deal. And it’s special because it represents a merger of Alibaba’s in-house version of Blink with Apache’s Flink.

This update has been several months in the making. In as early as January, Alibaba announced the news that Blink would become open-source and also contribute code to Apache’s Flink. The merger was far from simple, with around 1,500,000 lines of code being modified since the previous release. It not only involved major structural changes, but major updates and enhancements to the feature set of Flink.

In this article, we will go over some of the major changes and new features included in this newest, just released version of Flink. But, before we get into anything else, let’s briefly review some key points about why Alibaba choose to make Blink open-source during this transition, what it meant for developers, and what does it mean for the future of Alibaba’s Blink as well as Apache’s Flink:

  • The move of Blink to go open-source is mainly about moving a large body of core code for new features, performance optimization and stability improvement that Alibaba has accumulated in stream computing and batch processing to community developers. In short, the core code of Blink can be described as being based on the open-source Flink engine and relying on the internal businesses of the Group.
  • Blink is open-source in the form of a branch, that is, it will become a branch under the Apache Flink project after it made fully open-source.
  • The goal for making Blink open-source is not to have it become another active project, but to make Flink better. That is, by this change, community developers-including you-can learn how exactly Blink code was written and implement and then use these insights to improve the efficiency of merging Blink features into Flink.

Between when Alibaba announced that it would be making Blink open source and when Flink 1.9 got released, a span of around six months have passed. But with all things said and done, the original vision and goal that Alibaba set forth with making Blink open source and merging features into Flink have come into fruition-thanks of course to much community support along the way. Although some of Blink’s features have still to be merged into Flink, the powerful enhancements made through the merger are clear.

First, I would like to share the following figures on the comparison between Flink 1.9.0 and previous versions:

Image for post
Image for post
Image for post
Image for post

In terms of the number of resolved issues and the number of code commits, 1.9.0 has reached or exceeded the number addressed in the two previous versions combined. Next, in terms the number of contributors, Flink has attracted a much higher number of community contributors than ever before, many of whom are probably from China.

Upgrades to the Underlying Architecture

Image for post
Image for post

Readers familiar with Flink will know that the architecture diagram on the left very well. Simply put, Flink has two independent processes, DataStream API and DataSet API, on top of its distributed streaming execution engine to describe stream computing and batch processing jobs. On top of the two APIs, a stream-batch unified API is provided, namely the Table API & SQL. As a result, users can use the same Table API program or SQL to describe the stream-batch job, but they may also need to tell the Flink engine whether to run in the form of a stream or a batch at the run time. At this time, the optimizer of the Table layer optimizes the program into a DataStream job or a DataSet job.

However, if we look closely at the implementation details at the underlying layer of DataStream and DataSet, we can find that the two APIs do not actually have much in common. They have independent translation and optimization processes, and they also use completely different tasks during actual operation. Such inconsistencies can be problematic for both users and developers.

For users, they need to choose between the two APIs when writing jobs, while the two APIs are not only semantically different, but also support different types of connectors, which inevitably causes problems. Although the Table module has been unified on the API, the underlying implementation is still based on DataStream and DataSet, so it will also be affected by the inconsistencies.

For developers, since the two processes are relatively independent, reusing code is a challenge. When developing some new features, developers often need to develop similar features twice, and each API has a long development path, which is basically an end-to-end modification. This greatly reduces their development efficiency. If two independent technology stacks exist for a long time, it will not only cause a long-term waste of manpower, but may also eventually lead to slower development of all Flink features.

Based on some initial explorations of Blink, Alibaba developers and community developers had a close discussion with the developers in the community, and determined the technical architecture route of Flink in the future.

Image for post
Image for post

In future Flink versions, devlopers at Apache, Alibaba, along with community developers will discard the DataSet API. The user APIs are mainly divided into the DataStream API, which describes the physical execution plan, and the Table & SQL, which describes the relational plan. The DataStream API provides users with more of a “WYSIWYG” (What You See Is What You Get) experience. Users describe and orchestrate the relationship between operators themselves, and the engine will not interfere and optimize too much. While Table API & SQL maintains its current style and provides the relational expression API. The engine will optimize based on the user’s intent, and select the optimal execution plan. It is worth mentioning that the two APIs will provide both stream computing and batch processing features in the future. Under the two user APIs, they share the same technology stack at the implementation layer. For example, they use a unified DAG data structure to describe jobs, a unified StreamOperator to write operator logic, and a unified streaming distributed execution engine.

Changes to Table API and SQL Queries

For this purpose, the community developers have made a series of efforts, including splitting the Table module (FLIP-32, where FLIP is Flink Improvement Proposals, which specifically records some proposals for major changes to Flink), sorting out the dependencies of Java and Scala API, and proposing the Planner interface to support various Planner implementations. The Planner is responsible for the specific optimization and the translation of Table jobs into execution graphs. Developers can move all the original implementations to the Flink Planner, and then put the code that connects the new architecture into the Blink Planner.

Image for post
Image for post

The Query Processor in the figure is the implementation of Planner

This approach has a lot of benefits. It not only makes the Table module clearer after being split, but also, more importantly, does not affect the user experience of earlier versions.

In version 1.9.0, developers have already merged most of the SQL features that were originally open-source from Blink. These are the new features and performance optimizations that have been obtained through repeated modifications in the internal scenarios at Alibaba over the past few years. I believe they can bring Flink to a higher level.

Image for post
Image for post

In addition to the architecture upgrade, the Table module has also undergone significant refactoring and added new features in 1.9.0, including:

1.FLIP-37: To refactor the Table API type system.

2.FLIP-29: To add the API for multi-row and multi-column operations to the Table module.

3.FLINK-10232: To implement the preliminary SQL DDL support.

4.FLIP-30: To implement the brand new unified Catalog API.

5.FLIP-38: To add the Table API for Python.

With these new features and subsequent fixes and improvements, the Flink Table API & SQL will play an increasingly important role in the future.

Improvements to Batch Processing

The first and foremost is to optimize the cost of the batch processing error recovery: FLIP-1 (Fine Grained Recovery from Task Failures). From the number of this FLIP, we can see that the optimization was proposed long ago, and the unfinished feature in FLIP-1 can finally be implemented in version 1.9.0. In the new version, if an error occurs in a batch job, Flink first calculates the impact scope of this error, that is, the Failover Region. In a batch job, some nodes can transmit pipelined data over the network, but some other nodes can save the output data in blocking mode first, and then the downstream reads the stored data for data transmission. If the data output by the operator has been completely saved, then it is not necessary to pull the operator to rerun. In this way, the error recovery can be controlled to a relatively small range.

Image for post
Image for post

In extreme cases, if data is stored in the disk at every place where the Shuffle operation is needed for a job, then it is similar to the behavior of MapReduce and Spark. It is just that Flink supports more advanced usage. You can control whether each Shuffle is directly connected through the network or by storing files on the disk.

With the file-based Shuffle, it is easy to think of whether the Shuffle implementation can be converted into plug-ins. Yes. The community is also making improvements in this direction: FLIP-31 (Pluggable Shuffle Service). For example, developers can use the Auxiliary Service of Yarn as a Shuffle implementation. Developers can even write a distributed service to help batch tasks to shuffle. Recently, some work in this area has been shared on Facebook. At Alibaba, developers have used this architecture to support processing hundreds of TBs of data for a single job. Equipped with such a plug-in mechanism, Flink can easily connect to these more efficient and flexible implementations, so that the Shuffle, the long-standing problem of batch processing, can be better solved.

Improvements to Stream Processing

  1. Users can use this API to read data from other external systems in advance, convert them to the Flink Savepoint format, and then enable the Flink job to start from this savepoint. In this way, many cold start problems can be avoided.
  2. Use Flink State Processor API to directly analyze the state data. The state data has always been a “black box” for users. Users do not know whether the data stored in it is right or wrong and whether any exceptions have occurred. With this API, users can analyze the state data just like other data.
  3. Revise the dirty data. If a piece of dirty data contaminates your state, you can also use this API to fix and correct such problems.
  4. Migrate the state. Suppose a user modifies the logic of the job and wants to reuse the state of most of the original jobs, and also wants to make some fine-tuning. Then, the user can use this API to complete the corresponding work.

All of the above are very common requirements and problems in the stream computing field, and can be solve through this flexible API. Therefore, I am personally very optimistic about the application prospects of this API.

Speaking of the savepoint, I would also like to mention another practical feature developed by the community, that is FLIP-34 (Stop with Savepoint). As we all know, in Flink, checkpoints are performed periodically and a global state snapshot is maintained. Suppose we encounter a scenario in which the user actively suspends the job in the middle of two checkpoint cycles, and then restarts it later. In this case, Flink automatically reads the last successfully saved global state snapshot, and starts to compute the data after the last global snapshot. This can ensure that the state data is neither too much nor too little, but the output to Sink already has duplicate data. With this feature, Flink will take a global snapshot while suspending the job, and store it to the savepoint. The next time you start the job, the job will be started from this savepoint, so Sink will not receive unexpected duplicate data. However, this cannot solve the problem of repeated data output to Sink caused by automatic failover while the job is running.

Integrating Hive

The first thing to be solved is to use Flink to read Hive data. With the help of the unified Catalog API proposed in FLIP-30, Flink has now fully realized the access to Hive Meta Store. At the same time, we have added the connector for Hive. Currently, formats, such as CSV, Sequence File, Orc and Parquet are supported. To read Hive tables directly from Flink, users only need to configure how HMS is accessed. Additionally, the Flink compatibility with custom Hive functions allows User-Defined Functions (UDFs), User-Defined Tabular Functions (UDTFs), and User-Defined Aggregate Functions (UDAFs) to be run directly in Flink SQL.

Flink currently provides simple writing support and only allows inserting into a new table (using the INSERT INTO statement). Hive compatibility has always been a high priority for the Flink community. Continuous improvements are expected in future versions of Flink.

Summary

Original Source

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store