Alibaba Cloud Logstash JDBC Implements Data Synchronization between Elasticsearch and Relational Databases

Overview

Prerequisites

RDS Data Preparation

create table student(
id BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY unique_id (id),
stuName nvarchar(5) not null,
stuSex nchar(1) check (stuSex in('男', '女')) default '男' ,
stuAge int check(stuAge>1),
stuDept nvarchar(20),
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

Logstash Synchronization Configuration

input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_driver_library => "/ssd/1/share/ls-cn-4591f1y6i003/logstash/current/config/custom/mysql-connector-java-8.0.17.jar"
jdbc_connection_string => "jdbc:mysql://rm-bp1p4vl011t9w9ksf.mysql.rds.aliyuncs.com:3306/terms? useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
jdbc_user => "zl_manager"
jdbc_password => "Elastic@123"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 50
statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM student WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
record_last_run => true
clean_run => true
tracking_column_type => "numeric"
tracking_column => "unix_ts_in_secs"
use_column_value => true
last_run_metadata_path => "/ssd/1/ls-cn-4591f1y6i003/logstash/data/student"
schedule => "*/5 * * * * *"
}
}
filter {
mutate{
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
elasticsearch {
hosts => "es-cn-mp91kzb8m0009pjzh.elasticsearch.aliyuncs.com:9200"
index => "student"
user => "elastic"
password => "Elastic@123"
doc_as_upsert => true
action => "update"
document_id => "%{[@metadata][_id]}"

}
}

Analysis of SELECT Statement Correctness

Intuitive Method 1

statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM student WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"

Intuitive Method 2

statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM student WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"

Solutions to the Problems Caused by the Intuitive Methods

(UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW())
update_time < NOW()

Test System

GET student/_search
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 5,
"max_score" : 1.0,
"hits" : [
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205002",
"_score" : 1.0,
"_source" : {
"update_time" : "2020-05-19T08:43:19.000Z",
"stuage" : 25,
"@timestamp" : "2020-05-19T09:08:02.173Z",
"stuname" : "张雨",
"studept" : "大个子",
"create_time" : "2020-05-19T08:43:19.000Z",
"stusex" : "男"
}
},
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205005",
"_score" : 1.0,
"_source" : {
"stuage" : 28,
"stuname" : "大禹",
"create_time" : "2020-05-19T12:12:22.000Z",
"stusex" : "男",
"update_time" : "2020-05-19T12:12:22.000Z",
"studept" : "咖啡",
"@timestamp" : "2020-05-19T12:13:00.160Z"
}
},
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205001",
"_score" : 1.0,
"_source" : {
"update_time" : "2020-05-19T08:42:39.000Z",
"stuage" : 27,
"@timestamp" : "2020-05-19T09:08:02.140Z",
"stuname" : "赵弘景",
"studept" : "健身",
"create_time" : "2020-05-19T08:42:39.000Z",
"stusex" : "男"
}
},
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205004",
"_score" : 1.0,
"_source" : {
"update_time" : "2020-05-19T08:44:54.000Z",
"stuage" : 23,
"@timestamp" : "2020-05-19T09:08:02.191Z",
"stuname" : "潘多",
"studept" : "跳舞",
"create_time" : "2020-05-19T08:44:54.000Z",
"stusex" : "女"
}
},
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205003",
"_score" : 1.0,
"_source" : {
"update_time" : "2020-05-19T08:44:11.000Z",
"stuage" : 26,
"@timestamp" : "2020-05-19T09:08:02.175Z",
"stuname" : "黄磊",
"studept" : "烹饪",
"create_time" : "2020-05-19T08:44:11.000Z",
"stusex" : "男"
}
}
]
}
}
update `student` set `stuDept`='好男人,烹饪大厨' where `id`='20205003';
GET student/_doc/20205003
{
"_index" : "student",
"_type" : "_doc",
"_id" : "20205003",
"_version" : 3,
"_seq_no" : 2,
"_primary_term" : 1,
"found" : true,
"_source" : {
"update_time" : "2020-05-19T12:27:42.000Z",
"stuage" : 26,
"@timestamp" : "2020-05-19T12:28:00.125Z",
"stuname" : "黄磊",
"studept" : "好男人,烹饪大厨",
"create_time" : "2020-05-19T08:44:11.000Z",
"stusex" : "男"
}
}
insert `student`(`id`,`stuName`,`stuSex`,`stuAge`,`stuDept`) values('20205008','悦来','女',28,'跳跳');

Original Source:

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Krunal Thakkar CHARUSAT UNIVERSITY

Docker and Kubernetes for developers

APPlying to Explore

Event Driven Microservices with Spring Cloud Stream

How to Install and Use Composer on Ubuntu 16.04

Tutorial: Run your Chainlink node and add External Adapters — Get your Ethereum prices

Animated Background with Pure CSS and Html

Xipology (⅔) — A library for turning DNS caching into a carrier

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

Blending Efficient Ingestion and Querying

Streaming MongoExport to Blob Store (S3)

Query prioritization in Apache Druid

Serverless Data Integration with AWS Glue