Optimization of Apache Flink for Large-State Scenarios

1. Background

Use Multiple Disks to Share the Workload

state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb,/data4/flink/rocksdb,/data5/flink/rocksdb,/data6/flink/rocksdb,/data7/flink/rocksdb,/data8/flink/rocksdb,/data9/flink/rocksdb,/data10/flink/rocksdb,/data11/flink/rocksdb,/data12/flink/rocksdb

2. Common Load Balancing Policies

Hash Policy

Random Policy

Round Robin Policy

Minimum Load Policy or Least Response Time Policy

Weight Allocation Policy

3. Allocate Disks in the Source Code

/** Base paths for RocksDB directory, as initialized.
The 12 local directories of RocksDB that we specified in the preceding section. * /
private transient File[] initializedDbBasePaths;
/** The index of the next directory to be used from {@link #initializedDbBasePaths}.
The index of the directory to be used next time. If nextDirectory is 2, the directory with the subscript of 2 in the initializedDbBasePaths is used as the storage directory of RocksDB.*/

private transient int nextDirectory;
// In the lazyInitializeForJob method, this line of code is used to determine the index of the directory to be used next time.
// Generates a random number based on initializedDbBasePaths.length.
// If initializedDbBasePaths.length is 12, the random number to be generated ranges from 0 to 11.
nextDirectory = new Random().nextInt(initializedDbBasePaths.length);

4. Most Suitable Policies and the Challenges They Pose

Round Robin Policy

// Define the following code in the RocksDBStateBackend class:
private static final AtomicInteger DIR_INDEX = new AtomicInteger(0);
// Change the policy for allocating nextDirectory to the following code. Each time a directory is allocated, the value of DIR_INDEX increases by 1 and then the remainder is calculated based on the total number of directories.
nextDirectory = DIR_INDEX.getAndIncrement() % initializedDbBasePaths.length;
// Define the following code in the RocksDBStateBackend class:
private static final AtomicInteger DIR_INDEX = new AtomicInteger(new Random().nextInt(100));

Minimum Load Policy

Challenges

5. Summary

Original Source:

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Share Xamarin.Forms.WebView as a PDF?

WebHR At The Top On Human Resources (HR) Category Leaders Q1 2018 On Gartner’s GetApp

Login into WordPress WebSite using Asgardeo

Getting started with Voxa: Creating an Alexa skill Part 1

Battery Registration: Carista Service Tool for MK7 & B9

Detective Controls of AWS

ChainIDE now supports Binance Smart Chain!

Industry Use Cases with Demonstration on Kubernetes

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

The journey of million(s) devices on AWS: Kafka, Streaming, and Scaling

How to Analyze Prometheus Alertmanager Alerts Using S3, Athena and CloudFormation

Kafka Connect: An Easier Way to Connect Messages with Data Stores

What is Apache Kafka ?