canal详解及demo
提示:如何保证Redis中的数据与数据库中的数据一致性?数据同步canal的介绍和demo、大型企业如何实现mysql到redis的同步?使用binlog实时更新redis缓存、canal的接入教程、win下canal的服务器端、canal客户端的创建、连接、测试教程、数据同步方式canal
文章目录
- 前言
- 一、canal是什么?
- 1.1、工作原理
- 二、canal服务端demo
- 1.先看一下mysql的配置
- 2.修改配置
- 3.编辑my.ini 文件
- 4.创建canal用户
- 5.启动canal服务
- 6.修改canal.properties
- 7.修改instance.properties
- 8.启动canal
- 9.创建canal客户端
- 三、canal客户端demo
- 1.maven依赖
- 2.demo代码
- 三、测试
- 1.启动服务器端
- 2.启动测试端
- 3.修改数据库
- 总结
前言
很多时候我们需要数据库和redis(或者其他中间件,mq、es等)有交互。数据量少、并发量少的时候还好,那一旦并发上来了,怎么保证redis和数据库的数据一致性呢?这时就用到数据库和redis的同步工具-canal了。
一、canal是什么?
canal是阿里中基于java写的一个组件,他的官网是:canal官网
。他的作用是读取mysql数据的binlog日志,然后将其转换为对应的数据(数据的变化或者变化后的数据,跟配置有关),并且同步到相关中间件(本次demo中是用的reids中间件)
1.1、工作原理
原理是(从官网截的图)
二、canal服务端demo
1.先看一下mysql的配置
- show variables like ‘%log_bin%’; – 判断binlong是on还是off,默认是off,需要打开
- show variables like ‘binlog_format’; – 判断binlog的记录方式,有row和statement、fix,这里是用row,row是记录数据数据的变化,变成了啥,statement是记录执行的sql,看不到数据的变化以及最终的结果,fix是俩的混合
- show variables like ‘%server_id%’; 是配置主从的,一般主节点设为1
- show master status; 表示当前mysql中的binlog日志记录在哪里了,现在记录的多大内存了
2.修改配置
如果配置不满足,则需要修改mysql的配置文件: my.ini
win下的查找配置文件的方式: 服务 -》 属性 -》 可执行路径。 本次我这里的是:
“C:\Program Files\MySQL\MySQL Server 5.7\bin\mysqld.exe” --defaults-file=“C:\ProgramData\MySQL\MySQL Server 5.7\my.ini”
3.编辑my.ini 文件
搜索log-bin,在这个log-bin下面新建两条:
log-bin=mysql-bin
binlog-format=ROW
然后重新启动
再次执行下看结果:
4.创建canal用户
创建用户,专门给canal使用
-- 创建一个canal用户(专门去给canal这个同步数据软件使用)
create user canal IDENTIFIED by 'canal';
-- 给canal赋予select、repl权限
grant select ,REPLICATION SLAVE,REPLICATION CLIENT on *.* to 'canal'@'%';
-- 刷新权限
flush privileges;
5.启动canal服务
此时拿到一个canal的服务包。 我的百度云盘:云盘
6.修改canal.properties
解压后,进入到conf目录,找到 canal.properties 文件。这里需要注意的其实就一行:
新增这个:
canal.destinations = example
//如果有多个的话,就用逗号分隔,例如:
//canal.destinations = promption,seckill
这里是一个,就需要在conf中有一个文件夹,文件夹的名字与这个配置的名字一致,如果是两个,就需要两个文件夹了
7.修改instance.properties
进入这个example文件夹里面,还有一个配置文件(instance.properties),这个就是我们要改的配置文件了
这个配置文件中,比较重要的几个是:
# 要监听的数据库的地址
canal.instance.master.address=127.0.0.1:3306
# 当前binlog记录的位置
canal.instance.master.journal.name=mysql-bin.000002
# 当前binlog记录到哪了
canal.instance.master.position=154
# canal连接的账号和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 要监听的数据库表,例如: micromall.sms_home,miscromall.sms_brand
canal.instance.filter.regex=.*\\..*
# 不需要监听的数据库表,例如:mysql\\.test_.*
canal.instance.filter.black.regex=
其中canal.instance.master.journal.name对应的是file。
canal.instance.master.position 对应的是position, 这俩只需要配置一次,后续就会有一个缓存文件(meta.dat)去自己记录最大缓存到哪了
8.启动canal
到这里以后,就可以启动了,直接双击bin文件夹下的 start.bat
9.创建canal客户端
然后创建一个canal客户端去测试一下。
三、canal客户端demo
可以参考官网: https://github.com/alibaba/canal/wiki/ClientExample
1.maven依赖
<!-- canan 的依赖 这个最好是和服务端的版本一致 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
2.demo代码
默认是127.0.0
package com.zheng.canal;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
* @author: ztl
* @date: 2024/11/24 22:31
* @desc:
*/
public class MyCanal {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
三、测试
1.启动服务器端
启动服务器端,也就是canal中的服务器,我本次是 D:\java\ruanjian\canal\canal.deployer-1.1.4\bin 下的 startup.bat 脚本![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/34deb7542e564e3a9cab0b03e95438cc.png
2.启动测试端
运行步骤2中的main方法: 此时看到的效果是不断的再刷新
3.修改数据库
修改数据库中的数据,然后看控制台中的变化:
(本次我修改了mytest库中的course_2023表中的第一条数据,控制台的变化如下:)
到此,我们就能拿到变化后的数据了,后面的操作我不用说大家也知道:将数据转化为实体,然后set进redis、es等各个地方。
总结
以上是一个简单的demo,大家可以基于demo或者官网去在自己项目中做一些个性化的处理。