当前位置: 首页 > article >正文

MySQL 和 Elasticsearch 之间的数据同步

MySQL 和 Elasticsearch 之间的数据同步是常见的需求,通常用于将结构化数据从关系型数据库同步到 Elasticsearch 以实现高效的全文搜索、聚合分析和实时查询。以下是几种常用的同步方案及其实现方法:


1. 应用层双写(双写模式)

原理

在业务代码中同时向 MySQL 和 Elasticsearch 写入数据,保证两者数据一致。

实现步骤

  1. 在写入 MySQL 的事务中,同步或异步写入 Elasticsearch。
  2. 需处理可能的写入失败问题(如 Elasticsearch 宕机),通过重试机制或补偿机制(如消息队列)确保最终一致性。

优点

  • 实现简单,对架构改动较小。
  • 实时性强,写入即生效。

缺点

  • 双写可能引入数据不一致风险(如 MySQL 成功但 Elasticsearch 失败)。
  • 业务逻辑耦合度高,维护成本增加。

适用场景

  • 小规模数据同步,对实时性要求高。
  • 业务逻辑简单,可接受双写风险。

2. 使用 Logstash 定时同步

原理

通过 Logstash 的 jdbc 插件定期轮询 MySQL,将增量或全量数据同步到 Elasticsearch。

实现步骤

  1. 配置 Logstash 输入(Input):使用 jdbc 插件连接 MySQL,定义 SQL 查询(如按时间戳增量拉取)。
  2. 配置 Logstash 输出(Output):将数据写入 Elasticsearch。
  3. 定时任务:通过 schedule 参数设置轮询间隔(如每分钟一次)。

示例 Logstash 配置

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-8.0.26.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "root"
    jdbc_password => "password"
    schedule => "* * * * *"  # 每分钟执行一次
    statement => "SELECT * FROM products WHERE updated_at > :sql_last_value"
    use_column_value => true
    tracking_column => "updated_at"
    tracking_column_type => "timestamp"
  }
}
output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "products"
    document_id => "%{id}"
  }
}

优点

  • 配置简单,无需修改业务代码。
  • 支持增量同步。

缺点

  • 实时性较差(依赖轮询间隔)。
  • 频繁轮询可能对 MySQL 造成压力。

适用场景

  • 对实时性要求不高(如 T+1 数据同步)。
  • 数据量较小,无需复杂转换的场景。

3. 基于 Binlog 的实时同步

原理

通过解析 MySQL 的 Binlog 日志(记录数据变更),将变更事件实时同步到 Elasticsearch。
常用工具:

  • Canal(阿里开源工具)
  • Debezium(基于 Kafka Connect)
  • Maxwell

实现步骤(以 Canal 为例)

  1. 开启 MySQL Binlog

    # 在 MySQL 配置文件中启用 Binlog
    server-id = 1
    log_bin = /var/log/mysql/mysql-bin.log
    binlog_format = ROW  # 必须为 ROW 模式
    
  2. 部署 Canal Server

    • Canal 伪装为 MySQL 从库,订阅 Binlog 变更。
    • 解析 Binlog 并转发到消息队列(如 Kafka)或直接调用 Elasticsearch API。
  3. 数据消费与写入 Elasticsearch

    • 编写消费者程序(如 Java/Python),将 Binlog 中的增删改事件转换为 Elasticsearch 的写入/更新/删除操作。

优点

  • 实时性高(毫秒级延迟)。
  • 对业务代码无侵入。

缺点

  • 部署复杂度较高,需维护中间件(如 Canal、Kafka)。
  • 需处理数据格式转换(如关系表到 JSON 文档)。

适用场景

  • 大规模数据实时同步。
  • 对数据一致性要求高的场景。

4. 使用消息队列解耦

原理

将 MySQL 的变更事件发送到消息队列(如 Kafka、RabbitMQ),由消费者异步写入 Elasticsearch。

实现步骤

  1. 捕获 MySQL 变更
    • 使用 Binlog 工具(如 Debezium)将变更事件发送到 Kafka。
  2. 消费 Kafka 消息
    • 编写消费者程序,处理消息并写入 Elasticsearch。

示例架构

MySQL → Debezium → Kafka → Consumer → Elasticsearch

优点

  • 高可靠性,消息队列提供持久化和重试机制。
  • 解耦生产者和消费者,扩展性强。

缺点

  • 架构复杂度高,需维护多个组件。

适用场景

  • 高并发、高可靠性的生产环境。
  • 需要灵活扩展和数据缓冲的场景。

5. 第三方工具

工具推荐

  • Go-MySQL-Elasticsearch:基于 Go 开发的工具,直接读取 MySQL Binlog 并同步到 Elasticsearch。
  • Elasticsearch River(已弃用):旧版 Elasticsearch 插件,不建议使用。

实现步骤(以 Go-MySQL-Elasticsearch 为例)

  1. 配置 MySQL 连接信息和 Elasticsearch 地址。
  2. 定义表到索引的映射规则。
  3. 启动服务,自动监听 Binlog 并同步数据。

优点

  • 开箱即用,无需开发代码。

缺点

  • 灵活性和可定制性较差。

总结与选型建议

方案实时性复杂度可靠性适用场景
应用层双写小规模,强实时性
Logstash 定时同步离线分析,非实时场景
Binlog 同步(Canal)大规模,实时性要求高
消息队列(Kafka)高并发,需解耦和扩展
第三方工具快速实现,无需定制开发

注意事项

  1. 数据结构转换:需将 MySQL 的行数据转换为 Elasticsearch 的 JSON 文档,可能涉及嵌套对象或父子关系处理。
  2. 幂等性:确保同步操作的幂等性(如通过唯一ID),避免重复写入。
  3. 错误处理:监控同步失败的情况,提供重试或人工干预机制。
  4. 性能优化
    • 批量写入 Elasticsearch(使用 _bulk API)。
    • 调整 Elasticsearch 的刷新间隔(refresh_interval)提升写入性能。

通过合理选择方案并配合监控工具(如 Kibana、Prometheus),可实现高效可靠的 MySQL 到 Elasticsearch 数据同步。


http://www.kler.cn/a/566704.html

相关文章:

  • docker里面pgadmin4自动备份pg数据库操作
  • NIM平台开发基于提示工程的大语言模型(LLM)应用
  • 笔记20250225
  • IP-----双重发布
  • 使用PDFMiner.six解析PDF数据
  • 【Rust中级教程】题外话:Rust + Python联合编程(基于Maturin)
  • 城电科技|会追日的智能花,光伏太阳花开启绿色能源新篇章
  • 一个便捷的web截图库~
  • 回溯算法(C/C++)
  • mysql中事务的基本概念
  • 开源PDF解析工具olmOCR
  • next.js-学习3
  • USRP4120-通用软件无线电平台
  • C语言一维数组的全面解析
  • CAN总线通信协议学习1——物理层
  • HarmonyOS 5.0应用开发——多线程Worker和@Sendable的使用方法
  • 版图自动化连接算法开发 00004 ------ 给定一个点,添加一个中间点实现 Manhattan 方式连接两个给定的坐标点
  • I2S音频开发(使用USB音频进行验证)
  • ESP32 S3开发笔记(环境搭建,成功烧录)
  • Windows逆向工程入门之MASM整数存储机制