Architecture Evolution and Application Scenarios of Real-time Warehouses in the Cainiao Supply Chain

  • Real-time computing: In practice, we used Alibaba’s JStorm and Spark Streaming for real-time computing. Both services could meet the real-time computing requirements of most scenarios. However, for some complex functions, such as logistics and supply chains, the implementation was very complicated and the development costs were high. At the same time, it was difficult to balance functionality, performance, stability, and fast fault recovery.
  • Data services: Data is mainly stored in different types of databases, such as HBase, MySQL, and AnalyticDB (ADB). However, many of the operations personnel do not frequently query databases, and the cost of using databases is high. This is especially true for NoSQL databases. Moreover, data usage is not under control, while the required hot spot blocking, permission control, and end-to-end monitoring capabilities are absent.

Data Model Upgrade

The data model upgrade mainly consisted of the layering of models to fully reuse common intermediary models. In the previous model, data was extracted from the TT data source (such as Kafka) and processed to generate a single-layer table structure. Comparatively, the new data model is layered. The first layer is the data collection layer, which supports data collection from various databases and inputs the collected data into message-oriented middleware. The second layer is the fact detail layer, where TT-based real-time messages generate detailed fact tables and then write them to TT’s message middleware. These tables are then converged to the third and fourth layers (the light summary layer and the high summary layer) through publishing and subscription. The light summary layer is suitable for scenarios with a large amount of data dimensions and metric information, such as statistical analysis for a major promotion. Data at this layer is generally stored in Alibaba’s proprietary ADB databases, in which you can filter target metrics for aggregation based on your specific needs. In contrast, the high summary layer provides some common granularity metrics and writes data to HBase. This supports a dashboard function for real-time data display scenarios, such as media and logistics data visualization.

Computing Engine Upgrade

Cainiao originally used the JStorm and Spark Streaming services developed by Alibaba for its computing engine. These services can meet the requirements of most scenarios, but they also create problems in some complex scenarios, such as supply chains and logistics. Therefore, Cainiao fully upgraded to a Flink-based real-time computing engine in 2017. The main reasons for choosing Flink were:

  • In addition, the state-based retraction mechanism in Flink supports order cancellation and configuration modification in supply chain scenarios.
  • The CEP function introduced later simplifies the implementation of real-time timeout statistics in logistics and supply chain scenarios.
  • Automatic optimization solutions, such as AutoScaling, reduce the complexity and cost of resource allocation for Cainiao.
  • Quasi-intelligent functions, such as hybrid batch and stream operations, can better meet the actual needs of Cainiao businesses.

Case 1: State-based Retraction

The left side of the following figure shows a logistics order table, which contains four columns of data: the logistics order number, creation time, whether the order was canceled, and assigned delivery company. To count the number of valid orders that a delivery company is assigned, this seems simple but involves some considerations in actual implementation.

Case 2: Timeout Statistics

Logistics is a common business scenario in Cainiao. Logistics businesses often require real-time timeout statistics, such as the number of orders that have not been collected more than six hours after leaving the warehouse. The data table used in this example is shown on the left of the following figure. It includes the log time, logistics order number, warehouse exit time, and collection time. It is easier to implement this function in the offline hourly table or daily table. However, in real-time scenarios, we must overcome certain challenges. If an order is not collected after leaving the warehouse, no new message can be received. If this is the case, no timeout messages can be calculated. To address this, Cainiao began exploring potential solutions in 2017. During the exploration, we found that some message-oriented middleware (such as Kafka) and Flink CEP provide timeout message delivery functions. The maintenance cost of introducing messaging middleware is relatively high, whereas the application of Flink CEP would create accuracy problems.

Case 3: From Manual Optimization to Intelligent Optimization

Data hot spots and data cleansing are common problems in real-time data warehouses. Data hot spots are shown on the left of the following figure. After data shuffling is performed in the Map stage (highlighted in blue), data hot spots appear when the data is passed to Agg (highlighted in red). The following figure shows the pseudo-code implementation of Cainiao’s initial solution to this problem. To clean lg_order-code, we must first hash it and then perform secondary aggregation on the hashed results. This reduces data skew to a certain extent because there may be another Agg operation.

  • LocalGlobal: Similar to Map-stage aggregation in Hive, this parameter can aggregate data in the read stage to easily cope with count hot spots.
  • PartialFinal: In more complex scenarios, such as those with count_distinct hot spots, this parameter can be used to implement two aggregation operations, similar to the dual Reduce operations in Hive.
  • Routine scenarios: The peak QPS in routine scenarios may be significantly lower than in promotion scenarios. However, it is still complicated to configure the QPS one by one. To address this issue, Flink provides the AutoScaling intelligent tuning function. As such, it allows pre-setting QPS and obtaining the required resources based on stress testing, and can also automatically estimate the required resources based on upstream QPS data. This greatly simplifies the resource configuration process and allows developers to focus on business logic development.

Data Service Upgrade

Cainiao also provides a series of data products to offer data services during data warehouse development. Originally, it provided multiple methods for connecting to databases through Java Web. However, in practical applications, the most common database types are HBase, MySQL, and OpenSearch. Therefore, Cainiao worked with the data service team to develop a unified data service middleware called Tiangong Data Service. This service provides centralized functions, such as centralized database access, centralized authority management, centralized data guarantee, and centralized end-to-end monitoring. Also, this service privileges SQL, uses SQL as the domain-specific language (DSL) of data service, and provides a standardized service access mode (HSF).

Case 1: NoSQL to TgSQL

For NoSQL databases such as HBase, it is difficult for operational personnel to write code. As such, a standard syntax is required. To solve this problem, Tiangong provides TgSQL to standardize NoSQL conversion. The following figure shows the conversion process. As shown in the figure, the Employee table is converted to a two‑dimensional table. In this example, the conversion is logical rather than physical. To query data, Tiangong middleware parses SQL and automatically converts it to a query language in the background.

Case 2: Cross-source Data Query

When developing data products, Cainiao often encounters situations that cannot be broken down into real-time and offline cases. For example, Cainiao collects the real-time KPI completion rate every year by using the ratio of completed orders to planned orders. This computing job relies on two data sources: one is the planned offline KPI table, and the other is the calculated real-time table written to HBase. In our original solution, two APIs were called through Java, and then the necessary mathematical operations were performed at the front end. To address this problem, Cainiao provides standard SQL for queries on different data sources, such as offline MySQL tables and HBase real-time tables. This way, you only need to write data in standard SQL, use the upgraded data service to parse the data, and then query the data from the corresponding databases.

Case 3: Service Assurance Upgrade

Cainiao initially lacked service assurance capabilities. After a task was released, we could assure it was problem-free, and some problems were discovered until reported by users. In addition, when the concurrency is high, there is no way to promptly take appropriate measures, such as throttling and active/standby switchover.

Exploration and Innovation of Other Tools

In addition to data models, computing engines, and data services, Cainiao has also explored and made innovations in other areas, including real-time stress testing, process monitoring, resource management, and data quality assurance. Real-time stress testing is commonly used for major promotions. It can be used to simulate the traffic during major promotions and test whether tasks can be successfully executed under specific QPS conditions. Originally, we would restart the jobs on the standby link, and then change the source of the jobs on the standby link to the stress test source and change the sink action to the action of the stress test source. This solution was very complicated to implement when many tasks coexist. To solve this problem, the Alibaba Cloud team developed a real-time stress testing tool. This tool can start all required stress testing jobs at one time, automatically generate the source and sink items for stress testing, perform automatic stress testing, and generate stress testing reports. Flink also provides job monitoring features, including latency monitoring and alert monitoring. For example, if a job does not respond within a specified period of time, an alert is sent and a TPS or resource alert is triggered.

  • In terms of intelligence, the intelligent features provided by Flink are used to avoid data skew and optimize resources. Cainiao is also considering using Flink’s intelligent solutions to optimize some procedures during the real-time ETL process, such as deduplication. In addition, for data service assurance, features such as active/standby switchover still require manual database monitoring. In the future, Cainiao hopes that Flink will provide an end-to-end real-time assurance strategy. Finally, we need to consider intelligent capabilities in business scenarios. Therefore, we are also looking into Alibaba Alink’s support for business intelligence.

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store