How Can Tens of Millions of Data Records Be Processed Every Second?

  1. Real-time data collection and analysis, complex computing, and analysis result persistence.
  2. The capability for processing diversified data, including application logs, host performance monitoring indicators, and call link graphs.
  3. High reliability: this protects the system against failures and data loss.
  4. High performance and low latency: the system supports the processing of tens of millions of data records per second with a latency of less than three seconds.

Input and Output Definitions

To gain a better grasp of the system and how it works, it’s important to clarify first what exactly are the system inputs and outputs.

Architecture Design

Now let’s dive into the architecture design of the system. When the system is up and running, logs and monitoring data are constantly generated every minute and second. Each data record has its own timestamp. The real-time transmission of such data in a lot of ways can be understood as being like water flowing in different pipelines.

Collection Mechanism

For the collection phase and processes, Xianyu’s technical architecture uses Alibaba Cloud’s Log Service, which includes the Logtail and LogHub components. Xianyu selects Logtail as its collection client because Logtail features the benefits of high performance, high reliability, as well as the flexible plug-in extension mechanism. Xianyu can customize its own collection plug-ins to collect various data in a timely manner.

Transmission Mechanism

LogHub can be regarded as a data release and subscription component and has similar functions to Kafka. However, LogHub is a more stable and secure data transmission channel than Kafka.

Preprocessing Mechanism

For the preprocessing part, Blink, that is a processing component of Alibaba Cloud’s StreamCompute, which works to preprocess real-time data. It is Alibaba Cloud’s enhanced version of Flink which is open source. Some other open-source StreamCompute products include JStorm, SparkStream, and Blink. Of these, Blink is the most appropriate for this process as it has an excellent status management mechanism to ensure real-time computing and provides complete SQL statements to ensure easier stream computing.

Computing and Persistence Mechanism

After data is preprocessed, call link aggregation logs and host monitoring data are generated. Then, all of this host monitoring data is stored in a time series database (TSDB) for subsequent statistical analysis. Because of the special storage structure design of this kind of database for time indicator data, the TSDB is specifically suitable for storing and querying time series data. The call link aggregation logs are provided to the complex event processing (CEP) or graph service to analyze the diagnosis model. The CEP or graph service is a Xianyu in-house developed and designed application for analyzing models, processing complex data and exchanging with external services. In addition, it uses a relational database to aggregate graph data in a timely manner.

Design and Performance Optimization

Collection Process

Logtail collects logs and indicator data. The following figure shows the overall collection process.

  • Inputs: obtain data.
  • Processors: process obtained data.
  • Aggregators: aggregate data.
  • Flushers: export data to a specified sink.

Transmission Process

The transmission process is as follows: LogHub transmits data. Then, Logtail writes data, and Blink uses the data. For the entire process to work, you only need to set a proper number of partitions; the number of partitions must be greater than or equal to the number of concurrent tasks that Blink reads to prevent tasks that are running but not processing any data in Blink.

Preprocessing Process

For the preprocessing part of the process, Blink is used for data preprocessing. Its design and optimizations are a bit more complicated. Let’s discuss it in detail. Its design and optimizations can be broken up into five parts.

1. Compile an Efficient Computing Process

Blink is a stateful StreamCompute framework that is suitable for real-time aggregation, joining, and other operations.

  1. Service request entry logs: used to filter out request data with errors
  2. Logs of calling other intermediate links: used to filter out request data that services with errors depend on by joining with the preceding flow based on the traceID

2. Set a Proper State Lifecycle

When joining flows, Blink caches intermediate data based on the state and then matches data. If the state lifecycle is too long, the data can be large and the performance will be affected. If the state lifecycle is too short, delayed data may fail to be associated. Therefore, you always need to set a proper state lifecyle. The real-time data processing system allows a maximum data latency of one minute.

Use Niagara as the state backend and set the state lifecycle, in milliseconds.
state.backend.type=niagara
state.backend.niagara.ttl.ms=60000

3. Enable MicroBatch or MiniBatch

MicroBatch and MiniBatch are micro batch processing methods with different triggering mechanisms. In principle, they first cache data and then trigger data processing to reduce state accesses, improve the throughput, and compress the size of output data.

Enable joining.
blink.miniBatch.join.enabled=true
Retain the following minibatch configurations when MicroBatch is used.
blink.miniBatch.allowLatencyMs=5000
Set the maximum number of data records cached in each batch to prevent the out of memory (OOM) problem.
blink.miniBatch.size=20000

4. Use Dynamic Rebalancing to Replace Rebalancing with Dynamic Load Balancing

When Blink runs tasks, some computing nodes may fail to be executed. To ensure dynamic data rebalancing, Blink writes data to subpartitions with a light load based on the number of buffers in each subpartition to implement dynamic load balancing. Compared with the static rebalancing policy, dynamic rebalancing enhances load balancing among different tasks and improves job performance when the computing capabilities of downstream tasks are unbalanced.

Enable dynamic rebalancing.
task.dynamic.rebalance.enabled=true

5. Customize Output Plug-ins

After data is associated, data on the request link needs to be sent to downstream graph analysis nodes as one data packet. In the past, the messaging service is used to transmit data. However, the messaging service has the following disadvantages:

Graph Aggregation and Computing

After receiving a notification from MetaQ, the complex event processing (CEP) or graph computing service node generates the real-time diagnosis result based on the requested link data and dependent environment monitoring data. The following figure shows the simplified diagnosis result:

Benefits of the System

After the system goes online, the whole real-time data processing link has a latency of less than three seconds. Now, the Xianyu server takes less than five seconds to locate a problem, greatly improving the problem location efficiency.

Future Prospects

Currently, this newly implemented system enables Xianyu to process tens of millions of data records per second. In the future, the automatic problem location service will be applied to more business scenarios within Alibaba. Also, the data volume will be multiplied, which imposes higher requirements on efficiency and cost.

  1. Automatic reduction or compression of processed data.
  2. Complex model analysis and computing in Blink to reduce the number of I/O operations and improve performance.
  3. Multi-tenant data isolation.

Original Source

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com