Download the “Real Time is the Future — Apache Flink Best Practices in 2020” whitepaper to learn about Flink’s development and evolution in the past year, as well as Alibaba’s contributions to the Flink community.
By Zheng Zhisheng, Director of the Real-time Big Data Platform at Bilibili
This article introduces the architecture and practices of the Bilibili’s Saber real-time computing platform by considering the pain points of real-time computing. Based on the Flink engine, Saber provides unified external support to manage metadata, lineage, and permissions and maintain jobs. The upper layer of the platform is based on the in-house Saber-BSQL layer, which greatly streamlines the construction of instance streams and resolves three major data pre-processing issues: stream-stream joins (SJoin), stream-dimension-table joins (DJoin), and real-time streaming features.
This article is divided into four parts:
- Pain Points of Real-time Computing
- Evolution of the Saber Platform
- Case Studies of AI
- Future Development and Thinking
1) Background — Pain Points of Real-time Computing
1.1 Pain Points
Each business department needs to perform real-time computing during business R&D. In the early days, it was difficult to develop services without the support of a platform system. Due to the different language types and systems used in different business departments, it was difficult to manage and maintain services. In addition, Bilibili has many business intelligence (BI) analysis tasks, such as user growth and channel distribution analysis. The real-time data in the real-time data warehouses also needs to be cleansed. As a content-oriented video website, Bilibili requires real-time computing in AI recommendation scenarios.
1.2 Common Features of the Pain Points
- High development threshold: Environment configuration and language programming must be taken into account during the development of a real-time engine based on the underlying layer. Data reliability and code quality must be considered during the coding process. It is also difficult to choose among the diverse real-time engines in the market.
- High site reliability engineering (SRE) costs: The high SRE costs are mainly caused by two reasons. One is the poor job stability. In the early days, we used Spark clusters and YARN clusters, which had low job stability and made it difficult to manage fault tolerance. The other is the lack of a unified monitoring and alert system. As a result, business teams need to repeat their development work, such as latency calculation, traffic interruption, data fluctuation, and failover.
- Difficult real-time AI engineering: The recommendation section on the homepage of the Bilibili application depends on the support of the AI system. In the early stage, we encountered many AI machine learning problems. Machine learning is a system that involves both algorithms and engineering. Engineering focuses on efficiency and code reuse, while algorithms focus more on feature extraction and model output. In fact, the AI team has to undertake a lot of engineering work, which restricts them from carrying out experiments to a certain extent. In addition, the language system and the framework system for the AI team are quite different, and therefore, engineering is the infrastructure system. To speed up the AI process and reduce engineering work of algorithm engineers, we must improve infrastructure.
1.3 Apache Flink-based Stream Computing Platform
To solve the preceding problems, Bilibili hopes to build an Apache Flink-based stream computing platform to meet the following requirements:
- SQL-based programming. Bilibili provides extensions to SQL to develop BSQL. BSQL extends the SQL syntax layer, which is the upper layer of the underlying SQL in Flink.
- Directed acyclic graph (DAG) drag-and-drop programming. In this way, users can build their own pipelines through the dashboard and use native Java archives (JAR) for encoding.
- Integrated and managed job SRE.
Bilibili’s stream computing platform mainly covers the following scenarios:
- AI engineering. The platform provides stream joiners and dimension table joiners for advertising, searches, and recommendations.
- Support for real-time computing features. The platform can monitor the quality of players and content delivery networks (CDNs). Monitoring metrics include the quality of live streaming, the number of peak concurrent users (PCUs), the lag rate, and the CDN quality.
- User growth. Based on real-time computing, the platform can analyze channels and adjust channel delivery performance.
- Real-time ETL. The platform provides the real-time boss view, real-time big screen, and dashboard.
2) Evolution of the Saber Platform
2.1 Platform Architecture
The real-time platform consists of real-time transfer and real-time computing. The platform manages metadata, lineages, and permissions and maintains jobs in a unified manner at the underlying layer. Real-time transfer refers to the transfer of data to the big data system. Based on BSQL, real-time computing provides support for various scenarios.
As shown in the following figure, application logs, binary database logs, server logs, and system logs are transferred in real time. Bilibili’s internal Lancer system stores data in Kafka or HDFS. The computing system mainly builds a BSQL set based on Saber. It schedules and manages data based on YARN at the underlying layer, and builds a runtime pool based on Flink at the core of the upper layer. At the layer above the runtime pool layer, the computing system meets the needs of multiple dimension table scenarios by providing MySQL, Redis, and HBase. In the state part, the computing system extends MapDB and Redis in addition to RocksDB. It is very frustrating that I/O-intensive resources are required in Flink. Flink’s resource scheduling system involves both the memory and CPU resources, while the I/O resource units are not managed in a unified manner. When a job has a high demand for I/O resources, the system needs to allocate many resources measured in units of CPU or memory. However, the resources may not be able to meet the I/O extension requirements. Therefore, Bilibili currently transfers the state of I/O-intensive resources to Redis to alleviate the problem. After being computed based on BSQL, the data is transferred to real-time data warehouses, such as Kafka, HBase, ES, and MySQLTiDB. Lastly, the data is transferred to the AI or BI, report, and log center.
2.2 Development Architecture Design
2.2.1 Development Architecture
The following figure shows the development architecture on the left side. The uppermost layer is Saber-Streamer, which commits jobs and manages APIs. The layer below that is the BSQL layer, which mainly extends and parses SQL by using custom operators and common operators. The next layer is runtimes, followed by the engine layer. Saber-Runtimes mainly manages the upper and lower layers of jobs at the engine layer. At first, Bilibili used the Spark Streaming engine. In later stages, Bilibili extended Flink and reserved an extension to the engine layer in the development architecture. The bottom-most layer is the state storage layer. On the right is the metrics monitoring module.
2.2.2 Platform Design Standards
During the design of the Saber platform system, the team focused on its boundaries, specifications, and standards, covering the following key points: abstraction of streaming workflows, data standardization for ensuring schema integrity, general-purpose BSQL parser layer, and engineering efficiency.
- Streaming workflows: The following figure shows a stream computing abstraction model. By its nature, the big data computing engine is meant to calculate the input data through a function and then output the data. Therefore, the function is essentially a Transform that converts the data to a DAG. The abstraction of stream computing on the Saber platform provides a Source, a Transform that converts the data to a DAG in the computing process, and a Sink that outputs the results. Semantic standards are standardized in the preceding abstraction process. This means that standards are provided to regulate the input and output. The underlying layer commits a job expressed in JSON format, even if no job commission UI is available.
- Data abstraction: Make the data speak. The data that enters the computing process is reported through data integration. The data integration system reports data through a unified platform portal. You first need to build an input data source on the platform. After you select a data source, the platform can distribute the data source to Kafka, HBase, and Hive and you must define schemas as instructed during distribution. Therefore, you can easily manage schemas in the input language during data integration. During computing, you can select an input source, such as an HBase table or a Kafka table. In this case, the schema is strongly constrained. You can output result tables or metrics by using BSQL or DAGs provided by the platform.
- General design of BSQL: BSQL complies with the design concept of streaming workflows. The core work of BSQL involves Source, Transform, and Sink. The Transform mainly depends on Flink SQL, so BSQL tends to package data on the Source and the Sink. In addition, BSQL supports DDL packaging. The DDL has been extended according to external documentation provided by Alibaba Cloud. In addition, we optimized BSQL for the computing process. For example, to resolve data skew in operator computing, we use bucket- and hash-based policies to cleanse the data, and use Redis HyperLogLog in non-precision computing of distinct counts.
- BSQL parser model: The following figure shows the topology of the BSQL parser model. When you submit an SQL statement, the model converts the SQL statement into an SqlTree. Then, you can obtain SqlNodes, which contain a lot of metadata. Lastly, TableParsers are implemented based on the SqlTree, and different SqlNodes are converted into the corresponding Flink Streamers to map the data.
- BSQL execution process: After you submit SQL statements, the BSQL first verifies the SQL statements and builds an SqlTree. Specifically, the BSQL extracts table names and field information and then obtains the schemas from the metadatabase to verify the compliance, integrity, and validity of the SQL statements. If the verification is successful, the BSQL registers the input table and the result table to the Flink Runtimes, completes watermark information, and loads the UDF. In addition, the platform provides some extensions to SQL. The third part is the core of the extension. The BSQL converts the subtrees extended from the SqlTree into new nodes, and commits the DAGs of the SQL statements to Flink for running.
- Result display — DAG: As shown in the following figure, the results are shown in a DAG, including parallelism design, logs, monitoring metrics, and alerts.
- Result display — BSQL: You can write SQL statements based on the schema from the input source of the selected table. Lastly, you can select the corresponding UDF to submit the SQL statement to the corresponding cluster.
- Result display — job debugging: The following figure shows the job debugging process supported by the platform. It would not be user friendly if SQL development was provided without job debugging. Therefore, the platform allows you to run job debugging SQL statements to debug jobs by uploading files and performing online sampling.
- Result display — job SRE: The platform provides some monitoring metrics. Users can define extended metrics and custom metrics for some special SQL statements implemented by Bilibili. The following figure shows the running status of some queues.
3) Case Studies with AI
3.1 Current Status of AI-based Machine Learning
The AI system includes offline and online processes. In the online training process, A/B experiments are conducted based on traffic, and then recommendations are provided based on the results of different experiments. In addition, each experiment must be pushed online through a corresponding model. The pain points of AI are concentrated in offline training, which is performed in streaming mode. The following figure shows the initial approach to streaming-based offline training. To generate a real-time label stream, you need to create a real-time stream-stream join. In addition, to generate a real-time instance stream, you need to create a join between a stream and a dimension table and then join them with feature information. However, in the early days, related engineering services had single points of failure, and the SRE costs necessary to ensure service quality and stability were also high. As a result, the AI investment during early pipeline building was high.
3.2 Disadvantages and Pain Points
- Data timeliness: The data timeliness cannot be guaranteed. A great deal of data is computed offline, while many features require high timeliness.
- Engineering quality: Single-point engineering does not facilitate service extension and stability assurance.
- Engineering efficiency: In each experiment, you must produce labels, compute features, and splice instance streams, which is a high barrier to entry. The algorithm team undertakes the engineering work to provide recommendations for different business lines in different scenarios. However, the languages they are familiar with are different from the engineering languages, which leads to language chaos in engineering. In addition, streams and batches are inconsistent, and, as a result, model training in the real-time streaming environment differs greatly from that in the offline batching environment. This causes a double increase in labor investment because the logic of online model training is actually similar to that of offline model training.
3.3 Engineering-based Model Training
We build a data computing pipeline based on the Saber-BSQL and the Flink engine to greatly streamline the creation of instance streams. The key to this pipeline is to resolve the following problems: SJoin, DJoin, and real-time features.
- SJoin — engineering background: First, the traffic volume is high, such as the traffic generated for recommendations on the Bilibili homepage. AI-based display and click joins come from the number of clicks and the number of displays of the whole site. Second, in addition to two-stream joins, three or more streams can also be joined, such as ad display streams, click streams, and query streams. Third, different join operations use different ETL cleansing methods. If users are unable to use SQL statements, you need to provide general extensions for users, so that they can customize ETL cleansing methods for different services before join operations. Fourth, the atypical A Left Join B on time-based window model is used. After primary stream A is joined successfully in the window time, it needs to wait until the window time ends before outputting the data, which lengthens the stay time of primary stream A in the window. This scenario is crucial. In Bilibili, similar scenarios are required for advertising, AI, search, and live streaming. AI-based machine learning requires even positive and negative samples to ensure the training results. Therefore, the fourth problem urgently needs to be resolved.
- SJoin — engineering scale: We provide joiners for real-time online recommendation. Originally, the QPS of feed streams and click streams were 150,000 and 20,000 respectively at peak times. After joiners are used, the QPS of the output streams reaches 100,000 at peak time, and the output bytes reach 200 MB/s at peak time. The number of Keyed State queries per second is maintained at 600,000 at peak times, including those in read, write, and exist states. In a one-hour window, the number of Keyed State entries stored on the Timer reaches 150,000 x 3,600 = 5.4 billion, and the number of bytes in RocksDB state reaches 200 MB x 3,600 = 700 GB. In practice, native Flink can cause many performance problems at this scale. For example, the early Flink 1.3.x versions have poor stability in this scale.
- SJoin — technical pain point: The following figure shows the internal topology of Flink when WindowOperator is used. Each record is one window that calls WindowOperator. The first problem is that the number of distributed windows is huge, and the QPS and the number of distributed windows are basically the same. The second problem is that every record of the Timer Service opens one window, which was a memory queue in the early native Flink. There are also many problems in the memory queue. In the early days, the underlying queue adopted a single-thread mechanism. As a result, the data cached in memory had many problems.
Briefly, the technical pain points are as follows: First, the Timer has poor performance and consumes a large amount of memory. Second, traffic jitter occurs when the RocksDB ValueState compacts data. Similar to the scenario in HBase, multi-level compaction can cause performance jitter and write amplification. Third, when the restart traffic is too high, the recovery cycles of a window and the Keyed State are uncontrollable because Timer had only memory queues in the early stage. It takes a long time to load a large amount of data from the disk. Therefore, service recovery is time-consuming.
- SJoin — optimization ideas: First, optimize and upgrade the Timer. No better solution was available in the community. At that time, Bilibili tried to independently develop PersistentTimerManager. Later, Bilibili upgraded Flink and used the RocksDB-based Timer. Second, enable Redis as the ValueState to improve the stability of the state. Third, extend the SQL syntax to support the atypical A Left Join B on time-based window scenario.
- SJoin optimization — in-house Timer: The memory data is spilled to the disk when the maximum memory is exceeded. The underlying layer uses MapDB for the spill to disk. The spill to disk theory is based on the LSM model, which also has data jitter. When the window is 1 hour, the state manages the data of each 1 hour. As shown on the right side of the following figure, in the hour from 0:00 to 1:00, data is only written because the data is only output 1 hour later. In the hour from 1:00 to 2:00, the data will be written into the new state. The state for the period from 0:00 to 1:00 has reached the window time and therefore spills the data. The in-house Timer can resolve data read and write and jitter problems. However, the in-house Timer lacks the checkpoint mechanism. If a disk failure occurs on a node, the data in the state will be lost.
- SJoin optimization — RocksDBTimer: We upgraded Flink to introduce the RocksDB-based Timer. The following figure shows the upgraded architecture. The data obtained from Topic-Feed and Topic-Click in Kafka is first cleansed and then output to the custom Joiner Operator. The Joiner Operator first spills the primary stream data to Redis, which serves as the state, and then stores and registers the to-be-windowed keys to the TimerService. Next, the native checkpoints of the TimerService start the incremental checkpoint process. When OnTimer reaches the time, the TimerService spills the data. This solution meets the SJoin requirements for high-throughput jobs.
- SJoin optimization — introduction of KVStore: The native state of Flink cannot meet our requirements because severe jitter occurs when the values and I/O requirements are high. Actually, the RocksDB state also has jitter problems. To address this, Bilibili has tried many improvement solutions. For example, the data volume of a 1-hour window is about 700 GB, and the total traffic of a double-stream 1-hour window can reach the terabyte range. If the distributed KVStore is used for storage, the data volume is about 700 GB after the data is compacted.
- SJoin optimization — extended SQL syntax: The purpose of extending the SQL syntax is to provide the 1-hour wait window for display streams. When a click stream arrives, the window does not immediately spill the joined data, but waits until the window ends. Although the extended SQL syntax is not currently a widely used approach, it can meet the AI needs of many departments. The supported SQL syntax is as follows: Select * from A left(global)$time window and $time delay join B on A.xx=B.xx where A.xx=xx. This brings great benefits to users.
SQL syntax extension focuses on two key points. At the top definition layer of the SQL syntax, Calcite is used to extend JoinType. First, the SQL statement is expanded into an SqlTree. A node in the SqlTree is as follows: left(global)$time window and $time delay join. This subtree is extracted and a logical conversion rule is customized for it. In this example, StreamingJoinRute is defined to convert the subtree into a new node. The async I/O capability provided by Flink converts the asynchronous subtree into a Streaming Table and registers it with the Flink environment. In this way, SQL syntax can be supported.
- DJoin — engineering background: Bilibili has different requirements for dimension table data. For example, some dimension tables contain terabytes of data. It is a waste to use Redis to store this data. However, some dimension tables contain a small volume of data, such as real-time features. In addition, data in a dimension table can be updated on a daily basis, on an hourly basis, or every minute. Furthermore, dimension tables have high performance requirements. Many experiments are conducted for AI scenarios. For example, if a feature is good, many models are developed and the parameters are adjusted to different values to conduct experiments. A job containing more experimental groups results in higher QPS and higher RT requirements. Different dimension tables are stored in different storage media, which has a significant impact on stability. This survey involves two scenarios: When the data volume is relatively small, Redis can be used for storage, and the stability is good. When the data volume is very large, Redis is costly, and the CP architecture of HBase cannot guarantee stability.
- DJoin — engineering optimization: The SQL syntax for dimension table joins must be supported. The caching performance must be optimized as well. When a user writes multiple SQL statements to join dimension tables, the user must extract the keys of multiple SQL dimension tables and send requests to merge the queried dimension tables. This increases the I/O performance and balances the traffic. In addition, KV storage is supported in different scenarios, such as JDBC and KV scenarios. In KV scenarios, Redis is used for the real-time update and query of hundreds of gigabytes of data. Multiple HBase clusters are used for the real-time update and query of terabytes of data. For example, two HBase clusters are used in the Failover + LoadBalance mode to ensure that the 99th percentile RT is less than 20 ms, to improve the stability.
- DJoin — syntax extension: DJoin syntax extension is similar to SJoin syntax extension. The SQL subtree is converted and then extended through the async I/O capability to obtain dimension tables.
- DJoin — high availability with HBase: When the volume of dimension tables reaches the terabyte range, HBase is used for data storage. High availability is achieved by using dual HBase clusters in failover AB mode. In this case, two issues need to be considered. The first issue is the data update mechanism. Data can be updated on an hourly or daily basis and imported in series at intervals as HFiles in BulkLoad mode. Then, the imported data is synchronized for preloading, to ensure the stability of the two HBase clusters. The other issue is the data query mechanism. Hystrix is introduced to implement service fusing and downgrading policies. When the availability of cluster A decreases, a certain amount of data is dynamically switched from cluster A to cluster B according to the RT of clusters A and B. This ensures balanced data traffic. The following figure shows the dual-HBase cluster architecture. On the right side, data is imported offline by day, and a DAG is pulled through the scheduling framework to compute the data. The output of the DAG goes through two layers of serial HBase Sinks. The serial mode ensures that the data is written first to cluster A and then to cluster B. In the runtime, the data is processed through async I/O in Flink and then passes through two layers of HystrixClients. HystrixClient at the first layer mainly collects statistics on the RT of HystrixClient at the second layer HBase and dynamically distributes traffic to the two HBase clusters based on the RT. When cluster A is stable, all traffic runs in cluster A. When jitter occurs in cluster A, a certain proportion of traffic is dynamically switched from cluster A to cluster B based on the failure rate.
3.4 Real-Time Pipeline for Model Training
The entire system implements a pipeline that pre-generates data through AI-based model training to the model. A BSQL solution is provided to implement a joiner to join display streams and click streams. Real-time feature data is calculated using BSQL, and offline data is processed through offline scheduling. Dimension table joins form a pipeline through BSQL, and then are converted into instance streams for the machine learning team and imported for model training to output a model.
4) Future Development and Thinking
4.1 Saber — Improve Basic Features
When an increasing number of people use the platform, basic SRE is the most critical aspect. The Saber platform will optimize SQL IDE development, for example, to provide richer version management, online and offline management, task debugging, resource management, and basic operations. The Saber platform will also enrich job SRE, including SLA, launch approval, priority, various system monitoring metrics, custom metric alerts, and job operations.
4.2 Saber — Improve Application Capabilities
Saber’s application capabilities will continuously evolve toward AI. For example, experimental concepts will be introduced to the engineering process during model training to pull an SQL pipeline through experiments. In addition, we will unify the SQL used for real-time computing and offline computing to increase SQL reuse for the model training team. We will carry out experiments on the models to evaluate the results and provide advance warnings. The engineering process during real-time feature computing will support multi-feature computing, covering many scenarios such as feature computing, storage, and queries.