How Log Service Has Evolved into a Data Pipeline Over the Past 5 Years

Data Pipeline as the Infrastructure of Enterprises

What is a Data Pipeline?

  • Decouple producers and consumers to reduce the complexity of system interconnection.
  • Define unified format and operation semantics.

Challenges of Data Pipelines

  • Immutable queues can only be written and cannot be modified.
  • Results are returned after data is written by consumers, so the write order is preserved.
  • Consumers can consume data based on points.
  • Data cannot be modified but can only be deleted based on time-to-live (TTL), that is, the write order.
  • Producer: A consumer program is incorrect, causing the pipeline endpoint to be congested by heavy traffic. Some data sources experience increases in traffic on a scale dozens or hundreds of times the normal volume in an hour during promotional activities.
  • Consumer: More than 20 subscribers simultaneously consume data from the same source. Extensive adaptation is required when hundreds of data sources access the pipeline in different modes every day.

Challenge 1: Producer Adaptation

Apsara Logs

SDK Extensibility

Multi-Platform Access

Platform-Based Logtail

Cloud Native Support

Diskless Logs Based on Cloud Native

Challenge 2: Multi-protocol Support

  • Customers who build systems in open source mode want to access Log Service simply by modifying configuration files, without transforming their systems.
  • Many devices such as switches and routers provide fixed protocols and cannot use HTTP.
  • Open-source agents support the monitoring information and binary format of various software.

Challenge 3: Agent Throttling

  • A host has more than 20 types of logs, including operation logs requiring reconciliation and Info-level program output logs.
  • A large number of program output logs may be generated in a period of time due to uncontrollable log producers.
  • These program output logs may crash the collection agent in a short time, causing a failure to collect important data or a delay in collection.
  • Logtail schedules data collection based on time slices to guarantee the isolation and fairness of configuration data endpoints.
  • Logtail supports multi-level feedback queues for resource usage to guarantee the isolation and fairness of processing flows and configurations with extremely low resource consumption.
  • Logtail supports non-blocking event processing to guarantee high reliability even if log files are rotated when a configuration is blocked or data collection is stopped.
  • Logtail supports different throttling mechanisms for various configurations, collection stop policies, and dynamic configuration updates to guarantee a high level of control over data collection.

Challenge 4: Server Throttling

Coarse-grained Throttling: Project Level

  • Thousands of NGINX frontends summarize the traffic and request counts of received projects every second and send them to QuotaServer, which uses a distributed architecture and is partitioned by project.
  • QuotaServer summarizes all the project statistics from each NGINX frontend, calculates whether the traffic and queries per second of each project exceed the predefined quota, and determines whether to disable operations on the project and the disabled time period.
  • QuotaServer can notify all NGINX frontends of the projects that have exceeded their quotas within seconds.
  • After receiving the list of disabled projects, the NGINX frontends deny requests from these projects.

Fine-grained Throttling: Shard Level

  • Each shard clearly defines its processing capability, such as 5 MB/s of write and 10 MB/s of read.
  • A shard tries its best to process traffic as long as the instance where the shard is located has available resources. However, a resource usage limit is imposed.
  • When the shard queue is blocked, the system returns HTTP Error 403 (which indicates throttling) or 500 (a system error) and notifies QuotaServer of shard throttling.
  • After receiving a throttling message, QuotaServer instantly synchronizes the message to all NGINX frontends through the long pull channel between the NGINX frontends and QuotaServer.
  • NGINX frontends perform precise throttling on shards after receiving the throttling message.
  • The traffic that can be received by each shard is limited. Excessive traffic is directly denied by NGINX frontends to prevent the traffic from reaching the backend.
  • Project-level throttling is not the primary throttling mechanism but is used to prevent a sharp increase in traffic caused by code errors and other reasons.
  • HTTP Error Code 403 (throttling), and HTTP Error Code 500 (a Log Service exception at the backend).
  • When HTTP Error Code 403 is returned, users can increase throughput through shard partitioning, or buy more resources to process excessive traffic.

Challenge 5: Consumer (High Concurrency)

  • Global cache management is implemented. A consumption weight is calculated for each shard based on its consumption, and cache space is preferentially allocated to shards with higher weights.
  • Cache management is refined and more intuitive. The cache size is dynamically adjusted based on users’ recent consumption.
  • A certain amount of cache space is allocated to users with high availability assurance so that they are not affected by other users.

Challenge 6: Consumer (Multiple Instances and Concurrency)

  1. When logs are classified into specific types, ConsumerGroup does not allow consumers to simultaneously consume logs from multiple Logstores. Logs may come from multiple projects and different accounts. This complicates resource mapping and permission ownership management.
  2. The minimum unit assigned by ConsumerGroup is a shard. A shard in Log Service supports writing at a rate of dozens of megabytes per second. A single host at the consumer end cannot process data at this rate. This causes a serious imbalance between production and consumption.

View Consumption Mode

Fanout Consumption Mode

Challenge 7: Automatic O&M

Automatic Hotspot Elimination

  • Automatic load balancing: The system collects statistics on the load of each node and the resource consumption, including for CPU, MEM, and NET resources, of each data partition on the node in real time. The load statistics are reported to the scheduler, which automatically checks whether any node is operating under a high load. When a node is overloaded, the data that causes the high load is partitioned and automatically migrated to nodes with low loads. This optimization helps achieve load balancing.
  • Automatic partitioning: The system monitors the load of each shard in real time. A shard is automatically partitioned when it exceeds the per-shard upper limit. The old partitions become read-only, and two new partitions are created and migrated to other nodes.

Traffic Analysis in Seconds (Root Cause Analysis)

Other Challenges: Problems Being Solved

1. Shardless

2. From “At Least Once” to “Exactly Once”

3. LogHub Select Function (Push Down Filter Conditions)

Original Source:



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website: