Implementating a Real-Time Data Warehouse with Flink
By Guo Hua
A data warehouse is a basic service that is required when a company’s data grows to a certain size. Data warehouse construction is an essential part of “data intelligence.” This article describes six aspects of a data warehouse: overview, development path, construction methods, architecture evolution, application, and real-time compared to offline data warehouses.
A data warehouse is a subject-oriented, integrated, non-volatile, and time-variant data collection. It is primarily used to support decision-making.
Data warehouses evolve along with the enterprise informatization process. In this process, existing information tools are quickly upgraded, and new tools are put into use. Data grows in diversified new formats, and decision-making requirements become increasingly stringent. All these have been boosting the development of data warehouse technologies.
- Real-time data warehouses meet real-time and automatic decision-making requirements.
- Big data and data lake support large amounts of complex data types (text, images, videos, and audio).
Data warehouse development involves two parts: construction and application.
- Early data warehouse construction mainly refers to modeling of enterprise business databases, such as ERP, CRM, and SCM. Also, it aggregates data in the data warehouse engine, according to the decision-making and analysis requirements. Data warehouses were mainly used to prepare statements for the management and marketing personnel to make mid- and long-term strategic decisions.
With the development of business and environment, both aspects are undergoing significant changes.
- Now, information technologies are widely used on the Internet and mobile networks, bringing diversified new data sources. Unstructured data types, such as Website logs, IoT device data, and app tracking data, successively come into being on the basis of existing service data. Data volumes of these data types are several orders larger than structured data, posing higher requirements on the extract-transform-load (ETL) process and storage.
- Online services increasingly have high real-time requirements. Adjusting business strategies based on customer behavior from time to time is a common practice in the industry. These strategies include the mid- and long-term strategies and short-term operating procedures of inventory management and marketing management for big promotions. After shifting business focus to the Internet, many companies may have to deal with dramatically increasing concurrent customer requests, which may go beyond the capabilities of human beings. In this case, we may need help from machines, and even want machines to make decisions automatically. For example, machines can help us detect fraud and review user information.
Simply put, we need data warehouses to:
A. Generate results.
B. Process and save large amounts of heterogeneous data in real time.
Note: Data lake technology is beyond the scope of this article.
Data warehouses can be constructed based on a company’s actual business to serve analytic requirements on various topics, such as suppliers, products, customers, and warehouses.
This type of warehouse mainly focuses on data reports and analytic functions, such as data cubes, data rollup, data drilling, data slicing, and data rotation.
Anti-NF Data Model-Oriented
This type of warehouse deals with star data models made up by fact tables and dimension tables.
Note: The image is from 51 CTO.
The concept of a data warehouse was proposed by W.H. Inmon in 1990, who also provided a complete construction approach. The advent of the Internet brought explosively increasing data volumes.
- Offline Big Data Architecture: Big data tools were used to replace conventional tools in classic data warehouses. At this time, only tools were replaced, while the architecture basically remained the same.
- Lambda Architecture: With increasingly high requirements on real-time services, an acceleration layer was added on top of the offline big data architecture, and stream processing technologies were used to directly compute metrics with high real-time requirements.
- Real-time Processing-based Kappa Architecture: More real-time services sprang up, so did event-based data sources. Real-time processing began to play the leading role, and the architecture was adjusted accordingly
Offline Big Data Architecture
Data sources are imported into offline data warehouses. Downstream applications can directly read the data market or add data services, such as MySQL or Redis, based on business needs. A data warehouse is typically divided into three layers:
1 Operational Data Store (ODS): stores raw data.
2 Data Warehouse Detail (DWD): defines a fact and dimension tables with reference to topics and stores most fine-grained fact data.
3 Data Market (DM): known as data warehouse summary of data details based on the DWD layer, according to different business needs.
A typical data warehouse storage service is Hive HDFS, where the ETL can be MapReduce script or Hive SQL.
With the development of big data applications, there is a gradual need for the system to meet some real-time requirements. To compute some real-time metrics, we added a real-time computing chain to the offline data warehouse and restructured the data sources to deal with data streams (send data to message queues). We use a real-time computing service to subscribe to message queues, directly compute the incremental metric data, and push the results to a downstream data service to merge offline and real-time results.
Note: Metrics computed by stream computing are also computed through batch processing, and the batch processing results prevail. In other words, the batch processing results will eventually override stream processing results. This is a compromise when the stream processing engine is imperfect.
Problems with Lambda Architecture
- Need to develop two sets of code to deal with the same requirement: This is the biggest problem of the Lambda architecture. Two sets of code make development difficult (the same requirement must be implemented on both the batch processing and stream processing engines, as well as undergo separate data testing procedures to ensure consistent results), and subsequent maintenance harder. For example, if the requirement changes, we must modify two sets of code, test the results separately, and launch these two jobs simultaneously.
- High resource usage: If the logic is calculated twice, the overall resource usage increases (the part for real-time computing).
Although the Lambda architecture meets the real-time requirements, it requires more development and maintenance effort. The background of the Lambda architecture is that the stream processing engine is not perfect, yet. The stream processing result is inaccurate and used for reference only. Later, some stream processing engines, such as Flink, were developed to improve stream processing results. To solve the problem of two sets of code, Jay Kreps of LinkedIn proposed the Kappa architecture.
- Consider Kappa a simplified version of Lambda (by removing the batch processing part of Lambda).
- In Kappa, requirement changes and historical data reprocessing are implemented in the upper stream.
- The biggest problem with the Kappa architecture is that stream processing throughput of historical data is lower than batch processing. However, this can be fixed by increasing computing resources.
Reprocessing Procedure of Kappa Architecture
Reprocessing is the biggest concern with Kappa architecture. However, it is not complex:
- Select a message queue service that supports replay, historical data storage, multiple consumers, and setting the historical data storage duration as needed. For example, Kafka can save all historical data.
- When one or several metrics need to be reprocessed, write a job according to the new logic to consume the upstream message queue from the earliest offset, and then write the results to a new downstream table.
- When the new job catches up with the progress, switch data source of the app to read the new result table generated in the previous step.
- Stop the old job, and delete the old result table.
Comparison: Lambda vs. Kappa
1 In real-life scenarios, we do not usually use pure Lambda or Kappa architectures. In fact, we use hybrid architecture in most cases. For example, we use Kappa to compute the most real-time metrics and use Lambda to recompute a few key metrics (for example, money related) to verify the stream computing results.
2 Now, the intermediate results of the Kappa architecture can be used for machine learning (offline training) by many big data systems. Therefore, the intermediate real-time processing results need to be stored in corresponding storage engines for use in machine learning. Sometimes, we may also need to query for the DWD data. In this case, we need to write the DWD data to a corresponding engine. For details, refer to subsequent examples.
3 In addition, with diversified data development, it is increasingly difficult for data warehouses that define a schema in advance to meet flexible exploration and analytic requirements. Data lake technology attempts to solve that problem. In a data lake architecture, all raw data is cached in a big data storage system, which supports parsing raw data, as needed for future data analysis. Simply put, a data warehouse works in the schema-on-write mode, and a data lake works in the schema-on-read mode.
In this section, I describe the Cainiao real-time data warehouse shared by the Cainiao warehousing and distribution team. It describes the following aspects: overall design, data model, and data assurance.
Note: Thanks to Yuan Qiao for his generous contribution.
The following diagram shows the overall design of the Cainiao data warehouse. Based on business system data, Cainiao adopts the middle-layer concept in data model design to build a real-time data warehouse for product warehousing and distribution. Cainiao uses Flink, a simple-to-use real-time computing engine with excellent performance, as the primary computing engine. With regards to data service, Cainiao uses Tiangong data service middleware to avoid a direct connection to the database. In addition, the Tiangong data service middleware supports the flexible configuration and switches between primary/secondary databases in seconds. In terms of data application, Cainiao builds a comprehensive data system for product warehousing and distribution during big promotions covering five dimensions: planning, inventory preparation, real-time tracking, after sales service, and event reproduction.
To construct a real-time data warehouse for product warehousing and distribution, use the middleware-based method and avoid IT silos, to save computation costs, simplify usage, improve reusability, and ensure consistency. We can divide a real-time middleware-based data warehouse into two layers.
- Layer 1: DWD
The real-time computing system subscribes to message queues of business data. Then, it associates dimensional attributes in business systems and dimension tables with the same granularity through several procedures, such as data cleansing, multi-data source joining, streaming data computing, and offline dimension information processing, to simplify data usage and improve data reusability. As a result, the final real-time detail data is produced. This data is divided to two branches. The first branch directly goes to the ADS for real-time detail queries, and the second branch goes to message queues for underlying computing.
- Layer 2: DWS
A real-time summary layer is built on the concept of data domain + business domain. Unlike the offline data warehouse, the summary layer is composed of a light summary layer and a deep summary layer, which generates output at the same time. Output of the light summary layer is written to ADS for complex OLAP queries by front-end products, such self-service analytic tools and data report generation tools. Output of the deep summary layer is written to HBase for simple KV queries by front-end products to improve query performance, for example, the real-time dashboard.
- ADS is an engine that provides OLAP analytic services. There are also some other open-source options, such as Elasticsearch, Kylin, and Druid, that provide similar functions.
- In this case, data is written to HBase for KV queries, but you can choose other engines based on your situation. For example, if you do not have a big data volume or heavy query pressure, you can use MySQL.
- Topic-oriented modeling is closely related to business and is not described in detail.
Alibaba organizes a variety of big promotions, such as the 11.11 Global Shopping Festival. Traffic and data volume increase dramatically during these big promotions. In real-time data processing, a real-time data warehouse is more sensitive to data volume and has higher stability requirements than offline systems. To address requirements, two preparations are required:
- System stress testing before big promotions.
- Primary/secondary service chain guarantee during big promotions.
Comparison: Real-Time vs. Offline Data Warehouses
After reading the previous sections and taking a closer look at the Cainiao data warehouse, you may want to know the differences and similarities between a real-time and offline data warehouse.
- First, architecture. A real-time data warehouse is built on the Kappa architecture, which makes it distinctly different from an offline data warehouse that mainly uses a conventional big data architecture. Lambda architecture can be considered an intermediate state.
- Second, construction method. Both the real-time and offline data warehouses follow the conventional data warehouse topic modeling theory and generate wide fact tables.
Note: JOIN of real-time stream data in real-time data warehouses has hidden time semantics.
- At last, data guarantee. To ensure real-time data processing, real-time data warehouses are very sensitive to data volume changes. We must do stress testing and arrange primary/secondary guarantees for real-time data warehouses before big promotions. This is another major difference between real-time and offline data warehouses.