当前位置: 首页 > article >正文

Logstash 迁移索引元数据(设置和映射)

https://help.aliyun.com/zh/es/use-cases/use-logstash-to-migrate-full-or-incremental-data-from-self-managed-elasticsearch-to-alibaba-cloud-elasticsearch

在进行数据迁移时,Logstash会帮助您自动创建索引,但是自动创建的索引可能与您待迁移的索引存在差异,导致迁移前后数据的格式不一致。因此建议您在数据迁移前,在阿里云Elasticsearch中手动创建目标索引,确保迁移前后索引数据完全一致。

您可以通过Python脚本创建目标索引,具体操作步骤如下:

适配 Python 3.10.9

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import base64
import http.client
import json

## 源集群host。
oldClusterHost = "localhost:9200"
## 源集群用户名,可为空。
oldClusterUserName = "elastic"
## 源集群密码,可为空。
oldClusterPassword = "xxxxxx"
## 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。
newClusterHost = "jiankunking****.elasticsearch.aliyuncs.com:9200"
## 目标集群用户名。
newClusterUser = "elastic"
## 目标集群密码。
newClusterPassword = "xxxxxx"
DEFAULT_REPLICAS = 0


def httpRequest(method, host, endpoint, params="", username="", password=""):
    conn = http.client.HTTPConnection(host)
    headers = {}
    if (username != ""):
        'Hello {name}, your age is {age} !'.format(name='Tom', age='20')
        up = ('{username}:{password}'.format(username=username, password=password))
        # print(up)
        # print(up.encode())
        # base64string = base64.encodestring(
        #     up.encode()).replace('\n', '')
        base64string = base64.b64encode(up.encode()).decode()
        print(base64string)
        headers["Authorization"] = "Basic %s" % base64string;
    if "GET" == method:
        headers["Content-Type"] = "application/x-www-form-urlencoded"
        conn.request(method=method, url=endpoint, headers=headers)
    else:
        headers["Content-Type"] = "application/json"
        conn.request(method=method, url=endpoint, body=params, headers=headers)
    response = conn.getresponse()
    res = response.read()
    return res


def httpGet(host, endpoint, username="", password=""):
    return httpRequest("GET", host, endpoint, "", username, password)


def httpPost(host, endpoint, params, username="", password=""):
    return httpRequest("POST", host, endpoint, params, username, password)


def httpPut(host, endpoint, params, username="", password=""):
    return httpRequest("PUT", host, endpoint, params, username, password)


def getIndices(host, username="", password=""):
    endpoint = "/_cat/indices"
    indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
    indicesList = indicesResult.decode().split("\n")
    indexList = []
    for indices in indicesList:
        if (indices.find("open") > 0):
            indexList.append(indices.split()[2])
    return indexList


def getSettings(index, host, username="", password=""):
    endpoint = "/" + index + "/_settings"
    indexSettings = httpGet(host, endpoint, username, password)
    print(index + "  原始settings如下:\n" + indexSettings.decode())
    settingsDict = json.loads(indexSettings)
    ## 分片数默认和源集群索引保持一致。
    number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
    ## 副本数默认为0。
    number_of_replicas = DEFAULT_REPLICAS
    newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (
        number_of_shards, number_of_replicas)
    return newSetting


def getMapping(index, host, username="", password=""):
    endpoint = "/" + index + "/_mapping"
    indexMapping = httpGet(host, endpoint, username, password)
    print(index + " 原始mapping如下:\n" + indexMapping.decode())
    mappingDict = json.loads(indexMapping)
    mappings = json.dumps(mappingDict[index]["mappings"])
    newMapping = "\"mappings\" : " + mappings
    return newMapping


def createIndexStatement(oldIndexName):
    settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
    return createstatement


def createIndex(oldIndexName, newIndexName=""):
    if (newIndexName == ""):
        newIndexName = oldIndexName
    createstatement = createIndexStatement(oldIndexName)
    print("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement)
    endpoint = "/" + newIndexName
    createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
    print("新索引 " + newIndexName + " 创建结果:" + createResult.decode())


## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
    if (index.startswith(".")):
        systemIndex.append(index)
    else:
        createIndex(index, index)
if (len(systemIndex) > 0):
    for index in systemIndex:
        print(index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")


http://www.kler.cn/a/372665.html

相关文章:

  • JWT在线解密/JWT在线解码 - 加菲工具
  • PHP企业IM客服系统
  • 计算机网络 (49)网络安全问题概述
  • [NOIP2012 提高组] 借教室
  • Android SystemUI——CarSystemBar添加到窗口(十)
  • uni-app vue3 常用页面 组合式api方式
  • Word中遇到的问题记录(页眉,页码分节符,跨页断行)
  • 《Web性能权威指南》-浏览器API与协议-读书笔记
  • 搭建普通 Spring IoC 项目
  • 白立新:人工智能爆发,倒逼人类走向“三体全能”
  • 阿里巴巴店铺商品API返回值中的商品分类与筛选条件
  • QT如何给视频打时标
  • PG数据库之事务处理
  • 域渗透AD渗透攻击利用 python脚本攻击之IPC连接 以及 python生成exe可执行程序讲解方式方法
  • 「Mac畅玩鸿蒙与硬件7」鸿蒙开发环境配置篇7 - 使用命令行工具和本地模拟器管理项目
  • Spring Boot 安全 API 构建:加密解密功能的卓越实践
  • Linux 上安装 conda 步骤实现
  • 一、ARMv8寄存器之通用、状态、特殊寄存器
  • String常量池
  • 【秋招笔试-支持在线评测】10.30花子秋招(已改编)-三语言题解
  • Codeforces Global Round 27 D.Yet Another Real Number Problem
  • 双11来了,云计算优惠大集合
  • android 10 后台启动activity
  • Unity Editor 快速移动资源
  • VB中如何创建和使用自定义控件
  • 【动手学电机驱动】 STM32-FOC(1)IHM03 电机控制套件的介绍