当前位置: 首页 > article >正文

Spring Boot+RabbitMQ+Canal 解决数据一致性

目录大纲

    • 一、环境配置
      • 1.1 docker-compose.yml 配置
      • 1.2 docker-compose 常用命令
      • 1.3 镜像服务启动状态
    • 二、MySQL binlog 配置
      • 2.1 docker-compose command 配置 binlog
      • 2.2 创建canal用户,以及查看是否开启binlog
    • 三、canal 相关配置文件
      • 3.1 canal.properties 完整文件
      • 3.2 instance.properties 完整文件
      • 3.3 检查配置是否与宿主机一致
      • 3.4 开启相关端口防火墙配置
    • 四、代码实现
      • 4.1 相关pom依赖引入
      • 4.2 完整pom.xml
      • 4.3 application.yml 配置
      • 4.4 完整application.yml配置
      • 4.5 RabbitConstants 基础常量配置
      • 4.6 CanalMqConfigure MQ队列交换机配置
      • 4.7 CanalConsumer 消费者
    • 五、运行与测试

一、环境配置

1.1 docker-compose.yml 配置

version: '3.8'

services:
  redis:
    container_name: redis
    image: redis:6.2.7
    restart: always
    networks:
      - app_net
    ports:
      - "6379:6379"
    volumes:
      - /usr/local/docker/redis/data:/data
      - /usr/local/docker/redis/config/redis.conf:/usr/local/redis/config/redis.conf
      - /usr/local/docker/redis/logs:/logs
    command: [ "redis-server","/usr/local/redis/config/redis.conf" ]

  mysql:
    container_name: mysql
    image: mysql:8.0.30
    restart: always
    networks:
      - app_net
    ports:
      - "3306:3306"
    volumes:
      - /usr/local/docker/mysql/data:/var/lib/mysql
      - /usr/local/docker/mysql/config:/etc/mysql/conf.d
    environment:
      MYSQL_ROOT_PASSWORD: root
      TZ: Asia/Shanghai
    command:
      --default-authentication-plugin=mysql_native_password
      --character-set-server=utf8mb4
      --collation-server=utf8mb4_general_ci
      --explicit_defaults_for_timestamp=true
      --lower_case_table_names=1
      --log-bin=/var/lib/mysql/mysql-bin
      --server-id=1
      --binlog-format=ROW
      --expire_logs_days=7
      --max_binlog_size=500M

  canal:
    image: canal/canal-server:v1.1.5
    container_name: canal
    restart: always
    ports:
      - 11110:11110
      - 11111:11111
      - 11112:11112
    volumes:
      - /usr/local/docker/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
      - /usr/local/docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
      - /usr/local/docker/canal/logs:/home/admin/canal-server/logs
    networks:
      - app_net
    depends_on:
      - mysql
      - rabbitmq

  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    restart: always
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - /usr/local/docker/rabbitmq/data/:/var/lib/rabbitmq/
      - /usr/local/docker/rabbitmq/log/:/var/log/rabbitmq/
    environment:
      - RABBITMQ_DEFAULT_USER=guest
      - RABBITMQ_DEFAULT_PASS=guest
    networks:
      - app_net

networks:
  app_net:
    driver: bridge

1.2 docker-compose 常用命令

# 后台启动容器编排文件
docker-compose up -d [service]

# 停止up命令所启动的容器,并移除网络
docker-compose down

# 进入指定容器
docker-compose exec [service]

# 列出项目中所有的容器
docker-compose ps [service]

# 重启项目中容器
docker-compose restart [service]

# 删除项目中所有容器
docker-compose rm -f [service]

# 启动项目中容器(或指定容器)
docker-compose start [service]

# 暂停项目中容器(或指定容器)
docker-compose stop [service]

1.3 镜像服务启动状态

[root@lavm-13jmyj9ugf docker]# docker ps -a
CONTAINER ID   IMAGE                           COMMAND                  CREATED          STATUS          PORTS                                                                                                                                                 NAMES
0d4260bc557b   canal/canal-server:v1.1.5       "/alidata/bin/main.s…"   38 minutes ago   Up 38 minutes   9100/tcp, 0.0.0.0:11110-11112->11110-11112/tcp, :::11110-11112->11110-11112/tcp                                                                       canal
c66b3f1f13a9   mysql:8.0.30                    "docker-entrypoint.s…"   38 minutes ago   Up 38 minutes   0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp                                                                                                  mysql
645e27bd4001   rabbitmq:3-management           "docker-entrypoint.s…"   5 hours ago      Up 49 minutes   4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq
f55d42cbbd8e   redis:6.2.7                     "docker-entrypoint.s…"   3 days ago       Up 49 minutes   0.0.0.0:6379->6379/tcp, :::6379->6379/tcp                                                                                                             redis

二、MySQL binlog 配置

2.1 docker-compose command 配置 binlog

--log-bin=/var/lib/mysql/mysql-bin
--server-id=1
--binlog-format=ROW
--expire_logs_days=7
--max_binlog_size=500M

2.2 创建canal用户,以及查看是否开启binlog

mysql> CREATE USER canal IDENTIFIED BY 'canal';  
Query OK, 0 rows affected (0.05 sec)

mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.05 sec)

mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.05 sec)

mysql> select * from mysql.user where User = 'canal';
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| Host | User  | Select_priv | Insert_priv | Update_priv | Delete_priv | Create_priv | Drop_priv | Reload_priv | Shutdown_priv | Process_priv | File_priv | Grant_priv | References_priv | Index_priv | Alter_priv | Show_db_priv | Super_priv | Create_tmp_table_priv | Lock_tables_priv | Execute_priv | Repl_slave_priv | Repl_client_priv | Create_view_priv | Show_view_priv | Create_routine_priv | Alter_routine_priv | Create_user_priv | Event_priv | Trigger_priv | Create_tablespace_priv | ssl_type | ssl_cipher | x509_issuer | x509_subject | max_questions | max_updates | max_connections | max_user_connections | plugin                | authentication_string                     | password_expired | password_last_changed | password_lifetime | account_locked | Create_role_priv | Drop_role_priv | Password_reuse_history | Password_reuse_time | Password_require_current | User_attributes |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| %    | canal | Y           | N           | N           | N           | N           | N         | N           | N             | N            | N         | N          | N               | N          | N          | N            | N          | N                     | N                | N            | Y               | Y                | N                | N              | N                   | N                  | N                | N          | N            | N                      |          |            |             |              |             0 |           0 |               0 |                    0 | mysql_native_password | *E3619321C1A937C46A0D8BD1DAC39F93B27D4458 | N                |   2025-03-10 11:53:49 | NULL              | N              | N                | N              | NULL                   | NULL                | NULL                     | NULL            |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
1 row in set (0.08 sec)

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.30 sec)

三、canal 相关配置文件

canal.properties 主要核心配置

canal.serverMode=rabbitMQ:选择 RabbitMQ 作为通知服务模型。
rabbitmq.host=rabbitmq:基于 Docker 同一网络下,可以使用容器名称代替 host。
rabbitmq.queuerabbitmq.routingKeyrabbitmq.exchange : RabbitMQ 的三件套,用于后续创建具体通道监听。

#################################################
#########               common argument         #############
#################################################
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 支持的服务模型,tcp直连或mq,此处我选择RabbitMQ
canal.serverMode=rabbitMQ


##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=

instance.properties 主要核心配置
canal.instance.master.address 数据库地址
canal.instance.dbUsername 数据库用户名
canal.instance.dbPassword 数据库密码
canal.mq.topic RabbitMQ路由

# 数据地址,此处mysql,是因为canal和mysql是同一network下,可以使用容器名称代替具体ip
canal.instance.master.address=mysql:3306

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root

# mq config
canal.mq.topic=canal-routing-key

3.1 canal.properties 完整文件

#################################################
#########               common argument         #############
#################################################
canal.ip=
canal.register.ip=
canal.port=11111
canal.metrics.pull.port=11112
canal.admin.port=11110
canal.admin.user=admin
canal.admin.passwd=
canal.zkServers=
canal.zookeeper.flush.period=1000
canal.withoutNetty=false
canal.serverMode=rabbitMQ
canal.file.data.dir=${canal.conf.dir}
canal.file.flush.period=1000
canal.instance.memory.buffer.size=16384
canal.instance.memory.buffer.memunit=1024 
canal.instance.memory.batch.mode=MEMSIZE
canal.instance.memory.rawEntry=true
canal.instance.detecting.enable=false
canal.instance.detecting.sql=select 1
canal.instance.detecting.interval.time=3
canal.instance.detecting.retry.threshold=3
canal.instance.detecting.heartbeatHaEnable=false
canal.instance.transaction.size=1024
canal.instance.fallbackIntervalInSeconds=60
canal.instance.network.receiveBufferSize=16384
canal.instance.network.sendBufferSize=16384
canal.instance.network.soTimeout=30
canal.instance.filter.druid.ddl=true
canal.instance.filter.query.dcl=false
canal.instance.filter.query.dml=false
canal.instance.filter.query.ddl=false
canal.instance.filter.table.error=false
canal.instance.filter.rows=false
canal.instance.filter.transaction.entry=false
canal.instance.filter.dml.insert=false
canal.instance.filter.dml.update=false
canal.instance.filter.dml.delete=false
canal.instance.binlog.format=ROW,STATEMENT,MIXED 
canal.instance.binlog.image=FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation=false
canal.instance.parser.parallel=true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize=256
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
canal.instance.tsdb.snapshot.interval=24
canal.instance.tsdb.snapshot.expire=360
#################################################
#########               destinations            #############
#################################################
canal.destinations=example
canal.conf.dir=../conf
canal.auto.scan=true
canal.auto.scan.interval=5
canal.auto.reset.latest.pos.mode=false
canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
canal.instance.global.mode=spring
canal.instance.global.lazy=false
canal.instance.global.manager.address=${canal.admin.manager}
canal.instance.global.spring.xml=classpath:spring/file-instance.xml
##################################################
#########             MQ Properties      #############
##################################################
canal.aliyun.accessKey=
canal.aliyun.secretKey=
canal.aliyun.uid=
canal.mq.flatMessage=true
canal.mq.canalBatchSize=50
canal.mq.canalGetTimeout=100
canal.mq.accessChannel=local
canal.mq.database.hash=true
canal.mq.send.thread.size=30
canal.mq.build.thread.size=8
##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers=127.0.0.1:9092
kafka.acks=all
kafka.compression.type=none
kafka.batch.size=16384
kafka.linger.ms=1
kafka.max.request.size=1048576
kafka.buffer.memory=33554432
kafka.max.in.flight.requests.per.connection=1
kafka.retries=0
kafka.kerberos.enable=false
kafka.kerberos.krb5.file=../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file=../conf/kerberos/jaas.conf
##################################################
#########                   RocketMQ         #############
##################################################
rocketmq.producer.group=test
rocketmq.enable.message.trace=false
rocketmq.customized.trace.topic=
rocketmq.namespace=
rocketmq.namesrv.addr=127.0.0.1:9876
rocketmq.retry.times.when.send.failed=0
rocketmq.vip.channel.enabled=false
rocketmq.tag=
##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=
##################################################
#########                     Pulsar         #############
##################################################
pulsarmq.serverUrl=
pulsarmq.roleToken=
pulsarmq.topicTenantPrefix=

3.2 instance.properties 完整文件

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# position info
canal.instance.master.address=mysql:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# multi stream for polardbx
canal.instance.multi.stream.on=false

# ssl
#canal.instance.master.sslMode=DISABLED
#canal.instance.master.tlsVersions=
#canal.instance.master.trustCertificateKeyStoreType=
#canal.instance.master.trustCertificateKeyStoreUrl=
#canal.instance.master.trustCertificateKeyStorePassword=
#canal.instance.master.clientCertificateKeyStoreType=
#canal.instance.master.clientCertificateKeyStoreUrl=
#canal.instance.master.clientCertificateKeyStorePassword=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=admin123!@#
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=canal-routing-key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

3.3 检查配置是否与宿主机一致

进入容器内部:docker exec -it canal bash

检查配置文件内容是否与宿主机一致:

  • cat /home/admin/canal-server/conf/canal.properties
  • cat /home/admin/canal-server/conf/example/instance.properties

3.4 开启相关端口防火墙配置

  • canal:11110、11111、11112
  • mysql:3306
  • redis:6379
  • RabbitMQ: 15672、5672
    在这里插入图片描述

四、代码实现

4.1 相关pom依赖引入

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.7.15</version>
  <relativePath/> <!-- lookup parent from repository -->
</parent>

<!-- Spring Boot MQ 依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  <!-- canal -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

4.2 完整pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.neo</groupId>
    <artifactId>code-repository</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.15</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <name>code-repository</name>

    <properties>
        <java.version>17</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hutool.version>5.8.20</hutool.version>
        <mysql.version>8.0.30</mysql.version>
        <mybatis-plus.version>3.5.3.1</mybatis-plus.version>
        <redis.version>3.1.0</redis.version>
        <druid.version>1.2.16</druid.version>
        <fastjson.version>1.2.83</fastjson.version>
        <sa-token.version>1.37.0</sa-token.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>${hutool.version}</version>
        </dependency>

        <dependency>
            <groupId>cn.dev33</groupId>
            <artifactId>sa-token-spring-boot-starter</artifactId>
            <version>${sa-token.version}</version>
        </dependency>
        <dependency>
            <groupId>cn.dev33</groupId>
            <artifactId>sa-token-redis-jackson</artifactId>
            <version>${sa-token.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

4.3 application.yml 配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true

4.4 完整application.yml配置

server:
  port: 8088
  servlet:
    context-path: /api

spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/db_v1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
    username: root
    password: root
    druid:
      initial-size: 5
      min-idle: 5
      max-active: 20
      max-wait: 60000
      validation-query: SELECT 1
      test-while-idle: true
      stat-view-servlet:
        enabled: true
        url-pattern: /druid/*
        login-username: admin
        login-password: admin123
      web-stat-filter:
        enabled: true
        url-pattern: /*
        exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"
      filter:
        stat:
          enabled: true
          log-slow-sql: true
          slow-sql-millis: 1000
        wall:
          enabled: true
          config:
            drop-table-allow: false
  redis:
    host: localhost
    port: 6379
    password: 123456
    database: 0
    timeout: 5000
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0
  mail:
    host: smtp.aliyun.com
    username: 
    password: 
    port: 25
    properties:
      mail:
        smtp:
          auth: true
        starttls:
          enable: true
          required: true
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true

mybatis-plus:
  mapper-locations: classpath*:mapper/*_Mapper.xml
  global-config:
    db-config:
      logic-delete-field: delFlag
      logic-delete-value: 1
      logic-not-delete-value: 0
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

4.5 RabbitConstants 基础常量配置

public interface RabbitConstants {

    interface Canal {
        String QUEUE = "canal-queue";
        String EXCHANGE = "canal-exchange";
        String ROUTING = "canal-routing-key";
    }

    interface EventType {
        String INSERT = "INSERT";
        String UPDATE = "UPDATE";
        String DELETE = "DELETE";
    }

}

4.6 CanalMqConfigure MQ队列交换机配置

@Configuration
public class CanalMqConfigure {

    @Bean
    public Queue queue() {
        return new Queue(RabbitConstants.Canal.QUEUE, true);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(RabbitConstants.Canal.EXCHANGE, true, false);
    }

    @Bean
    public Binding bindingCanal() {
        return BindingBuilder.bind(queue())
                .to(directExchange())
                .with(RabbitConstants.Canal.ROUTING);
    }
}

4.7 CanalConsumer 消费者

package com.neo.core.canal;

import com.neo.core.constant.RabbitConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

@Slf4j
@Component
@RabbitListener(queues = RabbitConstants.Canal.QUEUE)
public class CanalConsumer {
    @RabbitHandler
    public void execute(Map<String, Object> msg) {
        log.info("canal消息监听事件触发,消息内容:{}", msg);

        boolean isDdl = (boolean) msg.get("isDdl");

        if (isDdl) {
            return;
        }

        String database = (String) msg.get("database");
        String table = (String) msg.get("table");
        String type = (String) msg.get("type");
        List<?> data = (List<?>) msg.get("data");

        log.info("database:{}.table:{}", database, table);

        if (RabbitConstants.EventType.INSERT.equalsIgnoreCase(type)) {
            System.out.println("INSERT");
        } else if (RabbitConstants.EventType.UPDATE.equalsIgnoreCase(type)) {
            System.out.println("UPDATE");
        } else if (RabbitConstants.EventType.DELETE.equalsIgnoreCase(type)) {
            System.out.println("DELETE");
        } else {
            // 其他事件
        }
    }
}

五、运行与测试

当MySQL数据出现变动后,会触发canal-queue的监听事件,后续可根据具体业务逻辑实现业务处理。

25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test1, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596660000, id=5, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596660932, type=DELETE}
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - database:db_v1.table:sys_user
DELETE
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596663000, id=6, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596664038, type=INSERT}
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - database:db_v1.table:sys_user
INSERT
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - canal消息监听事件触发,消息内容:{data=[{id=2, account=test1, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596668000, id=7, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=[{account=test}], pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596668545, type=UPDATE}
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - database:db_v1.table:sys_user
UPDATE

以上是RabbitMQ+Canal数据一致性的完整解决方案,包括环境配置、代码实现以及运行测试等环节,确保了数据在不同系统间的一致性和可靠性。


http://www.kler.cn/a/580005.html

相关文章:

  • 【时间序列聚类】Feature-driven Time Series Clustering(特征驱动的时间序列聚类)
  • 为什么大模型网站使用 SSE 而不是 WebSocket?
  • 【JAVA】之路启航——初识Java篇
  • 基于Spring Cloud Alibaba的电商系统微服务化实战:从拆分到高可用部署
  • 物理服务器是指的什么?
  • 【实战ES】实战 Elasticsearch:快速上手与深度实践-7.1.1Spark Streaming实时写入ES
  • java学习总结三:springMVC
  • 【批量图片识别改名】如何自动识别图片文字并命名,一次性识别多张图片内容作为文件名,基于WPF和百度OCR,教你如何实现
  • 从头开始开发基于虹软SDK的人脸识别考勤系统(python+RTSP开源)(四)
  • Java本地方法根据线上地址下载图片到本地然后返回本地可以访问的地址
  • c语言笔记 一维数组与二维数组
  • python爬虫:Android自动化工具Auto.js的详细使用
  • RabbitMQ高级特性----生产者确认机制
  • craco.config.js是什么?
  • Java剪刀石头布
  • 小程序实现存储用户注册信息功能 前后端+数据库联调
  • 【2025】基于php+vue的舞蹈培训机构管理系统(源码+文档+调试+图文修改+答疑)
  • 静态网页的爬虫(以电影天堂为例)
  • 基于SpringBoot实现旅游酒店平台功能三
  • 【Academy】Web 缓存欺骗 ------ Web cache deception