How to Implement Data Replication Using Tunnel Service for Data in Tablestore

Preface

Using DataX for Data Replication in Tablestore

Using Tunnel Service for Data Replication in Tablestore

1. Configure the Replication Scope

{
"ots-reader": {
"endpoint": "https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com",
"instanceName": "zhuoran-high",
"tableName": "testSrcTable",
"accessId": "",
"accessKey": "",
"tunnelName": "testTunnel",
"endTime": "2019-06-19 17:00:00"
},
"ots-writer": {
"endpoint": "https://zhuoran-search.cn-hangzhou.ots.aliyuncs.com",
"instanceName": "zhuoran-search",
"tableName": "testDstTable",
"accessId": "",
"accessKey": "",
"batchWriteCount": 100
}
}

2. Compile the Main Logic

sourceClient = new SyncClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),
config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());
destClient = new SyncClient(config.getWriteConf().getEndpoint(), config.getWriteConf().getAccessId(),
config.getWriteConf().getAccessKey(), config.getWriteConf().getInstanceName());
if (destClient.listTable().getTableNames().contains(config.getWriteConf().getTableName())) {
System.out.println("Table is already exist: " + config.getWriteConf().getTableName());
} else {
DescribeTableResponse describeTableResponse = sourceClient.describeTable(
new DescribeTableRequest(config.getReadConf().getTableName()));
describeTableResponse.getTableMeta().setTableName(config.getWriteConf().getTableName());
describeTableResponse.getTableOptions().setMaxTimeDeviation(Long.MAX_VALUE / 1000000);
CreateTableRequest createTableRequest = new CreateTableRequest(describeTableResponse.getTableMeta(),
describeTableResponse.getTableOptions(),
new ReservedThroughput(describeTableResponse.getReservedThroughputDetails().getCapacityUnit()));
destClient.createTable(createTableRequest);
System.out.println("Create table success: " + config.getWriteConf().getTableName());
}
sourceTunnelClient = new TunnelClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),
config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());
List<TunnelInfo> tunnelInfos = sourceTunnelClient.listTunnel(
new ListTunnelRequest(config.getReadConf().getTableName())).getTunnelInfos();
String tunnelId = null;
TunnelInfo tunnelInfo = getTunnelInfo(config.getReadConf().getTunnelName(), tunnelInfos);
if (tunnelInfo != null) {
tunnelId = tunnelInfo.getTunnelId();
System.out.println(String.format("Tunnel is already exist, TunnelName: %s, TunnelId: %s",
config.getReadConf().getTunnelName(), tunnelId));
} else {
CreateTunnelResponse createTunnelResponse = sourceTunnelClient.createTunnel(
new CreateTunnelRequest(config.getReadConf().getTableName(),
config.getReadConf().getTunnelName(), TunnelType.BaseAndStream));
System.out.println("Create tunnel success: " + createTunnelResponse.getTunnelId());
}
backgroundExecutor = Executors.newScheduledThreadPool(2, new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "background-checker-" + counter.getAndIncrement());
}
});
backgroundExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DescribeTunnelResponse resp = sourceTunnelClient.describeTunnel(new DescribeTunnelRequest(
config.getReadConf().getTableName(), config.getReadConf().getTunnelName()
));
// 已同步完成
if (resp.getTunnelConsumePoint().getTime() > config.getReadConf().getEndTime()) {
System.out.println("Table copy finished, program exit!");
// 退出备份程序
shutdown();
}
}
}, 0, 2, TimeUnit.SECONDS);
if (tunnelId != null) {
sourceWorkerConfig = new TunnelWorkerConfig(
new OtsReaderProcessor(config.getReadConf(), config.getWriteConf(), destClient));
sourceWorkerConfig.setHeartbeatIntervalInSec(15);
sourceWorker = new TunnelWorker(tunnelId, sourceTunnelClient, sourceWorkerConfig);
sourceWorker.connectAndWorking();
}

3. Compile the Data Synchronization Logic (OtsReaderProcessor)

public void process(ProcessRecordsInput input) {
System.out.println(String.format("Begin process %d records.", input.getRecords().size()));
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
int count = 0;
for (StreamRecord record : input.getRecords()) {
if (record.getSequenceInfo().getTimestamp() / 1000 > readConf.getEndTime()) {
System.out.println(String.format("skip record timestamp %d larger than endTime %d",
record.getSequenceInfo().getTimestamp() / 1000, readConf.getEndTime()));
continue;
}
count++;
switch (record.getRecordType()) {
case PUT:
RowPutChange putChange = new RowPutChange(writeConf.getTableName(), record.getPrimaryKey());
putChange.addColumns(getColumns(record));
batchWriteRowRequest.addRowChange(putChange);
break;
case UPDATE:
RowUpdateChange updateChange = new RowUpdateChange(writeConf.getTableName(),
record.getPrimaryKey());
for (RecordColumn column : record.getColumns()) {
switch (column.getColumnType()) {
case PUT:
updateChange.put(column.getColumn());
break;
case DELETE_ONE_VERSION:
updateChange.deleteColumn(column.getColumn().getName(),
column.getColumn().getTimestamp());
break;
case DELETE_ALL_VERSION:
updateChange.deleteColumns(column.getColumn().getName());
break;
default:
break;
}
}
batchWriteRowRequest.addRowChange(updateChange);
break;
case DELETE:
RowDeleteChange deleteChange = new RowDeleteChange(writeConf.getTableName(),
record.getPrimaryKey());
batchWriteRowRequest.addRowChange(deleteChange);
break;
default:
break;
}
if (count == writeConf.getBatchWriteCount()) {
System.out.println("BatchWriteRow: " + count);
writeClient.batchWriteRow(batchWriteRowRequest);
batchWriteRowRequest = new BatchWriteRowRequest();
count = 0;
}
}
// 写最后一次的数据。
if (!batchWriteRowRequest.isEmpty()) {
System.out.println("BatchWriteRow: " + count);
writeClient.batchWriteRow(batchWriteRowRequest);
}
}

4. Technical Annotation

The Road Ahead

Original Source:

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Quarkus & Karate

VMware ESX vs. ESXi: Overview of Key Differences

Revisiting School Days — Recursion & Recursive functions

CS 373 Spring 2021 Final Blog: Amy Ouyang

Everything You NEED To Know About Chrome Cast and Its Development

Conquering the Microservices Dependency Hell at Postman, with Postman (Part 1 — Introduction)

A closer look at the Okio library

Try using Github Actions for beginner

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

Getting Started with Data Selection

Performance Analysis of loading Transformed Data from S3 and HDFS to Redshift

Creating SCD Type 0–4 in Microsoft SQL Server using Azure Data Studio

Apache Pig for Big Data Analysis