By Chen Shouyuan, Senior Product Expert, and Gordon Tai, Founding Team Member
Definition, Architecture, and Principles of Flink
Apache Flink is a distributed processing engine for big data that performs stateful or stateless computations over both bound and unbound data streams. It’s also deployed in various cluster environments for fast computations over data of different sizes.
To better understand Flink, you need to know basic Flink processing semantics such as Streams, State, and Time as well as APIs that provide significant flexibility and convenience.
- Streams: Streams are divided into two types: bounded streams and unbounded streams. Unbounded streams have a start but no defined end. Bounded streams have a defined start and an end. Unbounded stream data continuously increases over time and the computations are continuous with no end. Contrary to unbounded streams, bounded stream data has a fixed size and an end when you complete the computations.
- State: State is data information generated during computations and plays a significant role in fault tolerance, failure recovery, and checkpoints. In essence, stream computing is incremental processing. Therefore, stream computing requires keeping and querying a state continuously. Also, to ensure the exactly-once semantics, data needs to be written into the state. Persistent storage ensures exactly-once when the entire distributed system fails or crashes. This is another state role.
- Time: Time in Flink includes Event time, Ingestion time, and Processing time. Because unbounded data processed in Flink is a continuous process, time is an important basis to judge upon whether the state experiences latency and whether data is processed on time.
- API: APIs are usually divided into three layers` from top to bottom: SQL/Table API, DataStream API, and ProcessFunction. APIs show powerful expressiveness and abstraction. However, the closer it is to the SQL layer, the weaker the expressiveness. On the contrary, APIs in the ProcessFunction layer has strong expressiveness and support various flexible operations, but the abstraction capability is smaller.
The Flink architecture has the four following features:
- Unified Data Processing Framework: Flink has a unified framework for processing bounded and unbounded data streams.
- Flexible Deployment: The underlying layer of Flink supports many resource schedulers, including YARN and Kubernetes. The built-in Flink standalone scheduler also supports flexible deployment.
- High Scalability: Scalability is very important for distributed systems. During Alibaba’s Double 11 events, Flink helps to process large amounts of data and delivers good performance while processing up to 1.7 billion data entries per second.
- Excellent Stream Processing Performance: The biggest advantage of Flink over is that it completely abstracts state semantics into the framework and supports reading state locally. This avoids a large amount of network I/O and significantly improves the performance of reading and saving state.
We will provide special courses on this topic later. This section only describes operations and maintenance (O&M) and service monitoring in Flink.
- Thanks to the consistent checkpoints in the Flink implementation, Flink has a 24/7 available Service Oriented Architecture (SOA). Checkpoints are the core mechanism to handle fault tolerance in Flink, which regularly records the state of Operators during computations and generates snapshots for persistent storage. When a Flink job fails and crashes, Flink allows you to selectively restore the job from checkpoints to ensure computational consistency.
- Flink provides functions and interfaces for monitoring, O&M, as well as built-in Web UI. It also provides DAG graphs and various metrics for running jobs and helping you manage job state.
Flink Scenario: Data Pipeline
The core application scenario of Data Pipelines is moving data and cleansing or processing a portion of the data during the move. On the left side of the entire architecture, the diagram is Periodic extract, transform, load (ETL,) which enables stream ETL or real-time ETL and allows you to subscribe to messages in the message queue and process them. After cleansing, write the data into the downstream database or file system in real-time.
For better understanding, consider the following scenario examples:
- Real-time Data Warehouse
While creating a data warehouse in the upstream, you may need real-time Stream ETL. During this process, data is cleansed or expanded in real-time. After data cleansing, it’s written into the downstream real-time data warehouse This ensures the timeliness of data queries and real-time implementation of data collection, data processing, and querying in the downstream.
- Search Engine
Take Taobao for example. When a seller releases a new item, a message stream is generated in the background. When passing the message into the Flink system, data is processed and expanded in real-time. Then a real-time index is built on the processed and expanded data before writing it into the search engine. Therefore, when a Taobao seller puts a new item on sale, search for that item is done through the search engine just a few seconds or minutes later.
Flink Scenario: Data Analytics
The figure shows Batch Analytics on the left side and Streaming Analytics on the right. Batch Analysis uses methods such as MapReduce, Hive, and Spark Batch to analyze and process jobs and generate offline reports. Streaming Analytics uses streaming analysis engines such as Storm and Flink to process and analyze data in real-time. Using Streaming Analytics in scenarios such as real-time dashboards and reports is more common.
Flink Scenario: Data-Driven
To a certain extent, all real-time data processing or stream processing is data-driven. Stream computing is essentially data-driven computing. A common Flink application scenario has to do with risk control systems. When a risk control system needs to process a variety of complex rules, data-driven computing allows writing rules and logic into APIs in a DataStream or ProcessFunction and then abstracting logic into the entire Flink engine.
When external data streams or events enter the stream, corresponding rules are triggered. This is how data-driven computing works. After specific rules are triggered, a data-driven application starts processing or creates alerts, which are distributed to the downstream system to generate a business notification. Employing data-driven computing is a typical Flink scenario for processing complex events.
Stateful Stream Processing
Traditional Batch Processing
In traditional batch processing, time is the basis of creating different batches; you continuously collect data and then batch operations are performed periodically. However, consider a scenario where you need to count the number of event conversions per hour.
If events span beyond the defined time splits, traditional batch processing will bring the intermediate calculation results to the next batch. Also, by receiving events in reverse order, tradition batch processing also brings intermediate states to the results of the next batch. Therefore, traditional batch processing is not ideal for some scenarios.
Features of Ideal Solutions
- State: An ideal solution ensures that the engine is capable of accumulating and maintaining state. The cumulative state represents all events that have been received in the past and affects the output.
- Time: With time, the engine has a mechanism to control data integrity. After receiving data, computational results are returned.
- Results Available in Real-time: Producing real-time results is an ideal solution. More importantly, a new model for processing continuous data is needed to process real-time data to fully comply with the characteristics of continuous data.
In simple words, an endless data source continuously collects data and code serves as the basic logic for data processing; results are generated and returned after the data from the data source is processed by code.
Distributed Stream Processing
Assume that Input Streams have multiple users and each user has its ID. To calculate the occurrences of each user, transfer the occurrence events of the same user to the same operation code — this is similar to the Group By operation on other batch jobs. Therefore, like streams, it is also required to implement partitions, set the corresponding keys and flow the same keys to the same computation instance for the same operation.
Stateful Distributed Stream Processing
As shown in the figure, the code defines variable X, which is read and written during data processing. When returning the final output result, the output can be determined based on variable X which affects the final output result. In this process, the first key point is that the state is co-partitioned by the key.
Similar to the preceding user occurrence example, the same key will flow to the computation instance. The number of occurrences is called state, which accumulates on the same computation instance together with events having the same key.
This is equivalent to state co-partitioned with the input stream by key. When partitions enter the stream, the cumulative state of the steam also becomes a co-partition. The second key point is the embedded local state backend.
When using a stateful distributed stream processing engine, the cumulative state becomes large over time. If there are too many keys, the state may exceed the memory load of a single node. At this point, a state backend must be available to maintain the state. If the backend works normally, in-memory maintains state by itself.
Advantages of Apache Flink
State and Fault Tolerance
When we consider fault tolerance, we may think of exactly-once fault tolerance. Whether it is state accumulated, when applications perform computations, each input event reflects state or state changes. If there are multiple modifications, results generated from the data engine may be not reliable.
- How to ensure that the state has the exactly-once fault-tolerance guarantee?
- How to produce global consistent snapshots for multiple operators with the local state in distributed scenarios?
- How to create a snapshot without interrupting the operation?
Exactly-Once Fault Tolerance Guarantee in Simple Scenarios
For example, if the number of occurrences of a user is not accurately calculated and does not have the exactly-once guarantee, the result is not reliable. Before considering precise fault-tolerance guarantees, let’s consider some very simple use cases.
For instance, while writing unbounded stream data, a single processor performs computations.
The state is accumulated for each completed computation. To ensure that the processor produces exactly-once fault tolerance, create a snapshot after each piece of data is processed post the state changes. The snapshot is included in the queue and compared with the corresponding state to ensure consistent snapshots, which ensure exactly-once guarantee.
Distributed State and Fault Tolerance
As a distributed processing engine, Flink generates only one global consistent snapshot when performing multiple operations on the local state. To create a global consistent snapshot without interrupting the operations, use distributed state and fault tolerance.
- Global Consistency Snapshot
When operators perform operations on individual nodes in a distributed environment, snapshots of individual date records are processed continuously. The operation flows through all operation values. After all the operation values are changed, you can see the state of each operation value and the position of the corresponding operation. Create a consistent snapshot, so a global consistent snapshot is the extended form of a consistent snapshot in simple scenarios.
- Failure Recovery
First, let’s take a look at Checkpoints. As mentioned before, the local state backend of each Operator must maintain the state. After generating a checkpoint, it’s transferred to the shared DFS every time. When one processor fails, recover the state of all the operation values from the three checkpoints and reset to the corresponding positions. Checkpoints allow processors to implement exactly-once in distributed environments.
The method for continuously generating global consistency snapshots without interrupting the operation is based on the simple Lamport algorithm. Flink continuously injects checkpoint barriers in a data stream, for example, Checkpoint Barrier N-1. Checkpoint barrier N means that the data in the range is for Checkpoint Barrier N.
Assume that you need to generate a Checkpoint Barrier N, but the job manager in Flink actually triggers checkpoints. After triggering the checkpoints, Checkpoint barriers generate from the data source. As the figure shows when the job starts to create Checkpoint Barrier N, Checkpoint Barrier N needs to gradually fill in the table in the lower-left corner.
As shown in the figure, when some events are marked red and Checkpoint Barrier N is also red, it implies that the Checkpoint Barrier N is responsible for these events or the corresponding data. The data or events in the white part after Checkpoint Barrier N do not belong to Checkpoint Barrier N.
When the data source receives Checkpoint Barrier N, it’s saving its own state first. For example, in Apache Kafka, the state of the data source is its partition in Kafka and will also be written into the table above. When Operator 1 starts to calculate the data for Checkpoint Barrier N and Checkpoint Barrier N flows to Operator 1 with the data, Operator 1 also reflects all the data of Checkpoint Barrier N in the state. When Checkpoint Barrier N is received, it creates a snapshot of the checkpoint.
After creating the snapshot, Operator 2 receives all data, searches for the data of Checkpoint Barrier N, and directly reflects the status. When the state receives Checkpoint Barrier N, it is directly written to Checkpoint N. At this point, Checkpoint Barrier N has filled in the entire table, which is a distributed snapshot. Use distributed snapshots for fault tolerance to recover the state.
When a node fails, recover jobs on that node from the previous checkpoint. The process continues: Creating multiple checkpoints at the same time and Checkpoint Barrier N has flown to job manager 2. The Flink job manager triggers other checkpoints such as checkpoint N+1 and checkpoint N+2. With this mechanism, continuous checkpoints generation is done without interrupting the operation.
State maintenance means to maintain the state value by using code. A local state backend is required to support a very large state value.
In the figure, the
getRuntimeContext().getState(desc); API can be used to register the state in Flink. Flink provides multiple state backends. After the API registers the state, state is read by using a state backend. Flink has two different state values and two different state backends.
- JVM Heap State Backend: This backend is suitable for maintaining a small number of states. The JVM Heap state backend performs Java object reads/writes each time an operator value reads the state, without having high overhead. There is a need for serialization when a checkpoint needs to put the local state of individual operation values into distributed snapshots.
- RocksDB State Backend: It is an out-of-core state backend. When a user reads the state by using the local state backend of the runtime, you can perform and maintain the state in the disk. Accordingly, you require serialization and deserialization each time the state is read. Upon snapshotting, you only need to serialize the application. Serialized data is transferred to the central shared DFS.
Flink currently supports the 2 state backends, memory state backend and state backend with the resource disk. Choose a proper state backend according to the number of states when maintaining state.
Different Types of Time
Before the emergence of Flink and other advanced stream processing engines, processing engines for big data only supported processing time. Assume there’s a defined hourly event time window. For example, if running on processing time, data engines perform operations on the data received between 3 o’clock and 4 o’clock. When making reports or analyzing results, we want to know the output results of the data generated between 3 o’clock and 4 o’clock. To do this, use event time.
As shown in the figure, event time is the time that each event occurred on its producing device and carried a timestamp, used for operations. An example of what event time processing does could be when the initial queue receives data and divides data into one batch every hour.
Event Time Processing
In event time, the time stamp for an event is used for re-bucketing. Put the data between 3 o’clock and 4 o’clock into the corresponding bucket for this time and then generate the results. Therefore, event time and processing time are two different concepts.
An important feature of the event, time is that it records the time when an engine returns the operation results. Stream engine runs and collects data around the clock. For example, a window operator is performing operations in a pipeline and results are generated every hour. In this case, the main role of event time is determining when to return the window operation values to indicate that the expected data is received.
Flink implements event time based on watermarks. Watermarks are also a special event in Flink. The core principle of watermarks is that the Watermark(t) declares that event time has reached time t in that stream, so you’re not receiving any more elements from the stream with a timestamp t.
Watermarks help to accurately estimate the end time for receiving data. For example, assume that the duration between the expected time of receiving data and returning time is delayed by five minutes.
When all the window operators in Flink search for the data between 3 o’clock and 4 o’clock, the required data is collected only after it collects data at 4:05 due to latency. Next the results of the data between 3 o’clock and 4 o’clock are returned. The results within this time period are part of watermarks.
Save and Migrate State
Since stream processing applications run all the time, we need to consider several important O&M factors.
- When changing application logic or fixing bugs, how to move the state of the previous execution to a new execution?
- How to redefine the parallelism level?
- How to upgrade the version number of the operation cluster?
Checkpoints perfectly answer the preceding questions. Additionally, Flink provides another term called Savepoint. A manually generated checkpoint is called a savepoint. Checkpoints are different from savepoints. Flink generates checkpoints continuously for a stateful application by using distributed snapshots. Savepoints record the state of all operators in a stream application.
The preceding figure shows two savepoints: Savepoint A and Savepoint B. To change the underlying code logic, fix bugs, upgrade Flink versions or redefine applications or parallelism of computations, the first thing to do is produce Savepoints.
Manually inject Checkpoint barriers flow to all pipelines producing distributed snapshots, which are savepoints. Savepoints can be stored anywhere. After making the changes, Flink supports resuming and running directly from savepoints.
Note that when resuming from savepoints, the time continues while making application changes. For example, Kafka continuously collects data. When resuming from savepoints, savepoints hold the time when checkpoints were produced and the corresponding positions in Kafka. Therefore, it is necessary to restore the latest data. Event time ensures completely consistent results irrespective of operations.
Assume that operations after recovery use Process event time and the window is an hour. Include all operation results in a single window within 10 minutes. However, if you’ve used the event time, the process is similar to the Bucketing operation. Bucketing ensures that the time of re-operations and the results in the window are consistent regardless of how many operations are to be performed.
This article covers the definition, architecture, and basic principles of Apache Flink and explains some basic concepts about stream computing for big data. Additionally, it provides a recap of the evolution of big data processing methods and principles of stateful stream processing.
Finally, from the perspective of challenges to stateful stream processing, we analyze the natural advantages of Apache Flink, one of the best stream computing engines worldwide. We hope that now you have a clear understanding of the basic concepts about stream processing engines for big data and find it easier to use Flink.