通过logstash同步elasticsearch数据
1 概述
logstash是一个对数据进行抽取、转换、输出的工具,能对接多种数据源和目标数据。本文介绍通过它来同步elasticsearch的数据。
2 环境
实验仅仅需要一台logstash机器和两台elasticsearch机器(elasticsearch v7.1.0)。本文用docker来模拟,使用docker-compose来启动elasticsearch容器。
elasticsearch机器A是192.168.243.128:9200。
elasticsearch机器B是192.168.243.128:19200。
2.1 安装elasticsearch机器A
2.1.1 编写docker-compose.yml
version: '2.2'
services:
cerebro:
image: lmenezes/cerebro:0.8.3
container_name: cerebro
ports:
- "9000:9000"
command:
- -Dhosts.0.host=http://elasticsearch:9200
kibana:
image: docker.elastic.co/kibana/kibana:7.1.0
container_name: kibana7
environment:
- I18N_LOCALE=zh-CN
- XPACK_GRAPH_ENABLED=true
- TIMELION_ENABLED=true
- XPACK_MONITORING_COLLECTION_ENABLED="true"
ports:
- "5601:5601"
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.1.0
container_name: es7_01
environment:
- cluster.name=xttblog
- node.name=es7_01
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.seed_hosts=es7_01
- cluster.initial_master_nodes=es7_01,es7_02
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- es7data1:/usr/share/elasticsearch/data
ports:
- 9200:9200
elasticsearch2:
image: docker.elastic.co/elasticsearch/elasticsearch:7.1.0
container_name: es7_02
environment:
- cluster.name=xttblog
- node.name=es7_02
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.seed_hosts=es7_01
- cluster.initial_master_nodes=es7_01,es7_02
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- es7data2:/usr/share/elasticsearch/data
volumes:
es7data1:
driver: local
es7data2:
driver: local
2.1.2 修改vm.max_map_count
cat >> /etc/sysctl.conf << EOF
vm.max_map_count = 2621440
EOF
sysctl -p
2.1.3 启动
docker-compose up -d
2.1.3 导入样例数据
localhost:5601,用于访问kibana并导入样例数据
localhost:9000,用于访问cerebro
2.2 安装elasticsearch机器B
2.2.1 编写docker-compose.yml
version: '2.2'
services:
cerebro:
image: lmenezes/cerebro:0.8.3
container_name: cerebro-2
ports:
- "19000:9000"
command:
- -Dhosts.0.host=http://elasticsearch:9200
kibana:
image: docker.elastic.co/kibana/kibana:7.1.0
container_name: kibana7-2
environment:
- I18N_LOCALE=zh-CN
- XPACK_GRAPH_ENABLED=true
- TIMELION_ENABLED=true
- XPACK_MONITORING_COLLECTION_ENABLED="true"
ports:
- "15601:5601"
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.1.0
container_name: es7_03
environment:
- cluster.name=xttblog
- node.name=es7_03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.seed_hosts=es7_03
- cluster.initial_master_nodes=es7_03,es7_04
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- es7data3:/usr/share/elasticsearch/data
ports:
- 19200:9200
elasticsearch2:
image: docker.elastic.co/elasticsearch/elasticsearch:7.1.0
container_name: es7_04
environment:
- cluster.name=xttblog
- node.name=es7_04
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.seed_hosts=es7_03
- cluster.initial_master_nodes=es7_03,es7_04
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- es7data4:/usr/share/elasticsearch/data
volumes:
es7data3:
driver: local
es7data4:
driver: local
2.2.2 修改vm.max_map_count
cat >> /etc/sysctl.conf << EOF
vm.max_map_count = 2621440
EOF
sysctl -p
2.2.3 启动
docker-compose up -d
2.2 logstash机器上准备python环境
在centos7机器上执行如下命令,可安装python环境:
yum install python2
yum install python-pip
由于python脚本需要导入requests模块哈yaml模块,因此需要执行如下命令在机器上安装python模块:
pip install pyyaml
pip install requests
2.2 logstash机器上准备python脚本
2.2.1 准备migrateConfig.yaml
本文件用于描述源端和目标端是谁,是否开启SSL连接。
一般只需要修改src_ip、dest_ip。
cat > /tmp/migrateConfig.yaml << EOF
es_cluster_new:
# 源集群的名称
clustername: es_cluster_new
# 源Elasticsearch集群的访问地址,加上“http://”。
src_ip: http://x.x.x.x:9200
# 访问源Elasticsearch集群的用户名和密码,如果为非安全集群则设置为""。
src_username: ""
src_password: ""
# 目标Elasticsearch集群的访问地址,加上“http://”。
dest_ip: http://x.x.x.x:9200
# 访问目标Elasticsearch集群的用户名和密码,如果为非安全集群则设置为""。
dest_username: ""
dest_password: ""
# only_mapping 可以不定义,默认值为false,需要搭配“migrateMapping.py”使用,表示是否只处理这个文件中mapping地址的索引。
# 当设置成false时,则迁移源集群中除“.kibana”和“.*”之外的所有索引数据。
# 当设置成true时,则只迁移源集群中和下面mapping的key一致的索引数据;迁移过程中会将索引名称与下面的mapping匹配,如果匹配一致,则使用mapping的value作为目标集群的索引名称;如果匹配不到,则使用源集群原始的索引名称。
only_mapping: false
# 当only_mapping = true时,mapping用于设置要迁移的索引,key为源集群的索引名字,value为目标集群的索引名字。
mapping:
test_index_1: test_index_1
# only_compare_index 可以不定义,默认值为false,需要搭配“checkIndices.py”使用,当设置为false会比较所有的索引和文档数量,当设置为true只比较索引数量。
only_compare_index: false
EOF
2.2.1 准备migrateTemplate.py
用于迁移索引模板,此文件不需要修改,直接复制。
cat > /tmp/migrateTemplate.py << EOF
# -*- coding:UTF-8 -*-
import sys
import yaml
import requests
import json
import os
def printDividingLine():
print("<=============================================================>")
def loadConfig(argv):
if argv is None or len(argv) != 2:
config_yaml = "migrateConfig.yaml"
else:
config_yaml = argv[1]
config_file = open(config_yaml)
# config = yaml.load(config_file, Loader=yaml.FullLoader)
return yaml.load(config_file)
def put_template_to_target(url, template, cluster, template_name, dest_auth=None):
headers = {'Content-Type': 'application/json'}
create_resp = requests.put(url, headers=headers, data=json.dumps(template), auth=dest_auth, verify=False)
if not os.path.exists("templateLogs"):
os.makedirs("templateLogs")
if create_resp.status_code != 200:
print(
"create template " + url + " failed with response: " + str(
create_resp) + ", source template is " + template_name)
print(create_resp.text)
filename = "templateLogs/" + str(cluster) + "#" + template_name
with open(filename + ".json", "w") as f:
json.dump(template, f)
return False
else:
return True
def main(argv):
requests.packages.urllib3.disable_warnings()
print("begin to migration template!")
config = loadConfig(argv)
src_clusters = config.keys()
print("process cluster name:")
for name in src_clusters:
print(name)
print("cluster total number:" + str(src_clusters.__len__()))
for name, value in config.items():
printDividingLine()
source_user = value["src_username"]
source_passwd = value["src_password"]
source_auth = None
if source_user != "":
source_auth = (source_user, source_passwd)
dest_user = value["dest_username"]
dest_passwd = value["dest_password"]
dest_auth = None
if dest_user != "":
dest_auth = (dest_user, dest_passwd)
print("start to process cluster name:" + name)
source_url = value["src_ip"] + "/_template"
response = requests.get(source_url, auth=source_auth, verify=False)
if response.status_code != 200:
print("*** get all template failed. resp statusCode:" + str(
response.status_code) + " response is " + response.text)
continue
all_template = response.json()
migrate_itemplate = []
for template in all_template.keys():
if template.startswith(".") or template == "logstash":
continue
if "index_patterns" in all_template[template]:
for t in all_template[template]["index_patterns"]:
# if "kibana" in template:
if t.startswith("."):
continue
migrate_itemplate.append(template)
for template in migrate_itemplate:
dest_index_url = value["dest_ip"] + "/_template/" + template
result = put_template_to_target(dest_index_url, all_template[template], name, template, dest_auth)
if result is True:
print('[success] delete success, cluster: %-10s, template %-10s ' % (str(name), str(template)))
else:
print('[failure] delete failure, cluster: %-10s, template %-10s ' % (str(name), str(template)))
if __name__ == '__main__':
main(sys.argv)
EOF
2.2.1 准备migrateMapping.py
脚本用于迁移索的表结构,此文件不需要修改,直接复制。
cat > /tmp/migrateMapping.py << EOF
# -*- coding:UTF-8 -*-
import sys
import yaml
import requests
import re
import json
import os
def printDividingLine():
print("<=============================================================>")
def loadConfig(argv):
if argv is None or len(argv) != 2:
config_yaml = "migrateConfig.yaml"
else:
config_yaml = argv[1]
config_file = open(config_yaml)
# config = yaml.load(config_file, Loader=yaml.FullLoader)
return yaml.load(config_file)
def get_cluster_version(url, auth=None):
response = requests.get(url, auth=auth)
if response.status_code != 200:
print("*** get ElasticSearch message failed. resp statusCode:" + str(
response.status_code) + " response is " + response.text)
return False
cluster = response.json()
version = cluster["version"]["number"]
return True
def process_mapping(index_mapping, dest_index):
# remove unnecessary keys
del index_mapping["settings"]["index"]["provided_name"]
del index_mapping["settings"]["index"]["uuid"]
del index_mapping["settings"]["index"]["creation_date"]
del index_mapping["settings"]["index"]["version"]
if "lifecycle" in index_mapping["settings"]["index"]:
del index_mapping["settings"]["index"]["lifecycle"]
# check alias
aliases = index_mapping["aliases"]
for alias in list(aliases.keys()):
if alias == dest_index:
print(
"source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
del index_mapping["aliases"][alias]
# if index_mapping["settings"]["index"].has_key("lifecycle"):
if "lifecycle" in index_mapping["settings"]["index"]:
lifecycle = index_mapping["settings"]["index"]["lifecycle"]
opendistro = {"opendistro": {"index_state_management":
{"policy_id": lifecycle["name"],
"rollover_alias": lifecycle["rollover_alias"]}}}
index_mapping["settings"].update(opendistro)
# index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
del index_mapping["settings"]["index"]["lifecycle"]
# replace synonyms_path
if "analysis" in index_mapping["settings"]["index"]:
analysis = index_mapping["settings"]["index"]["analysis"]
if "filter" in analysis:
filter = analysis["filter"]
if "my_synonym_filter" in filter:
my_synonym_filter = filter["my_synonym_filter"]
if "synonyms_path" in my_synonym_filter:
index_mapping["settings"]["index"]["analysis"]["filter"]["my_synonym_filter"][
"synonyms_path"] = "/rds/datastore/elasticsearch/v7.10.2/package/elasticsearch-7.10.2/plugins/analysis-dynamic-synonym/config/synonyms.txt"
return index_mapping
def getAlias(source, source_auth):
# get all indices
response = requests.get(source + "/_alias", auth=source_auth)
if response.status_code != 200:
print("*** get all index failed. resp statusCode:" + str(
response.status_code) + " response is " + response.text)
exit()
all_index = response.json()
system_index = []
create_index = []
for index in list(all_index.keys()):
if (index.startswith(".")):
system_index.append(index)
else:
create_index.append(index)
return system_index, create_index
def put_mapping_to_target(url, mapping, cluster, source_index, dest_auth=None):
headers = {'Content-Type': 'application/json'}
create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth, verify=False)
if not os.path.exists("mappingLogs"):
os.makedirs("mappingLogs")
if create_resp.status_code != 200:
print(
"create index " + url + " failed with response: " + str(create_resp) +
", source index is " + str(source_index))
print(create_resp.text)
filename = "mappingLogs/" + str(cluster) + "#" + str(source_index)
with open(filename + ".json", "w") as f:
json.dump(mapping, f)
return False
else:
return True
def main(argv):
requests.packages.urllib3.disable_warnings()
print("begin to migrate index mapping!")
config = loadConfig(argv)
src_clusters = config.keys()
print("begin to process cluster name :")
for name in src_clusters:
print(name)
print("cluster count:" + str(src_clusters.__len__()))
for name, value in config.items():
printDividingLine()
source = value["src_ip"]
source_user = value["src_username"]
source_passwd = value["src_password"]
source_auth = None
if source_user != "":
source_auth = (source_user, source_passwd)
dest = value["dest_ip"]
dest_user = value["dest_username"]
dest_passwd = value["dest_password"]
dest_auth = None
if dest_user != "":
dest_auth = (dest_user, dest_passwd)
print("start to process cluster: " + name)
# only deal with mapping list
if 'only_mapping' in value and value["only_mapping"]:
for source_index, dest_index in value["mapping"].iteritems():
print("start to process source index" + source_index + ", target index: " + dest_index)
source_url = source + "/" + source_index
response = requests.get(source_url, auth=source_auth)
if response.status_code != 200:
print("*** get ElasticSearch message failed. resp statusCode:" + str(
response.status_code) + " response is " + response.text)
continue
mapping = response.json()
index_mapping = process_mapping(mapping[source_index], dest_index)
dest_url = dest + "/" + dest_index
result = put_mapping_to_target(dest_url, index_mapping, name, source_index, dest_auth)
if result is False:
print("cluster name:" + name + ", " + source_index + ":failure")
continue
print("cluster name:" + name + ", " + source_index + ":success")
else:
# get all indices
system_index, create_index = getAlias(source, source_auth)
success_index = 0
for index in create_index:
source_url = source + "/" + index
index_response = requests.get(source_url, auth=source_auth)
if index_response.status_code != 200:
print("*** get ElasticSearch message failed. resp statusCode:" + str(
index_response.status_code) + " response is " + index_response.text)
continue
mapping = index_response.json()
dest_index = index
if 'mapping' in value:
if index in value["mapping"].keys():
dest_index = value["mapping"][index]
index_mapping = process_mapping(mapping[index], dest_index)
dest_url = dest + "/" + dest_index
result = put_mapping_to_target(dest_url, index_mapping, name, index, dest_auth)
if result is False:
print("[failure]: migrate mapping cluster name: " + name + ", " + index)
continue
print("[success]: migrate mapping cluster name: " + name + ", " + index)
success_index = success_index + 1
print("create index mapping success total: " + str(success_index))
if __name__ == '__main__':
main(sys.argv)
EOF
3 同步数据
3.1 同步元数据
python /tmp/migrateTemplate.py
python /tmp/migrateMapping.py
3.2 同步实际数据
准备logstash.conf文件,放在/tmp/目录中即可。需要在logstash.conf指定源端和目标端,待同步数据的索引有哪些。
实际需要改动字段是input.elasticsearch.hosts 、input.elasticsearch.index,output.elasticsearch.hosts。
vim /tmp/logstash.conf,输入以下内容:
input{
elasticsearch{
# 源Elasticsearch的访问地址,不需要添加协议,添加HTTPS协议会导致报错。
hosts => ["192.168.243.128:9200"]
# 访问源集群的用户名和密码,非安全集群无需配置。
# user => "css_logstash"
# password => "*****"
# 配置待迁移的索引信息,多个索引以逗号隔开,可以使用通配符设置,例如“index*”。
index => "kibana_sample_data_flights,kibana_sample_data_ecommerce"
docinfo => true
slices => 3
size => 3000
# 当源集群是HTTPS访问方式时,则设置ssl => false。
# ssl => false
}
}
# 移除一些logstash增加的字段。
filter {
mutate {
remove_field => ["@metadata", "@version"]
}
}
output{
elasticsearch{
# 目标Elasticsearch集群的访问地址
hosts => ["192.168.243.128:19200"]
# 访问目标集群的用户名和密码,非安全集群无需配置。
# user => "css_logstash"
# password => "*****"
# 配置目标集群的索引,以下配置为索引名称和源端保持一致,保持默认。
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
# 当目标集群是HTTPS访问方式时,则需额外配置以下信息。
# 配置集群HTTPS访问证书,CSS集群保持以下不变。
#cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs/CloudSearchService.cer"
# 是否开启HTTPS通信,HTTPS访问集群则设置为true。
#ssl => true
# 是否验证服务端Elasticsearch证书,设置为false表示不验证证书。
#ssl_certificate_verification => false
}
}
执行如下命令来启动logstash进程,进程执行完后会自动退出:
docker run -it --rm -v /tmp/logstash.conf:/tmp/logstash.conf docker.elastic.co/logstash/logstash:7.1.0 logstash -f /tmp/logstash.conf
4 小结
logstash可用于同步elasticsearch的数据,不仅可以进行全量同步,其实还能进行增量同步(数据带时间字段,用该字段进行query即可抽取出增量的数据),虽然本文没演示增量同步。