How to Migrate Data from an ECS-hosted User-created Elasticsearch Cluster to an Alibaba Cloud Elasticsearch Cluster

Prerequisites

Before you follow the steps in this article to migrate data, make sure the following requirements are met. If these requirements are not met, select another migration plan. For more information, see Logstash deployment.

  • Run the scripts in this article on an intermediate server. Make sure that the intermediate server is connected to both Elasticsearch clusters over port 9200.
  • The VPC-type security group of the ECS instance that hosts the user-created Elasticsearch cluster is not allowed to restrict IP whitelist, and port 9200 must be enabled.
  • The IP addresses of nodes in the Alibaba Cloud Elasticsearch cluster are added to the VPC-type security group of the ECS instance that hosts the user-created Elasticsearch cluster. Query the IP addresses of the nodes in the Kibana console of the Alibaba Cloud Elasticsearch cluster.
  • The user-created Elasticsearch cluster and Alibaba Cloud Elasticsearch cluster are interconnected. Quickly test the connectivity on the intermediate server by running the curl -XGET http://<host>:9200 command.

Create Indexes

Create indexes on the Alibaba Cloud Elasticsearch cluster based on the index settings of the user-created Elasticsearch cluster. Enable auto-indexing and dynamic mapping for the Alibaba Cloud Elasticsearch cluster to automatically create indexes and mappings. However, we recommend using dynamic mapping.

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# File name:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## Old cluster host(ip+port)
oldClusterHost = "old-cluster.com"
## Username of old cluster, can be null
oldClusterUserName = "old-username"
## Password of old cluster, can be null
oldClusterPassword = "old-password"
## New cluster host(ip+port)
newClusterHost = "new-cluster.com"
## Username of new cluster, can be null
newClusterUser = "new-username"
## Password of new cluster, can be null
newClusterPassword = "new-password"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
conn = httplib.HTTPConnection(host)
headers = {}
if (username != "") :
'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
headers["Authorization"] = "Basic %s" % base64string;
if "GET" == method:
headers["Content-Type"] = "application/x-www-form-urlencoded"
conn.request(method=method, url=endpoint, headers=headers)
else :
headers["Content-Type"] = "application/json"
conn.request(method=method, url=endpoint, body=params, headers=headers)
response = conn.getresponse()
res = response.read()
return res
def httpGet(host, endpoint, username="", password=""):
return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
endpoint = "/_cat/indices"
indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
indicesList = indicesResult.split("\n")
indexList = []
for indices in indicesList:
if (indices.find("open") > 0):
indexList.append(indices.split()[2])
return indexList
def getSettings(index, host, username="", password=""):
endpoint = "/" + index + "/_settings"
indexSettings = httpGet(host, endpoint, username, password)
print index + " 原始settings如下:\n" + indexSettings
settingsDict = json.loads(indexSettings)
## Shard number is the same as that of old cluster index by default
number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
## Number of replicas is 0 by default
number_of_replicas = DEFAULT_REPLICAS
newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
return newSetting
def getMapping(index, host, username="", password=""):
endpoint = "/" + index + "/_mapping"
indexMapping = httpGet(host, endpoint, username, password)
print index + " 原始mapping如下:\n" + indexMapping
mappingDict = json.loads(indexMapping)
mappings = json.dumps(mappingDict[index]["mappings"])
newMapping = "\"mappings\" : " + mappings
return newMapping
def createIndexStatement(oldIndexName):
settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
return createstatement
def createIndex(oldIndexName, newIndexName=""):
if (newIndexName == "") :
newIndexName = oldIndexName
createstatement = createIndexStatement(oldIndexName)
print "新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement
endpoint = "/" + newIndexName
createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print "新索引 " + newIndexName + " 创建结果:" + createResult
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
if (index.startswith(".")):
systemIndex.append(index)
else :
createIndex(index, index)
if (len(systemIndex) > 0) :
for index in systemIndex:
print index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~"

Migrate Data

Use one of the following methods to migrate data. Select an appropriate method based on the amount of data to be migrated and specific business needs.

  • While using one of the following methods to migrate data, if you connect to the source Elasticsearch cluster by using IP + Port, first modify the YML configuration on the destination Elasticsearch cluster. Add the IP address of the source cluster to the reindex whitelist, for example, reindex.remote.whitelist: 1.1.1.1:9200,1.2. *. *:*.
  • If you access the source cluster by using a domain name, do not use the http://host:port/path format. The domain name cannot contain a path.**
#!/bin/bash
# file:reindex.sh
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
newClusterHost="新集群host"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
# Old cluster host must be [scheme]://[host]:[port],for example http://10.37.1.1:9200
oldClusterHost="老集群host"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d'{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"match_all": {}
}
},
"dest": {
"index": "'${indexName}'"
}
}'
#!/bin/bash
# file: circleReindex.sh
# CONTROLLING STARTUP:
# This is the script to rebuild index through remote operations by reindex. Requirements are as follows:
# 1. Index of new cluster is created, or automatic creation and dynamic mapping are supported.
# 2. IP whitelist must be configured in yml of new cluster. reindex.remote.whitelist: 172.16.123.*:9200
# 3. Host must be [scheme]://[host]:[port]
USAGE="Usage: sh circleReindex.sh <count>
count: 执行次数,多次(负数为循环)增量执行或者单次执行
Example:
sh circleReindex.sh 1
sh circleReindex.sh 5
sh circleReindex.sh -1"
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
## http://myescluster.com
newClusterHost="新集群host"
# Old cluster host must be [scheme]://[host]:[port],For example http://10.37.1.1:9200
oldClusterHost="老集群host"
timeField="更新时间字段"
reindexTimes=0
lastTimestamp=0
curTimestamp=`date +%s`
hasError=false
function reIndexOP() {
reindexTimes=$[${reindexTimes} + 1]
curTimestamp=`date +%s`
ret=`curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"range" : {
"'${timeField}'" : {
"gte" : '${lastTimestamp}',
"lt" : '${curTimestamp}'
}
}
}
},
"dest": {
"index": "'${indexName}'"
}
}'`
lastTimestamp=${curTimestamp}
echo "第${reindexTimes}次reIndex,本次更新截止时间 ${lastTimestamp} 结果:${ret}"
if [[ ${ret} == *error* ]]; then
hasError=true
echo "本次执行异常,中断后续执行操作~~,请检查"
fi
}
function start() {
## Loop execution is not stopped if it is a negative number
if [[ $1 -lt 0 ]]; then
while :
do
reIndexOP
done
elif [[ $1 -gt 0 ]]; then
k=0
while [[ k -lt $1 ]] && [[ ${hasError} == false ]]; do
reIndexOP
let ++k
done
fi
}
## main
if [ $# -lt 1 ]; then
echo "$USAGE"
exit 1
fi
echo "开始执行索引 ${indexName} 的 ReIndex操作"
start $1
echo "总共执行 ${reindexTimes} 次 reIndex 操作"
#!/bin/bash
# file:miss.sh
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
newClusterHost="新集群host"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
# Old cluster host must be [scheme]://[host]:[port],For example http://10.37.1.1:9200
oldClusterHost="老集群host"
timeField="updatetime"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"bool": {
"must_not": {
"exists": {
"field": "'${timeField}'"
}
}
}
}
},
"dest": {
"index": "'${indexName}'"
}
}'

FAQs

  • Problem: The system displays {"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406} when I run the curl command.
  • Solution: Add the -H "Content-Type: application/json" parameter to the curl command and try again.
// Obtain all index information from old cluster. If you do not have the permission,delete "-u user:pass",oldClusterHost is the host of old cluster, do not forget to replace it.
curl -u user:pass -XGET http://oldClusterHost/_cat/indices | awk '{print $3}'
// Refer to the index returned as mentioned above, obtain the setting and mapping of specified user index that needs to be migrated. Do not forget to replace indexName with the user index name you want to query.
curl -u user:pass -XGET http://oldClusterHost/indexName/_settings,_mapping?pretty=true
// Refer to the information of settings and mapping of the index obtained, create an index in the new cluster. The number of index replicas can be set to 0 first to accelerate the data synchronizing speed. After the data migration is completed, set the number of replicas to 1.
//The newClusterHost is the host of the new cluster, testindex is the name of the index created, testtype is the type of the index.
curl -u user:pass -XPUT http://<newClusterHost>/<testindex> -d '{
"testindex" : {
"settings" : {
"number_of_shards" : "5", // Suppose the number of shards in the corresponding index of the old cluster is 5
"number_of_replicas" : "0" // Set number of index replicas to 0
}
},
"mappings" : { // Suppose the mapping configuration of the corresponding index in the old cluster is as follows
"testtype" : {
"properties" : {
"uid" : {
"type" : "long"
},
"name" : {
"type" : "text"
},
"create_time" : {
"type" : "long"
}
}
}
}
}
}'
  • Solution: If the index is too large, set the number of replicas to 0 and the refresh interval to -1 before migration. After the data is migrated, restore the settings to the original values. This accelerates the synchronization process.
// Before index data migration, set number of index replicas to 0 and do not 
refresh to accelerate data migration.
curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
"number_of_replicas" : 0,
"refresh_interval" : "-1"
}'
// After migration is completed, set number of index replicas to 1 and refreshing time to 1s (1s is the default value).
curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
"number_of_replicas" : 1,
"refresh_interval" : "1s"
}'

Reference

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store