Trillions of Bytes of Data Per Day! Application and Evolution of Apache Flink in Kuaishou

1) Scenarios and Scale of Apache Flink in Kuaishou

1.1 Apache Flink Scenarios in Kuaishou

  • 80% monitoring: Collects statistics on various data metrics in real time, and generates alerts to assist the business team with real-time analysis and monitoring.
  • 15% logical data processing: Cleanses, splits, and joins data, such as the splitting and cleansing of the data of large topics.
  • 5% real-time service processing: Processes service data in real time based on the specific business logic, such as real-time data scheduling.
  • Quality monitoring of short videos and live streams: Collects statistics on metrics related to live streaming quality in real time. Such metrics include the volume of live streams, the lag rate, and the live streaming startup failure rate on the audience side and the caster side.
  • User growth analysis: Collects statistics on the growth of new users in each distribution channel in real time and adjusts the distribution volume for each channel in real time based on the results.
  • Real-time data processing: Joins the ad display streams and clickstreams in real time and splits client logs.
  • Live streaming traffic scheduling for CDNs: Monitors the quality of each CDN in real time and adjusts the ratio of traffic allocated to each CDN provider.

1.2 Scale of Flink Clusters in Kuaishou

2) Technical Evolution of Flink in Kuaishou

  • We optimized the performance in specific scenarios, such as interval joins.
  • We improved stability by controlling the speed of data sources, improving the stability of the JobManager, and resolving frequent job failures.
  • We built a platform.

2.1 Performance Optimization in Specific Scenarios

2.1.2 Optimization of Interval Join

2.1.2.1 How Interval Join Works

KeyedStreamA.intervalJoin(KeyedStreamB)
.between(Time.minutes(0),Time.minutes(20))
.process(joinFunction)

2.1.2.2 Select a Storage Policy for States

  • FsStateBackend: A state is first stored in the memory. When a checkpoint occurs, the state is persisted to Hadoop Distributed File System (HDFS).
  • RocksDBStateBackend: A state is stored in the RocksDB instance. Incremental checkpoints are supported. This method is suitable for ultra large states.

2.1.2.3 Analysis of RocksDB Access Performance Problems

  • After a job had been running for a period of time, backpressure occurred, and the throughput decreased.
  • With Jstack, we found that the program logic was frequently at the RocksDB get request stage.
  • With Top, we found that single-thread CPUs were always fully loaded.

2.1.2.4 Optimize the Access Performance of RocksDB

  • Replace prefix scan with full-key scan. In the interval join scenario, the precise bounds of the data to access can be determined. Full-key scan allows you to precisely splice the queried upper and lower bounds. The full key is in the form of keyGroupId+joinKey+ts[lower,upper].
  • Enable range query in RocksDB. This method can more precisely seek to the upper and lower bounds, avoiding scanning and validating invalid data.

2.1.2.5 Resolve the High Disk Load in RocksDB

  • We selected Flink cluster nodes of the computing type, with large memory and a single HDD. When the cluster included few nodes, a single node needed to process four or five containers of a job and stored data in only one HDD.
  • The RocksDB backend frequently performed compactions, which caused write amplification. At the same time, checkpoints also wrote data to the disk.
  • We tuned the RocksDB parameters to reduce the I/O of a compaction process. After optimization, the total I/O volume decreased by about 50%.
  • We added a RocksDB configuration package for large states to the Flink framework layer to facilitate the tuning of RocksDB parameters and enable RocksDBStateBackend to specify various RocksDB parameters.
  • In the future, we will consider storing states through shared storage. This will further reduce the total I/O and allow quick checkpoints and recovery.

2.2 Stability Improvement

2.2.1 Source Speed Control

Source Speed Control Policy
  • A SourceTask shares a speed status in the form of <id,ctime,watermark,speed> to JobManager.
  • In the JobManager, the SourceCoordinator is introduced. It can view the global speed, develop policies accordingly, and deliver the speed control policy to the SourceTask.
  • The SourceTask executes the corresponding speed control logic based on the speed adjustment information delivered by the JobManager.
  • A significant advantage of this method is that, when a DAG includes sub-graphs, the sources of different sub-graphs do not affect each other.
Details of the Source Speed Control Policy
  • The SourceTask periodically reports a status to the JobManager. The default interval is 10 seconds.
  • The report content is in the form of <id,clocktime,watermark,speed>.
  • Speed control threshold: Watermark of the fastest concurrent read — Watermark of the slowest concurrent read > Δt (5 minutes by default). A speed control policy is formulated only when the speed control threshold is reached.
  • Global prediction: TargetWatermark of each concurrent read = Base + Speed x Time. The SourceCoordinator first performs global prediction to predict the watermark that can be reached by each concurrent read at the next interval.
  • Global decision-making: TargetWatermark = Predicted watermark of the slowest concurrent read + Δt/2. The SourceCoordinator obtains the predicted watermark of the slowest concurrent read based on the global prediction result, and then adds a range to it to obtain the target watermark. The target watermark is then used to determine the global speed control policy in the next cycle.
  • Delivery of speed control information: The speed control information is in the form of , which includes the next target point in time and the target watermark. The SourceCoordinator sends the global decision information to all SourceTasks.
  • After obtaining the speed control information in the form of <targetTime,targetWatermark>, the SourceTask performs speed control.
  • Take KafkaSource as an example. When obtaining data, KafkaFetcher checks the current progress based on the speed control information to determine whether to wait, which controls the read speed. Other factors to consider in this solution are as follows:
  • In terms of time, speed control is performed only in EventTime.
  • A job can specify whether to enable source speed control policies.
  • The sources of different sub-graphs of a DAG must not affect each other.
  • The delivery of checkpoint barriers must not be affected.
  • The transmission speed of the data sources is not constant. Therefore, this solution also considers how to avoid a sudden surge in watermarks.
Source Speed Control Result

2.2.2 Stability of the JobManager

2.2.3 Solutions to Frequent Job Failure

  • For the disk errors, we added DiskChecker to the TaskManager to check the health of the disk. If an error occurs on the disk, the TaskManager will automatically exit.
  • For some nodes where TaskManager errors frequently occur, we allowed users to specify a policy to blacklist the faulty nodes. In addition, we added a software blacklist mechanism to instruct the Yarn not to schedule any Container to the blacklisted nodes.

2.3 Platform Building

2.3.1 Qingteng Job Hosting Platform

2.3.2 Optimization of the Problem Locating Process

3) Future Plans

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com