Logstash 迁移索引元数据(设置和映射)
https://help.aliyun.com/zh/es/use-cases/use-logstash-to-migrate-full-or-incremental-data-from-self-managed-elasticsearch-to-alibaba-cloud-elasticsearch
在进行数据迁移时,Logstash会帮助您自动创建索引,但是自动创建的索引可能与您待迁移的索引存在差异,导致迁移前后数据的格式不一致。因此建议您在数据迁移前,在阿里云Elasticsearch中手动创建目标索引,确保迁移前后索引数据完全一致。
您可以通过Python脚本创建目标索引,具体操作步骤如下:
适配 Python 3.10.9
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import base64
import http.client
import json
## 源集群host。
oldClusterHost = "localhost:9200"
## 源集群用户名,可为空。
oldClusterUserName = "elastic"
## 源集群密码,可为空。
oldClusterPassword = "xxxxxx"
## 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。
newClusterHost = "jiankunking****.elasticsearch.aliyuncs.com:9200"
## 目标集群用户名。
newClusterUser = "elastic"
## 目标集群密码。
newClusterPassword = "xxxxxx"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
conn = http.client.HTTPConnection(host)
headers = {}
if (username != ""):
'Hello {name}, your age is {age} !'.format(name='Tom', age='20')
up = ('{username}:{password}'.format(username=username, password=password))
# print(up)
# print(up.encode())
# base64string = base64.encodestring(
# up.encode()).replace('\n', '')
base64string = base64.b64encode(up.encode()).decode()
print(base64string)
headers["Authorization"] = "Basic %s" % base64string;
if "GET" == method:
headers["Content-Type"] = "application/x-www-form-urlencoded"
conn.request(method=method, url=endpoint, headers=headers)
else:
headers["Content-Type"] = "application/json"
conn.request(method=method, url=endpoint, body=params, headers=headers)
response = conn.getresponse()
res = response.read()
return res
def httpGet(host, endpoint, username="", password=""):
return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
endpoint = "/_cat/indices"
indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
indicesList = indicesResult.decode().split("\n")
indexList = []
for indices in indicesList:
if (indices.find("open") > 0):
indexList.append(indices.split()[2])
return indexList
def getSettings(index, host, username="", password=""):
endpoint = "/" + index + "/_settings"
indexSettings = httpGet(host, endpoint, username, password)
print(index + " 原始settings如下:\n" + indexSettings.decode())
settingsDict = json.loads(indexSettings)
## 分片数默认和源集群索引保持一致。
number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
## 副本数默认为0。
number_of_replicas = DEFAULT_REPLICAS
newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (
number_of_shards, number_of_replicas)
return newSetting
def getMapping(index, host, username="", password=""):
endpoint = "/" + index + "/_mapping"
indexMapping = httpGet(host, endpoint, username, password)
print(index + " 原始mapping如下:\n" + indexMapping.decode())
mappingDict = json.loads(indexMapping)
mappings = json.dumps(mappingDict[index]["mappings"])
newMapping = "\"mappings\" : " + mappings
return newMapping
def createIndexStatement(oldIndexName):
settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
return createstatement
def createIndex(oldIndexName, newIndexName=""):
if (newIndexName == ""):
newIndexName = oldIndexName
createstatement = createIndexStatement(oldIndexName)
print("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement)
endpoint = "/" + newIndexName
createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print("新索引 " + newIndexName + " 创建结果:" + createResult.decode())
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
if (index.startswith(".")):
systemIndex.append(index)
else:
createIndex(index, index)
if (len(systemIndex) > 0):
for index in systemIndex:
print(index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")