当前位置: 首页 > article >正文

Logstash同步MySQL到ES

Logstash同步MySQL到ES

前提

软件版本

  • Logstash: 7.10.2

  • ES: 7.10.2

  • MySQL: 5.7

Logstash和ES安装在Windows中, MySQL安装在虚拟机中

配置

user表建表语句

CREATE TABLE `user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `name` varchar(255) DEFAULT NULL,
  `create_date` datetime DEFAULT NULL,
  `update_date` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

位置信息

创建userDTS_last_value.txt, 不需要内容, 到时候logstash会写入

同步user数据的userDTS.sql

SELECT 
	id as id, 
	name as name, 
	DATE_FORMAT(create_date,'%Y-%m-%d %H:%i:%S') as createDate, 
	DATE_FORMAT(update_date,'%Y-%m-%d %H:%i:%S') as updateDate 
FROM 
	user
WHERE
	# 增量数据的查询条件,一条数据怎么才算增量数据就是由这个条件确定的,而其中的::sql_last_value就是第2步中配置项中的last_run_metadata_path里配置的文件路径对应的文件中的值,也就是每次同步数据后最新一条的updateDate值,那么下次同步数据的时候,sql根据这个条件就不会查到非增量数据,同时第2步中配置项中的tracking_column配置的值就是updateDate,实际使用的时候可以根据场景更换这个字段 
	# 解决Logstash的sql_last_value 和MySQL时区不一致问题
	update_date >= CONVERT_TZ(:sql_last_value, '+08:00', '+00:00') 
    AND CONVERT_TZ(:sql_last_value, '+08:00', '+00:00') < now()

logstash-mysql-es.conf

注意事项

  1. MySQL驱动器jar包需要指定, 要和MySQL的版本兼容
  2. logstash的同步不是实时同步的, 使用同步可以使用Canel, Flink CDC监听Binlog的方式, 可以参考博主的SpringBoot集成Flink-CDC
  3. 存量更新和存量更新配置根据自己的需求开启
  4. 相关位置改成自己, 比如mysql, sql存放位置
  5. 启动logstash的时候需要指定logstash-mysql-es.conf配置启动
    • 我的配置存放logstash安装目录的config中, 进入bin目录, 执行cmd, 输入启命令为logstash -f …\config\logstash-mysql-es.conf
# 输入源
input {
   jdbc { 
		# 数据库连接参数
		jdbc_connection_string => "jdbc:mysql://192.168.132.10:3306/whitebrocade?characterEncoding=UTF-8&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai"
		# mysql用户名
		jdbc_user => "root"
		# mysql密码
		jdbc_password => "12345678"
		
		# mysql驱动器jar包, 我这里用的是5.7的MySQL
		jdbc_driver_library => "E:/software/Maven/repository/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar"
		# 驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		# 开启分页
		jdbc_paging_enabled => "true"
		# 最大页码
		jdbc_page_size => "1000"
		
		# 这个type可以用来做多个输入源和多个输出源区分  这里举例所以只有一个
		type => "user_DTS" 
		# 用于同步的查询sql
		statement_filepath => "E:/software/Logstash/test_data/userDTS.sql"
		# 直接书写sql语句
		# statement => "select * from user"
		# 加上jdbc时区, 要不然logstash的时间会不准确
		jdbc_default_timezone => "Asia/Shanghai"
		# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
		lowercase_column_names => "false"
		 # 处理中文乱码问题
		codec => plain { charset => "UTF-8" }
		
		# 日志级别
		sql_log_level => warn
		
		# 如果需要增量更新的话,则需要在input/jdbc下添加如下配置
		# 如果要使用其它字段追踪, 而不是用时间开启这个配置
		use_column_value => true
		# 这个就是追踪字段的类型,只有数值和时间两个类型(numeric和timestamp,默认numeric) 这个值会记录到last_run_metadata_path 配置的文件中 如果配置是numeric 那么默认值为0 如果配置为timestamp 那么默认值为1970年
		tracking_column_type => "timestamp"
		# 追踪的字段  这个字段只有在上面的lowercase_column_names配置为false的时候才会区分大小写  因为不配置为false的话默认是true  查询结果字段默认会变成全小写
		tracking_column => "updateDate"
		# 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
		record_last_run => true
		# 上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值  这个就是增量数据同步的关键
		last_run_metadata_path => "E:/software/Logstash/test_data/userDTS_last_value.txt"
		# 是否清除 last_run_metadata_path 的记录,如果为true那么每次都相当于从头开始查询所有的数据库记录
		clean_run => false
		# 设置监听间隔  各字段含义秒(由左至右)分、时、天、月、年
		# 这里配置为每15秒扫描一次MySQL
		schedule => "*/15 * * * * *"

    }
}

# 过滤
filter {
	mutate {
		# 去掉没用字段, 注意了type不要去除掉, 这里需要根据type判断是否同步到ES
		remove_field  => ["@version", "@timestamp"]
	}
}

# 输出源
output {
	# 判断类型是否位user_DTS(即是否为user表相关的, 如果是就同步到ES中)
    if[type] == "user_DTS"{
		elasticsearch {
			# elasticsearch url
			hosts => ["http://localhost:9200"]
			# 用户名&密码
			# user => "whitebrocade"
			# password => "whitebrocade"
			# 下面两个参数可以开启更新模式
			action => "update"
			doc_as_upsert => true
			# 索引名
			index => "user"
			# 文档id 设置成数据库的id
			document_id => "%{id}"
       }
    }
    stdout {
        # 以json格式输出到控制台, 用于调试使用, 正式运行时可以注释掉
        codec => json_lines
    }
} 

ES创建user索引

Logstash同步的时候, 并不会创建user的索引, 所以这里我们得自己手动创建一下

推荐一个ES的客户端工具,es-clint, 下边使用这工具创建

PUT /user
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 2
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "name": {
        "type": "text"
      },
      "createDate": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
      },
      "updateDate": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
      }
    }
  }
}

创建结果

image-20250307141251259

image-20250307141348283

测试

现在的MySQL和ES都没有数据

新增

往MySQL中新建一条数据
INSERT INTO user (name, create_date, update_date) 
VALUES ('xm', NOW(), NOW());

image-20250307141701181

logstash日志

image-20250307142724014

查看ES的user索引

image-20250307142748466

修改

修改id为1的数据
UPDATE 
  user 
SET
  name = 'whiteBrocade',
  update_date = NOW() 
WHERE id = 1;

image-20250307143205419

logstash日志

image-20250307143249717

查看ES的user索引

image-20250307143322543

参考资料

Running Logstash on Docker | Logstash Reference8.17

Docker 安装 LogStash的详细过程

Docker部署Logstash同步Mysql数据到ES方式

使用docker安装logstash的具体方法


http://www.kler.cn/a/576829.html

相关文章:

  • 从0到1入门Linux
  • MongoDB(一) - MongoDB安装教程(Windows + Linux)
  • STM32使用无源蜂鸣器
  • 深度解读DeepSeek:从原理到模型(二)
  • 小程序 wxml 语法 —— 37 setData() - 修改对象类型数据
  • [视频编码]rkmpp 实现硬件编码
  • 群晖DS 223 Docker:开启私有云
  • PCI 总线学习笔记(四)
  • 【linux网络编程】套接字编程API详细介绍
  • 怎么用vscode 写 markdown 文档
  • RK3568平台(音频篇)audio_policy_volumes_drc.xml解析
  • 硬件基础(4):(1)AD采集电路设计
  • Golang中的 “...” 操作符
  • 【大厂AI实践】美团:事件图谱在美团智能客服问答中的应用(基于交互的推理)
  • im即时聊天客服系统SaaS还是私有化部署:成本、安全与定制化的权衡策略
  • React基础之受控表单绑定
  • 头歌作业-数据库实验一:数据库和数据表的建立,修改和删除
  • 深入解析 JVM —— 从基础概念到实战调优的全链路学习指南
  • 【机械臂】Windows 11安装Mujoco200并运行基于强化学习的多任务机械臂Meta-word基准
  • 论文阅读_LMLPA_用大语言模型实现人格评测