Best Practices and Tips for Working with Flink State: Flink Advanced Tutorials

Image for post
Image for post

By Tang Yun(Cha Gan)

Flink State Overview

State in stream computing, such as in Flink, is considered to be the information that operators must remember about past input as data flows through the system. As shown in the following diagram, when an application receives data events, it transforms the events to downstream operators and performs arbitrary computations that involve reading data from or writing data to the state. Local state backends maintain all states in local memory or within an embedded key-value store.

Image for post
Image for post

There are four primary areas of difference in the two basic kinds of Flink state- Keyed State and Operator State.

1) currentKey: There is no currentKey in Operator State. However, there is always a currentKey in Keyed State that matches the state value.
2) On-heap/Off-heap store: Operator State is always stored on-heap, whereas keyed state backends support the use of both on-heap and off-heap memory to store state objects. Only Keyed State has the option of being stored in RocksDB.
3) Manual/Automatic snapshotting and recovery: For an operator state, you must take snapshots and restore from snapshots manually, whereas the snapshotting and recovery of a keyed state are implemented automatically by Flink, with the snapshots not exposed to the user.
4) Large/Small state size: In general, operator states are of smaller sizes than keyed states. Note that these rules are based on practice and are not applied universally.

The following figure gives an overview of the state backends classification. At the top, there are three types of state backends. The section in green describes the state backends that hold data internally as objects on the Java heap. The yellow color indicates objects stored in off-heap memory.

Image for post
Image for post

FsStateBackend and RocksDBStateBackend are recommended for production environments.

  • FsStateBackend: This state backend provides better performance. With FsStateBackend, local states are maintained as on-heap objects, which also means that users may potentially run into OutOfMemory errors. It does not support incremental checkpoints.
  • RocksDBStateBackend: This state backend allows much larger state sizes, making it a good choice for most applications.

RocksDB is an open-source database for key-value data that is based on a log-structured merge-tree (LSM tree) data structure. It is a widely used component in big data systems. As keyed states are essentially key-value maps, they are serialized and maintained as key-value pairs in RocksDB. The following table lists the formats for WindowState and ValueState in state entries across RocksDB. The keys and values are stored as serialized bytes within RocksDB.

Image for post
Image for post

Every distinct state of the operator is stored in a separate column family within RocksDB, and every state includes its own set of write buffer and block cache. WindowState and ValueState in the preceding table belong to two different column families.

The following table includes the descriptions and recommended actions for some key RocksDB configuration options. You will find more suggestions on RocksDB configuration from the Alibaba Cloud Community.

Image for post
Image for post

Best Practices and Tips

  • Use long lists with caution

The following is an example of StateMetaInfo as part of checkpoint acknowledgment messages sent to JobMaster upon checkpointing an operator state.

Image for post
Image for post

A common real-world use case of operator state in Flink is to maintain current offsets for Kafka partitions in Kafka sources. Since operator states are not organized into key groups, in order to change parallelism while restoring, Kafka must use an offset to maintain the position of the next message to be sent to a consumer. The circled section represents an array of partition offsets.

A big offset list can be dozens of megabytes. When it reports to the checkpoint coordinator (Flink’s JobMaster) with high parallelism, memory issues with JobMaster often occur. Some users even store operator states as a blacklist. But the blacklist turns out to be too big and overloads JobMaster upon checkpointing, and eventually causes program execution failure.

  • Use UnionListState correctly

UnionListState is widely used in Kafka connectors. It represents the state as a list of entries and is a variation where each sub-task gets the complete list of elements when a function is restoring from a checkpoint, as shown in the following figure.

Image for post
Image for post

UnionListState enables each operator to get the complete list of state elements upon recovery. Keep in mind that the tasks should not process the complete set, but a portion of the elements for the next snapshot. Otherwise, the state might continually grow in size as the job keeps being restarted.

  • How to clear application states?

The state.clear() method clears only the state for the currently active key. To clear all states, use applyToAllKeys.

Image for post
Image for post

If you only need to clear expired states, use the state time-to-live (TTL) feature instead.

  • How to handle huge values in RocksDB

As RocksDB’s JNI bridge API is based on byte[], the maximum supported size per value is 2³¹ bytes. Given what MapState holds is a map of keys and values, not a collection of all values added to the state, MapState is used as a replacement for ListState or ValueState in case the records get too big for the RocksDB JNI bridge.

  • How to monitor the overall performance of RocksDB State Backend

Enable RocksDB native metrics. Starting from Flink 1.10, Flink by default configures RocksDB’s memory allocation to the amount of managed memory of TaskManager. Keep this feature activated, open RocksDB native metrics, and watch for state.backend.rocksdb.metrics.block-cache-usage, state.backend.rocksdb.metrics.mem-table-flush-pending, state.backend.rocksdb.metrics.num-running-compactions, and state.backend.rocksdb.metrics.num-running-flushes, as they are important performance indicators.

The following are some example dashboard panels of Flink’s metric system in Flink 1.10 or later versions.

Image for post
Image for post

The following figure includes the same dashboard panels of Flink’s metric system but in Flink versions earlier than version 1.10, or in Flink 1.10 or later versions but with state.backend.rocksdb.memory.managed deactivated.

Image for post
Image for post
  • How to optimize the memory allocation of RocksDB in container environments

In Flink versions earlier than version 1.10, each state has its own block cache and write buffer. As a result, the amount of allocated memory by RocksDB grows infinitely if too many states are created in the operator. It’s problematic because the process may run out of the memory budget of the container, and the process will get killed. Users must take into account the number of RocksDB instances per slot and the number of states per RocksDB instance in order to track memory usage. However, this is highly impractical and difficult to carry out.

As mentioned earlier, Flink manages RocksDB’s memory automatically starting from version 1.10, which means that RocksDB is set up to limit native memory allocation to the size of the managed memory. Due to the implementation of certain RocksDB caches (will discuss this in another article), you may still need to use native metrics to monitor the block cache size. Also, you may choose to allocate the memory to increase taskmanager.memory.task.off-heap.size to free up more native memory.

  • Set an appropriate checkpoint interval

Theoretically, Flink supports very short checkpoint intervals. However, in actual production, very frequent checkpoints cause high overhead during regular processing. Additionally, mutual exclusion is instituted between checkpointing and record processing, which means a short interval may compromise the overall application performance. Hence, if you need to alleviate the workload on the distributed file system, we recommend setting an appropriate checkpoint interval.

  • Specify a proper timeout period

The default timeout period for checkpoints is 10 minutes. You need to set a longer duration for large states. The worst-case scenario is that checkpoints are realized through distributed snapshots faster than they are deleted by a single JobMaster, which may severely impact cluster storage. if checkpoint timeouts are frequent, we recommend extending the timeout period.

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store