How to Implement Data Replication Using Tunnel Service for Data in Tablestore

Preface

Using DataX for Data Replication in Tablestore

Using Tunnel Service for Data Replication in Tablestore

1. Configure the Replication Scope

{
"ots-reader": {
"endpoint": "https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com",
"instanceName": "zhuoran-high",
"tableName": "testSrcTable",
"accessId": "",
"accessKey": "",
"tunnelName": "testTunnel",
"endTime": "2019-06-19 17:00:00"
},
"ots-writer": {
"endpoint": "https://zhuoran-search.cn-hangzhou.ots.aliyuncs.com",
"instanceName": "zhuoran-search",
"tableName": "testDstTable",
"accessId": "",
"accessKey": "",
"batchWriteCount": 100
}
}

2. Compile the Main Logic

sourceClient = new SyncClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),
config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());
destClient = new SyncClient(config.getWriteConf().getEndpoint(), config.getWriteConf().getAccessId(),
config.getWriteConf().getAccessKey(), config.getWriteConf().getInstanceName());
if (destClient.listTable().getTableNames().contains(config.getWriteConf().getTableName())) {
System.out.println("Table is already exist: " + config.getWriteConf().getTableName());
} else {
DescribeTableResponse describeTableResponse = sourceClient.describeTable(
new DescribeTableRequest(config.getReadConf().getTableName()));
describeTableResponse.getTableMeta().setTableName(config.getWriteConf().getTableName());
describeTableResponse.getTableOptions().setMaxTimeDeviation(Long.MAX_VALUE / 1000000);
CreateTableRequest createTableRequest = new CreateTableRequest(describeTableResponse.getTableMeta(),
describeTableResponse.getTableOptions(),
new ReservedThroughput(describeTableResponse.getReservedThroughputDetails().getCapacityUnit()));
destClient.createTable(createTableRequest);
System.out.println("Create table success: " + config.getWriteConf().getTableName());
}
sourceTunnelClient = new TunnelClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),
config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());
List<TunnelInfo> tunnelInfos = sourceTunnelClient.listTunnel(
new ListTunnelRequest(config.getReadConf().getTableName())).getTunnelInfos();
String tunnelId = null;
TunnelInfo tunnelInfo = getTunnelInfo(config.getReadConf().getTunnelName(), tunnelInfos);
if (tunnelInfo != null) {
tunnelId = tunnelInfo.getTunnelId();
System.out.println(String.format("Tunnel is already exist, TunnelName: %s, TunnelId: %s",
config.getReadConf().getTunnelName(), tunnelId));
} else {
CreateTunnelResponse createTunnelResponse = sourceTunnelClient.createTunnel(
new CreateTunnelRequest(config.getReadConf().getTableName(),
config.getReadConf().getTunnelName(), TunnelType.BaseAndStream));
System.out.println("Create tunnel success: " + createTunnelResponse.getTunnelId());
}
backgroundExecutor = Executors.newScheduledThreadPool(2, new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "background-checker-" + counter.getAndIncrement());
}
});
backgroundExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DescribeTunnelResponse resp = sourceTunnelClient.describeTunnel(new DescribeTunnelRequest(
config.getReadConf().getTableName(), config.getReadConf().getTunnelName()
));
// 已同步完成
if (resp.getTunnelConsumePoint().getTime() > config.getReadConf().getEndTime()) {
System.out.println("Table copy finished, program exit!");
// 退出备份程序
shutdown();
}
}
}, 0, 2, TimeUnit.SECONDS);
if (tunnelId != null) {
sourceWorkerConfig = new TunnelWorkerConfig(
new OtsReaderProcessor(config.getReadConf(), config.getWriteConf(), destClient));
sourceWorkerConfig.setHeartbeatIntervalInSec(15);
sourceWorker = new TunnelWorker(tunnelId, sourceTunnelClient, sourceWorkerConfig);
sourceWorker.connectAndWorking();
}

3. Compile the Data Synchronization Logic (OtsReaderProcessor)

public void process(ProcessRecordsInput input) {
System.out.println(String.format("Begin process %d records.", input.getRecords().size()));
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
int count = 0;
for (StreamRecord record : input.getRecords()) {
if (record.getSequenceInfo().getTimestamp() / 1000 > readConf.getEndTime()) {
System.out.println(String.format("skip record timestamp %d larger than endTime %d",
record.getSequenceInfo().getTimestamp() / 1000, readConf.getEndTime()));
continue;
}
count++;
switch (record.getRecordType()) {
case PUT:
RowPutChange putChange = new RowPutChange(writeConf.getTableName(), record.getPrimaryKey());
putChange.addColumns(getColumns(record));
batchWriteRowRequest.addRowChange(putChange);
break;
case UPDATE:
RowUpdateChange updateChange = new RowUpdateChange(writeConf.getTableName(),
record.getPrimaryKey());
for (RecordColumn column : record.getColumns()) {
switch (column.getColumnType()) {
case PUT:
updateChange.put(column.getColumn());
break;
case DELETE_ONE_VERSION:
updateChange.deleteColumn(column.getColumn().getName(),
column.getColumn().getTimestamp());
break;
case DELETE_ALL_VERSION:
updateChange.deleteColumns(column.getColumn().getName());
break;
default:
break;
}
}
batchWriteRowRequest.addRowChange(updateChange);
break;
case DELETE:
RowDeleteChange deleteChange = new RowDeleteChange(writeConf.getTableName(),
record.getPrimaryKey());
batchWriteRowRequest.addRowChange(deleteChange);
break;
default:
break;
}
if (count == writeConf.getBatchWriteCount()) {
System.out.println("BatchWriteRow: " + count);
writeClient.batchWriteRow(batchWriteRowRequest);
batchWriteRowRequest = new BatchWriteRowRequest();
count = 0;
}
}
// 写最后一次的数据。
if (!batchWriteRowRequest.isEmpty()) {
System.out.println("BatchWriteRow: " + count);
writeClient.batchWriteRow(batchWriteRowRequest);
}
}

4. Technical Annotation

The Road Ahead

Original Source:

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Add Authentication and Billing to Your API on AWS [Tutorial]

source-2-1

VirtueMart vs. Hikashop Comparison

How I was able to bypass the admin portal by using the default credentials in BBC Corporation.

Production in Quarantine

The PlutosNetwork Synthetic System V2.0

PurgeCSS 2.0

10+ Best Jenkins Tutorials for Beginners - Learn Jenkins Online

Drown into programming because of a street light pole 🚨

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

Smart Explanation On Why Hadoop Course Is Important!

Part 2: How to install DBeaver on Linux server and Connect to Secure Cloudera Impala from Windows…

Apache Spark: Summary 1 — Basic

How Taboola Powers the Conversion Data Pipe