Integrating Alibaba Cloud Log Service with Splunk

This article describes how to integrate Alibaba Cloud Log Service with Splunk to ensure all compliance, auditing, and other related logs can be ingested into your Security Operation Center (e.g. Splunk).

Related Terms

  1. Log Service: Alibaba Cloud Log Service, also known as Simple Log Service (SLS).

Audit Related Logs

Below table describe the logs available in Log Service that might be applicable to the Security Operations Team.

Note: Regions are updated regularly. Please refer to the product documentation for the latest status.

Alibaba Cloud Log Service

As a one-stop service for log data, Log Service experiences massive big data scenarios of Alibaba Group. Log Service allows you to quickly complete the collection, consumption, shipping, query, and analysis of log data without the need for development, which improves the Operation & Maintenance (O&M) efficiency and the operational efficiency, and builds the processing capabilities to handle massive logs in the DT (data technology) era.

Alibaba Cloud Log Service has the following architecture:

Integration Proposal



A project is the Log Service’s resource management unit, used to isolate and control resources.


The Logstore is a unit in Log Service for the collection, storage, and query of log data. Each Logstore belongs to a project, and each project can create multiple Logstores.


Logstore read/write logs must be stored in a certain shard. Each Logstore is divided into several shards and each shard is composed of MD5 left-closed and right-open intervals. Each interval range does not overlap with others and the total range of all the intervals is the entire MD5 value range.


The Log Service endpoint is a URL used to access a project and logs within the project, and is associated with the Alibaba Cloud region where the project resides and the project name.


Alibaba Cloud AccessKey is a “secure password” designed for you to access your cloud resources by using APIs (not the console). You can use the AccessKey to sign API request content to pass the security authentication in Log Service.


User’s SIEM (e.g. Splunk) are located in on-premise Env rather than Cloud. For security consideration, no Port will be accessible to SIEM from external env.


It’s recommended to built a program with SLS Consumer Group which real-time consume logs from Log Service, then use Splunk API (HEC) to send to Splunk.

Program with Consumer Group

The consumer library is an advanced mode of log consumption in Log Service, and provides the consumer group concept to abstract and manage the consumption end. Compared with using SDKs directly to read data, you can only focus on the business logic by using the consumer library, without caring about the implementation details of Log Service, or the load balancing or failover between consumers.

Spark Streaming, Storm, and Flink connector use consumer library as the base implementation.

Basic Concepts

Consumer Group — A consumer group is composed of multiple consumers. Consumers in the same consumer group consume the data in the same Logstore and the data consumed by each consumer is different.

Consumer — Consumers, as a unit that composes the consumer group, must consume data. The names of consumers in the same consumer group must be unique.

In Log Service, a Logstore can have multiple shards. The consumer library is used to allocate a shard to the consumers in a consumer group. The allocation rules are as follows:

Each shard can only be allocated to one consumer.

One consumer can have multiple shards at the same time.

After a new consumer is added to a consumer group, the affiliations of the shards for this consumer group is adjusted to achieve the load balancing of consumption. However, the preceding allocation rules are not changed. The allocation process is transparent to users.

The consumer library can also save the checkpoint, which allows consumers to consume data starting from the breakpoint after the program fault is resolved and makes sure that the data is consumed only once.

Deployment Proposal

Hardware Proposal

Host Spec:

A host is needed to run your program, A Linux (e.g. Ubuntu x64) with hardware spec is recommended:

  1. 2.0+ GHZ X 8 Core 32G

Network Spec:

The bandwidth from your on-premise env to Alibaba Cloud should be large enough to ensure the speed of data consumption is not slower than the speed of data generation.

Usage (Python)

Here we describe the program using Consumer Group in Python. For Java Usage Introduction, refer to this documentation.

Note: You combine all example into one file in GitHub, you could get the latest version of the example here.



  1. PyPy3 is highly recommended to run the program rather than CPython.
  • pypy3 -m pip install aliyun-log-python-sdk -U
  1. For more guide on SLS Python SDK, refer to the guide.

Program Configuration

The following code shows you how to configure the program:

  1. local debugging rotation log file (sync_data_to_splunk.log) for diagnose purpose.

Please read the comments carefully and make adjustments whenever necessary.

import os
import logging
from logging.handlers import RotatingFileHandler
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
logger = logging.getLogger(__name__)def get_option():
# Basic options
# load connection info env and consumer group name from envs
endpoint = os.environ.get('SLS_ENDPOINT', '')
accessKeyId = os.environ.get('SLS_AK_ID', '')
accessKey = os.environ.get('SLS_AK_KEY', '')
project = os.environ.get('SLS_PROJECT', '')
logstore = os.environ.get('SLS_LOGSTORE', '')
consumer_group = os.environ.get('SLS_CG', '')
# Some advanced options
# DON'T configure the consumer name especially when you need to run this program in parallel
consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)
# This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
# Could be "begin", "end", "specific time format in ISO", it's log receiving time.
cursor_start_time = "2018-12-26 0:0:0"
# once a client doesn't report to server * heartbeat_interval * 2 interval, server will consider it's offline and re-assign its task to another consumer.
# thus don't set the heartbeat interval too small when the network bandwidth or performance of consumption is not so good.
heartbeat_interval = 20
# if the coming data source data is not so frequent, please don't configure it too small (<1s)
data_fetch_interval = 1
# create one consumer in the consumer group
option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
# Splunk options
settings = {
"host": "",
"port": 80,
"token": "a023nsdu123123123",
'https': False, # optional, bool
'timeout': 120, # optional, int
'ssl_verify': True, # optional, bool
"sourcetype": "", # optional, sourcetype
"index": "", # optional, index
"source": "", # optional, source
return option, settings

Data Consumption and Forwarding

The code below shows you how to process data fetched from SLS and forward them into Splunk.

from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import time
import json
import socket
import requests
class SyncData(ConsumerProcessorBase):
this consumer will keep sync data from SLS to Splunk
def __init__(self, splunk_setting):
"""init the processor and verify connections with Splunk"""
super(SyncData, self).__init__() # remember to call base's init
assert splunk_setting, ValueError("You need to configure settings of remote target")
assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")
self.option = splunk_setting
self.timeout = self.option.get("timeout", 120)
# Testing connectivity
s = socket.socket()
s.connect((self.option["host"], self.option['port']))
self.r = requests.session()
self.r.max_redirects = 1
self.r.verify = self.option.get("ssl_verify", True)
self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
self.url = "{0}://{1}:{2}/services/collector/event".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])
self.default_fields = {}
if self.option.get("sourcetype"):
self.default_fields['sourcetype'] = self.option.get("sourcetype")
if self.option.get("source"):
self.default_fields['source'] = self.option.get("source")
if self.option.get("index"):
self.default_fields['index'] = self.option.get("index")
def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)"Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
for log in logs:
# Put your sync code here to send to remote.
# the format of log is just a dict with example as below (Note, all strings are unicode):
# Python2: {u"__time__": u"12312312", u"__topic__": u"topic", u"field1": u"value1", u"field2": u"value2"}
# Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
event = {}
if log.get(u"__topic__") == 'audit_log':
# suppose we only care about audit log
event['time'] = log[u'__time__']
event['fields'] = {}
del log['__time__']
data = json.dumps(event, sort_keys=True) try:
req =, data=data, timeout=self.timeout)
except Exception as err:
logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}", self.url, err)
# TODO: add some error handling here or retry etc."Complete send data to remote") self.save_checkpoint(check_point_tracker)

Main Control

The following code shows how to start the program.

def main():
option, settings = get_monitor_option()"*** start to consume data...")
worker = ConsumerWorker(SyncData, option, args=(settings,) )
if __name__ == '__main__':


Suppose the program is saved as “”, we could launch it as:

export SLS_ENDPOINT=<Endpoint of your region>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_CG=<Consumer Group Name, could just be "syc_data">

Limitation and restriction

Each Logstore can create at most 10 consumer groups. The error ConsumerGroupQuotaExceed is reported when the number exceeds the limit.


You can monitor your logs using Log Service. You can view consumer group status as well as create a consumer group monitoring alarm

Performance Consideration

Running Multiple Consumers

The program could be launched for multiple times to have multiple consumers to process in parallel.

nohup pypy3 &
nohup pypy3 &
nohup pypy3 &

Note: All consumers should using same consumer group name with different consumer name. Since one shard could only be consumed by one consumer, suppose you have 10 shard, you could have up to 10 consumers consuming in parallel.


if the endpoint is configured with prefix https:// e.g., the connection will automatically be encrypted in https.

The certification of * is GlobalSign, by default most Linux/Windows machines should already trust it by default. In case your mahcine don't trust it, you could download the cert and install&trust it on the machine running the program.

Refer to this guide for detail.


Basing on test, w/o bottleneck of network bandwidth, receiver side speed (Splunk side) limitation, launch the example above using pypy3 under the recommended hardware spec. Each consumer could consume up to 5 MB/s in raw logs size using less than 10% of one CPU-core. Thus, theoretically, it could be 50 MB/s per CPU core, which is about 4 TB/day per CPU core.

Note: this is highly depending on your bandwidth, hardware spec and how fast your SIEM (e.g. Splunk) could receive the data and the count of your shard.

High Availability

Since Consumer Group will save check-point on server side. When one consumer is stopped, other consumer will take over its shard to continually consume.

You could also start the consumer in different machines, so when one machine is shut down or broken, consumers on another machine could take over the tasks automatically.

You could start more consumers than the count of shard for backup purposes.

Further Reading

  1. Log Service Product Page official product page of the Log Service.


Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.