当前位置: 首页 > article >正文

浅谈canal实例 在docker里面安装canal镜像 Canal监听MySQL数据库变更并同步更新Redis和Elasticsearch 示例

目录

1. 环境准备

1.1 MySQL配置

1.2 部署Canal Server

2. Spring Boot项目配置

2.1 添加依赖

2.2 配置参数

3. 实现Canal监听与同步

3.1 Canal客户端监听

3.2 同步到Redis

3.3 同步到Elasticsearch

4. 注意事项


在Spring Boot中通过Canal监听MySQL数据库变更并同步更新Redis和Elasticsearch,可按照以下步骤实现:


1. 环境准备

1.1 MySQL配置
  • 开启Binlog并设置为ROW模式:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
  • 创建Canal用户并授权:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
1.2 部署Canal Server
  1. 下载Canal Server:Canal Releases
  2. 修改配置 conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306 
canal.instance.dbUsername=canal 
canal.instance.dbPassword=canal 
canal.instance.filter.regex=.*\\..*  # 监听所有库表,或指定如test.user

2. Spring Boot项目配置

2.1 添加依赖
<!-- Canal客户端 -->
<dependency>
    <groupId>com.alibaba.otter</groupId> 
    <artifactId>canal.client</artifactId> 
    <version>1.1.6</version>
</dependency>
<!-- Redis -->
<dependency>
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Elasticsearch -->
<dependency>
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
2.2 配置参数

application.yml

canal:
  server: 127.0.0.1:11111
  destination: example
  username: canal
  password: canal

spring:
  redis:
    host: localhost
    port: 6379
  data:
    elasticsearch:
      cluster-nodes: localhost:9200

3. 实现Canal监听与同步

3.1 Canal客户端监听
@Component
public class CanalListener {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private ElasticsearchRestTemplate esTemplate;

    @PostConstruct
    public void init() {
        CanalConnector connector = CanalConnectors.newSingleConnector( 
                new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal");
        
        Thread thread = new Thread(() -> {
            connector.connect(); 
            connector.subscribe(".*\\..*"); 
            while (true) {
                Message message = connector.getWithoutAck(100); 
                long batchId = message.getId(); 
                if (batchId != -1) {
                    processEntry(message.getEntries()); 
                    connector.ack(batchId); 
                }
            }
        });
        thread.start(); 
    }

    private void processEntry(List<Entry> entries) {
        for (Entry entry : entries) {
            if (entry.getEntryType()  == EntryType.ROWDATA) {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); 
                for (RowData rowData : rowChange.getRowDatasList())  {
                    String tableName = entry.getHeader().getTableName(); 
                    EventType eventType = rowChange.getEventType(); 
                    
                    // 解析变更前后的数据
                    Map<String, String> before = parseColumns(rowData.getBeforeColumnsList()); 
                    Map<String, String> after = parseColumns(rowData.getAfterColumnsList()); 
                    
                    // 根据事件类型同步数据
                    switch (eventType) {
                        case INSERT:
                        case UPDATE:
                            syncToRedis(tableName, after);
                            syncToElasticsearch(tableName, after);
                            break;
                        case DELETE:
                            deleteFromRedis(tableName, before);
                            deleteFromElasticsearch(tableName, before);
                            break;
                    }
                }
            }
        }
    }

    private Map<String, String> parseColumns(List<Column> columns) {
        return columns.stream() 
                .collect(Collectors.toMap(Column::getName,  Column::getValue));
    }
}
3.2 同步到Redis
private void syncToRedis(String tableName, Map<String, String> data) {
    String key = tableName + ":" + data.get("id");  // 假设主键为id
    redisTemplate.opsForValue().set(key,  data);
}

private void deleteFromRedis(String tableName, Map<String, String> data) {
    String key = tableName + ":" + data.get("id"); 
    redisTemplate.delete(key); 
}
3.3 同步到Elasticsearch
private void syncToElasticsearch(String tableName, Map<String, String> data) {
    IndexQuery indexQuery = new IndexQueryBuilder()
            .withId(data.get("id")) 
            .withObject(data)
            .build();
    esTemplate.index(indexQuery,  IndexCoordinates.of(tableName)); 
}

private void deleteFromElasticsearch(String tableName, Map<String, String> data) {
    esTemplate.delete(data.get("id"),  IndexCoordinates.of(tableName)); 
}

4. 注意事项

  1. 异常处理:增加重试机制或记录错误日志,确保网络波动时的数据一致性。
  2. 性能优化:批量处理Canal消息,减少Redis/ES的频繁写入。
  3. 数据结构:确保Elasticsearch的索引Mapping与MySQL表结构兼容。
  4. 事务管理:如需强一致性,可结合本地事务表或消息队列(如RocketMQ)做可靠投递。

通过以上步骤,Spring Boot应用能够实时监听MySQL变更,并自动同步到Redis和Elasticsearch,保障数据一致性。


http://www.kler.cn/a/593768.html

相关文章:

  • 共筑智慧城市新生态!YashanDB与荣科科技完成兼容互认证
  • Hive高频SQL及典型应用场景总结
  • 【Pandas】pandas Series plot
  • STC89C52单片机学习——第28节: [12-2] AT24C02数据存储秒表(定时器扫描按键数码管)
  • 外星人入侵-Python-三
  • C++学习之nginx+fastDFS
  • 深入解析 Redis 原理:架构、数据结构与高效存储
  • 基于开源模型的微调训练及瘦身打造随身扫描仪方案__用AI把手机变成文字识别小能手
  • 【Vue3】01-vue3的基础 + ref reactive
  • Pygame实现记忆拼图游戏14
  • 实时数仓和离线数仓
  • subprocess执行系统命令简明用法
  • 「低延迟+快速集成:Amazon IVS如何重塑实时互动视频体验?」
  • Linux与HTTP中的Cookie和Session
  • 头歌实训--数据预处理Pandas--共三关
  • 黄金屋 #2 我应该将产品开源吗?
  • 雅可比行列式
  • fontTools工具的使用介绍
  • [DeepRetrieval] 用DeepSeek-R1-Zero的思路教会模型怎么用搜索引擎找文本
  • Python数据可视化实战:从基础图表到高级分析