当前位置: 首页 > article >正文

canal详解及demo

提示:如何保证Redis中的数据与数据库中的数据一致性?数据同步canal的介绍和demo、大型企业如何实现mysql到redis的同步?使用binlog实时更新redis缓存、canal的接入教程、win下canal的服务器端、canal客户端的创建、连接、测试教程、数据同步方式canal

文章目录

  • 前言
  • 一、canal是什么?
    • 1.1、工作原理
  • 二、canal服务端demo
    • 1.先看一下mysql的配置
    • 2.修改配置
    • 3.编辑my.ini 文件
    • 4.创建canal用户
    • 5.启动canal服务
    • 6.修改canal.properties
    • 7.修改instance.properties
    • 8.启动canal
    • 9.创建canal客户端
  • 三、canal客户端demo
    • 1.maven依赖
    • 2.demo代码
  • 三、测试
    • 1.启动服务器端
    • 2.启动测试端
    • 3.修改数据库
  • 总结


前言

很多时候我们需要数据库和redis(或者其他中间件,mq、es等)有交互。数据量少、并发量少的时候还好,那一旦并发上来了,怎么保证redis和数据库的数据一致性呢?这时就用到数据库和redis的同步工具-canal了。


一、canal是什么?

canal是阿里中基于java写的一个组件,他的官网是:canal官网
。他的作用是读取mysql数据的binlog日志,然后将其转换为对应的数据(数据的变化或者变化后的数据,跟配置有关),并且同步到相关中间件(本次demo中是用的reids中间件)

1.1、工作原理

原理是(从官网截的图)
在这里插入图片描述

二、canal服务端demo

1.先看一下mysql的配置

  • show variables like ‘%log_bin%’; – 判断binlong是on还是off,默认是off,需要打开
  • show variables like ‘binlog_format’; – 判断binlog的记录方式,有row和statement、fix,这里是用row,row是记录数据数据的变化,变成了啥,statement是记录执行的sql,看不到数据的变化以及最终的结果,fix是俩的混合
  • show variables like ‘%server_id%’; 是配置主从的,一般主节点设为1
  • show master status; 表示当前mysql中的binlog日志记录在哪里了,现在记录的多大内存了

2.修改配置

如果配置不满足,则需要修改mysql的配置文件: my.ini
win下的查找配置文件的方式: 服务 -》 属性 -》 可执行路径。 本次我这里的是:
“C:\Program Files\MySQL\MySQL Server 5.7\bin\mysqld.exe” --defaults-file=“C:\ProgramData\MySQL\MySQL Server 5.7\my.ini”
在这里插入图片描述

3.编辑my.ini 文件

搜索log-bin,在这个log-bin下面新建两条:
log-bin=mysql-bin
binlog-format=ROW
在这里插入图片描述
然后重新启动
在这里插入图片描述
再次执行下看结果:
在这里插入图片描述

4.创建canal用户

创建用户,专门给canal使用

-- 创建一个canal用户(专门去给canal这个同步数据软件使用)
create user canal IDENTIFIED by 'canal';
-- 给canal赋予select、repl权限
grant select ,REPLICATION SLAVE,REPLICATION CLIENT on *.* to 'canal'@'%';
-- 刷新权限
flush privileges;

5.启动canal服务

此时拿到一个canal的服务包。 我的百度云盘:云盘

6.修改canal.properties

解压后,进入到conf目录,找到 canal.properties 文件。这里需要注意的其实就一行:
在这里插入图片描述
新增这个:

canal.destinations = example
//如果有多个的话,就用逗号分隔,例如:
//canal.destinations = promption,seckill

这里是一个,就需要在conf中有一个文件夹,文件夹的名字与这个配置的名字一致,如果是两个,就需要两个文件夹了
在这里插入图片描述

7.修改instance.properties

进入这个example文件夹里面,还有一个配置文件(instance.properties),这个就是我们要改的配置文件了
在这里插入图片描述
这个配置文件中,比较重要的几个是:

# 要监听的数据库的地址
canal.instance.master.address=127.0.0.1:3306
# 当前binlog记录的位置
canal.instance.master.journal.name=mysql-bin.000002
# 当前binlog记录到哪了
canal.instance.master.position=154
# canal连接的账号和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 要监听的数据库表,例如: micromall.sms_home,miscromall.sms_brand
canal.instance.filter.regex=.*\\..*
# 不需要监听的数据库表,例如:mysql\\.test_.*
canal.instance.filter.black.regex=

其中canal.instance.master.journal.name对应的是file。
canal.instance.master.position 对应的是position, 这俩只需要配置一次,后续就会有一个缓存文件(meta.dat)去自己记录最大缓存到哪了
在这里插入图片描述

8.启动canal

到这里以后,就可以启动了,直接双击bin文件夹下的 start.bat
在这里插入图片描述

9.创建canal客户端

然后创建一个canal客户端去测试一下。

三、canal客户端demo

可以参考官网: https://github.com/alibaba/canal/wiki/ClientExample

在这里插入图片描述

1.maven依赖

<!-- canan 的依赖 这个最好是和服务端的版本一致 -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

2.demo代码

默认是127.0.0

package com.zheng.canal;
import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
 * @author: ztl
 * @date: 2024/11/24 22:31
 * @desc:
 */
public class MyCanal {

    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }




}

三、测试

1.启动服务器端

启动服务器端,也就是canal中的服务器,我本次是 D:\java\ruanjian\canal\canal.deployer-1.1.4\bin 下的 startup.bat 脚本![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/34deb7542e564e3a9cab0b03e95438cc.png

2.启动测试端

运行步骤2中的main方法: 此时看到的效果是不断的再刷新
在这里插入图片描述

3.修改数据库

修改数据库中的数据,然后看控制台中的变化:
(本次我修改了mytest库中的course_2023表中的第一条数据,控制台的变化如下:)
在这里插入图片描述
到此,我们就能拿到变化后的数据了,后面的操作我不用说大家也知道:将数据转化为实体,然后set进redis、es等各个地方。


总结

以上是一个简单的demo,大家可以基于demo或者官网去在自己项目中做一些个性化的处理。


http://www.kler.cn/a/443995.html

相关文章:

  • 深度学习每周学习总结J9(Inception V3 算法实战与解析 - 天气识别)
  • K8s HPA的常用功能介绍
  • 数据库管理系统——数据库设计
  • Set集合进行!contains判断IDEA提示Unnecessary ‘contains()‘ check
  • Datawhale AI冬令营——Chat-悟空设计
  • 【CVE-2024-56145】PHP 漏洞导致 Craft CMS 出现 RCE
  • 如何防范网络安全攻击,防止敏感数据被恶意窃取?
  • 【信息系统项目管理师-论文真题】2018下半年论文详解(包括解题思路和写作要点)
  • PaddlePaddle推理模型利用Paddle2ONNX转换成onnx模型
  • 使用phaser打僵尸游戏
  • Google Chrome浏览器安装到其它盘的方法
  • 小红书飞书素材库 | AI改写 | 无水印下载 | 多维表格 | 采集同步 | 影刀RPA
  • OpenCV函数及其应用
  • 大模型与呼叫中心结合的呼入机器人系统
  • jquery虚拟键盘插件jqkeyboard
  • 红队规范:减少工具上传,善用系统自带程序
  • CSS学习第二天
  • golang结构体转map
  • dify.ai和fastgpt,各有什么优缺点,有什么区别
  • 【计算机视觉基础CV】03-深度学习图像分类实战:鲜花数据集加载与预处理详解
  • 使用Python脚本进行编写批量根据源IP进行查询的语句用于态势感知攻击行为的搜索
  • 省略内容在句子中间
  • 安装配置Ubuntu 22.04
  • Binder 大小
  • 线性规划中的几种逻辑表达式
  • C#核心(18)面向对象多态vob