Flink Checkpoints Principles and Practices: Flink Advanced Tutorials

Image for post
Image for post

By Tang Yun (Chagan)

Relationship between Checkpoints and States

A checkpoint in Apache Flink is a global operation that is triggered by the source nodes to all downstream nodes. As shown in the red box in the following figure, a total of 569,027 checkpoints are triggered and all are completed successfully.

Image for post
Image for post

A state is the data for persistent backup made by a checkpoint. As shown within the red box in the following figure, the state size of Checkpoint 569116 is 9.17 KB.

Image for post
Image for post

What Is a State?

Let’s understand the definition of a state. The following figure shows a typical word count code, which is used to monitor the data of the local port 9000 and count the word frequency of network port input. If we execute Netcat locally and enter “hello world” on the command-line interface (CLI), what kind of output will we get?

Image for post
Image for post

The output includes and .

If we enter “hello world” on the CLI again, what kind of output will we get?

The output includes and . When "hello world" is entered for the second time, Flink determines that these two words already appeared once each by checking a type of state called keyed state, which stores previous word frequency statistics.

Let’s review the word count code. When the keyBy interface is called, a KeyedStream is created to divide the keys, which is a prerequisite for using a keyed state. Then, the sum method calls the built-in StreamGroupedReduce to implement the keyed state.

Image for post
Image for post

What Is a Keyed State?

A keyed state has two features:

  • It is used in functions and operators on a KeyedStream, such as keyed user-defined functions (UDFs) and window state.
  • A keyed state is already partitioned or sharded, and each key belongs to only one keyed state.

It will be better to understand the concept of partitioning by looking at the keyBy semantics. The following figure shows three concurrencies on the left and three concurrencies on the right. The keyBy interface distributes incoming words, as shown in the left part of the figure. For example, when we enter “hello world”, the word “hello” is always distributed to the concurrent task in the lower-right corner by the hash algorithm.

Image for post
Image for post

What Is an Operator State?

  • An operator state, also called a non-keyed state, is bound to only one operator instance.
  • A common type of operator state is the source state, which is used to record the current source’s offset.

The following figure shows the word count code that uses an operator state.

Image for post
Image for post

calls the class. An operator state of the list state type is used. The following figure shows a classification of states by type.

Image for post
Image for post

The classification of states is also based on whether they are directly managed by Flink.

  • A managed state is a type of state which is managed by Flink. The states mentioned earlier are all managed states.
  • The raw state only stores data from Flink-provided streams. Flink views the raw state as a series of bytes.

We recommend using managed states in actual production environments. It is the focus of this article.

How to Use States in Flink

The following figure shows how to use the keyed state in code. Consider the class used by the sum method in the preceding word count code as an example.

Image for post
Image for post

The following figure shows a detailed description of the class and illustrates how to use the operator state in code.

Image for post
Image for post

Checkpoint Execution

Before introducing the checkpoint execution mechanism, we need to understand the storage of states because states are essential for the persistent backup of checkpoints.

Classification of State Backends

The following figure shows three built-in state backends in Flink. and are stored in Java heap during runtime. persistently saves data in the form of files to remote storage only when a checkpoint is executed. uses RocksDB (an LSM database and combines memory and disks) to store states.

Image for post
Image for post

Following are the ways to execute :

  • Support for asynchronous checkpoints (default): The storage format is CopyOnWriteStateMap.
  • Support for synchronous checkpoints only: The storage format is NestedStateMap.

When is used in MemoryStateBackend, by default, the maximum data volume is 5 MB during checkpoint-based data serialization.

For , each state is stored in a separate column family. The keyGroup, Key, and Namespace are serialized and stored in the database in the form of keys.

Image for post
Image for post

Detailed Explanation of Checkpoint Execution

This section explains the step by step checkpoint execution process. As shown in the following figure, a checkpoint coordinator is located on the left, a Flink job that consists of two source nodes and one sink node is in the middle, and persistent storage is located on the right, which is provided by the Hadoop Distributed File System (HDFS) in most scenarios.

Step 1) The checkpoint coordinator triggers a checkpoint to all source nodes.

Image for post
Image for post

Step 2) The source nodes broadcast a barrier downstream. The barrier is at the core of the Chandy-Lamport distributed snapshot algorithm. The downstream tasks execute the checkpoint only after receiving the barriers of all inputs.

Image for post
Image for post

Step 3) After completing state backup, the tasks send the backup data address, namely, the state handle, to the checkpoint coordinator.

Image for post
Image for post

Step 4) After receiving the barriers of the two upstream inputs, the downstream sink node executes the local snapshot. The following figure shows the process of executing the RocksDB incremental checkpoint. RocksDB flushes full data to the disk, as shown by the red triangle. Then, Flink implements persistent backup for the non-uploaded files, as shown by the purple triangle.

Image for post
Image for post

Step 5) After executing its checkpoint, the sink node returns the state handle to the checkpoint coordinator.

Image for post
Image for post

Step 6) After receiving the state handles of all tasks, the checkpoint coordinator determines that the checkpoint is globally completed and then backs up a checkpoint metafile to the persistent storage.

Image for post
Image for post

EXACTLY_ONCE Semantics of Checkpoints

To implement the EXACTLY ONCE semantics, Flink uses an input buffer to cache data during alignment and processes the data after alignment is completed. To implement the AT LEAST ONCE semantics, Flink directly processes the collected data without caching it. In this case, data may be processed multiple times when being restored. The following figure shows the schematic diagram of checkpoint alignment, which is sourced from the official Flink documentation.

Image for post
Image for post

The checkpoint mechanism only ensures EXACTLY ONCE for Flink computing. End-to-end EXACTLY ONCE requires the support of source and sink nodes.

Differences between Savepoints and Checkpoints

Savepoints and checkpoints are used to resume jobs. The following table lists the differences between them.

Image for post
Image for post

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store