SpringBoot整合Canal+RabbitMQ监听数据变更
需求
在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求。例如,在电商系统中,当订单表发生更新时,可能需要同步这些变更到搜索服务、缓存服务或者通知其他微服务。传统的解决方案包括定时轮询数据库或通过触发器将变更写入消息队列等方法,但这些方案要么效率低下,要么实现复杂。而使用 Canal + RabbitMQ 可以提供一种高效且可靠的方式来捕获 MySQL 数据库的变更,并将其发送到 RabbitMQ 中供其他服务消费。
Canal 是阿里巴巴开源的一个用于增量订阅和消费 MySQL 数据库 Binlog 的工具,它模拟 MySQL 主从复制机制,无需侵入业务逻辑即可捕获数据库变更。RabbitMQ 是一个流行的开源消息代理,支持多种协议并提供了丰富的特性来确保消息传递的可靠性。结合这两者,可以构建一个强大的实时数据变更监听和处理系统。
步骤
- 环境搭建
- 整合SpringBoot与Canal实现客户端
- Canal整合RabbitMQ
- SpringBoot整合RabbitMQ
环境搭建
1. 安装MySQL
确保你有一个正在运行的 MySQL 实例,并启用了 binlog 日志记录功能。这是 Canal 捕获数据库变更的基础。
# 修改 MySQL 配置文件 my.cnf 或 my.ini
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW
重启 MySQL 服务使配置生效。
2. 安装Canal Server
下载最新版本的 Canal Server 并解压到合适的位置。根据官方文档进行必要的配置,特别是 instance.properties
文件中的数据库连接信息。
3. 安装RabbitMQ
可以通过 Docker 快速安装 RabbitMQ:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
访问 http://localhost:15672 登录管理界面,默认用户名/密码为 guest/guest。
整合SpringBoot与Canal实现客户端
创建SpringBoot项目
使用 Spring Initializr 创建一个新的 Spring Boot 项目,添加 Web, JPA, 和 AMQP(用于后续整合 RabbitMQ)依赖。
引入Canal依赖
在 pom.xml
中添加 Canal Client 的依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
编写Canal客户端代码
创建一个 Canal 客户端类,用来监听 MySQL 数据库的变化,并将变更事件转发给 RabbitMQ。
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.rabbitmq.client.Channel;
public class CanalClient {
private final CanalConnector connector;
private final Channel channel;
public CanalClient(CanalConnector connector, Channel channel) {
this.connector = connector;
this.channel = channel;
}
public void start() {
// Canal 连接配置
connector.connect();
connector.subscribe(".*\\..*"); // 订阅所有数据库和表
connector.rollback();
while (true) {
int batchSize = 1000;
EntryBatch batch = connector.getWithoutAck(batchSize); // 获取一批次数据
long batchId = batch.getId();
int size = batch.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
printEntry(batch.getEntries());
connector.ack(batchId); // 提交确认
}
if (Thread.currentThread().isInterrupted()) {
break;
}
}
connector.disconnect();
}
private void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
sendToRabbitMQ(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
sendToRabbitMQ(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
sendToRabbitMQ(rowData.getBeforeColumnsList());
System.out.println("-------> after");
sendToRabbitMQ(rowData.getAfterColumnsList());
}
}
}
}
private void sendToRabbitMQ(List<Column> columns) {
StringBuilder message = new StringBuilder();
for (Column column : columns) {
message.append(column.getName()).append("=").append(column.getValue()).append(",");
}
try {
channel.basicPublish("", "canal_exchange", null, message.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
}
Canal整合RabbitMQ
配置Canal Server
确保 Canal Server 已正确配置并启动,能够监听 MySQL 的 binlog 日志。修改 Canal Server 的配置文件以指向你的 MySQL 实例,并设置适当的过滤规则。
配置RabbitMQ Exchange
在 RabbitMQ 中创建一个名为 canal_exchange
的 exchange,类型可以根据需要选择,如 fanout
, direct
, topic
或 headers
。
rabbitmqadmin declare exchange name=canal_exchange type=fanout
SpringBoot整合RabbitMQ
添加依赖
确保在 pom.xml
中已经包含了 RabbitMQ 的 Spring AMQP 依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置RabbitMQ连接信息
在 application.yml
或 application.properties
中配置 RabbitMQ 的连接参数。
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
创建消费者
编写一个消费者类来接收来自 RabbitMQ 的消息。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class CanalMessageConsumer {
@RabbitListener(queues = "canal_queue")
public void receive(String message) {
System.out.println("Received message: " + message);
}
}
配置队列和绑定
确保在应用程序启动时自动创建所需的队列,并将它们绑定到之前创建的 exchange 上。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue canalQueue() {
return new Queue("canal_queue", false);
}
@Bean
public TopicExchange canalExchange() {
return new TopicExchange("canal_exchange");
}
@Bean
public Binding binding(Queue canalQueue, TopicExchange canalExchange) {
return BindingBuilder.bind(canalQueue).to(canalExchange).with("#");
}
}
总结
通过以上步骤,我们成功地将 Canal 与 RabbitMQ 整合到了 Spring Boot 应用程序中。这使得我们可以实时监听 MySQL 数据库的变更,并将这些变更作为消息发布到 RabbitMQ 中供其他微服务消费。这种方法不仅提高了系统的响应速度,也简化了数据同步的过程,降低了开发和维护成本。