Meituan-Dianping’s Use of Flink-based Real-time Data Warehouse Platforms
Download the “Real Time is the Future — Apache Flink Best Practices in 2020” whitepaper to learn about Flink’s development and evolution in the past year, as well as Alibaba’s contributions to the Flink community.
By Lu Hao (Senior Technical Expert at Meituan-Dianping)
Data warehouse construction is an essential part of data intelligence and an inevitable challenge in large-scale data application. Flink-based real-time data warehouses assume an important role in the data process. In this article, Lu Hao, a senior technical expert from Meituan-Dianping, shares Meituan-Dianping’s practices using the Flink-based real-time data warehouse platform.
This article covers the following content:
- Meituan-Dianping’s Real-time Computing evolution and business practices
- Flink-based real-time data warehouse platform
- Future development and considerations
1) Meituan-Dianping’s Real-time Computing Evolution and Business Practices
Evolution of Real-time Computing at Meituan-Dianping
In 2016, Meituan-Dianping already had an initial platform based on Apache Storm. In early 2017, Meituan-Dianping introduced Spark Streaming for specific scenarios, mainly for data synchronization. At the end of 2017, Meituan-Dianping introduced Flink on its real-time computing platform. Compared with Apache Storm and Spark Streaming, Flink has many advantages. At this stage, Meituan-Dianping carried out an in-depth platform-oriented restructuring of its system, focusing on the security, stability, and ease of use. Since 2019, Meituan-Dianping has been committed to providing real-time data warehouse and machine learning solutions to better support businesses.
Real-time Computing Platform
At present, Meituan-Dianping has thousands of machines and tens of thousands of active jobs on its real-time computing platform every day, with 150 million messages processed per second during peak hours and thousands of users using the real-time computing service.
Real-time Computing Platform Architecture
The following figure shows the real-time computing architecture of Meituan-Dianping.
The underlying layer collects real-time user data, including binary logs, backend service logs, and IoT data, which are processed by the log collection team and the database collection team and then collected to Apache Kafka. The data is used in both real-time computing and offline computing.
Above the collection layer is the storage layer. In addition to using Apache Kafka as a message channel, this layer stores status data based on HDFS and stores dimension data based on HBase.
Above the storage layer is the engine layer, which includes Apache Storm and Flink. The real-time computing platform encapsulates frameworks and supports public packages and components for users at the engine layer.
Above the engine layer is the platform layer, which manages data, tasks, and resources.
The top layer of the architecture is the application layer, which includes real-time data warehouses, machine learning, data synchronization, and event-driven applications.
Meituan-Dianping’s real-time computing platform mainly provides job and resource management functions.
Job management includes job configuration, release, and status functions.
- Job configuration includes job settings, runtime settings, and topology settings.
- Job release includes version management, compilation, release, and rollback.
- Job status includes the runtime status, custom metrics and alerts, and command and runtime logs.
Resource management includes multi-tenant resource isolation and resource delivery and deployment.
Business Data Warehouse Practices
As mentioned earlier, Meituan-Dianping’s real-time computing platform focuses on security, ease of use, and stability and it assumes an important role for business data warehouses. We will share some examples of business data warehouses.
Our first example looks at traffic. The traffic data warehouse is a basic service for traffic businesses. For business channels, there are tracking points for different channels and tracking data on different pages. After passing through the log collection channel, traffic is distributed to different business channels at the basic details layer, such as the Meituan channel and take-out channel.
Finer-grain splitting is performed based on business channels, such as exposure logs, preferences, and recommendations. In this case, data can be provided as streams to downstream service providers, and this approach can also be used to analyze traffic in real time.
The following figure shows an architectural diagram of the traffic data warehouse on the right, which has four layers. From the bottom up, the SDK layer includes the frontend, mini program, and app SDKs. The collection layer stores tracking logs in Nginx and sends the logs to Apache Kafka at the storage layer through the log collection channel. At the computing layer, the traffic team encapsulates upper-layer SQL statements based on Apache Storm and dynamically updates the SQL statements. Jobs do not have to be restarted in the event of SQL changes.
Real-time ad performance
Here, we will give another example of a traffic data warehouse: one used to verify real-time ad performance. The following figure shows the comparison of real-time ad performance on the left. Ad hits are divided into page view (PV) hits, server PV (SPV) hits, client PV (CPV) exposure hits, and CPV click hits. Each hit contains a traffic request ID and a hit experiment path. You can join all logs based on the request ID and the hit experiment path to obtain all the data required for a request and store the data in Durid to verify the performance, including the actual click-through rate (CTR) and estimated CTR.
Another business data warehouse practice is instant delivery. Real-time data assumes an important role in instant delivery marketing strategies. Take delivery time estimation as an example. The delivery time measures the delivery difficulty for delivery staff, which is divided into multiple periods. The delivery data warehouse cleans and extracts feature data based on Apache Storm so that the algorithm team can train and obtain the time estimation result. This process involves the seller, delivery staff, and user, with many data features and a large data volume.
Real-time business data warehouses are divided into traffic, business, and feature data warehouses.
- In terms of data models, the traffic data warehouse is a flat wide table, the business data warehouse is modeled in normal form, and the feature data warehouse uses KVStore.
- In terms of data sources, traffic data warehouses obtain data from logs, business data warehouses obtain data from business binary logs, and feature data warehouses obtain data from various sources.
- In terms of data volume, the data volume of traffic and feature data warehouses is massive, with tens of billions of data records every day, while the data volume of business data warehouses is millions to tens of millions every day.
- In terms of data update frequency, traffic data is rarely updated while business and feature data is updated more often. We focus on time series and trends of traffic data, and status changes of business data and feature data.
- In terms of data accuracy, traffic data can be less accurate while business data and feature data must be highly accurate.
- In terms of model adjustment frequency, business data models are adjusted frequently while traffic data and feature data models are adjusted rarely.
2) Flink-based Real-time Data Warehouse Platform
This section describes the evolution of our real-time data warehouses and the development ideas behind Meituan-Dianping’s real-time data warehouse platform.
Offline Data Warehouse Model
To organize and manage data more effectively, the offline data warehouse model is divided into the operational data store (ODS) layer, data warehouse detail (DWD) layer, data warehouse service (DWS) layer, and application layer from bottom up. Ad hoc queries are implemented through Presto, Hive, and Spark.
Real-time Data Warehouse Model
The real-time data warehouse model is also divided into the ODS layer, DWD layer, DWS layer, and application layer. However, the real-time data warehouse model differs from the offline data warehouse model in processing methods. For example, data at the DWD and DWS layers is stored in Apache Kafka, dimension data is stored in KVStores, such as HBase and Tair, to improve performance, and ad hoc queries can be performed by using Flink.
Quasi-real-time Data Warehouse Model
Service providers can also use a quasi-real-time data warehouse model, which is not completely based on streams. Instead, data at its DWD layer is imported to Online Analytical Processing (OLAP) and then summarized and further processed based on the OLAP computing capability.
Comparison between Real-time Data Warehouses and Offline Data Warehouses
Real-time data warehouses and offline data warehouses can be compared in the following aspects:
- In terms of layering methods, offline data warehouses trade space for time and are divided into many layers for efficiency, while real-time data warehouses are divided into fewer layers for timeliness, which also reduces the probability of intermediate process errors.
- In terms of fact data storage, offline data warehouses store fact data based on HDFS, while real-time data warehouses store fact data based on message queues, such as Message Queue for Apache Kafka.
- In terms of dimension data storage, real-time data warehouses store data in KVStore.
- In terms of data processing, offline data warehouses use batch processing components such as Hive and Spark, while real-time data warehouses use real-time computing engines, such as Apache Storm and Flink, for stream processing.
Comparison between Real-time Data Warehouse Construction Solutions
The following figure compares the construction solutions for quasi-real-time data warehouses and real-time data warehouses. These solutions are implemented based on OLAP engines or stream computing engines and update data within minutes or seconds respectively.
- In terms of scheduling overhead, quasi-real-time data warehouse data is processed in batches, which requires a scheduling system. Scheduling overhead still exists in quasi-real-time data warehouses though it is less than that of offline data warehouses. Real-time data warehouses do not have scheduling overhead.
- In terms of business flexibility, quasi-real-time data warehouses use OLAP engines, which are more flexible than the stream computing of real-time data warehouses.
- In terms of data latency tolerance, quasi-real-time data warehouses can perform full computing on data in a given period, while real-time data warehouses perform incremental computing on data. Therefore, the former has a higher tolerance for data latency.
- In terms of scalability, quasi-real-time data warehouses integrate data computing and storage capabilities, making them less scalable than real-time data warehouses.
- In terms of application scenarios, quasi-real-time data warehouses are used in scenarios with moderate timeliness requirements, small data volumes, complex multi-table association, and frequent business changes, such as real-time analysis of transaction data. Real-time data warehouses are more suitable for scenarios with high timeliness requirements and large data volumes, such as real-time features, traffic distribution, and real-time analysis of traffic data.
In summary, the OLAP engine-based construction method is a compromise to improve timeliness and development efficiency when dealing with relatively small data volumes and low business traffic. Real-time data warehouses that use stream computing engines are more aligned with future development trends.
Through business practices, we discovered that the metadata of different businesses was separated, and business development personnel tended to use SQL statements to develop both offline data warehouses and real-time warehouses, which required more O&M tools. Therefore, we planned an all-in-one solution to streamline the entire process.
This all-in-one solution provides users with a data development platform and a metadata management platform. We provide the OLAP production platform to solve OLAP production problems in modeling methods, production task management, and resources. The following figure shows the data security system, resource system, and data governance on the left, which can be shared by offline data warehouses and real-time warehouses.
Why Use Flink?
Flink is used to construct the real-time data warehouse platform that implements the following functions, which are also the main concerns of the real-time data warehouse platform:
- Status management: Many aggregation and computing operations are performed in real-time data warehouses, which requires status access and management. Flink provides sophisticated status management.
- Expression: Flink provides a wide range of multi-layer APIs, including the Stream API,
Table API, and Flink SQL.
- Ecosystem: Real-time data warehouses are widely used, and users need to access multiple types of storage. Flink also provides complete ecosystem support.
- Batch and stream processing: Flink makes batch and stream processing possible.
Real-time Data Warehouse Platform
The construction of the real-time data warehouse platform is divided into four layers from the outside in. The platform needs to provide users with abstract expression capabilities, including message expression, data expression, computing expression, and stream and batch processing.
Real-time data warehouse platform architecture
The following figure shows the architecture of Meituan-Dianping’s real-time data warehouse platform. From the bottom up, the resource layer and storage layer reuse the capabilities of the real-time computing platform, and the engine layer implements some extended capabilities based on Flink Streaming, including user-defined function (UDF) integration and Connector integration. The SQL layer is extracted based on Flink SQL and is mainly responsible for parsing, verification, and optimization. The platform layer includes the development workbench, metadata, UDF platform, and OLAP platform. The top layer is the application layer, including real-time reports, real-time OLAP, real-time dashboards, and real-time features.
Message expression — data access
The data formats of binary logs, tracking logs, backend logs, and IoT data are inconsistent. Therefore, Meituan-Dianping ‘s real-time data warehouse platform provides a data access process to help users synchronize data to the ODS layer by unifying the messaging protocol and shielding processing details.
The following figure shows an example of the access process on the left. For binary logs, the real-time data warehouse platform also supports database sharding and table sharding so that it can collect the data of different database shards and table shards of the same business in the same ODS table according to business rules.
Computing expression — extended DDL
Meituan-Dianping’s real-time data warehouse platform extends a dynamic-link library (DDL) based on Flink, which aims to build a metadata system and integrate internal mainstream real-time storage, including KVStore data and OLAP data. The development workbench and the metadata system are interconnected. Therefore, many data details do not need to be declared in the DDL. Users only need to declare the data name and some runtime settings, such as consumption from the latest or earliest message in Message Queue (MQ) or from a timestamp. Other data can be accessed in the same way.
Computing expression — UDF platform
A UDF platform involves the following aspects:
- Data security: In the data warehouse construction process described above, users can upload JAR packages to directly reference UDFs, which is risky given the unknown data flow direction. In terms of data security, the platform audits code and analyzes kinship. It can converge components with historical risks or problems.
- UDF quality: The platform manages templates, cases, and tests for users, shields users from the compiling, packaging, and JAR package management processes, and performs metric log tracking and exception handling in the UDF template.
- UDF reuse capabilities: The UDFs developed by a service provider may be used by other service providers, but incompatibility may occur in the upgrade process. Therefore, the platform provides project, function, and version management for businesses.
UDFs are widely used. The UDF platform supports not only real-time data warehouses but also other application scenarios, such as offline data warehouses, machine learning, and query services. The following figure shows UDF use cases on the right. The left shows the UDF development process, in which users only need to pay attention to the registration process, while the platform completes the compiling, packaging, testing, and uploading processes. The right part shows the UDF use process, in which users only need to declare the UDF, while the platform performs parsing and verification, obtains the path, and performs integration upon job submission.
- Real-time data warehouse platform — web IDE
Finally, let’s take a look at the development workbench of the real-time data warehouse platform, which integrates the management of models, jobs, and UDFs in the web integrated development environment (IDE). In the web IDE, users can develop data by using SQL statements. The platform manages some SQL versions and allows users to roll back to the deployed version.
3) Future Development and Considerations
Automatic Resource Optimization
In terms of real-time computing, Meituan-Dianping’s real-time computing platform has thousands of nodes, which may be expanded to tens of thousands of nodes in the future. This poses new needs for resource optimization. A real-time task may require many resources during peak hours but less resources during off-peak hours.
In addition, the peaks can also change, so the allocated resources may be insufficient if the business grows. Therefore, automatic resource optimization is required to adjust the maximum value based on the peak traffic and allow jobs to quickly scale down to automatically adapt to traffic reduction. We can obtain the relational functions of operators, traffic, and resources according to the historical operations of each task and operator to adjust the resources in response to traffic changes.
We also need to consider how to use resources after optimization. To ensure availability, real-time tasks and offline tasks are generally deployed separately. Otherwise, the bandwidth and I/O may be fully occupied by offline computing, causing a delay in real-time tasks. To improve resource usage, real-time tasks and offline tasks can be deployed together, or some tasks with moderate timeliness requirements can be processed in streams. This requires finer-grained resource isolation and faster resource release.
Upgrading the Real-time Data Warehouse Construction Process
The construction of a real-time data warehouse includes the following steps.
- Propose business requirements and then perform modeling, develop business logic, and implement underlying technologies. In its real-time data warehouse construction, Meituan-Dianping aims to achieve unified technology expression so that the business side can focus on developing logic, which then can be implemented automatically based on configurations.
- Implement intelligent modeling based on business requirements to design an automatic modeling solution.
At present, Meituan-Dianping’s approach to real-time data warehouse platform construction still focuses on unified expression. We still have a long way to go to reach this goal.