Logstash同步MySQL到ES
Logstash同步MySQL到ES
前提
软件版本
Logstash: 7.10.2
ES: 7.10.2
MySQL: 5.7
Logstash和ES安装在Windows中, MySQL安装在虚拟机中
配置
user表建表语句
CREATE TABLE `user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`name` varchar(255) DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`update_date` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
位置信息
创建userDTS_last_value.txt, 不需要内容, 到时候logstash会写入
同步user数据的userDTS.sql
SELECT
id as id,
name as name,
DATE_FORMAT(create_date,'%Y-%m-%d %H:%i:%S') as createDate,
DATE_FORMAT(update_date,'%Y-%m-%d %H:%i:%S') as updateDate
FROM
user
WHERE
# 增量数据的查询条件,一条数据怎么才算增量数据就是由这个条件确定的,而其中的::sql_last_value就是第2步中配置项中的last_run_metadata_path里配置的文件路径对应的文件中的值,也就是每次同步数据后最新一条的updateDate值,那么下次同步数据的时候,sql根据这个条件就不会查到非增量数据,同时第2步中配置项中的tracking_column配置的值就是updateDate,实际使用的时候可以根据场景更换这个字段
# 解决Logstash的sql_last_value 和MySQL时区不一致问题
update_date >= CONVERT_TZ(:sql_last_value, '+08:00', '+00:00')
AND CONVERT_TZ(:sql_last_value, '+08:00', '+00:00') < now()
logstash-mysql-es.conf
注意事项
- MySQL驱动器jar包需要指定, 要和MySQL的版本兼容
- logstash的同步不是实时同步的, 使用同步可以使用Canel, Flink CDC监听Binlog的方式, 可以参考博主的SpringBoot集成Flink-CDC
- 存量更新和存量更新配置根据自己的需求开启
- 相关位置改成自己, 比如mysql, sql存放位置
- 启动logstash的时候需要指定logstash-mysql-es.conf配置启动
- 我的配置存放logstash安装目录的config中, 进入bin目录, 执行cmd, 输入启命令为logstash -f …\config\logstash-mysql-es.conf
# 输入源
input {
jdbc {
# 数据库连接参数
jdbc_connection_string => "jdbc:mysql://192.168.132.10:3306/whitebrocade?characterEncoding=UTF-8&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai"
# mysql用户名
jdbc_user => "root"
# mysql密码
jdbc_password => "12345678"
# mysql驱动器jar包, 我这里用的是5.7的MySQL
jdbc_driver_library => "E:/software/Maven/repository/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 开启分页
jdbc_paging_enabled => "true"
# 最大页码
jdbc_page_size => "1000"
# 这个type可以用来做多个输入源和多个输出源区分 这里举例所以只有一个
type => "user_DTS"
# 用于同步的查询sql
statement_filepath => "E:/software/Logstash/test_data/userDTS.sql"
# 直接书写sql语句
# statement => "select * from user"
# 加上jdbc时区, 要不然logstash的时间会不准确
jdbc_default_timezone => "Asia/Shanghai"
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
lowercase_column_names => "false"
# 处理中文乱码问题
codec => plain { charset => "UTF-8" }
# 日志级别
sql_log_level => warn
# 如果需要增量更新的话,则需要在input/jdbc下添加如下配置
# 如果要使用其它字段追踪, 而不是用时间开启这个配置
use_column_value => true
# 这个就是追踪字段的类型,只有数值和时间两个类型(numeric和timestamp,默认numeric) 这个值会记录到last_run_metadata_path 配置的文件中 如果配置是numeric 那么默认值为0 如果配置为timestamp 那么默认值为1970年
tracking_column_type => "timestamp"
# 追踪的字段 这个字段只有在上面的lowercase_column_names配置为false的时候才会区分大小写 因为不配置为false的话默认是true 查询结果字段默认会变成全小写
tracking_column => "updateDate"
# 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
record_last_run => true
# 上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值 这个就是增量数据同步的关键
last_run_metadata_path => "E:/software/Logstash/test_data/userDTS_last_value.txt"
# 是否清除 last_run_metadata_path 的记录,如果为true那么每次都相当于从头开始查询所有的数据库记录
clean_run => false
# 设置监听间隔 各字段含义秒(由左至右)分、时、天、月、年
# 这里配置为每15秒扫描一次MySQL
schedule => "*/15 * * * * *"
}
}
# 过滤
filter {
mutate {
# 去掉没用字段, 注意了type不要去除掉, 这里需要根据type判断是否同步到ES
remove_field => ["@version", "@timestamp"]
}
}
# 输出源
output {
# 判断类型是否位user_DTS(即是否为user表相关的, 如果是就同步到ES中)
if[type] == "user_DTS"{
elasticsearch {
# elasticsearch url
hosts => ["http://localhost:9200"]
# 用户名&密码
# user => "whitebrocade"
# password => "whitebrocade"
# 下面两个参数可以开启更新模式
action => "update"
doc_as_upsert => true
# 索引名
index => "user"
# 文档id 设置成数据库的id
document_id => "%{id}"
}
}
stdout {
# 以json格式输出到控制台, 用于调试使用, 正式运行时可以注释掉
codec => json_lines
}
}
ES创建user索引
Logstash同步的时候, 并不会创建user的索引, 所以这里我们得自己手动创建一下
推荐一个ES的客户端工具,es-clint, 下边使用这工具创建
PUT /user
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
},
"mappings": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text"
},
"createDate": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"updateDate": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
}
}
创建结果
测试
现在的MySQL和ES都没有数据
新增
往MySQL中新建一条数据
INSERT INTO user (name, create_date, update_date)
VALUES ('xm', NOW(), NOW());
logstash日志
查看ES的user索引
修改
修改id为1的数据
UPDATE
user
SET
name = 'whiteBrocade',
update_date = NOW()
WHERE id = 1;
logstash日志
查看ES的user索引
参考资料
Running Logstash on Docker | Logstash Reference8.17
Docker 安装 LogStash的详细过程
Docker部署Logstash同步Mysql数据到ES方式
使用docker安装logstash的具体方法