Real-Time Marketing Data Analysis Based on Hologres + Flink
During the 2020 Double 11 Global Shopping Festival, Alibaba’s cloud-native real-time data warehouse was first implemented in core data scenarios and reached a new record for the big data platform. This data warehouse is built based on Hologres and Realtime Compute for Apache Flink, and has set a new record for the big data platform. This article focuses on the best practices of Hologres in Taobao’s marketing analysis scenarios. The article also discusses the technical challenges behind the first-time implementation of Apache Flink + Hologres stream-batch unification on the marketing analysis screen during Double 11.
1. Real-Time Marketing Data Analysis
Big promotions are crucial for business operations and user number growth for Taobao business operations. Marketing analysis services are the core data services used for decision-making and operation instructions during big promotions. They cover the analysis works of the whole process before, during, and after the promotions. Different requirements for data timeliness and flexibility must be met in different phases.
The traditional marketing analysis is based on conventional real-time offline data systems and FW product architectures. Many problems have been exposed in large and small activities in the past, and the core problems can be classified into three types:
- Real-Time and Offline Data Inconsistency: Real-time and offline data with the same standards are inconsistent, which is shown in non-unified data logical standards and data interfaces. The separation of real-time and offline data development (developers and interfaces) not only increases the overall data O&M cost but also increases the burden on product construction.
- High Maintenance Costs: As the business volume increases, the original database system cannot support complex and changing application scenarios quickly and flexibly. Conventional Apache HBase, MySQL, and AnalyticDB databases cannot meet all requirements for supporting massive-data, high-concurrency point queries, and OLAP queries. Therefore, when dealing with extremely complex services, multiple databases are needed. As a result, the costs of overall maintenance and dependency become very high.
- Poor Extensibility: Products with FW framework are constructed with high logical complexity and poor extensibility, and the maintenance cost during operation is high.
Therefore, it is increasingly important to respond to the changing business requirements quickly and to process data efficiently during activities. The upgraded next-generation architecture for marketing analysis should have the following advantages:
- Unified models (unified real-time and offline logics) and unified interfaces (unified data storage and retrieval interfaces) of real-time and offline data warehouses should be achieved for stream-batch unification.
- A more powerful data warehouse is required to support highly-concurrent writes, and query large amounts of data with quick response time.
- The existing product construction logic needs to be simplified, and the product implementation complexity needs to be reduced.
Based on the background information listed above, it is necessary to reconstruct the current architecture and find alternatives to solve the business pain points. After a lot of attempts, the Apache Flink + Hologres + FBI (a visual analysis tool of Alibaba) solution was chosen to re-architect the architecture for the Tmall marketing analysis service.
2. Stream-Batch Unification Solution
After deeply analyzing business requirements for data and exploring data models multi-dimensionally, the overall technical framework for marketing analysis reconstruction is finally determined, as shown in the figure below. The core points of the framework include:
- The stream-batch unification architecture has been upgraded by implementing stream-batch unification in SQL logic and computing.
- Data storage-query unification is achieved through Hologres.
- With the capabilities of FBI, the construction cost is reduced, and the high flexibility of the business, as well as the needs for reports of different roles, are all met.
The following section will introduce the core technologies of the technical solution: stream-batch unification, Hologres, and FBI.
1) Stream-Batch Unified Technical Framework
The following figure shows the traditional architecture of data warehouses. The core issues of the traditional data warehouse architecture are listed below:
- Storage layers of streaming processing and batch processing are separated by clusters, tables, and fields. As a result, different data retrieval logics must be written when the application layer is connected.
- Streaming and batch processing logics cannot be reused. Different SQL standards and computing engines lead to separated development for real-time and offline processing. The logics are similar in many cases, but cannot be flexibly converted, resulting in duplicated work.
- Compute-layer clusters are separated, and there are different peak use durations of real-time and offline resources. This causes low resource utilization, clear peaks and troughs.
The following figure shows the stream-batch unified architecture of data warehouses. The upgraded architecture has the following core features:
- Firstly, although the Data Warehouse Details (DWD) layer has a different storage medium, the equivalent in a data model must be guaranteed. Then, logical tables are encapsulated. One logical table maps two physical tables that are real-time DWD and offline DWD. Data computing codes are written based on the logical table.
- Secondly, a development platform (Dataphin stream-batch unified development platform) is needed to support logical table-based code development, the personalized configuration of streaming and batch computing modes, and different scheduling policies. Thus, a convenient development-O&M unified system can be constructed.
- Finally, the storage layer unification based on data modeling specifications is achieved not only in model specifications but also in the storage media. They are seamlessly connected.
The stream-batch unification data service based on Apache Flink has made a breakthrough in marketing analysis scenarios and has withstood severe stability, performance, and efficiency tests. During 2020 Double 11, the peak value of records processed by Flink reached 4 billion per second, and the data volume reached 7 TB per second. High stability was achieved in the entire Flink streaming and batch processing tasks during that time. The entire procedure was free of problems in terms of capacity, single points of servers, and network bandwidth.
2) Implementation of Hologres Stream-Batch Unification
The entire data layer is unified through the stream-batch unification data architecture. Another product is also needed to unify the entire storage layer. This product needs to support high-concurrency writes, real-time queries, and OLAP analysis.
In the architecture of the earlier versions, each page module involved the data query of one or more databases, such as MySQL, Apache HBase, and AnalyticDB 3.0 (the original HybridDB). Most real-time data was stored in HBase because HBase supported high-concurrency writing and high-performance point query. With the easy management and simple query of MySQL tables, dimension table data and offline data is usually stored in MySQL. In addition, the data used by some product modules have a small volume and multiple dimensions, such as marketing method data. AnalyticDB is selected as the database for OLAP multi-dimensional analysis. As a result, we had two pain points: the separation of real-time data from offline data and the disordered management of multiple databases and instances.
The new marketing analysis service is constructed to achieve unified storage, reduce O&M costs, and improve R&D efficiency. It also aims to realize high performance, high stability, and low cost.
After comparing multiple products, Hologres was selected for the marketing analysis service. Hologres is an all-in-one real-time data warehouse and is compatible with the PostgreSQL 11 protocol. It seamlessly connects to the big data ecosystem and supports the high-concurrency and low-latency analysis and has the capability to process PB-level data efficiently. It can easily and economically use existing BI tools to perform multi-dimensional analysis and business exploration of data. Hologres is extremely advantageous in complex business scenarios.
Based on the in-depth analysis of the modules in the marketing analysis service and the business-side requirements for data timeliness, specific real-time procedure solutions are made for these modules.
- The real-time point query of Hologres is used for core modules, such as live broadcasting, pre-sales, extra purchase, and traffic monitoring.
- The OLAP real-time query of Hologres is used to handle complex and changing marketing methods.
Based on the point query and OLAP analysis required for marketing analysis,
dt-camp-olap libraries are created for Tmall marketing analysis. Some historical data during the marketing activity needs to be stored in the
dt_camp library for a long time to make comparisons of different activities. The overall data volume is nearly 40 TB. What is stored in the OLAP library is the detailed data of the marketing methods, and the overall data volume is nearly 100 TB. As marketing methods require very high accuracy of the overall data, the inaccurate query method is not used. This puts forward higher requirements on the overall query performance of the database.
To improve the overall performance of Hologres, the following optimization strategies have been implemented for its database based on marketing analysis:
- Set the Distribution Key: In the case of the
count(distinct user_id)statement, the user_id is set as the distribution key. This way, the count distinct is performed parallelly in each shard of Hologres. This prevents large amounts of data from being shuffled and significantly improves query performance.
- Minimize the Count Distinct Times: The SQL is converted through the multi-layer
GROUP BYoperation to reduce the cost of COUNT DISTINCT.
- Shard Prunning: In some scenarios, some of the primary keys of a table will be targeted during the query. If the key combination in these scenarios is set as the distribution key, shards hit by this query can be determined when the query is processed. Thus, the number of RPC requests is reduced, which is crucial for scenarios with high QPS.
- Generate the Optimal Plan: Marketing analysis includes point query or range query based on summarized data, OLAP query based on raw data, and TopN query after single-table aggregation. For different query types, Hologres can generate optimal execution plan based on the collected statistics to ensure the QPS and latency of queries.
- Optimize the Writing: The writing operations of marketing analysis are based on the
UPDATEoperations on column storage tables. In Hologres, the
UPDATEoperations find the corresponding
uniqueidbased on the specified primary keys (PK). Then, they find the corresponding record mark for deletion based on the
uniqueidand finally insert a new record. In this case, if an incremental segment key can be set, the file can be located quickly based on the segment key during querying. This improves the record locating speed based on the PK, as well as writing performance. The peak writing traffic in the system stress testing of marketing analysis can reach 8 million records per second based on
- Compact Small Files: Some tables are written infrequently because the key updates during a fixed period of time is relatively constant. In such a case, the memory table would flush small files sometime. However, the default compaction strategy of Hologres does not always work on these files, resulting in a relatively large number of small files. Through in-depth optimization of the compaction parameters and increasing the compaction frequency, small file number is reduced, improving the query performance significantly.
Hologres performed stably during 2020 Double 11. The peak writing speed of point query in Hologres reached several hundreds of thousands of records per second, while millions of records were served per second. For OLAP, the peak writing speed reached four million records per second, and five million records were served per second. Meanwhile, the execution time of more than 99.7% of single point queries and OLAP queries was less than one millisecond. Therefore, the overall system performance was stable throughout Double 11 and supported fast point queries and OLAP analyses simultaneously.
3) FBI Analysis Big Screen
FBI is the preferred data visualization platform in the Alibaba ecosystem. It can quickly create reports for data analysis and support fast access and expansion of multiple data sets. It also supports the construction of various analytical data products through the product construction feature.
During the core process of product construction in FBI, four core functions can be used to reduce the construction cost:
- The “real-time hour-minute model” with online-offline unification enables accurate comparison and trend analysis of real-time data.
For the underlying data after stream-batch unified processing, users demand the flexible analysis of real-time data, real-time comparison, and hourly comparison. To meet users’ demands, FBI abstracts a standard data model featuring online-offline unification. This model allows accurate comparison of real-time data. For trend analysis, minute-based trends are automatically routed to minute-based tables, and hour-based trends are directly routed to hour-based tables.
- FBI’s original FAX functions are used to define and output various complex metrics in an extremely simplified way.
Complex metrics in FBI, such as channel proportion, category proportion, year-on-year contribution, and cumulative volume of activities, were all defined with complex SQL statements in the previous version. As a result, the SQL statements were too long, and the stability and maintainability of the product were also reduced. To solve this problem, FBI built a set of analytical DSL that is easy to learn and understand, called FAX functions. FAX functions include more than 20 analysis functions, such as year-on-year balance, contribution rate, and activity accumulation. Simple statements can be used to define a variety of complex metrics used in marketing analysis.
- The configurable analysis capabilities and plug-in exclusive logic can reduce page construction time.
Product page construction is a crucial process. FBI’s method to simplify user configuration is listed below:
- General Configurable Analysis Capabilities: Most commonly used analysis capabilities, such as cross tables, activity comparison, and date variable parameter passing, are upgraded to simple configuration items. Then, the year-on-year comparison and year-on-year balance analyses can be completed.
- Plug-In Exclusive Logic: Some customized capabilities, such as activity parameters, result displaying and hiding, and result sorting, are used for blocks. These capabilities can be transformed into data plug-ins.
- FBI’s high security system has been developed and tested. The release control, monitoring and alert, and change notification features have been upgraded with support for 1–5–10.
3. Test-Side Ensuring
To further ensure the quality of the marketing analysis service, the test side strictly compares and verifies data at the DWD layer, the DWS layer, and the product layer. A full range of monitoring is also carried out on the core data of the big promotion.
During 2020 Double 11, the test and inspection feature improved the ability to actively identify data problems and core issues in a timely manner. The quality and stability of the whole data product improved significantly.
4. Business Feedback and Value
During 2020 Double 11, the stream-batch unified marketing analysis service based on Apache Flink + Hologres supported an average of hundreds of PV access per capita for thousands of Tmall listings. The service also achieved zero P1/P2 failure. At the same time, several advantages of the service were shown during the activity compared with previous years:
- Comprehensiveness: Real-time data was widely used in the marketing analysis service. The core dimensions were involved in multiple dimensions, such as promotional products and merchant tag hierarchy. Real-time data in merchant and product dimensions were added in the extra purchase and pre-sale modules. By doing so, the business side’s BD of merchants was supported in a more user-friendly way.
- Stability: Based on the contiguous high-stability capability of Hologres, both real-time data writing and data reading were highly stable during Double 11. The engineering side also monitored user access and data response efficiency to analyze and resolve business problems in real-time. Moreover, product inspection covered the core data of the service to further ensure stability.
- High Efficiency: By applying the streaming and batch processing, and the unified connection of Hologres, the service greatly improved demand access efficiency during Double 11. The overall demand capacity during 2020 Double 11 was three times higher than 2019. In addition, the overall timeliness of problem feedback and resolution was improved three to four times overall.
5. Future Prospects
Even though it was tested by 2020 Double 11, it is still necessary to continuously improve the technologies to deal with more complex business scenarios:
- Dataphin’s stream-batch unification products need further improvement to reduce manual intervention costs and further ensure data quality.
- The Hologres resource separation, and isolation of read and write resources will be improved to better ensure query SLA. Hologres and MaxCompute interconnection can support metadata interconnection to have high guarantee on the product metadata. Dynamic scaling can be applied to flexibly respond to peak and daily business needs.
- FBI tools can improve the version management function of products. The same page supports multi-person editing and enables more efficient product construction.