OPPO’s Use of Flink-based Real-time Data Warehouses

1) Evolution of the OPPO Real-time Data Warehouse

1.1 OPPO’s Businesses and Data Scale

As everyone knows, OPPO is a major smartphone manufacturer. However, you may not know how OPPO is connected to the Internet and big data fields. The following figure shows OPPO’s businesses and data.

1.2 OPPO Data Mid-End

  • The data warehouse is built based on the tool system and is divided into the raw layer, details layer, summary layer, and application layer, which is a typical data warehouse architecture.
  • The upper layer is the panoramic data system. The panoramic data system integrates all business data as unified data assets, such as ID-Mapping and user tags.
  • Eventually, scenario-driven data products and services are required to apply data to businesses.

1.3 Construction of OPPO Offline Data Warehouses

1.4 Demand for Real-time Data Warehouses

1.5 Smooth Migration from Offline Data Warehouses to Real-time Data Warehouses

1.6 Construction of OPPO Real-time Data Warehouses

2) Flink SQL-based Extension

2.1 Why Use Flink SQL?

The following figure shows the basic structure of the Flink framework. The underlying layer is the runtime, which has four core advantages: low latency and high throughput, end-to-end exactly-once, fault-tolerant status management, and Windows time and event time support. Three layers of APIs are abstracted based on runtime, with the SQL API at the top layer.

2.2 Web-based Development IDE

2.3 AthenaX: RESTful SQL Manager

  • The core of metadata management is the injection of external databases and tables into Flink so that they can be identified in SQL. Flink reserves the capability of connecting to external metadata, providing the ExternalCatalog and ExternalCatalogTable abstractions. Then, AthenaX encapsulates a TableCatalog and extends it at the API layer. When submitting an SQL job, AthenaX automatically registers the TableCatalog with Flink, calls the Flink SQL API to compile the SQL into a Flink-executable unit JobGraph, and submits it to YARN to generate a new application.

2.4 Registering Databases and Tables in Flink SQL

First, we need to understand how databases and tables are registered in Flink SQL. The process involves three basic abstractions: TableDescriptor, TableFactory, and TableEnvironment.

2.5 Connection Between Flink SQL and External Data Sources

Based on the process of registering databases and tables in Flink SQL, we can see the following: If a table created with external metadata can be converted into a table that can be recognized by a TableFactory, it can be seamlessly registered with TableEnvironment. Based on this idea, we have connected Flink SQL to the existing Metadata Center, as shown in the following figure.

2.6 Real-time Tables — Dimension Table Association

The platform has the ability to manage metadata and SQL jobs. However, some basic features are lacking, which must be provided before it is available to users. The star schema is required to build a data warehouse. For example, the fact table in the middle records the ad click stream, with dimension tables for users, ads, products, and channels nearby.

2.7 UDF-based Dimension Table Association

For UDF-based implementation, we need to rewrite the original SQL statement into a statement with UDF calls, for example, UserDimFunc. The right side in the preceding figure shows the code for implementation. UserDimFunc inherits

2.8 SQL Conversion-based Dimension Table Association

We need to solve the issues caused by UDF-based implementation so that users do not need to rewrite the original SQL statements and the platform does not need to develop many UDFs. We could add SQL statement parsing and rewriting before transferring an SQL statement to Flink for compilation to automatically associate dimension tables. This type of SQL conversion-based implementation has been shown to work through technical research and proof of concept (POC). The following explains the idea in detail.

3) Cases of Real-time Data Warehouse Creation

The following are several typical cases, which are implemented by using Flink SQL on the platform.

3.1 Real-time ETL Splitting

This is a typical real-time ETL process of splitting large tables into small tables for each business.

3.2 Real-time Metric Statistics

The following case describes how to calculate the click-through rate (CTR) of information flows. The number of exposures is divided by the number of clicks to obtain the CTR, which is then imported to the MySQL database. Then, the result is visualized in the internal report system. The SQL statement uses tumbling windows and subqueries.

3.3 Real-time Tag Import

The following case describes how to import tags in real time. The mobile phone detects the longitude and latitude of the user in real time, converts them into a specific POI, and then imports it to Elasticsearch. Finally, the tag system performs user targeting.

4) Thoughts and Prospects for the Future

This section shares some of our thoughts and plans for future work.

4.1 End-to-end Real-time Stream Processing

What is end-to-end? One end is the collected raw data, and the other end is the presentation and application of data in the form of reports, tags, and APIs. The two ends are connected by real-time streams. Currently, we implement real-time stream processing based on SQL statements. Both the source and target tables are Apache Kafka tables. They are imported to Druid, Elasticsearch, or HBase through Apache Kafka. This design aims to improve the stability and availability of the overall process. Apache Kafka, as the buffer of downstream systems, can prevent downstream system exceptions from affecting real-time stream computing. It is easier to keep one system stable than to keep multiple systems stable. For real-time streams from Apache Kafka to Apache Kafka, the mature exactly-once semantics ensures consistency.

4.2 Kinship Analysis for Real-time Streams

You may understand the importance of kinship analysis if you have used offline data warehouses. It assumes an indispensable role in data governance. This is also true for real-time data warehouses. We want to build an end-to-end kinship relationship, which contains the access channel of the collection system, intermediate real-time tables and jobs, and then the products that consume data. Based on the analysis of the kinship relationship, we can evaluate the application value of the data and calculate the computing costs.

4.3 Integration of Offline and Real-time Data Warehouses

Finally, we are looking into the integration of offline and real-time data warehouses. In the short term, real-time data warehouses cannot replace offline data warehouses, so they will coexist. How can we adapt our tools and systems for offline data warehouses to real-time data warehouses and manage offline and real-time data warehouses together? Theoretically, their data sources are the same, and their upper-layer abstractions are tables and SQL statements. However, they also have differences, such as their time granularities and computing modes. We are exploring ways to transform data tools and products to achieve complete integration.

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store