Apache Flink Fundamentals: State Management and Fault Tolerance

By Sun Mengyao and compiled by Han Fei

Basics of State Management

What Is State?

Image for post
Image for post

Before we dive into the basics of state management, let’s consider an example of stateless computing. Suppose in consumption latency computing there’s a message queue wherein a producer continuously writes messages into the consumer queue and from which multiple consumers read messages simultaneously. As the preceding figure shows, the producer writes 16 messages, with the offset standing at 15. The three consumers consume data at different speeds. The fastest consumer consumes 13 messages, while slower consumers consume about 8 messages.

By how many messages does each consumer lag behind in real-time? The preceding figure provides an input and output example. For the input, each point in time has a timestamp. The timestamps infer the position where the producer writes messages as well as the positions where each consumer reads at the given point in time.

As shown in the figure, the producer has written 15 messages and the three consumers have read 10, 7, and 12 messages respectively. A question arises — how to convert the progress of the producer and consumers into the schematic information shown on the right side? Consumer 0 is 5 messages behind; Consumer 1 is 8 messages behind; Consumer 2 is 3 messages behind. According to Flink design principles, the Map operation is required at this point. The Map operation reads the messages and deducts the number of messages to detect by how many messages each consumer lags behind the producer. The final result is obtained as the Map operation continues.

Under this computing model, the output result remains the same, irrespective of how many times the same message is computed; because a single input entry already contains all the required information. The number of messages produced by the producer minus the number of messages already read by consumers is the consumption latency. As the consumption by both, the producer and consumers are found in a single entry, the same input means the same output, which is a stateless computation.

Image for post
Image for post

Now, what is stateful computing?

Consider the access log count as an example. Assuming you have an Nginx access log, where each log represents a request and records where the request comes from and which URL is accessed. Now, count how many times each URL is accessed, that is, how many times each API is invoked. As the simplified input and output shows, a total of three Nginx logs are generated. The first input entry requests GET /api/a at a certain point in time; the second log records Post /api/b at a certain point in time; the third entry requests GET /api/a. From the three Nginx logs, the first output entry shows that /api/a is accessed once.

The second output entry shows that /api/b is accessed once, and the third output entry shows that api/a is accessed twice. The two /api/a Nginx logs have the same input but different outputs (for one log, the count is 1; for the other, the count is 2), indicating that the output may be different although the input is the same. The output depends on how many times the requested API URL was accessed cumulatively.

For the first input, the cumulative access is 0, so the count is 1. for the second input, the API was accessed once before, so the cumulative count of the accesses to /api/a is 2. A single entry contains information about the current access instead of all the accesses. To obtain all the information, you also need the cumulative API access count, namely, state.

The computing model is to combine data and operators for performing complicated computation tasks and generating output data. In the process, operators access the stored state. Additionally, the impact of the current data on the state updates in real-time. Thus, if you enter 200 pieces of data, you get 200 results as a return.

Image for post
Image for post

Next, which scenarios require state information?

The following four common scenarios demand state information.

  • Deduplication: For instance, data in the upstream system is duplicate and you want to remove duplicate data while the data appears in the downstream system. To remove duplicates, you must know which data already exists and which data has not been stored before. Therefore, it requires you to record all primary keys and see whether an inbound data entry already exists in the primary key.

Why is State Management Necessary?

Image for post
Image for post

The most direct and common state management method is to put data in memory. For example, in a Wordcount scenario, words are the input, and Count is the output. The process continuously accumulates the input to count.

Streaming jobs have the following requirements.

  • 24/7 operation and high reliability

However, to meet the preceding requirements, some problems may occur in memory management. There is a limit to memory. To perform computations in a 24 hour time window, the data within the 24 hour period must be put in memory, which may lead to insufficient memory. Additionally, to ensure 24/7 service, there’s a need to implement high availability.

To overcome the downtime challenges, you must consider how to implement back up and recovery. You also need to consider scale-out. If a website does not receive many visits, the program that counts the number of visits to each API may run in single-thread mode. If the website experiences a surge of visits and a single node struggles to process all the access data, you must implement scale-out by adding several nodes. In such a case, another problem arises — how to evenly distribute the data state to newly-added nodes?

Therefore, to put all the data into memory is not the best solution for state management.

Ideal State Management

Image for post
Image for post

To implement ideal state management, you need to meet the following three requirements.

  • Ease of Use: Flink provides rich data structures, various forms of state organization, and simple extension interfaces, to simplify state management and make it more convenient.

Flink State Types and Use Cases

Managed State and Raw State

Image for post
Image for post

Managed State refers to an automatically managed Flink state, while the raw state is an inherent state whose data structures are not visible to Flink. Following are the key differences between both the states:

  • State Management Mode: Flink runtime maintains the Managed State. It stores, auto-recovers and optimizes for memory management. Users manage and serialize Raw State themselves. Flink does not know the data structures stored in the Raw State. Only users know the data structures. There is a need to serialize Raw State into storable data structures.

Keyed State and Operator State

Image for post
Image for post

Further, the Managed State has two types- Keyed State and Operator State. In the Flink Stream model, the keyBy operation converts a DataStream into a KeyedStream.

Each key corresponds to a state which implies that an Operator instance processes multiple keys and accesses corresponding states, leading to Keyed State. You can only use Keyed State in operators on a KeyedStream. Thus, you cannot use a KeyedStream if a program does not have the keyBy operation.

On the contrary, Operator State is available in all operators and provides a better matching method than data sources. Operator State is often applicable in sources, for example, FlinkKafkaConsumer. Unlike the Keyed State, one Operator instance corresponds to a state. As the number of concurrency instances changes, the state migrates among instances along with keys. For example, if only one instance exists, both /api/a and /api/b are stored in the instance; if the requests grow and scaling becomes necessary, the state of /api/a and /api/b will be stored on different nodes. Since Operator State is not relative to keys when the concurrency instances change, it is necessary to select how to redistribute state.

Two built-in redistribution schemes are available, even-split redistribution and union redistribution. With reference to access, RuntimeContext implementation extends access to Keyed State. This requires an Operator to be a rich function. To use Operator State, a stateful function implements the CheckpointedFunction interface or the ListCheckpointed interface. In terms of data structures, Keyed State supports data structures such as ValueState, ListState, ReducingState, AggregatingState, and MapState, while Operator State supports fewer data structures, for example, ListState.

Use Cases of Keyed State

Image for post
Image for post

Keyed State is classified into many types. The preceding figure shows the relationship between several types of Keyed State. The first-level sub-types include ValueState, MapState, and AppendingState. AppendingState also has a sub-type called MergingState. MergingState is further divided into three sub-types. ListState, ReducingState, and AggregatingState. This inheritance causes differences in their access methods and data structures.

Image for post
Image for post

Let’s take a look at the differences between types of Keyed State.

  • ValueState stores a single value. For example, in Wordcount scenarios, words are keys and State is the word count. The single value is either a number or string. Two types of access interfaces are used, get and set. Set value using update(T) and retrieve using T value().
Image for post
Image for post

The following section uses ValueState as an example to show how to use it in a state machine scenario.

The preceding snapshot only shows a snippet of code. To see the complete source code, click here

The figure only shows the mainstay information and omits the content of the main methods and main functions for Flink jobs, input, output, and some personalized configuration items.

The sample includes two DataStreams. An event that loads data through env.addSource and an alert. First, the system uses Keyby for the sourceAddress and then flatMap for creating a StateMachineMapper. The StateMachineMapper is a state machine. A state machine is a device that represents different states and changes in the state.

Consider a simple shopping scenario, when a buyer places an order and the system generates an order entry, the state is Pending Payment. When the payment is made successfully, the event state changes from Pending Payment to Paid and Pending Dispatch. Post shipment, the order state changes from Paid and Pending Dispatch to Being Delivered. When the buyer accepts the item, the state changes from Being Delivered to Accepted. Note that in the process, the order cancelation event may happen at any time and once the order cancels, the final event state changes to Cancelled.

How does Flink write state machines? To implement this, it needs RichFlatMapFunction. We need the Keyed State getRuntimeContext and the getRuntimeContext process requires a Rich Function. Therefore, use an open method to obtain the currentState and implement the getState. The currentState shows the current state. For instance, After order placement, the currentState is Pending Payment. Once the initialization is complete, the currentState shows that the order is created. The next step requires the implementation of flatMap method. The flatMap first defines a state and obtains a value from the currentState. Next, an evaluation is done to determine whether the value is empty. If the sourceAddress state is empty, the order has the initial state of the created order which is Pending Payment.

Assign State.Initial to State. It’s critical to note that here the state is a local variable rather than just a state managed in Flink. Extract its value from the state. Add another variable locally and perform the transition. The consequent impact on the state is visible and results in an event of a successful payment. The order state changes to Paid and Pending Dispatch. Obtain the latest order state from the nextState.

In addition, it is necessary to judge whether the state is legitimate. For example, if the buyer wants to cancel the accepted order, the order will have an illegitimate state because an accepted order cannot be canceled.

If a state is legitimate, another consideration is whether the state conversion continues. For example, if the order state is Cancelled, other states are impossible and the system clears the state. The clear operation is a common method for all Managed Keyed States in Flink. This operation implies the deletion of information.

If a state is neither an illegitimate state nor a final state, more state conversions may occur later. In this case, the current order state must update to complete the initialization, value extraction, and clearance of the ValueState. The role of the state machine throughout the process is to distribute illegitimate states so that processing is easy and simple in downstream systems. Other states also use a similar method.

Fault Tolerance and Failure Recovery

How to Save and Restore State

Image for post
Image for post

Flink maintains states mainly by using the Checkpoint mechanism. Checkpoints are distributed snapshots that back up the application state. For more information about how to implement distributed snapshots, check out the second course .

Now, another critical issue is how to restore jobs using distributed Checkpoint snapshots in case of job failures. Assume that one of the three machines that run a job suddenly fails. In this case, you must move the processes or threads to the other two active machines. In addition, all the tasks in the job need to roll back to the last checkpoint state and the job requires to continue from that point.

To use Checkpoints for recovery, data sources must support data retransmission. After recovery from checkpoints, Flink provides two consistency semantics; exactly-once and at-least-once. During the process of implementing checkpoints, determine whether to use exactly-once or at-least-once based on the alignment of Barriers. Use exactly-once if it is aligned and use at-least-once if it is not aligned.

There is no need to align Barriers if job processes on a single thread. If a process involves only one Checkpoint, it is easy to restore a job to the previous state from the Checkpoint at any time. In the case of multiple nodes, if the Barriers of one data record arrives while the Barriers of another data record is yet to arrive with the state being stored in memory, the two streams are not aligned. In this case, duplication may arise between the two streams during recovery.

Consider the following steps to implement checkpoints across code.

  • Pass 1000 from the job execution environment, env.enableCheckpointing, which indicates that the interval between the two Checkpoints is 1 sec. Frequent Checkpoints imply lesser data recovery. Meanwhile, Checkpoints consume some I/O resources.

As mentioned earlier, in addition to failure recovery, make manual adjustments, if necessary and redistribute states. To manually adjust concurrency, you must restart jobs with notifications stating that Checkpoints no longer exist.

Now, how to restore job data?

Flink allows retaining Checkpoints in external media when you cancel a job. Additionally, Flink also provides the SavePoint mechanism.

Image for post
Image for post

Similar to Checkpoints, Savepoints allow saving state to external media. When a job fails, restore it from the external media. Let’s take a look at the differences between Savepoints and Checkpoints.

  • Trigger Management: While Flink automatically triggers and manages Checkpoints, users trigger and manually manage Savepoints.

Available State Backends

Image for post
Image for post

One of the state backends is MemoryStateBackend. To construct a MemoryStateBackend, set the maximum StateSize and configure whether to use asynchronous snapshots. The TaskManager node (the memory of the execution node) stores the state backend. Due to memory capacity limits, the maxStateSize of a single State defaults to 5 MB.

Note that maxStateSize <= akka.framesize is 10 MB by default. Since the JobManager memory stores Checkpoints, the checkpoint size can't be larger than the memory of the JobManager.

Use MemoryStateBackend for local development, debugging, and jobs that hold little state, for example, when ETL and JobManager is not likely to fail or their failures have little impact. However, MemoryStateBackend is not recommended in production.

Image for post
Image for post

Another state backend is the FsStateBackend on a file system. To construct a FsStateBackend, pass a file path and specify whether to use asynchronous snapshots. The FsStateBackend also holds state data in the memory of the TaskManager, but unlike MemoryStateBackend, it doesn’t have the 5 MB size limit.

In terms of the capacity limit, the state size on a single TaskManager doesn’t exceed the memory size of the TaskManager and the total size doesn’t exceed the capacity of the configured file system. The FsStateBackend is applicable for jobs with a large state such as aggregation at the minute-window level, join, and jobs requiring high-availability setups.

Image for post
Image for post

The third state backend is RocksDBStateBackend. RocksDB is a key-value store. Similar to other storage systems for key-value data, the state is first put into memory. When the memory is about to run up, the state is written to disks. Note that RocksDB does not support synchronous Checkpoints. The synchronous snapshot option is not included in the constructor.

However, the RocksDBStateBackend is currently the only backend that supports incremental Checkpoints. This suggests that users only write incremental state changes, without having to write all the states each time. The external file systems (local file systems or HDFS) stores the Checkpoints. The state size of a single TaskManager is limited to the total size of its memory and disk.

The maximum size of a Key is 2 GB. The total size is not larger than the capacity of the configured file system. RocksDBStateBackend is applicable for jobs with a large state (ideally, jobs that do not require high read/write performance), aggregation at the day-window level, and jobs requiring high-availability setups.

Summary

This article explains the basic concepts of Flink state management and fault tolerance and answers the following questions:

(1) Why Do We Need State?

As mentioned earlier, jobs with state require stateful logic because data correlation exists between such jobs and a single data record doesn’t show all the information. Therefore, the state is required to meet business logic.

(2) Why Is State Management Necessary?

State management is essential because real-time jobs run around the clock and necessitate to cope up with the impact of unexpected factors.

(3) How to Select State Type and State Storage Methods?

This article clearly explains that to choose the proper types of state and state backends, firstly users must understand different business scenarios. Then, consider the advantages and limitations of individual schemes and make the right choice based on the actual requirements.

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store