Optimization of Apache Flink for Large-State Scenarios

  • How to configure Apache Flink when using a large state
  • Common load balancing policies
  • Problems caused when a state disk is selected for RocksDB in the Flink source code
  • Solutions and their respective advantages and disadvantages

1. Background

Use Multiple Disks to Share the Workload

state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb,/data4/flink/rocksdb,/data5/flink/rocksdb,/data6/flink/rocksdb,/data7/flink/rocksdb,/data8/flink/rocksdb,/data9/flink/rocksdb,/data10/flink/rocksdb,/data11/flink/rocksdb,/data12/flink/rocksdb

2. Common Load Balancing Policies

Hash Policy

Random Policy

Round Robin Policy

Minimum Load Policy or Least Response Time Policy

Weight Allocation Policy

3. Allocate Disks in the Source Code

/** Base paths for RocksDB directory, as initialized.
The 12 local directories of RocksDB that we specified in the preceding section. * /
private transient File[] initializedDbBasePaths;
/** The index of the next directory to be used from {@link #initializedDbBasePaths}.
The index of the directory to be used next time. If nextDirectory is 2, the directory with the subscript of 2 in the initializedDbBasePaths is used as the storage directory of RocksDB.*/

private transient int nextDirectory;
// In the lazyInitializeForJob method, this line of code is used to determine the index of the directory to be used next time.
// Generates a random number based on initializedDbBasePaths.length.
// If initializedDbBasePaths.length is 12, the random number to be generated ranges from 0 to 11.
nextDirectory = new Random().nextInt(initializedDbBasePaths.length);

4. Most Suitable Policies and the Challenges They Pose

Round Robin Policy

// Define the following code in the RocksDBStateBackend class:
private static final AtomicInteger DIR_INDEX = new AtomicInteger(0);
// Change the policy for allocating nextDirectory to the following code. Each time a directory is allocated, the value of DIR_INDEX increases by 1 and then the remainder is calculated based on the total number of directories.
nextDirectory = DIR_INDEX.getAndIncrement() % initializedDbBasePaths.length;
// Define the following code in the RocksDBStateBackend class:
private static final AtomicInteger DIR_INDEX = new AtomicInteger(new Random().nextInt(100));

Minimum Load Policy

Challenges

  • When a Flink job starts, only the current I/O usage of the disk can be obtained. Is this instantaneous value a reliable measure of average disk I/O?
  • It is impractical to measure the I/O usage for one minute during the startup of a Flink job.
  • Some users do not want to depend on an external monitoring system to measure I/O usage. Versatility must also be taken into account.
  • Assume that we have measured the I/O usage of all disks during the last minute. How can we determine the disk with the lowest disk I/O?
  • Disks with low average I/O usage also need to adopt the round robin policy.
  • It is difficult to determine which disks to view as having low average I/O usage. Is a difference of 10%, 20%, or 30% enough?
  • Different new jobs have different disk usage requirements, so it is difficult to determine how they should be allocated.

5. Summary

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com