Architecture Evolution and Application Scenarios of Real-time Warehouses in the Cainiao Supply Chain

Image for post
Image for post

Download the “Real Time is the Future — Apache Flink Best Practices in 2020” whitepaper to learn about Flink’s development and evolution in the past year, as well as Alibaba’s contributions to the Flink community.

By Jia Yuanqiao (Yuanqiao), Senior Data Technology Expert with Cainiao

During the forum on real-time data warehouses at the Flink Forward Asia Conference, Jia Yuanqiao, a senior data technology expert from the Cainiao data and planning department, discussed the evolution of Cainiao’s supply chain data team in terms of real-time data technology architecture, covering data models, real-time computing, and data services. He also discussed typical real-time application cases in supply chain scenarios and the Flink implementation solution.

First, let’s take a brief look at the technical architecture for real-time data adopted by Cainiao in 2016, which can be broken down into three aspects: data models, real-time computing, and data services.

  • Data models: Cainiao initially used a demand-driven siloed development model. This involved high computing costs and did not support reuse. It also led to data consistency problems. In addition, the overall data model was not layered, and the internal model layers of business lines were confusing. This resulted in a very high cost of data usage.
  • Real-time computing: In practice, we used Alibaba’s JStorm and Spark Streaming for real-time computing. Both services could meet the real-time computing requirements of most scenarios. However, for some complex functions, such as logistics and supply chains, the implementation was very complicated and the development costs were high. At the same time, it was difficult to balance functionality, performance, stability, and fast fault recovery.
  • Data services: Data is mainly stored in different types of databases, such as HBase, MySQL, and AnalyticDB (ADB). However, many of the operations personnel do not frequently query databases, and the cost of using databases is high. This is especially true for NoSQL databases. Moreover, data usage is not under control, while the required hot spot blocking, permission control, and end-to-end monitoring capabilities are absent.
Image for post
Image for post

In response to these problems, Cainiao performed a major upgrade to its data technology architecture in 2017, which will be described in detail below.

Data Model Upgrade

In the previous development model, business lines were developed independently, and problems common to different business lines were not considered. However, in logistics scenarios, the requirements for many functions are similar, which often resulted in a waste of resources. To address this issue, we first abstracted the common intermediary data layer (highlighted in blue on the left). Then, individual business lines could shunt data to their own intermediary data layers (highlighted in yellow on the right) based on the common intermediary data layer.

Image for post
Image for post

This business line shunting is implemented by a preset common shunting task. The shunting tasks, originally performed downstream, are completed by a common task upstream. This way, the preset common shunting models are fully reused for significant computing resource savings.

Image for post
Image for post

Next, we will look at a data model upgrade case: the real-time data model of Cainiao’s supply chain. The left side of the following figure shows the common intermediary data layer, including Cainiao’s horizontal logistics orders, dashboard logistics details, and common granularity data. Based on this, Cainiao implements preset common shunting, separating out personalized common intermediary data layers for individual business lines from logistics orders and logistics details. These intermediary layers include domestic supply chains, import supply chains, and export supply chains. Based on the common logic used for shunting and the personalized TT messages of the different business lines, we can produce an intermediary business data layer for each business line. For example, an import supply chain might shunt logistics orders and logistics details from the common business line, but store customs information and trunk line information in its own business line TT. Then, we can use this information to create an intermediary business data layer for this business line. By taking advantage of this design and introducing real-time model design specifications and real-time development specifications, it is much easier to use data models.

Image for post
Image for post

Computing Engine Upgrade

  • Functions provided by Flink are well-suited to the needs of supply chain scenarios. Cainiao has extracted Flink SQL syntax, which is easy to use and standardized, greatly improving development efficiency.
  • In addition, the state-based retraction mechanism in Flink supports order cancellation and configuration modification in supply chain scenarios.
  • The CEP function introduced later simplifies the implementation of real-time timeout statistics in logistics and supply chain scenarios.
  • Automatic optimization solutions, such as AutoScaling, reduce the complexity and cost of resource allocation for Cainiao.
  • Quasi-intelligent functions, such as hybrid batch and stream operations, can better meet the actual needs of Cainiao businesses.
Image for post
Image for post

Now, we will discuss three computing engine upgrade cases.

Case 1: State-based Retraction

One issue is that the LP3 order in the table is valid at the beginning (the cancellation state at 00:18:00 should be N, and the table is incorrect.) However, the order was canceled in the end (the cancellation state for the last line should be Y, and the table is incorrect.) In this case, the order is considered invalid and should not be included in the statistics.

In addition, we must pay attention to delivery company changes. The delivery company for LP1 at 00:01:00 was tmsA, but then changed to tmpB and tmsC. The result in the upper-right corner is obtained according to offline computing methods (such as Storm or incremental processing). Therefore, the LP1 records for tmsA, tmpB, and tmsC are all included in the statistics. However, tmsA and tmsB did not deliver the order, so the result is wrong. The correct result is shown in the table in the lower-right corner of the figure.

To address this scenario, Flink provides a built-in state-based retraction mechanism to help ensure accurate statistics when stream messages are withdrawn.

Image for post
Image for post

The following figure shows a pseudo-code implementation of the retraction mechanism. First, we use the built-in row count last_value in Flink SQL to obtain the last non-null value of the aggregate key. For the LP1 order in the preceding table, the result obtained by using last_value is tmsC, which is the correct value. Note that the retraction mechanism in Flink is triggered when any change is made to the gmt_create, plan_tms, or is_cancel field for last_value statistics on the left side.

Image for post
Image for post

Case 2: Timeout Statistics

Ultimately, Cainiao chose Flink Timer Service to meet these needs. Specifically, Cainiao rewrites the ProcessElement function in ProcessFunction at the underlying Flink layer. In this function, the original message is stored by the Flink state, and the same primary key is stored only once. Once endNode actually runs, the state message becomes invalid, and a timeout message is delivered. In addition, we rewrite an OnTimer function. This function is mainly responsible for reading the state message upon each timeout and then delivering messages that are still valid in the state. Then, we can count the number of orders with timeout messages based on the association operations of the downstream and normal stream.

Image for post
Image for post

The following figure shows the pseudo-code implementation of Flink Timer Service timeout statistics.

We must first create an execution environment and construct a Process Function (for accessing keyed state and times).

Next, we must write the processElement function, which tells state what kind of data to store and registers a timerService for each timeout message. In the code, timingHour stores the timeout period, which is six hours in this example. Then, we start timerService.

Finally, we write the onTimer function, which is used to read the state data upon timeout and send timeout messages.

Image for post
Image for post

Case 3: From Manual Optimization to Intelligent Optimization

Image for post
Image for post

Cainiao currently uses the latest version of Flink, which provides intelligent features to solve the data hot spot problem:

  • MiniBatch: Originally, every time a new data entry came in, the query and write operations were performed on state. This function can aggregate data before writing it to or reading it from state, reducing the query pressure on state.
  • LocalGlobal: Similar to Map-stage aggregation in Hive, this parameter can aggregate data in the read stage to easily cope with count hot spots.
  • PartialFinal: In more complex scenarios, such as those with count_distinct hot spots, this parameter can be used to implement two aggregation operations, similar to the dual Reduce operations in Hive.

Intelligent functions also provide help in resource configuration. During real-time ETL, we need to define the data definition language (DDL), write SQL statements, and then configure resources. To solve the resource configuration problem, Cainiao previously configured each node, including setting the concurrency, whether out-of-order message operations are involved, CPU, and memory. This approach was very complicated, and it is still impossible to predict the resource consumption of some nodes in advance. Currently, Flink provides an ideal optimization solution to this problem:

  • Major promotion scenarios: In these scenarios, Cainiao estimates the Query per Second (QPS) in advance, configures this information in a job, and restarts. After the restart, Flink automatically performs a stress test to test the QPS of each node.
  • Routine scenarios: The peak QPS in routine scenarios may be significantly lower than in promotion scenarios. However, it is still complicated to configure the QPS one by one. To address this issue, Flink provides the AutoScaling intelligent tuning function. As such, it allows pre-setting QPS and obtaining the required resources based on stress testing, and can also automatically estimate the required resources based on upstream QPS data. This greatly simplifies the resource configuration process and allows developers to focus on business logic development.
Image for post
Image for post

Data Service Upgrade

Image for post
Image for post

As an implementer of Cainiao data services, Tiangong also provides many functions relevant to businesses. Now, we will look at several specific cases.

Case 1: NoSQL to TgSQL

Image for post
Image for post

Case 2: Cross-source Data Query

Image for post
Image for post

Case 3: Service Assurance Upgrade

To address this problem, Tiangong middleware provides a data assurance feature. In addition to active/standby switchover, this feature also provides active/standby active-active, dynamic load balancing, hot spot service blocking, and whitelist throttling.

For switchover in scenarios such as that shown above with physical and logical tables on the left and right sides, one logical table can be mapped to the active and standby links. When the active link fails, you can switch to the standby link with a single click.

In addition, promotions involve some very important services, such as dashboards and internal statistical analysis. These services use the active and standby links simultaneously. At this point, it is not appropriate to fully read from and write to one of the libraries, so we expect that traffic can be distributed to both links. Tiangong implements the active/standby active-active function, which directs heavy traffic to the active link and light traffic to the standby link.

When a task on the active link is affected, the task is moved to the standby link to proceed. When complex and slow queries impact the performance of the overall task, such hot spot services are blocked.

Image for post
Image for post

Exploration and Innovation of Other Tools

Image for post
Image for post

Cainiao is currently developing a series of features for real-time data warehouses based on Flink. In the future, we plan to move in the direction of hybrid batch and stream operations and artificial intelligence (AI).

With the batch processing function provided by Flink, Cainiao no longer imports many small- and medium-size table analysis jobs to HBase. Instead, we read offline MaxCompute dimension tables directly into the memory during source definition and perform association. This way, data synchronization is removed from many operations.

In some logistics scenarios, if the process is relatively long, especially for orders during Double 11, some orders may not be signed for. Even worse, if an error is found in the job and the job is restarted accordingly, the state of the job will be lost and the entire upstream source can only be retained for up to three days in TT. This makes it more difficult to solve such problems. Cainiao discovered that the batch function provided by Flink is an ideal solution to this problem. Specifically, it defines the TT source as an application for three-day real-time scenarios. TT data is written to the offline database for historical data backup. If a restart occurs, the offline data is read and aggregated. Even if the Flink state is lost, the offline data allows a new state to be generated. This means you do not have to worry about being unable to obtain information for Double 11 orders signed for before the 17th if a restart occurs. Undoubtedly, Cainiao has encountered many difficulties in our attempts to solve such problems. One example is the problem of data disorder when aggregating real-time data and offline data. Cainiao implemented a series of user-defined functions (UDFs) to address this issue, such as setting read priorities for real-time and offline data.

  • For log-based business scenarios, such as exposure and website traffic, after a log is generated, it will not change. Cainiao is currently considering handing over all the parsing work to Flink and then writing data into batch. This way, we can avoid batch operations in MaxCompute.
  • In terms of intelligence, the intelligent features provided by Flink are used to avoid data skew and optimize resources. Cainiao is also considering using Flink’s intelligent solutions to optimize some procedures during the real-time ETL process, such as deduplication. In addition, for data service assurance, features such as active/standby switchover still require manual database monitoring. In the future, Cainiao hopes that Flink will provide an end-to-end real-time assurance strategy. Finally, we need to consider intelligent capabilities in business scenarios. Therefore, we are also looking into Alibaba Alink’s support for business intelligence.
Image for post
Image for post

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store