使用canal增量同步ES索引库数据
Canal增量数据同步利器
Canal介绍
canal主要用途是基于 MySQL 数据库增量日志解析,并能提供增量数据订阅和消费,应用场景十分丰富。
github地址:https://github.com/alibaba/canal
版本下载地址:https://github.com/alibaba/canal/releases
文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart
Canal应用场景
1.电商场景下商品、用户实时更新同步到至Elasticsearch、solr等搜索引擎;
2.价格、库存发生变更实时同步到redis;
3.数据库异地备份、数据同步;
4.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。
MySQL主从复制原理
1.MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
2.MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
3.MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
Canal工作原理
1.canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
2.MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
3.canal 解析 binary log 对象(原始为 byte 流)
Canal安装
参考文档:https://github.com/alibaba/canal/wiki/QuickStart
MySQL Bin-log开启
1)MySQL开启bin-log
a.进入mysql容器
docker exec -it -u root mysql /bin/bash
b.开启mysql的binlog
cd /etc/mysql/mysql.conf.d
在mysqld.cnf最下面添加如下配置
# 开启 binlog
log-bin=/var/lib/mysql/mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server-id=12345
c.创建账号并授权
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
d.重启mysql
docker restart mysql
开启bin-log后,我们可以用sql语句查看下:
show variables like '%log_bin%'
效果如下:
Canal安装
1)拉取镜像
docker pull canal/canal-server:v1.1.1
2)安装容器
a.安装canal-server容器
docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server
b.配置canal-server
修改/home/admin/canal-server/conf/canal.properties,将它的id属性修改成和mysql数据库中server-id不同的值,如下图:
c.修改/home/admin/canal-server/conf/example/instance.properties,配置要监听的数据库服务地址和监听数据变化的数据库以及表,修改如下:
指定监听数据库表的配置如下canal.instance.filter.regex:
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)
重启canal:
docker restart canal
Canal微服务
我们搭建一个微服务,用于读取canal监听到的变更日志,微服务名字叫seckill-canal。该项目我们需要引入canal-spring-boot-autoconfigure包,并且需要实现EntryHandler接口,该接口中有3个方法,分别为insert、update、delete,这三个方法用于监听数据增删改变化。
参考地址:https://github.com/NormanGyllenhaal/canal-client
1)pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>seckill-service</artifactId>
<groupId>com.seckill</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seckill-canal</artifactId>
<dependencies>
<!--web-->
<dependency>
<groupId>com.seckill</groupId>
<artifactId>seckill-web</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--esAPI-->
<dependency>
<groupId>com.seckill</groupId>
<artifactId>seckill-search-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--goodsAPI-->
<dependency>
<groupId>com.seckill</groupId>
<artifactId>seckill-goods-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--canal-->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-autoconfigure</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<!-- 指定该Main Class为全局的唯一入口 -->
<mainClass>com.seckill.CanalApplication</mainClass>
<layout>ZIP</layout>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal><!--可以把依赖的包都打包到生成的Jar包中-->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
bootstrap.yml配置
server:
port: 18088
spring:
application:
name: seckill-canal
cloud:
nacos:
config:
file-extension: yaml
server-addr: nacos-server:8848
discovery:
#Nacos的注册地址
server-addr: nacos-server:8848
#超时配置
ribbon:
ReadTimeout: 3000000
#Canal配置
canal:
server: canal-server:11111
destination: example
#日志
logging:
level:
root: error
2)创建com.seckill.handler.SkuHandler实现EntryHandler接口,代码如下:
@Component
@CanalTable(value = "tb_sku")
public class SkuHandler implements EntryHandler<Sku> {
/***
* 增加数据
* @param sku
*/
@Override
public void insert(Sku sku) {
System.out.println("===========insert:"+sku);
}
/***
* 修改数据
* @param before
* @param after
*/
@Override
public void update(Sku before, Sku after) {
System.out.println("===========update-before:"+before);
System.out.println("===========update-after:"+after);
}
/***
* 删除数据
* @param sku
*/
@Override
public void delete(Sku sku) {
System.out.println("===========delete:"+sku);
}
}
3)创建启动类
@SpringBootApplication
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class,args);
}
}
程序启动后,修改tb_sku数据,可以看到控制会打印修改前后的数据:
索引库同步
当tb_sku秒杀商品发生变化时,我们应该同时变更索引库中的索引数据,比如秒杀商品增加,则需要同步增加秒杀商品的索引,如果有秒杀商品删除,则需要同步移除秒杀商品。
修改seckill-canal中的com.seckill.handler.SkuHandler的增删改方法,代码如下:
@Component
@CanalTable(value = "tb_sku")
public class SkuHandler implements EntryHandler<Sku> {
@Autowired
private SkuInfoFeign skuInfoFeign;
/***
* 增加数据
* @param sku
*/
@Override
public void insert(Sku sku) {
//将Sku转换成SkuInfo
SkuInfo skuInfo = JSON.parseObject( JSON.toJSONString(sku) ,SkuInfo.class);
//同步索引
skuInfoFeign.modify(1,skuInfo);
}
/***
* 修改数据
* @param before
* @param after
*/
@Override
public void update(Sku before, Sku after) {
int type=2;
//将Sku转换成SkuInfo
SkuInfo skuInfo = JSON.parseObject( JSON.toJSONString(after) ,SkuInfo.class);
if(skuInfo.getStatus()==1 || after.getSeckillNum()<=0){
//商品变成了普通商品,或者商品库存为0,则需要删除索引数据
type=3;
}
//同步索引
skuInfoFeign.modify(type,skuInfo);
}
/***
* 删除数据
* @param sku
*/
@Override
public void delete(Sku sku) {
//将Sku转换成SkuInfo
SkuInfo skuInfo = JSON.parseObject( JSON.toJSONString(sku) ,SkuInfo.class);
//同步索引
skuInfoFeign.modify(3,skuInfo);
}
}
开启Feign功能:@EnableFeignClients(basePackages = {“com.seckill.search.feign”})
此时对数据库中tb_sku表进行增删改的时候,会同步到索引库中。