Alibaba Cloud

Mar 5, 2019

5 min read

Setting Up Log Service to Analyze Windows Event Logs

By Steve Chen, Solutions Architect

Solution Design and Implementation

Alibaba Cloud Log Service is a cost-effective and easy-to-manage product to collect local logs from all different log sources, centralize the log storage on the cloud, and analyze the logs in a various ways.

Following sections generally introduce a solution of how to collect logs from a Windows Active Directory (AD) server, store them in Alibaba Cloud Log Service, and define different metrics in the dashboard to analyze AD activities.

The solution architecture is presented in the following graph. Logtail is a log collection agent provided by Alibaba Cloud Log Service. You can use Logtail to collect logs from servers such as Alibaba Cloud Elastic Compute Service (ECS) instances in real time, however it can work directly with Windows Event Log. So we introduce an open source agent which is called Winlogbeat to be connected with the AD Event Log and output all the event logs to a local file in JSON format, and then Logtail will timely monitor any change in the JSON file and update the incremental log to the Log Service. Once Windows logs arrive the log service on the cloud, customer can either analyze the logs in the Log Service by using SQL scripts and dashboard metrics, or ship the logs to Object Storage Service (OSS) for achieve, or move to MaxCompute or EMR which are Alibaba Cloud’s big data warehouse products to process the data in a complex way which might involve machine learnings and AI.

Installing the Winlogbeat and Logtail Agent Locally on the AD Server

Make the configuration in the Winlogbeat to be able to connect to windows event log and save logs to a local JSON file. Following is just an example of the configuration.

- name: Security
ignore_older: 168h
hosts: ["elasticsearch.elastic.local:9200"] "winlogbeat"
template.path: "winlogbeat.template.json"
template.overwrite: false

Download and install Alibaba Cloud Logtail agent locally on the AD server as well, and the detail of the installation can be found in the following link. In the configuration file, make sure Logtail monitor the Winlogbeat JSON file.

Configuring Logtail and Logstore

After the proper configuration, you should be able to receive and see following Windows event logs from the AD server in the logstore console.

Log Analysis Cases by Using SQL Scripts

Logon Failure Statistics

* and event_id: 4625 and event_data.TargetDomainName: LOGSRCHDEMO

Logon Attempt against Disabled Account

* and event_id: 4625 and event_data.TargetDomainName: LOGSRCHDEMO and event_data.SubStatus: 0xc0000072

User Added to Admin Group

* and event_id: 4732 | select date_format(date_trunc('minute', __time__), '%m-%d %H:%i')  as time, count(1) as event_count group by time order by time limit 1000

Consecutive Logon Failures

(event_id: 4625 or event_id: 4624)   | select  date_trunc('minute' ,__time__) as time, computer_name to_system, "event_data.TargetUserName" failed_user, count(*) NoOfFailures from (select __time__, computer_name, "event_data.TargetUserName", "event_data.TargetDomainName" TargetDomainName_prev, event_id, lag(event_id, 1, '4624') over(PARTITION BY  computer_name, "event_data.TargetUserName" order by __time__,record_number   ) as pre_event_id from log   limit 10000)  where event_id='4625' and pre_event_id='4625' and TargetDomainName_prev= 'LOGSRCHDEMO' and computer_name is not null group by date_trunc('minute' ,__time__) , computer_name, "event_data.TargetUserName"

Create Custom Tables and Use Custom OSS Backed Tables in Dashboards

* | create table spk_demo_assets ( computer_name varchar,severity varchar,  owner varchar) with ( bucket='spk-logsrchdemo',endpoint='',accessid='LTAIH005FxpyZpRg',accesskey ='XL6otNimuROIJKQvcrT5RgLjWDg33R',objects=ARRAY['high_value_assets_unix.csv'],type='oss')

Consecutive Logon Failure against High Value Assets

(event_id: 4625  or event_id: 4624)   | select date_format(date_trunc('minute', __time__), '%m-%d %H:%i') as time, computer_name, severity AssetValue, user, count(1) as consecutive_failures from (select l.__time__, l.computer_name, l."event_data.TargetUserName" as user, l.event_id, "event_data.TargetDomainName" TargetDomainName, lag(l.event_id, 1, '4624') over(PARTITION BY  l.computer_name, l."event_data.TargetUserName" order by l.__time__,l.record_number   ) as pre_event_id, r.severity  from log  l left join spk_demo_assets r on l.computer_name = r.computer_name  limit 10000)  where event_id='4625' and pre_event_id='4625' and severity='high' and TargetDomainName = 'LOGSRCHDEMO'  group by date_format(date_trunc('minute', __time__), '%m-%d %H:%i') , computer_name, severity, user order by time desc limit 100

Data Presentation on Dashboard