Optimization of Apache Flink for Large-State Scenarios

  • Common load balancing policies
  • Problems caused when a state disk is selected for RocksDB in the Flink source code
  • Solutions and their respective advantages and disadvantages

1. Background

Apache Flink supports multiple types of state backends. When the state is large, only RocksDBStateBackends are available.

Use Multiple Disks to Share the Workload

RocksDB stores data in both the memory chips and disks. When the state is large, the disk usage is high. If the RocksDB is frequently read, the disk I/O will restrict the performance of Flink jobs.

state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb,/data4/flink/rocksdb,/data5/flink/rocksdb,/data6/flink/rocksdb,/data7/flink/rocksdb,/data8/flink/rocksdb,/data9/flink/rocksdb,/data10/flink/rocksdb,/data11/flink/rocksdb,/data12/flink/rocksdb

2. Common Load Balancing Policies

Assume 12 disks are allocated to the RocksDB and only three parallel instances need to use the disks, with each instance using one disk. However, there is a high possibility that two parallel instances may share the same disk, and a low possibility that three parallel instances will share the same disk. In this case, the performance of Flink jobs will be easily restricted due to the disk I/O.

Hash Policy

After a certain hash policy is implemented for a job, the system distributes the workload to multiple workers. Correspondingly, in the preceding scenario, the RocksDB directories used by multiple slots are pointed to multiple disks to share the workload. However, a hash collision may occur. Specifically, after different parallel instances run the same hash function, the same hashCode is output or different hashCode values are output but the different parallel instances are distributed to the same disk based on the remainders obtained by dividing the hashCode values by the number of disks.

Random Policy

The random policy generates a random number for each Flink job and randomly allocates the workload to a worker. This means the workload is randomly allocated to a disk. However, random numbers may conflict with each other.

Round Robin Policy

The round robin policy is easy to understand: Two workers receive data in turn. When a Flink job applies for a RocksDB directory for the first time, directory 1 is allocated. Then, when the Flink job applies for a RocksDB directory for the second time, directory 2 is allocated, and so on. This policy allocates jobs most evenly. If this policy is used, the maximum difference between the number of jobs allocated to different disks is 1.

Minimum Load Policy or Least Response Time Policy

This policy allocates jobs based on the response time of workers. If the response time of a worker is short, the worker has a light load, and you can allocate more jobs to the worker. Correspondingly, in the preceding scenario, this policy checks the I/O usage of each disk. A low I/O usage indicates that the disk is relatively idle, and you can allocate more jobs to the disk.

Weight Allocation Policy

This policy allocates different weights to workers. A worker with a higher weight is allocated more jobs. The number of jobs allocated to a worker is in direct proportion to the weight of the worker.

3. Allocate Disks in the Source Code

When Flink 1.8.1 is used online, some disks may be allocated multiple parallel instances, while other disks may not be allocated any parallel instance. This usually occurs when a hash or random policy is used in the source code. In most cases, each disk is allocated only one job, and there is a low probability that multiple jobs are allocated to one disk. Therefore, the problem to be resolved is the unlikely scenario that multiple jobs are allocated to one disk.

/** Base paths for RocksDB directory, as initialized.
The 12 local directories of RocksDB that we specified in the preceding section. * /
private transient File[] initializedDbBasePaths;
/** The index of the next directory to be used from {@link #initializedDbBasePaths}.
The index of the directory to be used next time. If nextDirectory is 2, the directory with the subscript of 2 in the initializedDbBasePaths is used as the storage directory of RocksDB.*/

private transient int nextDirectory;
// In the lazyInitializeForJob method, this line of code is used to determine the index of the directory to be used next time.
// Generates a random number based on initializedDbBasePaths.length.
// If initializedDbBasePaths.length is 12, the random number to be generated ranges from 0 to 11.
nextDirectory = new Random().nextInt(initializedDbBasePaths.length);

4. Most Suitable Policies and the Challenges They Pose

When a large number of jobs exist, the random and hash policies can ensure that each worker undertakes the same number of jobs. However, if a small number of jobs exist, for example, when 20 jobs are allocated to 10 workers using the random algorithm, some workers may not be allocated any jobs, and some workers may be allocated three or four jobs. In this case, the random and hash policies cannot resolve the problem of uneven job allocation to disks in RocksDB. What about the round robin policy or the minimum load policy?

Round Robin Policy

The round robin policy can resolve the preceding problem in the following way:

// Define the following code in the RocksDBStateBackend class:
private static final AtomicInteger DIR_INDEX = new AtomicInteger(0);
// Change the policy for allocating nextDirectory to the following code. Each time a directory is allocated, the value of DIR_INDEX increases by 1 and then the remainder is calculated based on the total number of directories.
nextDirectory = DIR_INDEX.getAndIncrement() % initializedDbBasePaths.length;
// Define the following code in the RocksDBStateBackend class:
private static final AtomicInteger DIR_INDEX = new AtomicInteger(new Random().nextInt(100));

Minimum Load Policy

This policy monitors the average I/O usage of all disks corresponding to local directories of RocksDB for the previous one to five minutes when a TaskManager starts, filters out disks with high I/O usage, and preferentially select disks with low average I/O usage. In this scenario, a round robin policy is still implemented for the disks with low average I/O usage.

Challenges

  • When a Flink job starts, only the current I/O usage of the disk can be obtained. Is this instantaneous value a reliable measure of average disk I/O?
  • It is impractical to measure the I/O usage for one minute during the startup of a Flink job.
  • Some users do not want to depend on an external monitoring system to measure I/O usage. Versatility must also be taken into account.
  • Assume that we have measured the I/O usage of all disks during the last minute. How can we determine the disk with the lowest disk I/O?
  • Disks with low average I/O usage also need to adopt the round robin policy.
  • It is difficult to determine which disks to view as having low average I/O usage. Is a difference of 10%, 20%, or 30% enough?
  • Different new jobs have different disk usage requirements, so it is difficult to determine how they should be allocated.

5. Summary

This article analyzes problems that occur in scenarios where Flink uses large states. It also provides several solutions.

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store