Flink Checkpoints Principles and Practices: Flink Advanced Tutorials

Image for post
Image for post

By Tang Yun (Chagan)

Relationship between Checkpoints and States

Image for post
Image for post

A state is the data for persistent backup made by a checkpoint. As shown within the red box in the following figure, the state size of Checkpoint 569116 is 9.17 KB.

Image for post
Image for post

What Is a State?

Image for post
Image for post

The output includes (hello, 1) and (world, 1).

If we enter “hello world” on the CLI again, what kind of output will we get?

The output includes (hello, 2) and (world, 2). When "hello world" is entered for the second time, Flink determines that these two words already appeared once each by checking a type of state called keyed state, which stores previous word frequency statistics.

Let’s review the word count code. When the keyBy interface is called, a KeyedStream is created to divide the keys, which is a prerequisite for using a keyed state. Then, the sum method calls the built-in StreamGroupedReduce to implement the keyed state.

Image for post
Image for post

What Is a Keyed State?

  • It is used in functions and operators on a KeyedStream, such as keyed user-defined functions (UDFs) and window state.
  • A keyed state is already partitioned or sharded, and each key belongs to only one keyed state.

It will be better to understand the concept of partitioning by looking at the keyBy semantics. The following figure shows three concurrencies on the left and three concurrencies on the right. The keyBy interface distributes incoming words, as shown in the left part of the figure. For example, when we enter “hello world”, the word “hello” is always distributed to the concurrent task in the lower-right corner by the hash algorithm.

Image for post
Image for post

What Is an Operator State?

  • A common type of operator state is the source state, which is used to record the current source’s offset.

The following figure shows the word count code that uses an operator state.

Image for post
Image for post

fromElements calls the FromElementsFunction class. An operator state of the list state type is used. The following figure shows a classification of states by type.

Image for post
Image for post

The classification of states is also based on whether they are directly managed by Flink.

  • A managed state is a type of state which is managed by Flink. The states mentioned earlier are all managed states.
  • The raw state only stores data from Flink-provided streams. Flink views the raw state as a series of bytes.

We recommend using managed states in actual production environments. It is the focus of this article.

How to Use States in Flink

Image for post
Image for post

The following figure shows a detailed description of the FromElementsFunction class and illustrates how to use the operator state in code.

Image for post
Image for post

Checkpoint Execution

Classification of State Backends

Image for post
Image for post

Following are the ways to execute HeapKeyedStateBackend:

  • Support for asynchronous checkpoints (default): The storage format is CopyOnWriteStateMap.
  • Support for synchronous checkpoints only: The storage format is NestedStateMap.

When HeapKeyedStateBackend is used in MemoryStateBackend, by default, the maximum data volume is 5 MB during checkpoint-based data serialization.

For RocksDBKeyedStateBackend, each state is stored in a separate column family. The keyGroup, Key, and Namespace are serialized and stored in the database in the form of keys.

Image for post
Image for post

Detailed Explanation of Checkpoint Execution

Step 1) The checkpoint coordinator triggers a checkpoint to all source nodes.

Image for post
Image for post

Step 2) The source nodes broadcast a barrier downstream. The barrier is at the core of the Chandy-Lamport distributed snapshot algorithm. The downstream tasks execute the checkpoint only after receiving the barriers of all inputs.

Image for post
Image for post

Step 3) After completing state backup, the tasks send the backup data address, namely, the state handle, to the checkpoint coordinator.

Image for post
Image for post

Step 4) After receiving the barriers of the two upstream inputs, the downstream sink node executes the local snapshot. The following figure shows the process of executing the RocksDB incremental checkpoint. RocksDB flushes full data to the disk, as shown by the red triangle. Then, Flink implements persistent backup for the non-uploaded files, as shown by the purple triangle.

Image for post
Image for post

Step 5) After executing its checkpoint, the sink node returns the state handle to the checkpoint coordinator.

Image for post
Image for post

Step 6) After receiving the state handles of all tasks, the checkpoint coordinator determines that the checkpoint is globally completed and then backs up a checkpoint metafile to the persistent storage.

Image for post
Image for post

EXACTLY_ONCE Semantics of Checkpoints

Image for post
Image for post

The checkpoint mechanism only ensures EXACTLY ONCE for Flink computing. End-to-end EXACTLY ONCE requires the support of source and sink nodes.

Differences between Savepoints and Checkpoints

Image for post
Image for post

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store