Lyft’s Large-scale Flink-based Near Real-time Data Analytics Platform
Download the “Real Time is the Future — Apache Flink Best Practices in 2020” whitepaper to learn about Flink’s development and evolution in the past year, as well as Alibaba’s contributions to the Flink community.
By Xu Ying, Technical Lead for the Real-time Data Platform at Lyft. Gao Li, Technical Lead for the Computing Data Platform at Lyft.
How did Lyft build a large-scale near real-time data analytics platform based on Apache Flink? At Flink Forward Asia 2019, Dr. Xu Ying from Lyft’s Real-time Data Platform Team and Dr. Gao Li from the Computing Data Platform Team discussed Lyft’s large-scale near real-time data analytics platform that is based on Apache Flink.
This article is divided into four main parts:
- Streaming Data Scenarios at Lyft
- Near Real-time Data Analytics Platform and Architecture
- In-depth Analysis of Platform Performance and Fault Tolerance
- Summary and Future Outlook
1) Streaming Data Scenarios at Lyft
Lyft is a shared transportation platform in North America. Like Uber and Didi, Lyft also provides ride-sharing services for the public. Lyft’s mission is to improve people’s life with the world’s best transportation.
Streaming Data Scenarios at Lyft
Lyft’s streaming data can be roughly divided into three categories: second level, minute level, and less than 5 minutes level. For minute-level streaming data, we most commonly use an adaptive pricing system and a fraud and anomaly detection system. In addition, a machine learning feature engineering service recently developed by Lyft is also available. For the less than 5 minutes level streaming data, we use systems related to the interactive query of near real-time data.
Lyft’s Previous Data Analytics Platform and Architecture
The following figure shows the architecture of Lyft’s previous data analytics platform. Most of Lyft’s streaming data comes from events, which are mainly generated by mobile applications and backend services. For example, passengers, drivers, payment, and insurance services generate various events. These events require real-time responses.
On the analytics platform, events flow to Amazon Web Service (AWS) Kinesis, which is similar to Apache Kafka. AWS Kinesis is a dedicated PubSub service provided by AWS. All data streams are quantified as files and then are stored in AWS S3. In addition, data subsets are generated in many batch processing tasks. In the analytics system, Lyft uses the Presto query engine, which is relatively active in the open-source community. Lyft’s data analytics platform has four types of users: data engineers, data analysts, machine learning experts, and deep learning experts. They often use analysis engines to interact with data.
Problems with the Previous Platform
What drove Lyft to implement the large-scale near real-time data analytics platform based on Apache Flink were the problems with the previous platform. For example, due to high latency, the persisted data could not meet the requirements for near real-time queries. The streaming data persisted based on the Kinesis Client Library (KCL) provided limited performance. Too many small files were persisted, leading to poor performance in operations on the downstream AWS S3. Most architectures in the extract, transform, and load (ETL) pipeline had high latency and involved multiple multi-day steps. In addition, the previous platform provided limited support for nested data.
2) Near Real-time Data Analytics Platform and Architecture
Architecture of the Near Real-time Platform
On the new near real-time platform, Lyft uses Apache Flink to persist streaming data. Lyft stores data on the cloud, and uses Flink to directly write data in Parquet format to the cloud. Parquet is a column data storage format that effectively supports interactive data queries. Lyft builds a real-time data warehouse based on the raw Parquet data. The structure of the real-time data warehouse is stored in a Hive table, while the metadata of the Hive table is stored in a Hive metastore.
The platform performs multi-level non-blocking ETL processing on the raw data. Each level is a non-blocking level. Compaction and deduplication operations are performed at each level to obtain data of higher quality. The platform uses Apache Airflow to schedule ETL operations. All the raw data in Parquet format can be queried through Presto, and the interactive query results can be displayed to users in the form of a BI model.
Lyft’s large-scale near real-time data analytics platform implemented based on Apache Flink has the following features:
- The platform uses Flink to persist streaming data efficiently at a high speed. This reduces the number of clusters on the cloud by a factor of 10, greatly reducing the O&M costs.
- Data persisted in Parquet format supports interactive queries. If you want only certain columns of data, you can filter out unnecessary data by partitioning the data and selecting the desired columns. This improves query performance.
- AWS-based cloud storage eliminates the need to convert the format of the data to be stored on the cloud.
- The multi-level ETL process enhances the performance and data quality.
- Performance, fault tolerance, and evolvability are all taken into account.
Platform Features and Applications
Lyft’s near real-time data analytics platform needs to process hundreds of billions of events per day, with a data latency of less than 5 minutes. The components in the pipeline ensure data integrity. In addition, redundancy elimination based on ETL ensures data uniqueness.
Data scientists and data engineers perform ad-hoc interactive queries during modeling. In addition, the platform provides real-time correctness alerts for the machine learning model and provides a real-time dashboard to monitor the health status of market supply and demand.
Flink-based Near Real-time Data Persistence
As shown in the following figure, when events arrive at Kinesis, the events are stored as an EventBatch. The Flink Kinesis connector can extract the events and then send them to FlatMap and Record Counter. FlatMap distributes the events to Global Record Aggregator and Tagging Partitioning. When a checkpoint is performed, the file is closed and a persist operation is performed. Based on the characteristics of StreamingFileSink, the platform is set to perform a checkpoint every three minutes. This ensures that events can be persisted within three minutes after they enter the Kinesis connector.
This method may produce a large number of small files. The pipeline supports tens of thousands of files. Therefore, we use a subtask to record the weight of local events and use Global Record Aggregator to calculate the global weight of the events and broadcast the weight to downstream nodes. After receiving the event weight, the Operator distributes the event to the Sink.
Multi-tier Compaction and Deduplication During ETL
The preceding pipeline also performs multi-tier compaction and deduplication during ETL. The raw data in Parquet format undergoes smart compaction and deduplication every hour, resulting in larger Parquet files. Similarly, files with insufficient hourly compaction and deduplication are compacted and deduplicated again every day. Then, the new data undergoes an atomic partition swap. This means that, after the new data is generated, an ETL Job points a table partition in the Hive metastore to the new data and partition. In this process, a heuristic algorithm is used to analyze the events to determine the events that must be compacted and deduplicated and the levels of the time intervals between compactions and between deduplications. In addition, some ETL data is stored for several years to meet confidentiality and compliance requirements.
3) In-depth Analysis of Platform Performance and Fault Tolerance
EventTime-driven Partition Sensing
Flink and ETL synchronize data through EventTime-driven partition sensing. AWS S3 partitions data by using a common partition scheme. The final partition is determined by the timestamp, while the timestamp is based on EventTime. The advantage of this method is that the time sources for Flink and ETL are the same, which facilitates synchronous operations. In addition, some backfilling operations and master operations based on EventTime can achieve results similar to this. After processing events of the current hour, Flink writes a success file to the event partition. This file indicates that events of that hour have been processed and ETL operations can be performed on the files of that hour.
The watermark used in Flink cannot be directly applied to Lyft’s applications because the timestamp has not been persisted to the storage when Flink finishes processing the timestamp. In this case, we need to introduce the concept of a bucket watermark so that each Sink and Source can know the partition to which the data is currently written and maintain the same partition ID. In addition, we use Global State Aggregator to aggregate the information of each partition. Each subtask can learn of the global information and define the watermark as the smallest partition timestamp.
The main features of ETL are timeliness and deduplication. In particular, ETL mainly performs deduplication and compaction, and most importantly, deduplication in non-blocking cases. For the smart ETL mentioned earlier, smart indicates smart awareness. Two types of information are required to provide guidance for Global State Aggregator: success files and statistics (stats for short). A success file identifies the integrity of the partition. Statistics in each partition can notify the downstream ETL process of the methods, frequency, and scope of deduplication and compaction.
Challenges in Schema Evolution
In addition to the challenges of deduplication and compaction, ETL also often encounters challenges in schema evolution. The challenges involved in schema evolution can be divided into three parts: the evolution of data types of different engines, the evolution of nested structures, and the impact of data type evolution on the deduplication logic.
Deep Dive into AWS S3
Actually, Lyft’s data storage system can be thought of as a data lake. Lyft is also considering how to further optimize performance in some aspects of AWS S3. AWS S3 has internal partitions as well. To ensure parallel read and write performance in the partitions, we add entropy prefixes provided by AWS S3 and marker files to the partitions. These two methods can greatly reduce the impact of the I/O performance in AWS S3. Markers affect whether ETL operations can be triggered. In addition, markers can be integrated with Presto, allowing Presto to determine the number of files to be scanned under specific conditions.
Optimization Solution for Parquet
Lyft’s near real-time data analytics platform has made many optimizations in Parquet. For example, we optimize the statistics on file data value ranges and the statistics on file systems. In addition, based on the primary key data values, we speed up Presto queries and the generation of secondary indexes.
Data Backfilling-based Fault Tolerance for the Platform
The following two figures show the data backfilling-based fault tolerance mechanism in Lyft’s near real-time data analytics platform. For Flink, the platform must be near real-time. However, when a Flink job fails, the specific time period may be exceeded. As a result, after the Flink job restarts, two data streams are generated. The primary data stream is always executed from the latest data, while the additional data stream can be executed from the start position of a previous interruption to the end of the interruption through data backfilling. This ensures the near real-time performance of the primary data stream, and the backfilled data stream ensures data integrity.
For ETL, the data backfilling-based fault tolerance mechanism focuses on the idempotent scheduling system of Apache Airflow, atomic compaction, Hive Metastore (HMS) swap operations, partition self-check and self-healing systems, and integration with a schema.
4) Summary and Future Outlook
Experience and Lessons Learned
Flink allows you to ingest Parquet data in near real time, making interactive queries possible. However, Flink needs to be improved in many Lyft scenarios. Although the latency of Flink can be guaranteed in most cases, minute-level latency may still result during restart and deployment. This adversely affects service level objective (SLO).
One of Lyft’s current tasks is to improve the deployment system so it can support Kubernetes and achieve near-zero downtime. Lyft’s near real-time data analytics platform runs on the cloud. Therefore, some random network problems occur when data is uploaded to AWS S3. This causes the stagnation of Sink subtasks and ultimately the stagnation of the entire Flink job. We can introduce some time-out mechanisms to detect the stagnation of Sink subtasks so that the entire Flink job can run smoothly.
ETL partition sensing can reduce costs and latency, while success files can indicate when processing has been completed. In addition, AWS S3’s file layout has a great impact on performance improvement. Currently, the introduction of entropy values is still a lesson learned. Lyft will summarize and analyze these issues and make them public in the future. Moreover, Parquet data that has high compatibility requirements for schemas is used. As a result, if an incompatible event is persisted, the downstream ETL pipeline may crash. Lyft has addressed this problem by checking the compatibility of a schema at the upstream of the pipeline and rejecting incompatible schemas committed by users.
Lyft also has some more ideas for the near real-time data analytics platform, including:
- Lyft wants to deploy and run Flink in a Kubernetes cluster so that Kubernetes can manage these Flink jobs and we can leverage the high scalability of the Kubernetes cluster.
- Lyft wants to implement a general-purpose framework to persist streaming data. The near real-time data analytics platform not only supports events but also data such as database logs and service logs.
- Lyft hopes that the platform can implement smart compaction and event-driven ETL. In this way, events such as backfilling events can automatically trigger the corresponding ETL processes and be merged with the previous data. In addition, delayed data can be persisted to update the ETL process.
- Lyft hopes that the storage process and queries on the near-real-time data analytics platform can be optimized, the query performance of Presto can be improved by using statistical data in Parquet format, and the storage management performance can be improved by using open-source table management software.