Alibaba Cloud Logstash JDBC Implements Data Synchronization between Elasticsearch and Relational Databases

Overview

Prerequisites

RDS Data Preparation

create table student(
id BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY unique_id (id),
stuName nvarchar(5) not null,
stuSex nchar(1) check (stuSex in('男', '女')) default '男' ,
stuAge int check(stuAge>1),
stuDept nvarchar(20),
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

Logstash Synchronization Configuration

input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_driver_library => "/ssd/1/share/ls-cn-4591f1y6i003/logstash/current/config/custom/mysql-connector-java-8.0.17.jar"
jdbc_connection_string => "jdbc:mysql://rm-bp1p4vl011t9w9ksf.mysql.rds.aliyuncs.com:3306/terms? useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
jdbc_user => "zl_manager"
jdbc_password => "Elastic@123"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 50
statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM student WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
record_last_run => true
clean_run => true
tracking_column_type => "numeric"
tracking_column => "unix_ts_in_secs"
use_column_value => true
last_run_metadata_path => "/ssd/1/ls-cn-4591f1y6i003/logstash/data/student"
schedule => "*/5 * * * * *"
}
}
filter {
mutate{
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
elasticsearch {
hosts => "es-cn-mp91kzb8m0009pjzh.elasticsearch.aliyuncs.com:9200"
index => "student"
user => "elastic"
password => "Elastic@123"
doc_as_upsert => true
action => "update"
document_id => "%{[@metadata][_id]}"

}
}

Analysis of SELECT Statement Correctness

Intuitive Method 1

statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM student WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"

Intuitive Method 2

statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM student WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"

Solutions to the Problems Caused by the Intuitive Methods

(UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW())
update_time < NOW()

Test System

GET student/_search
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 5,
"max_score" : 1.0,
"hits" : [
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205002",
"_score" : 1.0,
"_source" : {
"update_time" : "2020-05-19T08:43:19.000Z",
"stuage" : 25,
"@timestamp" : "2020-05-19T09:08:02.173Z",
"stuname" : "张雨",
"studept" : "大个子",
"create_time" : "2020-05-19T08:43:19.000Z",
"stusex" : "男"
}
},
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205005",
"_score" : 1.0,
"_source" : {
"stuage" : 28,
"stuname" : "大禹",
"create_time" : "2020-05-19T12:12:22.000Z",
"stusex" : "男",
"update_time" : "2020-05-19T12:12:22.000Z",
"studept" : "咖啡",
"@timestamp" : "2020-05-19T12:13:00.160Z"
}
},
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205001",
"_score" : 1.0,
"_source" : {
"update_time" : "2020-05-19T08:42:39.000Z",
"stuage" : 27,
"@timestamp" : "2020-05-19T09:08:02.140Z",
"stuname" : "赵弘景",
"studept" : "健身",
"create_time" : "2020-05-19T08:42:39.000Z",
"stusex" : "男"
}
},
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205004",
"_score" : 1.0,
"_source" : {
"update_time" : "2020-05-19T08:44:54.000Z",
"stuage" : 23,
"@timestamp" : "2020-05-19T09:08:02.191Z",
"stuname" : "潘多",
"studept" : "跳舞",
"create_time" : "2020-05-19T08:44:54.000Z",
"stusex" : "女"
}
},
{
"_index" : "student",
"_type" : "doc",
"_id" : "20205003",
"_score" : 1.0,
"_source" : {
"update_time" : "2020-05-19T08:44:11.000Z",
"stuage" : 26,
"@timestamp" : "2020-05-19T09:08:02.175Z",
"stuname" : "黄磊",
"studept" : "烹饪",
"create_time" : "2020-05-19T08:44:11.000Z",
"stusex" : "男"
}
}
]
}
}
update `student` set `stuDept`='好男人,烹饪大厨' where `id`='20205003';
GET student/_doc/20205003
{
"_index" : "student",
"_type" : "_doc",
"_id" : "20205003",
"_version" : 3,
"_seq_no" : 2,
"_primary_term" : 1,
"found" : true,
"_source" : {
"update_time" : "2020-05-19T12:27:42.000Z",
"stuage" : 26,
"@timestamp" : "2020-05-19T12:28:00.125Z",
"stuname" : "黄磊",
"studept" : "好男人,烹饪大厨",
"create_time" : "2020-05-19T08:44:11.000Z",
"stusex" : "男"
}
}
insert `student`(`id`,`stuName`,`stuSex`,`stuAge`,`stuDept`) values('20205008','悦来','女',28,'跳跳');

Original Source:

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

What’s New in Redis 4.0?

Starting with Ansible

Web+: Alibaba Cloud’s Web App Service (Part 2)

Your First Flutter App: The Flutter Widgets Explorer

Operationalizing the EDM Council’s New Cloud Data Management Capabilities (CDMC) Framework

Distributing Objects in Cluster of Cache Nodes

DRDS Read-only Instance for Complex SQL Queries

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

CockroachDB — Postgres for the cloud

How to Connect Elastic Sink Connector with Kafka

Setup HTTPs Authentication for Presto / Trino

Export JMX metrics from Confluent KSQL to Datadog