Apache Flink Fundamentals: State Management and Fault Tolerance

Basics of State Management

What Is State?

  • Deduplication: For instance, data in the upstream system is duplicate and you want to remove duplicate data while the data appears in the downstream system. To remove duplicates, you must know which data already exists and which data has not been stored before. Therefore, it requires you to record all primary keys and see whether an inbound data entry already exists in the primary key.
  • Window Computing: For example, to count how many times a specific Nginx log API is accessed per minute. You need to calculate a window every minute. Suppose for the 08:00–08:01 window, the memory must store data in the first 59 seconds. This implies first retaining the data during the window and consecutively returning the data triggered throughout the window as the output at 8:01. Untriggered window data is also a state.
  • Machine Learning/Deep Learning: If a trained model and the model parameters are also states, machine learning uses a data set each time to provide feedback on the model.
  • Historical Data Access: To compare today’s data with yesterday’s data, you must have access to historical data. Since reading data from external systems each time for the data comparison consumes too many resources, it is efficient to put the historical data into the state.

Why is State Management Necessary?

  • 24/7 operation and high reliability
  • No data loss and duplicates with exactly one computation
  • Real-time data output without latency

Ideal State Management

  • Ease of Use: Flink provides rich data structures, various forms of state organization, and simple extension interfaces, to simplify state management and make it more convenient.
  • High Efficiency: Running real-time jobs generally requires low latency and fast recovery in case of failures. When there is insufficient processing capability to run jobs, you must scale out and implement backup without any impact on the performance of current processing jobs.
  • High Reliability: Flink supports state persistence, semantics that implements no data loss and duplicates, and automated fault tolerance. For example, to ensure high availability, if a node fails, another node automatically takes over without manual intervention.

Flink State Types and Use Cases

Managed State and Raw State

  • State Management Mode: Flink runtime maintains the Managed State. It stores, auto-recovers and optimizes for memory management. Users manage and serialize Raw State themselves. Flink does not know the data structures stored in the Raw State. Only users know the data structures. There is a need to serialize Raw State into storable data structures.
  • Sate Data Structures: Managed State supports known data structures such as Value, List, and Map. Raw State only supports byte arrays and requires conversion of all states to binary byte arrays.
  • Use Cases: Managed State is applicable in most cases, while Raw State is only recommended when Managed State can’t meet specific requirements, for example, customizing operators.

Keyed State and Operator State

Use Cases of Keyed State

  • ValueState stores a single value. For example, in Wordcount scenarios, words are keys and State is the word count. The single value is either a number or string. Two types of access interfaces are used, get and set. Set value using update(T) and retrieve using T value().
  • The data type of MapState is Map and you can use both Put and Remove. Note that the key of MapState is different from the key of the Keyed state.
  • The data type of ListState is list, and you can use both, Add and Update.
  • ReducingState, AggregatingState, and ListState belong to the parent type. However, ReducingState keeps a single value because the Add method does not add current elements to the list; instead, it updates the current elements to the results of ReducingState.
  • AggregatingState differs from ReducingState in the interface. For ReducingState, the elements added by using add(T) are of the same type as the elements retrieved by using T get(). For AggregatingState, the input is IN and the output is OUT.

Fault Tolerance and Failure Recovery

How to Save and Restore State

  • Pass 1000 from the job execution environment, env.enableCheckpointing, which indicates that the interval between the two Checkpoints is 1 sec. Frequent Checkpoints imply lesser data recovery. Meanwhile, Checkpoints consume some I/O resources.
  • Set a mode for checkpoints. Set Exactly_Once and configure the alignment of Barriers to avoid message loss or duplication.
  • setMinPauseBetweenCheckpoints (500) means that the minimal pause between Checkpoints is 500 ms. For example, a Checkpoint takes 700 ms. However, according to the principle, if a Checkpoint takes more than 300 ms, it should skip to the next Checkpoint because there is a checkpoint every 1000 ms. Since the waiting time is shorter than 500 ms, there is a need for an additional 200 ms waiting time. Therefore, this prevents low processing speed due to very frequent checkpoints.
  • setCheckpointTimeout indicates the timeout interval of checkpoints. Checkpoints timeout and discard if they do not complete within 1 minute.
  • setMaxConcurrentCheckpoints indicates how many Checkpoints are in progress. Set this parameter as per the requirements.
  • enableExternalizedCheckpoints indicates that current Checkpoints are retained after job cancellation. By default, checkpoints are deleted when the entire job is canceled. Checkpoints are job-level savepoints.
  • Trigger Management: While Flink automatically triggers and manages Checkpoints, users trigger and manually manage Savepoints.
  • Usage: Checkpoints allow fast recovery when tasks encounter exceptions, including network jitter or timeout. On the other hand, Savepoints enable scheduled backup and allow you to stop-and-resume jobs, such as modifying code or adjusting concurrency.
  • Features: Checkpoints are lightweight, implement automatic recovery from job failures and are deleted by default after job completion. Savepoints, on the other hand, are persistent and saved in a standard format. They allow code or configuration changes. Resuming a job from a Savepoint requires manual initiation and path specification.

Available State Backends


Original Source:




Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Microservices — The Good, The Bad, The Ugly

WebRTC in a Nutshell (Episode-I)

Best Android & iOS Tools For Mobile App Development

Keyfinder Free Vst Pirate Bay

Kickstart your GitOps practices today

X Swap Protocol

cs371p Spring 2022 Week 6: Badr Belhiti

Maybe We Think in Algorithms

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

Stream avro data from kafka over ssl to Apache pinot

JMX Exporter for Kafka Metrics

Elasticsearch for Multi-Tenant Architecture

Confluent Kafka Multi-Region Cluster in 2 minutes