MySQL Binlog 监听:Canal + Spring Boot 实战指南
1. 什么是 Canal?
Canal 是阿里巴巴开源的一个基于 MySQL binlog 的增量订阅和消费组件,主要用于 MySQL 主从同步、数据同步、变更通知等场景。它可以监听 MySQL 的 binlog 变化并将数据解析后推送到消息队列(如 RabbitMQ、Kafka),方便后续处理。
2. Canal 配置及安装
2.1 下载 Canal Server
-
前往 Canal GitHub 仓库 下载最新的 Canal Server。
-
解压后进入
conf
目录,修改example
配置文件。
2.2 配置 Canal Server
修改 conf/example/instance.properties
# 指定 MySQL 地址、端口、用户名、密码
db.host=127.0.0.1
db.port=3306
db.username=canal
db.password=Canal.123456
db.name=hcuc_synchro_oa
table.regex=.*
# 订阅的数据库和表
db.include=.*
修改 conf/canal.properties
# Canal 服务端口
canal.port=11111
# Canal 日志级别
canal.log.level=INFO
# Canal 实例配置
destinations=example
2.3 启动 Canal Server
# 启动 Canal
sh bin/startup.sh
# 查看日志,确认是否启动成功
cat logs/canal/canal.log
3. Spring Boot 整合 Canal
3.1 引入 Maven 依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
3.2 编写 Canal 监听客户端
@Slf4j
@Component
public class CanalClient {
private final static int BATCH_SIZE = 1000;
@Autowired
private RabbitTemplate rabbitTemplate;
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", "canal", "Canal.123456");
try {
connector.connect();
connector.subscribe("hcuc_synchro_oa.td_user|hcuc_synchro_oa.department");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(2000);
} else {
dataHandle(message.getEntries());
}
connector.ack(batchId);
}
} catch (Exception e) {
log.error("Canal 异常", e);
} finally {
connector.disconnect();
}
}
private void dataHandle(List<Entry> entryList) {
for (Entry entry : entryList) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
JSONObject json = generateJSON(eventType, tableName, rowData);
if (json != null) {
log.info("生成的 JSON: {}", json.toJSONString());
send(json);
}
}
} catch (Exception e) {
log.error("Canal 解析异常", e);
}
}
}
private JSONObject generateJSON(EventType eventType, String tableName, RowData rowData) {
JSONObject json = new JSONObject();
json.put("tableName", tableName);
JSONObject data = new JSONObject();
if (eventType == EventType.INSERT) {
json.put("state", "ADD");
for (Column column : rowData.getAfterColumnsList()) {
data.put(column.getName(), column.getValue());
}
} else if (eventType == EventType.DELETE) {
json.put("state", "DELETE");
for (Column column : rowData.getBeforeColumnsList()) {
data.put(column.getName(), column.getValue());
}
} else if (eventType == EventType.UPDATE) {
json.put("state", "UPDATE");
for (Column column : rowData.getAfterColumnsList()) {
data.put(column.getName(), column.getValue());
}
}
json.put("data", data);
return json;
}
public void send(JSONObject message) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(uuid);
rabbitTemplate.convertAndSend("synchro.exchange.topic", "synchro.key.topic", message.toString(), correlationData);
}
}
4. 运行项目并验证
-
启动 Canal Server sh bin/startup.sh
-
启动 Spring Boot 项目
-
在 MySQL 中执行数据变更测试
INSERT INTO td_user (id, name) VALUES (1, '张三'); UPDATE td_user SET name = '李四' WHERE id = 1; DELETE FROM td_user WHERE id = 1;
-
检查 Canal Client 日志,确认数据变更是否成功推送到 MQ。5. 总结
-
本教程详细介绍了 Canal 的安装、配置以及如何在 Spring Boot 中监听 MySQL Binlog,并将数据变更推送到 RabbitMQ,实现数据库数据同步和消息通知。
如果你有任何问题或建议,欢迎留言交流!