Apache Flink Fundamentals: State Management and Fault Tolerance
By Sun Mengyao and compiled by Han Fei
Basics of State Management
What Is State?
Before we dive into the basics of state management, let’s consider an example of stateless computing. Suppose in consumption latency computing there’s a message queue wherein a producer continuously writes messages into the consumer queue and from which multiple consumers read messages simultaneously. As the preceding figure shows, the producer writes 16 messages, with the offset standing at 15. The three consumers consume data at different speeds. The fastest consumer consumes 13 messages, while slower consumers consume about 8 messages.
By how many messages does each consumer lag behind in real-time? The preceding figure provides an input and output example. For the input, each point in time has a timestamp. The timestamps infer the position where the producer writes messages as well as the positions where each consumer reads at the given point in time.
As shown in the figure, the producer has written 15 messages and the three consumers have read 10, 7, and 12 messages respectively. A question arises — how to convert the progress of the producer and consumers into the schematic information shown on the right side? Consumer 0 is 5 messages behind; Consumer 1 is 8 messages behind; Consumer 2 is 3 messages behind. According to Flink design principles, the Map operation is required at this point. The Map operation reads the messages and deducts the number of messages to detect by how many messages each consumer lags behind the producer. The final result is obtained as the Map operation continues.
Under this computing model, the output result remains the same, irrespective of how many times the same message is computed; because a single input entry already contains all the required information. The number of messages produced by the producer minus the number of messages already read by consumers is the consumption latency. As the consumption by both, the producer and consumers are found in a single entry, the same input means the same output, which is a stateless computation.
Now, what is stateful computing?
Consider the access log count as an example. Assuming you have an Nginx access log, where each log represents a request and records where the request comes from and which URL is accessed. Now, count how many times each URL is accessed, that is, how many times each API is invoked. As the simplified input and output shows, a total of three Nginx logs are generated. The first input entry requests
GET /api/a at a certain point in time; the second log records
Post /api/b at a certain point in time; the third entry requests
GET /api/a. From the three Nginx logs, the first output entry shows that
/api/a is accessed once.
The second output entry shows that
/api/b is accessed once, and the third output entry shows that
api/a is accessed twice. The two
/api/a Nginx logs have the same input but different outputs (for one log, the count is 1; for the other, the count is 2), indicating that the output may be different although the input is the same. The output depends on how many times the requested API URL was accessed cumulatively.
For the first input, the cumulative access is 0, so the count is 1. for the second input, the API was accessed once before, so the cumulative count of the accesses to
/api/a is 2. A single entry contains information about the current access instead of all the accesses. To obtain all the information, you also need the cumulative API access count, namely, state.
The computing model is to combine data and operators for performing complicated computation tasks and generating output data. In the process, operators access the stored state. Additionally, the impact of the current data on the state updates in real-time. Thus, if you enter 200 pieces of data, you get 200 results as a return.
Next, which scenarios require state information?
The following four common scenarios demand state information.
- Deduplication: For instance, data in the upstream system is duplicate and you want to remove duplicate data while the data appears in the downstream system. To remove duplicates, you must know which data already exists and which data has not been stored before. Therefore, it requires you to record all primary keys and see whether an inbound data entry already exists in the primary key.
- Window Computing: For example, to count how many times a specific Nginx log API is accessed per minute. You need to calculate a window every minute. Suppose for the 08:00–08:01 window, the memory must store data in the first 59 seconds. This implies first retaining the data during the window and consecutively returning the data triggered throughout the window as the output at 8:01. Untriggered window data is also a state.
- Machine Learning/Deep Learning: If a trained model and the model parameters are also states, machine learning uses a data set each time to provide feedback on the model.
- Historical Data Access: To compare today’s data with yesterday’s data, you must have access to historical data. Since reading data from external systems each time for the data comparison consumes too many resources, it is efficient to put the historical data into the state.
Why is State Management Necessary?
The most direct and common state management method is to put data in memory. For example, in a Wordcount scenario, words are the input, and Count is the output. The process continuously accumulates the input to count.
Streaming jobs have the following requirements.
- 24/7 operation and high reliability
- No data loss and duplicates with exactly one computation
- Real-time data output without latency
However, to meet the preceding requirements, some problems may occur in memory management. There is a limit to memory. To perform computations in a 24 hour time window, the data within the 24 hour period must be put in memory, which may lead to insufficient memory. Additionally, to ensure 24/7 service, there’s a need to implement high availability.
To overcome the downtime challenges, you must consider how to implement back up and recovery. You also need to consider scale-out. If a website does not receive many visits, the program that counts the number of visits to each API may run in single-thread mode. If the website experiences a surge of visits and a single node struggles to process all the access data, you must implement scale-out by adding several nodes. In such a case, another problem arises — how to evenly distribute the data state to newly-added nodes?
Therefore, to put all the data into memory is not the best solution for state management.
Ideal State Management
To implement ideal state management, you need to meet the following three requirements.
- Ease of Use: Flink provides rich data structures, various forms of state organization, and simple extension interfaces, to simplify state management and make it more convenient.
- High Efficiency: Running real-time jobs generally requires low latency and fast recovery in case of failures. When there is insufficient processing capability to run jobs, you must scale out and implement backup without any impact on the performance of current processing jobs.
- High Reliability: Flink supports state persistence, semantics that implements no data loss and duplicates, and automated fault tolerance. For example, to ensure high availability, if a node fails, another node automatically takes over without manual intervention.
Flink State Types and Use Cases
Managed State and Raw State
Managed State refers to an automatically managed Flink state, while the raw state is an inherent state whose data structures are not visible to Flink. Following are the key differences between both the states:
- State Management Mode: Flink runtime maintains the Managed State. It stores, auto-recovers and optimizes for memory management. Users manage and serialize Raw State themselves. Flink does not know the data structures stored in the Raw State. Only users know the data structures. There is a need to serialize Raw State into storable data structures.
- Sate Data Structures: Managed State supports known data structures such as Value, List, and Map. Raw State only supports byte arrays and requires conversion of all states to binary byte arrays.
- Use Cases: Managed State is applicable in most cases, while Raw State is only recommended when Managed State can’t meet specific requirements, for example, customizing operators.
Keyed State and Operator State
Further, the Managed State has two types- Keyed State and Operator State. In the Flink Stream model, the keyBy operation converts a DataStream into a KeyedStream.
Each key corresponds to a state which implies that an Operator instance processes multiple keys and accesses corresponding states, leading to Keyed State. You can only use Keyed State in operators on a KeyedStream. Thus, you cannot use a KeyedStream if a program does not have the keyBy operation.
On the contrary, Operator State is available in all operators and provides a better matching method than data sources. Operator State is often applicable in sources, for example, FlinkKafkaConsumer. Unlike the Keyed State, one Operator instance corresponds to a state. As the number of concurrency instances changes, the state migrates among instances along with keys. For example, if only one instance exists, both
/api/b are stored in the instance; if the requests grow and scaling becomes necessary, the state of
/api/b will be stored on different nodes. Since Operator State is not relative to keys when the concurrency instances change, it is necessary to select how to redistribute state.
Two built-in redistribution schemes are available, even-split redistribution and union redistribution. With reference to access, RuntimeContext implementation extends access to Keyed State. This requires an Operator to be a rich function. To use Operator State, a stateful function implements the CheckpointedFunction interface or the ListCheckpointed interface. In terms of data structures, Keyed State supports data structures such as ValueState, ListState, ReducingState, AggregatingState, and MapState, while Operator State supports fewer data structures, for example, ListState.
Use Cases of Keyed State
Keyed State is classified into many types. The preceding figure shows the relationship between several types of Keyed State. The first-level sub-types include ValueState, MapState, and AppendingState. AppendingState also has a sub-type called MergingState. MergingState is further divided into three sub-types. ListState, ReducingState, and AggregatingState. This inheritance causes differences in their access methods and data structures.
Let’s take a look at the differences between types of Keyed State.
- ValueState stores a single value. For example, in Wordcount scenarios, words are keys and State is the word count. The single value is either a number or string. Two types of access interfaces are used, get and set. Set value using update(T) and retrieve using T value().
- The data type of MapState is Map and you can use both Put and Remove. Note that the key of MapState is different from the key of the Keyed state.
- The data type of ListState is list, and you can use both, Add and Update.
- ReducingState, AggregatingState, and ListState belong to the parent type. However, ReducingState keeps a single value because the Add method does not add current elements to the list; instead, it updates the current elements to the results of ReducingState.
- AggregatingState differs from ReducingState in the interface. For ReducingState, the elements added by using add(T) are of the same type as the elements retrieved by using T get(). For AggregatingState, the input is IN and the output is OUT.
The following section uses ValueState as an example to show how to use it in a state machine scenario.
The preceding snapshot only shows a snippet of code. To see the complete source code, click here
The figure only shows the mainstay information and omits the content of the main methods and main functions for Flink jobs, input, output, and some personalized configuration items.
The sample includes two DataStreams. An event that loads data through
env.addSource and an alert. First, the system uses Keyby for the sourceAddress and then flatMap for creating a StateMachineMapper. The StateMachineMapper is a state machine. A state machine is a device that represents different states and changes in the state.
Consider a simple shopping scenario, when a buyer places an order and the system generates an order entry, the state is Pending Payment. When the payment is made successfully, the event state changes from Pending Payment to Paid and Pending Dispatch. Post shipment, the order state changes from Paid and Pending Dispatch to Being Delivered. When the buyer accepts the item, the state changes from Being Delivered to Accepted. Note that in the process, the order cancelation event may happen at any time and once the order cancels, the final event state changes to Cancelled.
How does Flink write state machines? To implement this, it needs RichFlatMapFunction. We need the Keyed State getRuntimeContext and the getRuntimeContext process requires a Rich Function. Therefore, use an open method to obtain the currentState and implement the getState. The currentState shows the current state. For instance, After order placement, the currentState is Pending Payment. Once the initialization is complete, the currentState shows that the order is created. The next step requires the implementation of flatMap method. The flatMap first defines a state and obtains a value from the currentState. Next, an evaluation is done to determine whether the value is empty. If the sourceAddress state is empty, the order has the initial state of the created order which is Pending Payment.
Assign State.Initial to State. It’s critical to note that here the state is a local variable rather than just a state managed in Flink. Extract its value from the state. Add another variable locally and perform the transition. The consequent impact on the state is visible and results in an event of a successful payment. The order state changes to Paid and Pending Dispatch. Obtain the latest order state from the nextState.
In addition, it is necessary to judge whether the state is legitimate. For example, if the buyer wants to cancel the accepted order, the order will have an illegitimate state because an accepted order cannot be canceled.
If a state is legitimate, another consideration is whether the state conversion continues. For example, if the order state is Cancelled, other states are impossible and the system clears the state. The clear operation is a common method for all Managed Keyed States in Flink. This operation implies the deletion of information.
If a state is neither an illegitimate state nor a final state, more state conversions may occur later. In this case, the current order state must update to complete the initialization, value extraction, and clearance of the ValueState. The role of the state machine throughout the process is to distribute illegitimate states so that processing is easy and simple in downstream systems. Other states also use a similar method.
Fault Tolerance and Failure Recovery
How to Save and Restore State
Flink maintains states mainly by using the Checkpoint mechanism. Checkpoints are distributed snapshots that back up the application state. For more information about how to implement distributed snapshots, check out the second course .
Now, another critical issue is how to restore jobs using distributed Checkpoint snapshots in case of job failures. Assume that one of the three machines that run a job suddenly fails. In this case, you must move the processes or threads to the other two active machines. In addition, all the tasks in the job need to roll back to the last checkpoint state and the job requires to continue from that point.
To use Checkpoints for recovery, data sources must support data retransmission. After recovery from checkpoints, Flink provides two consistency semantics; exactly-once and at-least-once. During the process of implementing checkpoints, determine whether to use exactly-once or at-least-once based on the alignment of Barriers. Use exactly-once if it is aligned and use at-least-once if it is not aligned.
There is no need to align Barriers if job processes on a single thread. If a process involves only one Checkpoint, it is easy to restore a job to the previous state from the Checkpoint at any time. In the case of multiple nodes, if the Barriers of one data record arrives while the Barriers of another data record is yet to arrive with the state being stored in memory, the two streams are not aligned. In this case, duplication may arise between the two streams during recovery.
Consider the following steps to implement checkpoints across code.
- Pass 1000 from the job execution environment,
env.enableCheckpointing, which indicates that the interval between the two Checkpoints is 1 sec. Frequent Checkpoints imply lesser data recovery. Meanwhile, Checkpoints consume some I/O resources.
- Set a mode for checkpoints. Set Exactly_Once and configure the alignment of Barriers to avoid message loss or duplication.
- setMinPauseBetweenCheckpoints (500) means that the minimal pause between Checkpoints is 500 ms. For example, a Checkpoint takes 700 ms. However, according to the principle, if a Checkpoint takes more than 300 ms, it should skip to the next Checkpoint because there is a checkpoint every 1000 ms. Since the waiting time is shorter than 500 ms, there is a need for an additional 200 ms waiting time. Therefore, this prevents low processing speed due to very frequent checkpoints.
- setCheckpointTimeout indicates the timeout interval of checkpoints. Checkpoints timeout and discard if they do not complete within 1 minute.
- setMaxConcurrentCheckpoints indicates how many Checkpoints are in progress. Set this parameter as per the requirements.
- enableExternalizedCheckpoints indicates that current Checkpoints are retained after job cancellation. By default, checkpoints are deleted when the entire job is canceled. Checkpoints are job-level savepoints.
As mentioned earlier, in addition to failure recovery, make manual adjustments, if necessary and redistribute states. To manually adjust concurrency, you must restart jobs with notifications stating that Checkpoints no longer exist.
Now, how to restore job data?
Flink allows retaining Checkpoints in external media when you cancel a job. Additionally, Flink also provides the SavePoint mechanism.
Similar to Checkpoints, Savepoints allow saving state to external media. When a job fails, restore it from the external media. Let’s take a look at the differences between Savepoints and Checkpoints.
- Trigger Management: While Flink automatically triggers and manages Checkpoints, users trigger and manually manage Savepoints.
- Usage: Checkpoints allow fast recovery when tasks encounter exceptions, including network jitter or timeout. On the other hand, Savepoints enable scheduled backup and allow you to stop-and-resume jobs, such as modifying code or adjusting concurrency.
- Features: Checkpoints are lightweight, implement automatic recovery from job failures and are deleted by default after job completion. Savepoints, on the other hand, are persistent and saved in a standard format. They allow code or configuration changes. Resuming a job from a Savepoint requires manual initiation and path specification.
Available State Backends
One of the state backends is MemoryStateBackend. To construct a MemoryStateBackend, set the maximum StateSize and configure whether to use asynchronous snapshots. The TaskManager node (the memory of the execution node) stores the state backend. Due to memory capacity limits, the maxStateSize of a single State defaults to 5 MB.
maxStateSize <= akka.framesize is 10 MB by default. Since the JobManager memory stores Checkpoints, the checkpoint size can't be larger than the memory of the JobManager.
Use MemoryStateBackend for local development, debugging, and jobs that hold little state, for example, when ETL and JobManager is not likely to fail or their failures have little impact. However, MemoryStateBackend is not recommended in production.
Another state backend is the FsStateBackend on a file system. To construct a FsStateBackend, pass a file path and specify whether to use asynchronous snapshots. The FsStateBackend also holds state data in the memory of the TaskManager, but unlike MemoryStateBackend, it doesn’t have the 5 MB size limit.
In terms of the capacity limit, the state size on a single TaskManager doesn’t exceed the memory size of the TaskManager and the total size doesn’t exceed the capacity of the configured file system. The FsStateBackend is applicable for jobs with a large state such as aggregation at the minute-window level, join, and jobs requiring high-availability setups.
The third state backend is RocksDBStateBackend. RocksDB is a key-value store. Similar to other storage systems for key-value data, the state is first put into memory. When the memory is about to run up, the state is written to disks. Note that RocksDB does not support synchronous Checkpoints. The synchronous snapshot option is not included in the constructor.
However, the RocksDBStateBackend is currently the only backend that supports incremental Checkpoints. This suggests that users only write incremental state changes, without having to write all the states each time. The external file systems (local file systems or HDFS) stores the Checkpoints. The state size of a single TaskManager is limited to the total size of its memory and disk.
The maximum size of a Key is 2 GB. The total size is not larger than the capacity of the configured file system. RocksDBStateBackend is applicable for jobs with a large state (ideally, jobs that do not require high read/write performance), aggregation at the day-window level, and jobs requiring high-availability setups.
This article explains the basic concepts of Flink state management and fault tolerance and answers the following questions:
(1) Why Do We Need State?
As mentioned earlier, jobs with state require stateful logic because data correlation exists between such jobs and a single data record doesn’t show all the information. Therefore, the state is required to meet business logic.
(2) Why Is State Management Necessary?
State management is essential because real-time jobs run around the clock and necessitate to cope up with the impact of unexpected factors.
(3) How to Select State Type and State Storage Methods?
This article clearly explains that to choose the proper types of state and state backends, firstly users must understand different business scenarios. Then, consider the advantages and limitations of individual schemes and make the right choice based on the actual requirements.