Alibaba Cloud Logstash JDBC Implements Data Synchronization between Elasticsearch and Relational Databases

Overview

Prerequisites

RDS Data Preparation

create table student(
id BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY unique_id (id),
stuName nvarchar(5) not null,
stuSex nchar(1) check (stuSex in('男', '女')) default '男' ,
stuAge int check(stuAge>1),
stuDept nvarchar(20),
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

Logstash Synchronization Configuration

input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_driver_library => "/ssd/1/share/ls-cn-4591f1y6i003/logstash/current/config/custom/mysql-connector-java-8.0.17.jar"
jdbc_connection_string => "jdbc:mysql://rm-bp1p4vl011t9w9ksf.mysql.rds.aliyuncs.com:3306/terms? useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
jdbc_user => "zl_manager"
jdbc_password => "Elastic@123"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 50
statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM student WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
record_last_run => true
clean_run => true
tracking_column_type => "numeric"
tracking_column => "unix_ts_in_secs"
use_column_value => true
last_run_metadata_path => "/ssd/1/ls-cn-4591f1y6i003/logstash/data/student"
schedule => "*/5 * * * * *"
}
}
filter {
mutate{
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
elasticsearch {
hosts => "es-cn-mp91kzb8m0009pjzh.elasticsearch.aliyuncs.com:9200"
index => "student"
user => "elastic"
password => "Elastic@123"
doc_as_upsert => true
action => "update"
document_id => "%{[@metadata][_id]}"

}
}

Analysis of SELECT Statement Correctness

Intuitive Method 1

statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM student WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"

Intuitive Method 2

statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM student WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"

Solutions to the Problems Caused by the Intuitive Methods

(UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW())
update_time < NOW()

Test System

GET student/_search
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 5,
"max_score" : 1.0,
"hits" : [
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205002",
"_score" : 1.0,
"_source" : {
"update_time" : "2020-05-19T08:43:19.000Z",
"stuage" : 25,
"@timestamp" : "2020-05-19T09:08:02.173Z",
"stuname" : "张雨",
"studept" : "大个子",
"create_time" : "2020-05-19T08:43:19.000Z",
"stusex" : "男"
}
},
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205005",
"_score" : 1.0,
"_source" : {
"stuage" : 28,
"stuname" : "大禹",
"create_time" : "2020-05-19T12:12:22.000Z",
"stusex" : "男",
"update_time" : "2020-05-19T12:12:22.000Z",
"studept" : "咖啡",
"@timestamp" : "2020-05-19T12:13:00.160Z"
}
},
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205001",
"_score" : 1.0,
"_source" : {
"update_time" : "2020-05-19T08:42:39.000Z",
"stuage" : 27,
"@timestamp" : "2020-05-19T09:08:02.140Z",
"stuname" : "赵弘景",
"studept" : "健身",
"create_time" : "2020-05-19T08:42:39.000Z",
"stusex" : "男"
}
},
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205004",
"_score" : 1.0,
"_source" : {
"update_time" : "2020-05-19T08:44:54.000Z",
"stuage" : 23,
"@timestamp" : "2020-05-19T09:08:02.191Z",
"stuname" : "潘多",
"studept" : "跳舞",
"create_time" : "2020-05-19T08:44:54.000Z",
"stusex" : "女"
}
},
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205003",
"_score" : 1.0,
"_source" : {
"update_time" : "2020-05-19T08:44:11.000Z",
"stuage" : 26,
"@timestamp" : "2020-05-19T09:08:02.175Z",
"stuname" : "黄磊",
"studept" : "烹饪",
"create_time" : "2020-05-19T08:44:11.000Z",
"stusex" : "男"
}
}
]
}
}
update `student` set `stuDept`='好男人,烹饪大厨' where `id`='20205003';
GET student/_doc/20205003
{
"_index" : "student",
"_type" : "_doc",
"_id" : "20205003",
"_version" : 3,
"_seq_no" : 2,
"_primary_term" : 1,
"found" : true,
"_source" : {
"update_time" : "2020-05-19T12:27:42.000Z",
"stuage" : 26,
"@timestamp" : "2020-05-19T12:28:00.125Z",
"stuname" : "黄磊",
"studept" : "好男人,烹饪大厨",
"create_time" : "2020-05-19T08:44:11.000Z",
"stusex" : "男"
}
}
insert `student`(`id`,`stuName`,`stuSex`,`stuAge`,`stuDept`) values('20205008','悦来','女',28,'跳跳');

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

4.97K Followers

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com