Alibaba Cloud LOG Java Producer — A Powerful Tool to Migrate Logs to the Cloud

By Bruce Wu


Logs are ubiquitous. As a carrier that records changes in the world, logs are widely used in many fields, such as marketing, research and development, operation, security, BI, and auditing.

Image for post
Image for post

Alibaba Log Service is an all-in-one service platform for log data. Its core component LogHub has become an infrastructure of big data processing, especially real-time data processing, by virtue of outstanding features such as high throughput, low latency, and auto-scaling. Jobs running on big data computing engines such as Flink, Spark, and Storm write data processing results or intermediate results to LogHub in real time. With data from LogHub, downstream systems are able to provide many services such as query analysis, monitoring alarms, machine learning, and iterative calculation. The LogHub big data processing architecture is provided in the following figure.

Image for post
Image for post

To ensure that the system runs properly, you must use convenient and highly efficient data writing methods. Directly using APIs or SDKs are insufficient to meet data writing capability requirements in the big data scenario. In this context, Alibaba Cloud LOG Java Producer was developed.


Alibaba Cloud LOG Java Producer is an easy to use and highly configurable Java class library. It has the following features:

  1. Thread safe: All methods exposed by Alibaba Cloud LOG Java Producer (the “Producer”) are thread-safe.


Using Producer to write data to LogHub has the following advantages in comparison with using APIs or SDKs:

High Performance

With large amounts of data and limited resources, to achieve the desired throughput, you need to implement complex logic, such as multi-threading, cache policy, batching, and retries in case of failures. Producer implements the preceding logic to improve your application performance and to simplify your application development process.

Asynchronous and Non-Blocking Task Execution

With sufficient cache memory, Producer caches data to be sent to LogHub. When you call the send method, the specified data is sent immediately without blocking the process. This achieves the separation of computing and the I/O logic. Later, you can obtain the data sending result from the returned future object or the registered callback.

Controllable Resources Usage

The size of memory used by Producer to cache the data to be sent can be controlled by parameters as well as the number of threads used to perform data sending tasks. This can avoid unrestricted resource consumption by Producer. In addition, it allows you to balance the resource consumption and write throughput according to the actual situation.


To sum up, Producer provides many advantages by automatically handling the complex underlying details and exposing simple interfaces. In addition, it does not affect normal operations of upper layer services, significantly reducing the data access threshold.

Mechanism Explanation

To help you better understand the performance of Producer, this section describes how it works, including its data writing logic, implementation of core components, and graceful shutdown. The overall architecture of Producer is presented in the following figure.

Image for post
Image for post

Data Write

The data write logic of Producer:

  1. After you call the producer.send() method to send data to your specified logstore, the data will be loaded to a producer batch in LogAccumulator. Generally, the send method returns the results immediately. However, when your Producer instance does not have sufficient room to store the target data, the send method will be blocked until any one of the following conditions is met:

The previously cached data has been processed by the batch handler, and the memory occupied by such data is released. As a result, Producer has sufficient room to store the target data.

An exception is thrown when the specified blocking time is exceeded.

2. When you call producer.send(), the number of logs in the target batch may exceed the maxBatchCount, or the target batch does not have sufficient room for the target data. In this case, Producer first sends the target batch to IOThreadPool, and then creates a new batch to store the target data. To avoid blocking your threads, IOThreadPool uses an unbounded blocking queue. The number of logs that can be cached in a Producer instance is limited, so length of the queue does not grow infinitely.

3. Mover traverses each producer batch of LogAccumulator, and sends batches that have exceeded the maximum cache time to expiredBatches. It also records the earliest expiring time (t) of the unexpired batches.

4. Then it sends expired batches from LogAccumulator to IOThreadPool.

5. After that, Mover retrieves producer batches that meet the sending conditions from RetryQueue. If no batches meet the conditions, Mover waits for a period of t.

6. Then it sends expired batches from RetryQueue to IOThreadPool. After completing step 6, Mover repeats steps 3 to 6.

7. A worker thread of IOThreadPool sends batches from the blocked queue to the target logstore.

8. After a batch is sent to logstore, it goes to the success queue.

9. If it fails to be sent and meets any of the following conditions, it goes to the failure queue:

The failed batch cannot be retried.

RetryQueue is closed.

The specified retry times is reached, and the number of batches in the failure queue does not exceed 1/2 of the total number of batches to be sent.

10. Otherwise, the worker thread calculates the next sending time of the failed batch, and sends it to RetryQueue.

11. The SuccessBatchHandler thread pulls a batch from the success queue, and executes all callbacks registered for this batch.

12. The FailureBatchHandler thread pulls a batch from the failure queue, and executes all callbacks registered for this batch.

Core Components

Core components of Producer include LogAccumulator, RetryQueue, Mover, IOThreadPool, SendProducerBatchTask, and BatchHandler.


To improve the throughput, a common practice is to accumulate data into larger batches, and send data in batches. The main role of LogAccumulator described in this section is to merge data into batches. To merge different data into a big batch, the data must have the same project, logstore, topic, source, and shardHash properties. LogAccumulator caches these data to different positions of the internal map based on these properties. The key of the map is the quintuple of the above five properties, and the value is ProducerBatch. To ensure thread safety and high concurrency, ConcurrentMap is used.

Another function of LogAccumulator is to control the total size of cached data. Semaphore is used to implement the control logic. Semaphore is an AbstractQueuedSynchronizer-based (AQS-based) synchronization tool with high performance. It first tries to obtain shared resources through spinning, and to reduce the context switch overhead.


RetryQueue is used to store batches that have failed to be sent and are waiting to be retried. Each of these batches have a field to indicate the time to send batch. To efficiently pull expired batches, Producer has a DelayQueue to store these batches. DelayQueue is a time-based priority queue, and the earliest expiring batch will be processed first. This queue is thread-safe.


Mover is an independent thread. It periodically sends expired batches from LogAccumulator and RetryQueue to IOThreadPool. Mover occupies CPU resources even when it is idle. To avoid wasting CPU resources, Mover waits for expired batches from RetryQueue for a period when it cannot find qualified batches to be sent from LogAccumulator and RetryQueue. This period is the maximum cache time lingerMs that you have configured.


The worker thread in IOThreadPool sends data to the logstore. The size of IOThreadPool can be specified by the ioThreadCount parameter, and the default value is twice the number of processors.


SendProducerBatchTask is encapsulated with the batch sending logic. To avoid blocking I/O threads, no matter whether the target batch is successfully sent, SendProducerBatchTask sends the target batch to a separate queue for callback execution. In addition, if a failed batch satisfies the retry conditions, it is not immediately resent in the current I/O thread. If it is immediately resent, it usually fails again. Instead, SendProducerBatchTask sends it to the RetryQueue according to the exponential backoff policy.


Producer starts a SuccessBatchHandler and a FailureBatchHandler to handle successfully sent and failed batches. After a handler completes the execution of the callback or setting the future of a batch, it releases the memory occupied by this batch for use by new data. Separate handling ensures the successfully sent and failed batches are isolated. This ensures the smooth operation of Producer.


To implement graceful shutdown, the following requirements must be met:

  1. When the close method returns the result to you, all threads in Producer must have been terminated. In addition, the cached data must have been properly processed, all callbacks registered by you have been executed, and all futures to be returned to you have been set.

To meet the preceding requirements, the close logic of Producer is designed as follows:

  1. Close LogAccumulator. If you continue to write data to LogAccumulator, an exception will be thrown.

As you can see, the graceful shutdown and safe exit are achieved by closing queues and threads one by one based on the data flow direction.


Alibaba Cloud LOG Java Producer is a comprehensive upgrade of the earlier version of Producer. It solves many problems with the earlier version, including the high CPU usage in the case of network exceptions, and slight data loss upon closing Producer. In addition, the fault tolerance mechanism is enhanced. Producer can ensure proper resource usage, high throughput, and strict isolation even after you make any operation mistakes.

Original Source

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store