How to Migrate Data from an ECS-hosted User-created Elasticsearch Cluster to an Alibaba Cloud Elasticsearch Cluster

Prerequisites

Create Indexes

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# File name:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## Old cluster host(ip+port)
oldClusterHost = "old-cluster.com"
## Username of old cluster, can be null
oldClusterUserName = "old-username"
## Password of old cluster, can be null
oldClusterPassword = "old-password"
## New cluster host(ip+port)
newClusterHost = "new-cluster.com"
## Username of new cluster, can be null
newClusterUser = "new-username"
## Password of new cluster, can be null
newClusterPassword = "new-password"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
conn = httplib.HTTPConnection(host)
headers = {}
if (username != "") :
'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
headers["Authorization"] = "Basic %s" % base64string;
if "GET" == method:
headers["Content-Type"] = "application/x-www-form-urlencoded"
conn.request(method=method, url=endpoint, headers=headers)
else :
headers["Content-Type"] = "application/json"
conn.request(method=method, url=endpoint, body=params, headers=headers)
response = conn.getresponse()
res = response.read()
return res
def httpGet(host, endpoint, username="", password=""):
return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
endpoint = "/_cat/indices"
indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
indicesList = indicesResult.split("\n")
indexList = []
for indices in indicesList:
if (indices.find("open") > 0):
indexList.append(indices.split()[2])
return indexList
def getSettings(index, host, username="", password=""):
endpoint = "/" + index + "/_settings"
indexSettings = httpGet(host, endpoint, username, password)
print index + " 原始settings如下:\n" + indexSettings
settingsDict = json.loads(indexSettings)
## Shard number is the same as that of old cluster index by default
number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
## Number of replicas is 0 by default
number_of_replicas = DEFAULT_REPLICAS
newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
return newSetting
def getMapping(index, host, username="", password=""):
endpoint = "/" + index + "/_mapping"
indexMapping = httpGet(host, endpoint, username, password)
print index + " 原始mapping如下:\n" + indexMapping
mappingDict = json.loads(indexMapping)
mappings = json.dumps(mappingDict[index]["mappings"])
newMapping = "\"mappings\" : " + mappings
return newMapping
def createIndexStatement(oldIndexName):
settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
return createstatement
def createIndex(oldIndexName, newIndexName=""):
if (newIndexName == "") :
newIndexName = oldIndexName
createstatement = createIndexStatement(oldIndexName)
print "新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement
endpoint = "/" + newIndexName
createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print "新索引 " + newIndexName + " 创建结果:" + createResult
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
if (index.startswith(".")):
systemIndex.append(index)
else :
createIndex(index, index)
if (len(systemIndex) > 0) :
for index in systemIndex:
print index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~"

Migrate Data

#!/bin/bash
# file:reindex.sh
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
newClusterHost="新集群host"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
# Old cluster host must be [scheme]://[host]:[port],for example http://10.37.1.1:9200
oldClusterHost="老集群host"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d'{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"match_all": {}
}
},
"dest": {
"index": "'${indexName}'"
}
}'
#!/bin/bash
# file: circleReindex.sh
# CONTROLLING STARTUP:
# This is the script to rebuild index through remote operations by reindex. Requirements are as follows:
# 1. Index of new cluster is created, or automatic creation and dynamic mapping are supported.
# 2. IP whitelist must be configured in yml of new cluster. reindex.remote.whitelist: 172.16.123.*:9200
# 3. Host must be [scheme]://[host]:[port]
USAGE="Usage: sh circleReindex.sh <count>
count: 执行次数,多次(负数为循环)增量执行或者单次执行
Example:
sh circleReindex.sh 1
sh circleReindex.sh 5
sh circleReindex.sh -1"
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
## http://myescluster.com
newClusterHost="新集群host"
# Old cluster host must be [scheme]://[host]:[port],For example http://10.37.1.1:9200
oldClusterHost="老集群host"
timeField="更新时间字段"
reindexTimes=0
lastTimestamp=0
curTimestamp=`date +%s`
hasError=false
function reIndexOP() {
reindexTimes=$[${reindexTimes} + 1]
curTimestamp=`date +%s`
ret=`curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"range" : {
"'${timeField}'" : {
"gte" : '${lastTimestamp}',
"lt" : '${curTimestamp}'
}
}
}
},
"dest": {
"index": "'${indexName}'"
}
}'`
lastTimestamp=${curTimestamp}
echo "第${reindexTimes}次reIndex,本次更新截止时间 ${lastTimestamp} 结果:${ret}"
if [[ ${ret} == *error* ]]; then
hasError=true
echo "本次执行异常,中断后续执行操作~~,请检查"
fi
}
function start() {
## Loop execution is not stopped if it is a negative number
if [[ $1 -lt 0 ]]; then
while :
do
reIndexOP
done
elif [[ $1 -gt 0 ]]; then
k=0
while [[ k -lt $1 ]] && [[ ${hasError} == false ]]; do
reIndexOP
let ++k
done
fi
}
## main
if [ $# -lt 1 ]; then
echo "$USAGE"
exit 1
fi
echo "开始执行索引 ${indexName} 的 ReIndex操作"
start $1
echo "总共执行 ${reindexTimes} 次 reIndex 操作"
#!/bin/bash
# file:miss.sh
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
newClusterHost="新集群host"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
# Old cluster host must be [scheme]://[host]:[port],For example http://10.37.1.1:9200
oldClusterHost="老集群host"
timeField="updatetime"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"bool": {
"must_not": {
"exists": {
"field": "'${timeField}'"
}
}
}
}
},
"dest": {
"index": "'${indexName}'"
}
}'

FAQs

// Obtain all index information from old cluster. If you do not have the permission,delete "-u user:pass",oldClusterHost is the host of old cluster, do not forget to replace it.
curl -u user:pass -XGET http://oldClusterHost/_cat/indices | awk '{print $3}'
// Refer to the index returned as mentioned above, obtain the setting and mapping of specified user index that needs to be migrated. Do not forget to replace indexName with the user index name you want to query.
curl -u user:pass -XGET http://oldClusterHost/indexName/_settings,_mapping?pretty=true
// Refer to the information of settings and mapping of the index obtained, create an index in the new cluster. The number of index replicas can be set to 0 first to accelerate the data synchronizing speed. After the data migration is completed, set the number of replicas to 1.
//The newClusterHost is the host of the new cluster, testindex is the name of the index created, testtype is the type of the index.
curl -u user:pass -XPUT http://<newClusterHost>/<testindex> -d '{
"testindex" : {
"settings" : {
"number_of_shards" : "5", // Suppose the number of shards in the corresponding index of the old cluster is 5
"number_of_replicas" : "0" // Set number of index replicas to 0
}
},
"mappings" : { // Suppose the mapping configuration of the corresponding index in the old cluster is as follows
"testtype" : {
"properties" : {
"uid" : {
"type" : "long"
},
"name" : {
"type" : "text"
},
"create_time" : {
"type" : "long"
}
}
}
}
}
}'
// Before index data migration, set number of index replicas to 0 and do not 
refresh to accelerate data migration.
curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
"number_of_replicas" : 0,
"refresh_interval" : "-1"
}'
// After migration is completed, set number of index replicas to 1 and refreshing time to 1s (1s is the default value).
curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
"number_of_replicas" : 1,
"refresh_interval" : "1s"
}'

Reference

Original Source:

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Python Short-hands during problem solving

Steps to Upgrade Drupal 7 to Drupal 8

IT Subbotnik

Knative Eventing Hello World: An Introduction to Knative

appconduits: manage custom Ingresses via Helm

How to Install and Configure OTRS on Ubuntu 16.04

[event]Official Duel Cup — Land Series — “Quad Dragon” Powerd by Dragon

What is the purpose of gitignore?

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

Building a customized Spark metrics collection tool for a global enterprise technology provider

How to Interact Delta Lake Using EMR

Big Data and Cloud Computing: a Perfect Combination

Event driven architecture in Banking — A practical approach