Architecture Evolution and Practices of the Xiaomi Streaming Platform
Download the “Real Time is the Future — Apache Flink Best Practices in 2020” whitepaper to learn about Flink’s development and evolution in the past year, as well as Alibaba’s contributions to the Flink community.
By Xia Jun, Head of Xiaomi Streaming Platform, Senior R&D Engineer
Xiaomi’s business lines cover a wide range of fields, from information streams to E-commerce, advertising, finance, and more. The Xiaomi Streaming Platform provides an integrated streaming data solution for all the businesses of Xiaomi Group. This platform consists of three modules: data collection, data integration, and stream computing. Currently, the platform processes 1.2 trillion data records, 15,000 real-time synchronization tasks, and 1 trillion real-time computing data records per day.
As Xiaomi’s business has grown, the Xiaomi Streaming Platform has undergone three major upgrades to meet various requirements for a large number of services. In particular, in the latest iteration, the internal modules of the Streaming Platform were completely restructured based on Apache Flink. In addition, Xiaomi’s businesses are being gradually migrated from Spark Streaming to Flink.
- Development History of the Xiaomi Streaming Platform
- Flink-based Real-time Data Warehouse
- Future Plans
The vision of the Xiaomi Streaming Platform is to provide an integrated and platform-based solution for streaming data in all Xiaomi business lines. Specifically, this solution provides the following major features:
- Streaming data storage: Streaming data storage refers to message queues. Xiaomi has developed its own message queue, which is similar to Apache Kafka but has its own characteristics. The Xiaomi Streaming Platform provides storage for message queues.
- Streaming data access and dumping: After providing message queues to cache streaming data, we need to provide access to and dump the streaming data.
Streaming data processing: The platform processes streaming data based on computing engines such as Flink, Spark Streaming, and Storm.
The following figure shows the overall architecture of the Xiaomi Streaming Platform. The orange items in the leftmost column are data sources, which can be further divided into data sources from users (User for short) and data sources from databases (Database for short).
- Data sources from users include all kinds of event data collected from users, such as users’ application logs and WebServer logs. Data sources from databases include the data from various databases such as MySQL, HBase, and other RDS databases.
- The blue items in the middle shows the specific content of the Xiaomi Streaming Platform. Specifically, Talos is a message queue implemented by Xiaomi, and consumer SDKs and producer SDKs are provided at the upper layer of the message queue.
- A complete set of Talos Source is provided at the lower layer of the message queue. Talos Source is mainly used to collect the preceding user and database data in all scenarios.
Talos Sink and Talos Source are combined into a data streaming service that dumps the data of Talos to other systems at very low latency. Sink is a standardized service, but it is not highly customized. In the future, we will rebuild the Talos Sink module based on Flink SQL.
The following figure shows the business scale of Xiaomi. Xiaomi stores approximately 1.2 trillion messages per day, with traffic peaks reaching 43 million messages per second. Talos Sink alone dumps 1.6 petabytes (PB) of data per day and currently contains nearly 15,000 dumping jobs. More than 800 stream computing jobs and more than 200 Flink jobs are processed per day. Flink can process up to 700 billion messages with over 1 PB of data per day.
Development History of the Xiaomi Streaming Platform
The development history of the Xiaomi Streaming Platform can be divided into three stages:
- Streaming Platform 1.0: Xiaomi Streaming Platform 1.0 was built in 2010 initially based on Scribe, Kafka, and Storm. Scribe is a set of solutions for data collection and dumping.
- Streaming Platform 2.0: Due to various problems in version 1.0, we developed Xiaomi’s proprietary message queue Talos, including Talos Source and Talos Sink, and connected the platform to Spark Streaming.
- Streaming Platform 3.0: This version adds support for schemas and introduces Flink and Stream SQL.
Overall, Streaming Platform 1.0 is a cascaded service. It provides multi-level cascaded Scribe Agents and Scribe Servers at the front to collect data. This allows it to meet the requirements for offline computing and real-time computing. HDFS and Hive are used for offline computing, while Kafka and Storm are used for real-time computing. Such a combination of offline computing and real-time computing could meet the business needs of Xiaomi at that time. However, there were also many problems.
Excessive Scribe Agents were added, while the platform lacked configuration and packet management mechanisms. As a result, maintenance costs were high. In the push-based architecture used in Scribe, when an exception occurs, data cannot be effectively cached, and HDFS data and Kafka data interfere with each other. When the final cascaded data pipeline is relatively long, the data in the entire pipeline is completely obscured. Even worse, the platform lacks monitoring and data validation mechanisms.
To resolve the problems with Streaming Platform 1.0, Xiaomi released Streaming Platform 2.0. In this version, Talos was introduced and used as a data cache to store streaming data. The original cascaded architecture was changed to a star architecture, with various data sources on the left and various sinks on the right. This architecture features flexible scaling.
The architecture still contains a large number of agents that manage tens of thousands of data streams. What’s new is that this version implemented a configuration and packet management system to allow the agents to automatically update or restart after configuration. This version implemented a decentralized configuration service. After being configured, configuration files can be automatically distributed to distributed nodes. This version implemented end-to-end data monitoring. Through event tracking, the platform can monitor data loss and latency throughout the entire pipeline.
Streaming Platform 2.0 has the following advantages:
- Multi-Source and Multi-Sink: Previously, to transfer data between two systems, you had to directly connect the two systems. The current architecture reduces the complexity of system integration from the original O (M * N) to O (M + N).
- Configuration and packet management mechanisms: This feature significantly reduces the pressure of operations such as system upgrades, modifications, and releases, on the O&M team.
- End-to-end data monitoring mechanism: The new version implements full-pipeline data monitoring and quantifies the data quality throughout the pipeline.
- Product-based solutions: This feature avoids redundant building and resolves O&M issues.
The following figure shows how to synchronize data from a MySQL table to Talos by using the preceding mechanisms. The specific process is as follows: The Binlog service serves as a secondary node in MySQL and sends a dump binlog request to MySQL. After receiving the dump binlog request, MySQL starts to push binary logs to the Binlog service. The Binlog service dumps the binary logs to Talos in a strictly ordered manner. Then, a Spark Streaming job is connected to Talos to parse the binary logs and write the parsed results to a Kudu table. Currently, the platform can write more than 3000 tables into Kudu.
The following figure shows the functional modules of Agent Source. Agent Source supports the RPC and HTTP protocols. It can listen to local files through File and cache data both in the memory and disk to ensure high data reliability. The platform implemented Logger Appender and RPC SDKs based on the RPC protocol, implemented HttpClient based on the HTTP protocol, implemented File Watcher to automatically discover and scan local files, and implemented Offset Manager to automatically record offsets. The Agent’s mechanisms are deeply integrated with the Kubernetes environment, and therefore can be easily integrated with background stream computing.
The following figure shows the logical flowchart of Talos Sink, which implements a series of processes based on Spark Streaming. On the left side is a set of partitions of a Talos topic. The common logic is obtained through abstraction based on each batch, such as startProcessBatch() and stopProcessBatch(). Different Talos Sinks only need to implement their write logic and are independent as different jobs to avoid mutual interference. Talos Sinks are optimized based on Spark Streaming to dynamically schedule resources based on topic traffic. This minimizes resource usage while ensuring low system latency.
The following figure shows the end-to-end data monitoring mechanism implemented on the platform. The specific implementation is as follows: Each message has one timestamp named EventTime, which indicates the time when the message is actually generated. The timeline is divided into time windows by EventTime, and each time window is 1 minute long. The number of messages received in the current time window is counted at each hop of data transfer. Lastly, the integrity of the messages is measured. Latency is the difference between the ProcessTime and EventTime of a specific hop.
Streaming Platform 2.0 has the following major problems:
- Talos data was not managed by any schema. As a result, Talos does not understand the imported data and therefore cannot consume the data through SQL.
- The Talos Sink module does not support custom requirements. For example, when a piece of data containing 10 fields in Talos needs to be transferred to Kudu, which only needs 5 of the fields, Talos Sink cannot meet the requirement.
- Spark Streaming does not support EventTime and end-to-end Exactly Once semantics.
Flink-based Real-time Data Warehouse
To solve the preceding problems in Streaming Platform 2.0, Xiaomi conducted a lot of research and discussed and exchanged ideas with the Alibaba Real-time Compute team. Ultimately, Xiaomi decided to apply Flink to optimize the current process on the platform. The following describes the practices of the Flink-based Xiaomi Stream Computing Platform.
The design concepts that provide guidance on applying Flink to optimize the platform are as follows:
- Support schemas throughout the entire pipeline. The pipeline covers not only the Talos to Flink phase, but the entire process from initial data collection to final computing. In this case, data consistency checks must be implemented to avoid data corruption. In addition, field change and compatibility checks are also required. Especially, in big data scenarios, schemas change frequently, so compatibility checks are essential. Based on our experience with Kafka, we introduced forward, backward, and full compatibility check mechanisms to the schema.
- Comprehensively advance the application of Flink in Xiaomi with the help of the Flink community. We gradually migrate real-time computing jobs for streaming data from Spark and Storm to Flink, ensuring the original latency and resource savings. At present, Xiaomi has run more than 200 Flink jobs. In addition, we want to use Flink to optimize the sinking process, so as to support extract, transform, and load (ETL) while improving the runtime efficiency. This provides a foundation from which we can promote Streaming SQL.
- Implement streaming as a product. To do so, we introduced streaming jobs and streaming SQL statements to manage the platform.
- Improve Talos Sink based on Flink SQL to support custom business logic.
The following figure shows the architecture of Streaming Platform 3.0. This architecture is similar to that of Streaming Platform 2.0, but is expressed from a different perspective. Specifically, it includes the following modules:
- Abstract table: In this version, all types of storage systems, such as MySQL and Hive, are abstracted into tables for convenient conversion to SQL.
- Job management: This version supports the common management of streaming jobs, including support for multiple versions, configuration and JAR separation, compilation, deployment, and job status management.
- SQL management: SQL statements ultimately need to be converted into data streaming jobs. This management module includes support for web-based integrated development (IDE), schema exploration, user-defined function (UDF) and dimension table joins, SQL compilation, automatic DDL creation, and SQL storage.
- Talos Sink: This module reconstructs the sink in Streaming Platform 2.0 based on SQL management. It mainly includes quick table creation, automatic update of sink formats, field mapping, job merging, simple SQL, and configuration management. In the preceding scenario, a message is read from Talos based on Spark Streaming and transparently forwarded to HDFS for offline data warehousing analysis. At this point, the message can be directly expressed by using an SQL statement. In the future, we want to integrate this module with other Xiaomi proprietary systems, such as Elasticsearch and Kudu. In a specific scenario, assuming that a Talos schema is available, Talos Sink automatically creates Kudu tables for users based on the Talos schema.
- Platform: This version provides users with integrated platform solutions, including debugging and development, monitoring, and O&M.
This module manages the lifecycle, permissions, and tags of a job, displays the running history of a job to facilitate tracing, monitors the status and latency of a job, and automatically recovers failed jobs.
This module performs the following processes:
- Converts an external table to an SQL DDL statement, which corresponds to the standard DDL statement in Flink 1.9 and mainly includes the table schema, table format, and connector properties.
- Adds SQL statements based on a fully defined external SQL table to allow users to express their requirements. That means SQL Config indicates the complete expected expressions of users. It consists of Source Table DDL, Sink Table DDL, and SQL DML statements.
- Converts SQL Config to Job Config, which indicates a streaming job.
- Converts Job Config to Job Graph to submit a Flink job.
The following figure shows the procedure for converting an external table to an SQL DDL statement.
- Obtains the table schema and table format from the external table. The table format is used to parse data, such as to deserialize the Hive data.
- Generates the default connector configurations in the background. These configurations are classified into the following categories: configurations with fixed values, configurations with default values, and configurations to be specified by users.
Assume that a Talos component is to be consumed. In this case, the connector type must be Talos. Therefore, this configuration does not need to be modified and this configuration is a configuration with a fixed value. In addition, the default value of the consumption sequence indicates that consumption starts from the header of a topic, but you can change the value to start consumption from the tail of a topic. This configuration is a configuration with a default value. Permission-related information must be specified by users. These setting belong to the third type of configuration.
The three-level configuration management aims to minimize the complexity of configurations. The table schema, table format, and other configuration information of the connector constitute the SQL DDL statement. After SQL Config is returned to the user, the user needs to specify the required configurations so that the external table can be converted to an SQL DDL statement. The content in red is information to be modified by the user.
The SQL management module introduces the External Table feature. For example, when you choose to consume a topic of messages on the platform, this feature automatically obtains the schema and format information of the table described above and removes the logic used for registering Flink Table. When obtaining the schema, the feature automatically converts the field types for External Table to those for Flink Table and automatically registers the table as a Flink table. In addition, parameters for connector properties are divided into three categories. Default values are provided for these parameters, and only required parameters must be specified by users. All these parameters are expressed in the form of
Map<string,string>, which can be easily converted to Table Descriptor in Flink subsequently.
The preceding section describes how to create an SQL DDL statement. Based on the created SQL DDL statement, such as a Source SQL DDL or Sink SQL DDL statement, you must enter an SQL query and return it to the backend. The backend verifies the SQL query and then generates an SQL Config, which is a complete expression of an SQL statement.
The following figure shows the procedure for converting an SQL Config to a Job Config.
- Add the resources required for the job and the configurations (such as the state parameter in Flink) of the job based on the SQL Config.
- Compile the SQL Config into a Job Descriptor, which is the description of Job Config. For example, the Job Descriptor may include the JAR package address, MainClass, and MainArgs of the job.
The following figure shows the procedure for converting a Job Config to a Job Graph. The table schema, table format, and connector properties in the SQL DDL statement map to Table Descriptors in Flink. In this case, you only need to call relevant built-in API operations in Flink to convert the information to Table Descriptors, for example, CreateTableSource() and RegistorTableSource(). In this way, the SQL DDL statement can be registered with the Flink system for direct use. To convert an SQL statement, you can use sqlUpdate() of TableEnv.
The following table shows the procedure for converting an SQL Config to a template job. The previously entered JAR package address is the address of the JAR package for the template, and MainClass is the template job. Assume that you have created an SQL DDL statement. You can directly convert the SQL DDL statement to a Table Descriptor and then obtain a Table Source by using the findAndCreateTableSource() method of TableFactorUtil. This is similar to the conversion to Table Sink. After completing the first two steps, you can run the sqlUpdate() operation to convert the SQL job to the final executable Job Graph, which can then be submitted to the cluster to be run.
Talos Sink stores data in three modes, as shown in the following figure:
- Row: The Talos data is ingested into the target system as is. This mode features high efficiency because the data does not need to be serialized or deserialized during read and write operations.
- ID mapping: The fields on the left map to the fields on the right. For example, the name field corresponds to the field_name field, the timestamp field corresponds to the timestamp field, and the region field is discarded.
- SQL: SQL expressions are used to indicate the logical processing of the fields.
In the future, we plan to pursue the following goals for the Xiaomi Streaming Platform:
- Continue to promote streaming jobs and optimize the platform when implementing Flink.
- Use Flink SQL to unify offline data warehouses and real-time warehouses.
- Analyze and display data lineages based on the schema during data governance.
- Participate in building the Flink community.