How Log Service Has Evolved into a Data Pipeline Over the Past 5 Years
By Zhang Cheng, nicknamed Yuanyi at Alibaba.
Independently developed by the Apsara team, Alibaba Cloud Log Service serves more than 30,000 customers both on and off the cloud and also provides the computing infrastructure needed for processing log data on several of Alibaba Group’s e-commerce platforms including Taobao. Log Service withstood the huge traffic spikes seen during the Double 11 Shopping Festival, the world’s largest online shopping event, as well as the smaller Double 12 Shopping Festival, and several online promotions during Chinese New Year over the past several years.
During 2019’s Double 11 Shopping Festival, Log Service served more than 30,000 applications on Alibaba’s e-commerce platforms and various other online systems, serving a lot of 15,000 unique customers. The peak rate during the event was 30 TB/min, with the peak rate per cluster being 11 TB/min, the peak rate per log being 600 GB/min, and the peak rate per business line being 1.2 TB/min. Besides these amazing feats, Log Service also supported the full migration of logs to the cloud form Alibaba’s core e-commerce business operations, as well as Alibaba’s marketing platform Alimama, financial services hub Ant Financial, smart logistics network Cainiao, smart groceries brand Hema Fresh, community video platform Youku, mobile map and navigation platform Amap, and other teams to the cloud. Log Service seamlessly interoperated with more than 30 data sources and more than 20 data processing and computing systems, shown below.
Operating under this large business volume and user scale, Log Service proved its robust functions, superior experience, and high stability and reliability. Thanks to the unique environment and challenges of the Alibaba’s various services and e-commerce platforms, our team of engineers and developers have continuously tested and refined Log Service and related technologies over the past five years.
Data Pipeline as the Infrastructure of Enterprises
What is a Data Pipeline?
The concept of a data pipeline was proposed by Jay Kreps in 2009. Jay Kreps is an engineer at LinkedIn, the CEO of Confluent, and one of the co-creators of Apache Kafka. In his article “The Log: What every software engineer should know about real-time data’s unifying abstraction” published in 2012, Jay Kreps stated that pipeline design is intended to:
- Decouple producers and consumers to reduce the complexity of system interconnection.
- Define unified format and operation semantics.
With the emergence of real-time systems, a large number of Apache Kafka products have been developed over several years to solve these two pain points. This promoted the widespread use of Apache Kafka. As enterprises increasingly use data analysis systems, manufacturers have started to provide data pipeline products as services on the Internet. Typical data pipeline services include AWS Kinesis, Azure EventHub, and IBM BlueMix Event Pipeline.
Data pipelines are a type of carrier used to migrate data between systems. They involve such operations as data collection, transmission links, storage queues, consumption, and dumping. In Log Service, LogHub encompasses all functions related to data pipelines. LogHub provides more than 30 data access methods and real-time data pipelines and supports interconnection with downstream systems.
Data pipelines operate at the underlying layer and serve the important businesses of an enterprise during the digitization process. This requires data pipelines to be reliable, stable, and able to ensure smooth data communication and flexibly deal with traffic changes.
Now, let’s take a look at the challenges we have encountered over the past five years.
Challenges of Data Pipelines
The pipeline concept is simple. A pipeline prototype can be written in just 20 lines of code.
- Immutable queues can only be written and cannot be modified.
- Results are returned after data is written by consumers, so the write order is preserved.
- Consumers can consume data based on points.
- Data cannot be modified but can only be deleted based on time-to-live (TTL), that is, the write order.
In reality, it is challenging to maintain a pipeline that serves tens of thousands of users with tens of billions of reads and writes and processes dozens of petabytes of data every day. For example:
- Producer: A consumer program is incorrect, causing the pipeline endpoint to be congested by heavy traffic. Some data sources experience increases in traffic on a scale dozens or hundreds of times the normal volume in an hour during promotional activities.
- Consumer: More than 20 subscribers simultaneously consume data from the same source. Extensive adaptation is required when hundreds of data sources access the pipeline in different modes every day.
The following describes how to develop pipelines to deal with the preceding situations.
Challenge 1: Producer Adaptation
The first version of Log Service supported data sources that were log files in the Apsara format. Support has been added for more data sources over the past five years, such as SDKs in various languages, mobile terminals, embedded chips, Internet of Things (IoT) devices, and cloud-native environments.
Log Service originated from the Apsara project of Alibaba Cloud. At that time, Apsara provided a basic log module, which was used by many systems to print logs. Therefore, we developed Logtail to collect Apsara logs. Logtail was initially an internal tool of the Apsara system.
To adapt to the use habits of non-Alibaba Cloud teams, we extended Logtail to support general log formats, such as regular expressions, JSON, and delimiters. For applications that do not want to integrate Logtail, we provide SDKs in various languages to integrate the code for log uploading.
With the rise of the smartphone and the mobile Internet, we have developed SDKs for mobile terminals that use the Android and iOS operating platforms to enable fast log access. At the same time, Alibaba began to transform microservices and launched Pouch, which is compatible with Logtail. We provide the Log4J and LogBack appenders for Java microservices, as well as the direct data transmission service.
We also adapted log access modes to clients that use the Advanced RISC Machine (ARM) platform, embedded systems, and systems developed in China.
In early 2018, we added the plugin function to Logtail to meet the diverse requirements of our users. Users can extend Logtail by developing plugins to implement custom functions. Logtail also supports data collection in emerging fields, such as cloud native, smart devices, and IoT.
Cloud Native Support
With the implementation of cloud native, Logtail has fully supported data collection in Kubernetes since the beginning of 2018. It provides
CustomResourceDefinition (CRD) to integrate logs and Kubernetes systems. Currently, this solution is applied to thousands of clusters in Alibaba Group and on the public cloud.
Diskless Logs Based on Cloud Native
In Alibaba’s highly virtualized scenarios, a physical machine may run hundreds of containers. In conventional log collection, logs are flushed into a disk. This intensifies the contention for the disk resources of physical machines, affects the log write performance, and indirectly affects the response time of applications. Each physical machine reserves disk space to store the logs of each container, causing serious resource redundancy.
Therefore, we cooperated with the systems department of Ant Financial to develop a diskless log project to virtualize a log disk for applications based on a user-state file system. The log disk is directly connected to Logtail through the user-state file system and logs are directly transferred to Log Service. This accelerates the log query speed.
Challenge 2: Multi-protocol Support
The Log Service servers support HTTP-based writing and provide many SDKs and agents. However, there is still a huge gap between the server and data sources in many scenarios. For example:
- Customers who build systems in open source mode want to access Log Service simply by modifying configuration files, without transforming their systems.
- Many devices such as switches and routers provide fixed protocols and cannot use HTTP.
- Open-source agents support the monitoring information and binary format of various software.
We have launched a general protocol adaptation plan to enable compatibility between Log Service and the open-source ecosystem, including the Syslog, Apache Kafka, Prometheus, and JDBC protocols, in addition to HTTP. As such, a user’s existing system can access Log Service simply by modifying the write source. Existing routers and switches can be configured to directly write data to Log Service, without proxy forwarding. Log Service supports many open source collection components, such as Logstash, Fluentd, and Telegraf.
Challenge 3: Agent Throttling
In 2017, we had to deal with multitenancy throttling on single-host agents. For example:
- A host has more than 20 types of logs, including operation logs requiring reconciliation and Info-level program output logs.
- A large number of program output logs may be generated in a period of time due to uncontrollable log producers.
- These program output logs may crash the collection agent in a short time, causing a failure to collect important data or a delay in collection.
We optimized the Logtail agent to improve multitenancy isolation:
- Logtail schedules data collection based on time slices to guarantee the isolation and fairness of configuration data endpoints.
- Logtail supports multi-level feedback queues for resource usage to guarantee the isolation and fairness of processing flows and configurations with extremely low resource consumption.
- Logtail supports non-blocking event processing to guarantee high reliability even if log files are rotated when a configuration is blocked or data collection is stopped.
- Logtail supports different throttling mechanisms for various configurations, collection stop policies, and dynamic configuration updates to guarantee a high level of control over data collection.
Logtail has been continuously optimized to guarantee fair distribution to multiple data sources (tenants) on a single host.
Challenge 4: Server Throttling
The Log Service servers support throttling through back pressure at the project and shard levels to prevent a single abnormal instance from affecting other tenants at the access layer or backend service layer. We developed the QuotaServer module to provide project-level global throttling and shard-level throttling. This ensures tenant isolation and second-level synchronous throttling among millions of tenants and prevents cluster unavailability due to traffic penetration.
Coarse-grained Throttling: Project Level
- Thousands of NGINX frontends summarize the traffic and request counts of received projects every second and send them to QuotaServer, which uses a distributed architecture and is partitioned by project.
- QuotaServer summarizes all the project statistics from each NGINX frontend, calculates whether the traffic and queries per second of each project exceed the predefined quota, and determines whether to disable operations on the project and the disabled time period.
- QuotaServer can notify all NGINX frontends of the projects that have exceeded their quotas within seconds.
- After receiving the list of disabled projects, the NGINX frontends deny requests from these projects.
Project-level global throttling is intended to limit a user’s overall resource usage. It allows frontends to deny excessive requests to prevent cluster crashes when the traffic of user instances reaches the backend. Shard-level throttling implements fine-grained throttling with clearer semantics and better control.
Fine-grained Throttling: Shard Level
- Each shard clearly defines its processing capability, such as 5 MB/s of write and 10 MB/s of read.
- A shard tries its best to process traffic as long as the instance where the shard is located has available resources. However, a resource usage limit is imposed.
- When the shard queue is blocked, the system returns HTTP Error 403 (which indicates throttling) or 500 (a system error) and notifies QuotaServer of shard throttling.
- After receiving a throttling message, QuotaServer instantly synchronizes the message to all NGINX frontends through the long pull channel between the NGINX frontends and QuotaServer.
- NGINX frontends perform precise throttling on shards after receiving the throttling message.
Shard-level throttling provides the following benefits:
- The traffic that can be received by each shard is limited. Excessive traffic is directly denied by NGINX frontends to prevent the traffic from reaching the backend.
- Project-level throttling is not the primary throttling mechanism but is used to prevent a sharp increase in traffic caused by code errors and other reasons.
- HTTP Error Code 403 (throttling), and HTTP Error Code 500 (a Log Service exception at the backend).
- When HTTP Error Code 403 is returned, users can increase throughput through shard partitioning, or buy more resources to process excessive traffic.
Challenge 5: Consumer (High Concurrency)
To solve the log consumption problem, we need to consider specific scenarios. Log Service serves as a real-time pipeline and most consumption scenarios involve real-time consumption. Log Service provides a cache layer for consumption scenarios, but the cache policy is simple. As consumers increase and the data volume expands, the cache hit ratio decreases and the consumption latency increases. The cache module was redesigned as follows:
- Global cache management is implemented. A consumption weight is calculated for each shard based on its consumption, and cache space is preferentially allocated to shards with higher weights.
- Cache management is refined and more intuitive. The cache size is dynamically adjusted based on users’ recent consumption.
- A certain amount of cache space is allocated to users with high availability assurance so that they are not affected by other users.
After the preceding optimizations were implemented, the average log consumption latency of clusters was reduced from 5 milliseconds to less than 1 millisecond, effectively reducing the data consumption pressure during the Double 11 shopping Festival.
Challenge 6: Consumer (Multiple Instances and Concurrency)
With the emergence of microservices and cloud native, applications are increasingly fine-grained, the entire process becomes more complex, and more logs of diverse types are generated. In addition, logs have become increasingly important. The same log may be consumed by several and even a dozen of businesses.
Using traditional collection methods, the same log may be repeatedly collected dozens of times. This wastes many client, network, and server resources.
Log Service prevents the repeated collection of the same file. After logs are collected to Log Service, users are provided with ConsumerGroup for real-time consumption. However, Log Service’s data consumption mode encounters the following two problems as more logs types are used in a wide variety of scenarios:
- When logs are classified into specific types, ConsumerGroup does not allow consumers to simultaneously consume logs from multiple Logstores. Logs may come from multiple projects and different accounts. This complicates resource mapping and permission ownership management.
- The minimum unit assigned by ConsumerGroup is a shard. A shard in Log Service supports writing at a rate of dozens of megabytes per second. A single host at the consumer end cannot process data at this rate. This causes a serious imbalance between production and consumption.
View Consumption Mode
In response to resource mapping and permission ownership management in scenarios with specific log types, we cooperated with the log platform team at Ant Financial to develop the view consumption mode based on the view concept of databases. The resources of different users and Logstores can be virtualized into a large Logstore. Therefore, users only need to consume logs from a virtual Logstore, whose implementation and maintenance are transparent to users. The view consumption mode has been officially launched in Ant Financial clusters. Currently, thousands of instances run in view consumption mode.
Fanout Consumption Mode
To improve the capabilities of single consumers, we enhanced ConsumerGroup and developed the Fanout consumption mode. In this mode, data in a shard can be processed by multiple consumers. Shards are decoupled from consumers to solve the capability mismatch between producers and consumers. Consumers do not need to concern themselves with checkpoint management, failover, and other details, which are managed by the Fanout consumer group.
Challenge 7: Automatic O&M
The external service-level agreement (SLA) of Log Service promises 99.9% service availability and actually provides more than 99.95%. At the beginning, it was difficult for us to reach this level of service availability. We received many alerts every day and were often woken up by phone calls at night. We were tired of handling various problems. There were two main reasons for our difficulties.
1. Hotspot: Log Service evenly schedules shards to each worker node. However, the actual load of each shard varies and changes dynamically over time. Some hotspot shards are located on the same instance, which slows down request processing. As a result, requests may accumulate and exceed the service capability.
2. Time-consuming problem locating: Online problems are inevitable. To achieve 99.9% reliability, we must be able to locate problems in the shortest time and troubleshoot them in a timely manner. It takes a long time to manually locate problems even with the help of extensive monitoring and logs.
Automatic Hotspot Elimination
To address the hotspot problem, we added a scheduling role in the system, which automatically makes adjustments to eliminate hotspots after real-time data collection and statistics. The following two methods are used:
- Automatic load balancing: The system collects statistics on the load of each node and the resource consumption, including for CPU, MEM, and NET resources, of each data partition on the node in real time. The load statistics are reported to the scheduler, which automatically checks whether any node is operating under a high load. When a node is overloaded, the data that causes the high load is partitioned and automatically migrated to nodes with low loads. This optimization helps achieve load balancing.
- Automatic partitioning: The system monitors the load of each shard in real time. A shard is automatically partitioned when it exceeds the per-shard upper limit. The old partitions become read-only, and two new partitions are created and migrated to other nodes.
We also consider special scenarios such as the negative impacts of traffic spikes, heterogeneous models, concurrent scheduling, and migration, which are not described here.
Traffic Analysis in Seconds (Root Cause Analysis)
Currently, Log Service collects thousands of real-time metrics online, with hundreds of billions of access logs per day. Therefore, it is difficult to perform manual investigation when problems occur. We developed cause analysis algorithms to quickly locate the dataset most relevant to exceptions through frequent patterns and differential patterns.
In this example, the access dataset with an error with a HTTP status code of a format of 500 range is defined as abnormal Dataset A. 90% of the requests found in this dataset come from ID 1002. Therefore, the current error is related to ID 1002. To reduce misjudgment, we check the proportion of requests from ID 1002 in normal Dataset B with a HTTP status of a format below the 500 range and find that the proportion is low. This further supports the preceding judgment that the current error is highly related to ID 1002.
This method greatly reduces the time required for problem investigation. The root cause analysis results are automatically included when an alert is triggered. This allows us to identify the specific user, instance, or module that causes the problem right away.
Other Challenges: Problems Being Solved
To facilitate horizontal scaling, we introduced the concept of shards, similar to Kafka partitions, allowing users to scale resources by partitioning and merging shards. However, this requires users to understand the concept of shards, estimate the number of shards required for traffic distribution, and manually partition shards to meet quota limits.
Excellent products expose as few concepts as possible to users. In the future, we will weaken or even remove the concept of shards. As such, from the user perspective, after a quota is declared for the data pipeline of Log Service, we will provide services based on the quota. As a result, the internal shard logic will be completely transparent to users, making the pipeline capability truly elastic.
2. From “At Least Once” to “Exactly Once”
Like Apache Kafka, Log Service currently supports At-Least-Once writing and consumption. However, many core scenarios such as transactions, settlement, reconciliation, and core events require Exactly-Once writing and consumption. For many services, we have to encapsulate the deduplication logic at the upper layer to implement the Exactly-Once model. This is expensive and consumes a great deal of resources.
Soon we will support the semantics of Exactly-Once writing and consumption, together with the capabilities of ultra-high traffic processing and high concurrency.
3. LogHub Select Function (Push Down Filter Conditions)
Similar to Apache Kafka, Log Service supports full consumption at the Logstore level. Even if the business only needs a portion of the data, it must consume all the data during this period. All data must be transmitted from the server to the compute node for processing. This method wastes a lot of resources.
In the future, we will push down computing to the queue, where invalid data can be directly filtered out. This will greatly reduce the network transmission of invalid data and upper-layer computing costs.
Are you eager to know the latest tech trends in Alibaba Cloud? Hear it from our top experts in our newly launched series, Tech Show!