Spring Boot+RabbitMQ+Canal 解决数据一致性
目录大纲
- 一、环境配置
- 1.1 docker-compose.yml 配置
- 1.2 docker-compose 常用命令
- 1.3 镜像服务启动状态
- 二、MySQL binlog 配置
- 2.1 docker-compose command 配置 binlog
- 2.2 创建canal用户,以及查看是否开启binlog
- 三、canal 相关配置文件
- 3.1 canal.properties 完整文件
- 3.2 instance.properties 完整文件
- 3.3 检查配置是否与宿主机一致
- 3.4 开启相关端口防火墙配置
- 四、代码实现
- 4.1 相关pom依赖引入
- 4.2 完整pom.xml
- 4.3 application.yml 配置
- 4.4 完整application.yml配置
- 4.5 RabbitConstants 基础常量配置
- 4.6 CanalMqConfigure MQ队列交换机配置
- 4.7 CanalConsumer 消费者
- 五、运行与测试
一、环境配置
1.1 docker-compose.yml 配置
version: '3.8'
services:
redis:
container_name: redis
image: redis:6.2.7
restart: always
networks:
- app_net
ports:
- "6379:6379"
volumes:
- /usr/local/docker/redis/data:/data
- /usr/local/docker/redis/config/redis.conf:/usr/local/redis/config/redis.conf
- /usr/local/docker/redis/logs:/logs
command: [ "redis-server","/usr/local/redis/config/redis.conf" ]
mysql:
container_name: mysql
image: mysql:8.0.30
restart: always
networks:
- app_net
ports:
- "3306:3306"
volumes:
- /usr/local/docker/mysql/data:/var/lib/mysql
- /usr/local/docker/mysql/config:/etc/mysql/conf.d
environment:
MYSQL_ROOT_PASSWORD: root
TZ: Asia/Shanghai
command:
--default-authentication-plugin=mysql_native_password
--character-set-server=utf8mb4
--collation-server=utf8mb4_general_ci
--explicit_defaults_for_timestamp=true
--lower_case_table_names=1
--log-bin=/var/lib/mysql/mysql-bin
--server-id=1
--binlog-format=ROW
--expire_logs_days=7
--max_binlog_size=500M
canal:
image: canal/canal-server:v1.1.5
container_name: canal
restart: always
ports:
- 11110:11110
- 11111:11111
- 11112:11112
volumes:
- /usr/local/docker/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
- /usr/local/docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
- /usr/local/docker/canal/logs:/home/admin/canal-server/logs
networks:
- app_net
depends_on:
- mysql
- rabbitmq
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
restart: always
ports:
- "5672:5672"
- "15672:15672"
volumes:
- /usr/local/docker/rabbitmq/data/:/var/lib/rabbitmq/
- /usr/local/docker/rabbitmq/log/:/var/log/rabbitmq/
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
networks:
- app_net
networks:
app_net:
driver: bridge
1.2 docker-compose 常用命令
# 后台启动容器编排文件
docker-compose up -d [service]
# 停止up命令所启动的容器,并移除网络
docker-compose down
# 进入指定容器
docker-compose exec [service]
# 列出项目中所有的容器
docker-compose ps [service]
# 重启项目中容器
docker-compose restart [service]
# 删除项目中所有容器
docker-compose rm -f [service]
# 启动项目中容器(或指定容器)
docker-compose start [service]
# 暂停项目中容器(或指定容器)
docker-compose stop [service]
1.3 镜像服务启动状态
[root@lavm-13jmyj9ugf docker]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0d4260bc557b canal/canal-server:v1.1.5 "/alidata/bin/main.s…" 38 minutes ago Up 38 minutes 9100/tcp, 0.0.0.0:11110-11112->11110-11112/tcp, :::11110-11112->11110-11112/tcp canal
c66b3f1f13a9 mysql:8.0.30 "docker-entrypoint.s…" 38 minutes ago Up 38 minutes 0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp mysql
645e27bd4001 rabbitmq:3-management "docker-entrypoint.s…" 5 hours ago Up 49 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
f55d42cbbd8e redis:6.2.7 "docker-entrypoint.s…" 3 days ago Up 49 minutes 0.0.0.0:6379->6379/tcp, :::6379->6379/tcp redis
二、MySQL binlog 配置
2.1 docker-compose command 配置 binlog
--log-bin=/var/lib/mysql/mysql-bin
--server-id=1
--binlog-format=ROW
--expire_logs_days=7
--max_binlog_size=500M
2.2 创建canal用户,以及查看是否开启binlog
mysql> CREATE USER canal IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.05 sec)
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.05 sec)
mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.05 sec)
mysql> select * from mysql.user where User = 'canal';
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| Host | User | Select_priv | Insert_priv | Update_priv | Delete_priv | Create_priv | Drop_priv | Reload_priv | Shutdown_priv | Process_priv | File_priv | Grant_priv | References_priv | Index_priv | Alter_priv | Show_db_priv | Super_priv | Create_tmp_table_priv | Lock_tables_priv | Execute_priv | Repl_slave_priv | Repl_client_priv | Create_view_priv | Show_view_priv | Create_routine_priv | Alter_routine_priv | Create_user_priv | Event_priv | Trigger_priv | Create_tablespace_priv | ssl_type | ssl_cipher | x509_issuer | x509_subject | max_questions | max_updates | max_connections | max_user_connections | plugin | authentication_string | password_expired | password_last_changed | password_lifetime | account_locked | Create_role_priv | Drop_role_priv | Password_reuse_history | Password_reuse_time | Password_require_current | User_attributes |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| % | canal | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | Y | Y | N | N | N | N | N | N | N | N | | | | | 0 | 0 | 0 | 0 | mysql_native_password | *E3619321C1A937C46A0D8BD1DAC39F93B27D4458 | N | 2025-03-10 11:53:49 | NULL | N | N | N | NULL | NULL | NULL | NULL |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
1 row in set (0.08 sec)
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.30 sec)
三、canal 相关配置文件
canal.properties
主要核心配置
canal.serverMode=rabbitMQ
:选择 RabbitMQ 作为通知服务模型。
rabbitmq.host=rabbitmq
:基于 Docker 同一网络下,可以使用容器名称代替 host。
rabbitmq.queue
、rabbitmq.routingKey
、rabbitmq.exchange
: RabbitMQ 的三件套,用于后续创建具体通道监听。
#################################################
######### common argument #############
#################################################
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 支持的服务模型,tcp直连或mq,此处我选择RabbitMQ
canal.serverMode=rabbitMQ
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=
instance.properties
主要核心配置
canal.instance.master.address
数据库地址
canal.instance.dbUsername
数据库用户名
canal.instance.dbPassword
数据库密码
canal.mq.topic
RabbitMQ路由
# 数据地址,此处mysql,是因为canal和mysql是同一network下,可以使用容器名称代替具体ip
canal.instance.master.address=mysql:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# mq config
canal.mq.topic=canal-routing-key
3.1 canal.properties 完整文件
#################################################
######### common argument #############
#################################################
canal.ip=
canal.register.ip=
canal.port=11111
canal.metrics.pull.port=11112
canal.admin.port=11110
canal.admin.user=admin
canal.admin.passwd=
canal.zkServers=
canal.zookeeper.flush.period=1000
canal.withoutNetty=false
canal.serverMode=rabbitMQ
canal.file.data.dir=${canal.conf.dir}
canal.file.flush.period=1000
canal.instance.memory.buffer.size=16384
canal.instance.memory.buffer.memunit=1024
canal.instance.memory.batch.mode=MEMSIZE
canal.instance.memory.rawEntry=true
canal.instance.detecting.enable=false
canal.instance.detecting.sql=select 1
canal.instance.detecting.interval.time=3
canal.instance.detecting.retry.threshold=3
canal.instance.detecting.heartbeatHaEnable=false
canal.instance.transaction.size=1024
canal.instance.fallbackIntervalInSeconds=60
canal.instance.network.receiveBufferSize=16384
canal.instance.network.sendBufferSize=16384
canal.instance.network.soTimeout=30
canal.instance.filter.druid.ddl=true
canal.instance.filter.query.dcl=false
canal.instance.filter.query.dml=false
canal.instance.filter.query.ddl=false
canal.instance.filter.table.error=false
canal.instance.filter.rows=false
canal.instance.filter.transaction.entry=false
canal.instance.filter.dml.insert=false
canal.instance.filter.dml.update=false
canal.instance.filter.dml.delete=false
canal.instance.binlog.format=ROW,STATEMENT,MIXED
canal.instance.binlog.image=FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation=false
canal.instance.parser.parallel=true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize=256
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
canal.instance.tsdb.snapshot.interval=24
canal.instance.tsdb.snapshot.expire=360
#################################################
######### destinations #############
#################################################
canal.destinations=example
canal.conf.dir=../conf
canal.auto.scan=true
canal.auto.scan.interval=5
canal.auto.reset.latest.pos.mode=false
canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
canal.instance.global.mode=spring
canal.instance.global.lazy=false
canal.instance.global.manager.address=${canal.admin.manager}
canal.instance.global.spring.xml=classpath:spring/file-instance.xml
##################################################
######### MQ Properties #############
##################################################
canal.aliyun.accessKey=
canal.aliyun.secretKey=
canal.aliyun.uid=
canal.mq.flatMessage=true
canal.mq.canalBatchSize=50
canal.mq.canalGetTimeout=100
canal.mq.accessChannel=local
canal.mq.database.hash=true
canal.mq.send.thread.size=30
canal.mq.build.thread.size=8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers=127.0.0.1:9092
kafka.acks=all
kafka.compression.type=none
kafka.batch.size=16384
kafka.linger.ms=1
kafka.max.request.size=1048576
kafka.buffer.memory=33554432
kafka.max.in.flight.requests.per.connection=1
kafka.retries=0
kafka.kerberos.enable=false
kafka.kerberos.krb5.file=../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file=../conf/kerberos/jaas.conf
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group=test
rocketmq.enable.message.trace=false
rocketmq.customized.trace.topic=
rocketmq.namespace=
rocketmq.namesrv.addr=127.0.0.1:9876
rocketmq.retry.times.when.send.failed=0
rocketmq.vip.channel.enabled=false
rocketmq.tag=
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=
##################################################
######### Pulsar #############
##################################################
pulsarmq.serverUrl=
pulsarmq.roleToken=
pulsarmq.topicTenantPrefix=
3.2 instance.properties 完整文件
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# position info
canal.instance.master.address=mysql:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# multi stream for polardbx
canal.instance.multi.stream.on=false
# ssl
#canal.instance.master.sslMode=DISABLED
#canal.instance.master.tlsVersions=
#canal.instance.master.trustCertificateKeyStoreType=
#canal.instance.master.trustCertificateKeyStoreUrl=
#canal.instance.master.trustCertificateKeyStorePassword=
#canal.instance.master.clientCertificateKeyStoreType=
#canal.instance.master.clientCertificateKeyStoreUrl=
#canal.instance.master.clientCertificateKeyStorePassword=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=admin123!@#
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=canal-routing-key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
3.3 检查配置是否与宿主机一致
进入容器内部:docker exec -it canal bash
检查配置文件内容是否与宿主机一致:
cat /home/admin/canal-server/conf/canal.properties
cat /home/admin/canal-server/conf/example/instance.properties
3.4 开启相关端口防火墙配置
canal
:11110、11111、11112mysql
:3306redis
:6379RabbitMQ
: 15672、5672
四、代码实现
4.1 相关pom依赖引入
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.15</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<!-- Spring Boot MQ 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- canal -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
4.2 完整pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.neo</groupId>
<artifactId>code-repository</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.15</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<name>code-repository</name>
<properties>
<java.version>17</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hutool.version>5.8.20</hutool.version>
<mysql.version>8.0.30</mysql.version>
<mybatis-plus.version>3.5.3.1</mybatis-plus.version>
<redis.version>3.1.0</redis.version>
<druid.version>1.2.16</druid.version>
<fastjson.version>1.2.83</fastjson.version>
<sa-token.version>1.37.0</sa-token.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-spring-boot-starter</artifactId>
<version>${sa-token.version}</version>
</dependency>
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-redis-jackson</artifactId>
<version>${sa-token.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
4.3 application.yml 配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
4.4 完整application.yml配置
server:
port: 8088
servlet:
context-path: /api
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/db_v1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
druid:
initial-size: 5
min-idle: 5
max-active: 20
max-wait: 60000
validation-query: SELECT 1
test-while-idle: true
stat-view-servlet:
enabled: true
url-pattern: /druid/*
login-username: admin
login-password: admin123
web-stat-filter:
enabled: true
url-pattern: /*
exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"
filter:
stat:
enabled: true
log-slow-sql: true
slow-sql-millis: 1000
wall:
enabled: true
config:
drop-table-allow: false
redis:
host: localhost
port: 6379
password: 123456
database: 0
timeout: 5000
lettuce:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
mail:
host: smtp.aliyun.com
username:
password:
port: 25
properties:
mail:
smtp:
auth: true
starttls:
enable: true
required: true
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
mybatis-plus:
mapper-locations: classpath*:mapper/*_Mapper.xml
global-config:
db-config:
logic-delete-field: delFlag
logic-delete-value: 1
logic-not-delete-value: 0
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
4.5 RabbitConstants 基础常量配置
public interface RabbitConstants {
interface Canal {
String QUEUE = "canal-queue";
String EXCHANGE = "canal-exchange";
String ROUTING = "canal-routing-key";
}
interface EventType {
String INSERT = "INSERT";
String UPDATE = "UPDATE";
String DELETE = "DELETE";
}
}
4.6 CanalMqConfigure MQ队列交换机配置
@Configuration
public class CanalMqConfigure {
@Bean
public Queue queue() {
return new Queue(RabbitConstants.Canal.QUEUE, true);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(RabbitConstants.Canal.EXCHANGE, true, false);
}
@Bean
public Binding bindingCanal() {
return BindingBuilder.bind(queue())
.to(directExchange())
.with(RabbitConstants.Canal.ROUTING);
}
}
4.7 CanalConsumer 消费者
package com.neo.core.canal;
import com.neo.core.constant.RabbitConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
@RabbitListener(queues = RabbitConstants.Canal.QUEUE)
public class CanalConsumer {
@RabbitHandler
public void execute(Map<String, Object> msg) {
log.info("canal消息监听事件触发,消息内容:{}", msg);
boolean isDdl = (boolean) msg.get("isDdl");
if (isDdl) {
return;
}
String database = (String) msg.get("database");
String table = (String) msg.get("table");
String type = (String) msg.get("type");
List<?> data = (List<?>) msg.get("data");
log.info("database:{}.table:{}", database, table);
if (RabbitConstants.EventType.INSERT.equalsIgnoreCase(type)) {
System.out.println("INSERT");
} else if (RabbitConstants.EventType.UPDATE.equalsIgnoreCase(type)) {
System.out.println("UPDATE");
} else if (RabbitConstants.EventType.DELETE.equalsIgnoreCase(type)) {
System.out.println("DELETE");
} else {
// 其他事件
}
}
}
五、运行与测试
当MySQL数据出现变动后,会触发canal-queue的监听事件,后续可根据具体业务逻辑实现业务处理。
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test1, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596660000, id=5, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596660932, type=DELETE}
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
DELETE
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596663000, id=6, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596664038, type=INSERT}
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
INSERT
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容:{data=[{id=2, account=test1, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596668000, id=7, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=[{account=test}], pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596668545, type=UPDATE}
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
UPDATE
以上是RabbitMQ+Canal数据一致性的完整解决方案,包括环境配置、代码实现以及运行测试等环节,确保了数据在不同系统间的一致性和可靠性。