当前位置: 首页 > article >正文

MySQL Binlog 监听:Canal + Spring Boot 实战指南

1. 什么是 Canal?

Canal 是阿里巴巴开源的一个基于 MySQL binlog 的增量订阅和消费组件,主要用于 MySQL 主从同步、数据同步、变更通知等场景。它可以监听 MySQL 的 binlog 变化并将数据解析后推送到消息队列(如 RabbitMQ、Kafka),方便后续处理。

2. Canal 配置及安装

2.1 下载 Canal Server

  1. 前往 Canal GitHub 仓库 下载最新的 Canal Server。

  2. 解压后进入 conf 目录,修改 example 配置文件。

2.2 配置 Canal Server

修改 conf/example/instance.properties
# 指定 MySQL 地址、端口、用户名、密码
db.host=127.0.0.1
db.port=3306
db.username=canal
db.password=Canal.123456

db.name=hcuc_synchro_oa
table.regex=.*

# 订阅的数据库和表
db.include=.*
修改 conf/canal.properties
# Canal 服务端口
canal.port=11111
# Canal 日志级别
canal.log.level=INFO
# Canal 实例配置
destinations=example

2.3 启动 Canal Server

# 启动 Canal
sh bin/startup.sh
# 查看日志,确认是否启动成功
cat logs/canal/canal.log

3. Spring Boot 整合 Canal

3.1 引入 Maven 依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
    <exclusions>
        <exclusion>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.5</version>
</dependency>

3.2 编写 Canal 监听客户端

@Slf4j
@Component
public class CanalClient {

    private final static int BATCH_SIZE = 1000;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void run() {
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example", "canal", "Canal.123456");

        try {
            connector.connect();
            connector.subscribe("hcuc_synchro_oa.td_user|hcuc_synchro_oa.department");
            connector.rollback();

            while (true) {
                Message message = connector.getWithoutAck(BATCH_SIZE);
                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {
                    Thread.sleep(2000);
                } else {
                    dataHandle(message.getEntries());
                }
                connector.ack(batchId);
            }
        } catch (Exception e) {
            log.error("Canal 异常", e);
        } finally {
            connector.disconnect();
        }
    }

    private void dataHandle(List<Entry> entryList) {
        for (Entry entry : entryList) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                EventType eventType = rowChange.getEventType();
                String tableName = entry.getHeader().getTableName();

                for (RowData rowData : rowChange.getRowDatasList()) {
                    JSONObject json = generateJSON(eventType, tableName, rowData);
                    if (json != null) {
                        log.info("生成的 JSON: {}", json.toJSONString());
                        send(json);
                    }
                }
            } catch (Exception e) {
                log.error("Canal 解析异常", e);
            }
        }
    }

    private JSONObject generateJSON(EventType eventType, String tableName, RowData rowData) {
        JSONObject json = new JSONObject();
        json.put("tableName", tableName);
        JSONObject data = new JSONObject();

        if (eventType == EventType.INSERT) {
            json.put("state", "ADD");
            for (Column column : rowData.getAfterColumnsList()) {
                data.put(column.getName(), column.getValue());
            }
        } else if (eventType == EventType.DELETE) {
            json.put("state", "DELETE");
            for (Column column : rowData.getBeforeColumnsList()) {
                data.put(column.getName(), column.getValue());
            }
        } else if (eventType == EventType.UPDATE) {
            json.put("state", "UPDATE");
            for (Column column : rowData.getAfterColumnsList()) {
                data.put(column.getName(), column.getValue());
            }
        }

        json.put("data", data);
        return json;
    }

    public void send(JSONObject message) {
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(uuid);
        rabbitTemplate.convertAndSend("synchro.exchange.topic", "synchro.key.topic", message.toString(), correlationData);
    }
}

4. 运行项目并验证

  1. 启动 Canal Server    sh bin/startup.sh

  2. 启动 Spring Boot 项目

  3. 在 MySQL 中执行数据变更测试

    INSERT INTO td_user (id, name) VALUES (1, '张三');
    UPDATE td_user SET name = '李四' WHERE id = 1;
    DELETE FROM td_user WHERE id = 1;

  4. 检查 Canal Client 日志,确认数据变更是否成功推送到 MQ。5. 总结

  5. 本教程详细介绍了 Canal 的安装、配置以及如何在 Spring Boot 中监听 MySQL Binlog,并将数据变更推送到 RabbitMQ,实现数据库数据同步和消息通知。

    如果你有任何问题或建议,欢迎留言交流!


http://www.kler.cn/a/555549.html

相关文章:

  • 【分布式理论14】分布式数据库存储:分表分库、主从复制与数据扩容策略
  • 网络协议相关知识有哪些?
  • 一次交换机故障导致的云平台(opensatck+ceph)不可用的记录
  • Compose 定制UI视图
  • Netty入门详解
  • pyside6学习专栏(二):程序图像资源的加载方式
  • 庙算兵棋推演AI开发初探(5-数据处理)
  • 阿里云子账号管理ECS权限配置全指南
  • Canvas进阶-2、可视化应用
  • C# 中关于补位的写法 PadLeft,PadRight 函数
  • 猎板PCB百科——键盘PCB
  • cesium(vue)一些面试问题(包含Three.js)
  • el-tree选中数据重组成树
  • 【Python爬虫(24)】Redis:Python爬虫的秘密武器
  • AI 编程助手 cursor的系统提示词 prompt
  • 基于大数据的工业废水处理解决方案分享
  • Python迭代器知多少
  • 网络运维学习笔记 014网工初级(HCIA-Datacom与CCNA-EI)ACL访问控制列表
  • DeepSeek大模型下半场:开源、普惠与生态重构的技术革命
  • redis的缓存击穿,雪崩,穿透