Alibaba Cloud LOG Java Producer — A Powerful Tool to Migrate Logs to the Cloud
By Bruce Wu
Logs are ubiquitous. As a carrier that records changes in the world, logs are widely used in many fields, such as marketing, research and development, operation, security, BI, and auditing.
Alibaba Log Service is an all-in-one service platform for log data. Its core component LogHub has become an infrastructure of big data processing, especially real-time data processing, by virtue of outstanding features such as high throughput, low latency, and auto-scaling. Jobs running on big data computing engines such as Flink, Spark, and Storm write data processing results or intermediate results to LogHub in real time. With data from LogHub, downstream systems are able to provide many services such as query analysis, monitoring alarms, machine learning, and iterative calculation. The LogHub big data processing architecture is provided in the following figure.
To ensure that the system runs properly, you must use convenient and highly efficient data writing methods. Directly using APIs or SDKs are insufficient to meet data writing capability requirements in the big data scenario. In this context, Alibaba Cloud LOG Java Producer was developed.
Alibaba Cloud LOG Java Producer is an easy to use and highly configurable Java class library. It has the following features:
- Thread safe: All methods exposed by Alibaba Cloud LOG Java Producer (the “Producer”) are thread-safe.
- Asynchronous sending: A call to the SEND method of Producer usually returns immediately and does not wait for the data to be sent or a response to be received from the server. Producer has an internal caching mechanism (LogAcccumulator) to cache data to be sent in batches, and it sends data in batches to improve the throughput.
- Automatic retry: Producer provides an automatic and configurable retry mechanism (RetryQueue) for retriable exceptions. You can set the maximum retry times and backoff period for the RetryQueue.
- Traceability: You can use a callback or future to learn about whether the target data has been sent successfully, and the attempts that have been made to send the data. This feature allows you to trace problems and make decisions to solve the problems.
- Context restore: Logs generated by the same Producer are in the same context, and the relevant logs before and after a certain log can be viewed on the server side.
- Graceful shutdown: When the close method returns the result, all data cached by Producer are processed and you can receive the notice accordingly.
Using Producer to write data to LogHub has the following advantages in comparison with using APIs or SDKs:
With large amounts of data and limited resources, to achieve the desired throughput, you need to implement complex logic, such as multi-threading, cache policy, batching, and retries in case of failures. Producer implements the preceding logic to improve your application performance and to simplify your application development process.
Asynchronous and Non-Blocking Task Execution
With sufficient cache memory, Producer caches data to be sent to LogHub. When you call the send method, the specified data is sent immediately without blocking the process. This achieves the separation of computing and the I/O logic. Later, you can obtain the data sending result from the returned future object or the registered callback.
Controllable Resources Usage
The size of memory used by Producer to cache the data to be sent can be controlled by parameters as well as the number of threads used to perform data sending tasks. This can avoid unrestricted resource consumption by Producer. In addition, it allows you to balance the resource consumption and write throughput according to the actual situation.
To sum up, Producer provides many advantages by automatically handling the complex underlying details and exposing simple interfaces. In addition, it does not affect normal operations of upper layer services, significantly reducing the data access threshold.
To help you better understand the performance of Producer, this section describes how it works, including its data writing logic, implementation of core components, and graceful shutdown. The overall architecture of Producer is presented in the following figure.
The data write logic of Producer:
- After you call the
producer.send()method to send data to your specified logstore, the data will be loaded to a producer batch in LogAccumulator. Generally, the send method returns the results immediately. However, when your Producer instance does not have sufficient room to store the target data, the send method will be blocked until any one of the following conditions is met:
The previously cached data has been processed by the batch handler, and the memory occupied by such data is released. As a result, Producer has sufficient room to store the target data.
An exception is thrown when the specified blocking time is exceeded.
2. When you call producer.send(), the number of logs in the target batch may exceed the maxBatchCount, or the target batch does not have sufficient room for the target data. In this case, Producer first sends the target batch to IOThreadPool, and then creates a new batch to store the target data. To avoid blocking your threads, IOThreadPool uses an unbounded blocking queue. The number of logs that can be cached in a Producer instance is limited, so length of the queue does not grow infinitely.
3. Mover traverses each producer batch of LogAccumulator, and sends batches that have exceeded the maximum cache time to expiredBatches. It also records the earliest expiring time (t) of the unexpired batches.
4. Then it sends expired batches from LogAccumulator to IOThreadPool.
5. After that, Mover retrieves producer batches that meet the sending conditions from RetryQueue. If no batches meet the conditions, Mover waits for a period of t.
6. Then it sends expired batches from RetryQueue to IOThreadPool. After completing step 6, Mover repeats steps 3 to 6.
7. A worker thread of IOThreadPool sends batches from the blocked queue to the target logstore.
8. After a batch is sent to logstore, it goes to the success queue.
9. If it fails to be sent and meets any of the following conditions, it goes to the failure queue:
The failed batch cannot be retried.
RetryQueue is closed.
The specified retry times is reached, and the number of batches in the failure queue does not exceed 1/2 of the total number of batches to be sent.
10. Otherwise, the worker thread calculates the next sending time of the failed batch, and sends it to RetryQueue.
11. The SuccessBatchHandler thread pulls a batch from the success queue, and executes all callbacks registered for this batch.
12. The FailureBatchHandler thread pulls a batch from the failure queue, and executes all callbacks registered for this batch.
To improve the throughput, a common practice is to accumulate data into larger batches, and send data in batches. The main role of LogAccumulator described in this section is to merge data into batches. To merge different data into a big batch, the data must have the same project, logstore, topic, source, and shardHash properties. LogAccumulator caches these data to different positions of the internal map based on these properties. The key of the map is the quintuple of the above five properties, and the value is ProducerBatch. To ensure thread safety and high concurrency, ConcurrentMap is used.
Another function of LogAccumulator is to control the total size of cached data. Semaphore is used to implement the control logic. Semaphore is an AbstractQueuedSynchronizer-based (AQS-based) synchronization tool with high performance. It first tries to obtain shared resources through spinning, and to reduce the context switch overhead.
RetryQueue is used to store batches that have failed to be sent and are waiting to be retried. Each of these batches have a field to indicate the time to send batch. To efficiently pull expired batches, Producer has a DelayQueue to store these batches. DelayQueue is a time-based priority queue, and the earliest expiring batch will be processed first. This queue is thread-safe.
Mover is an independent thread. It periodically sends expired batches from LogAccumulator and RetryQueue to IOThreadPool. Mover occupies CPU resources even when it is idle. To avoid wasting CPU resources, Mover waits for expired batches from RetryQueue for a period when it cannot find qualified batches to be sent from LogAccumulator and RetryQueue. This period is the maximum cache time lingerMs that you have configured.
The worker thread in IOThreadPool sends data to the logstore. The size of IOThreadPool can be specified by the ioThreadCount parameter, and the default value is twice the number of processors.
SendProducerBatchTask is encapsulated with the batch sending logic. To avoid blocking I/O threads, no matter whether the target batch is successfully sent, SendProducerBatchTask sends the target batch to a separate queue for callback execution. In addition, if a failed batch satisfies the retry conditions, it is not immediately resent in the current I/O thread. If it is immediately resent, it usually fails again. Instead, SendProducerBatchTask sends it to the RetryQueue according to the exponential backoff policy.
Producer starts a SuccessBatchHandler and a FailureBatchHandler to handle successfully sent and failed batches. After a handler completes the execution of the callback or setting the future of a batch, it releases the memory occupied by this batch for use by new data. Separate handling ensures the successfully sent and failed batches are isolated. This ensures the smooth operation of Producer.
To implement graceful shutdown, the following requirements must be met:
- When the close method returns the result to you, all threads in Producer must have been terminated. In addition, the cached data must have been properly processed, all callbacks registered by you have been executed, and all futures to be returned to you have been set.
- You should be able to set the maximum waiting period for the close method. The method must immediately return the result to you after this period is exceeded, regardless of whether the threads have been terminated and whether the cached data has been processed.
- The close method can be called multiple times and work properly, even in a multi-threading environment.
- Calling the close method in callbacks is safe and will not cause deadlock to the application.
To meet the preceding requirements, the close logic of Producer is designed as follows:
- Close LogAccumulator. If you continue to write data to LogAccumulator, an exception will be thrown.
- Close RetryQueue. If you continue to send batches to RetryQueue, an exception will be thrown.
- Close Mover and wait for it to completely exit. After detecting the close signal, Mover will send all remaining batches from LogAccumulator and RetryQueue to IOThreadPool, regardless of whether they meet the sending conditions. To avoid data loss, Mover constantly pulls batches from LogAccumulator and RetryQueue, until no other threads are writing.
- Close IOThreadPool and wait for all submitted tasks to complete. When RetryQueue has already been closed, failed batches will be directly sent to the failure queue.
- Close SuccessBatchHandler and wait for it to completely exit. If a close method is called in a callback, the waiting process will be skipped. After detecting the close signal, SuccessBatchHandler pulls all batches from the success queue and processes them one by one.
- Close FailureBatchHandler and wait for it to completely exit. If a close method is called in a callback, the waiting process will be skipped. After detecting the close signal, FailureBatchHandler pulls all batches from the failure queue and processes them one by one.
As you can see, the graceful shutdown and safe exit are achieved by closing queues and threads one by one based on the data flow direction.
Alibaba Cloud LOG Java Producer is a comprehensive upgrade of the earlier version of Producer. It solves many problems with the earlier version, including the high CPU usage in the case of network exceptions, and slight data loss upon closing Producer. In addition, the fault tolerance mechanism is enhanced. Producer can ensure proper resource usage, high throughput, and strict isolation even after you make any operation mistakes.