【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步
【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步
- 0 背景
- 1 配置MySQL
- 1.1 开启MySQL的binlog功能
- 1.1.1 找到mysql配置文件my.ini的位置
- 1.1.2 开启binlog
- 1.2 创建canal用户
- 2 下载配置canal
- 2.1 canal 1.1.5下载
- 2.2 配置canal
- 2.3 启动canal
- 2.4 测试
- 3 canal实现双写一致
- 3.1 Redis操作封装
- 3.2 编写监听器
- 3.3 修改ShopServiceImpl.java
- 3.4 测试
- 参考资料
0 背景
【黑马点评优化】之使用Caffeine+Redis实现应用级二层缓存_caffeine redis二级缓存-CSDN博客
当时使用Redis+Caffeine实现对商铺信息的应用层两级缓存。文章提到了两级缓存Redis+Caffeine可以解决缓存雪等问题也可以提高接口的性能,但是可能会出现缓存一致性问题。如果数据频繁的变更,可能会导致Redis和Caffeine数据不一致的问题。
为此,使用Canel来解决这一问题。
MySQL工作原理如下:
一句话总结(详细工作原理,可以查看下面的介绍)
模拟MySQL从库读取binlog实现数据变更监听。它支持数据过滤、转换,并能将变更数据推送到不同的下游系统,如消息队列和其他数据库。
我的相关软件版本
mysql:8.0.36
redis:6.2.6
canal:1.1.5
1 配置MySQL
1.1 开启MySQL的binlog功能
1.1.1 找到mysql配置文件my.ini的位置
开始的时候找不到mysql配置文件(my.ini)的位置
通过下列方法找到:
- 先连接mysql
mysql -h localhost -u root -p123456
(注意,有密码的话,才-p123456)
- 输入以下命令
show variables like 'datadir';
这样就找到了配置文件所在的位置
1.1.2 开启binlog
my.ini的最后几行加上以下内容
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
1.2 创建canal用户
DROP USER IF EXISTS 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
但是由于mysql8.0之后 如果只设置了 % 的访问权限,
会导致localhost无法访问
所以 我们需要把当前权限更新为 localhost 再执行一遍
继续执行下述命令
select mysql;
update user set host = 'localhost' where user = 'canal' and host='%';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
查看所有用户访问权限结果如下:
-- 查看所有用户访问权限
SELECT DISTINCT CONCAT('User: ''',user,'''@''',host,''';') AS query FROM mysql.user;
自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password
解决:修改canal用户对应的身份验证插件为mysql_native_password
因此,接着执行下列命令
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'password';
ALTER USER 'canal'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';
之后我们再次查看canal用户对应的身份验证插件,如下即修改成功
2 下载配置canal
2.1 canal 1.1.5下载
在这里我们选择1.1.15版本的canal,因为canal1.1.15版本以后的canal不兼容Java 1.8
Release v1.1.5 · alibaba/canal (github.com)
选择canal.deployer-1.1.5.tar.gz下载,之后解压到自己的软件目录下。
2.2 配置canal
进入解压后的Canal目录,找到conf目录下的example实例,通常情况下,你可以通过修改conf/example/instance.properties文件来配置Canal连接到MySQL的参数,主要配置项包括:
canal.instance.master.address:MySQL服务器地址和端口。
canal.instance.dbUsername和canal.instance.dbPassword:用于连接MySQL的用户名和密码。
canal.instance.connectionCharset:数据库的字符集,通常为UTF-8。
canal.instance.tsdb.enable:是否启用表结构历史记录功能,建议开启。
2.3 启动canal
当安装好canal的时候,在window中启动bat的时候有些问题,JDK17版本已经不持’PermSize=128m’
因此,删除start.bat中的这一行
之后,双击bin/start.bat运行即可。
结果如下:
2.4 测试
pom.xml中导入依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.7</version>
</dependency>
之后,编写测试类如下:
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import org.jetbrains.annotations.NotNull;
public class CanalTest {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmtryCount = 1200;
while (emptyCount < totalEmtryCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(@NotNull List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(@NotNull List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
运行后,能看到,监听到了数据库的变化
3 canal实现双写一致
导入依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
新建com.hmdp.cache.handler包
3.1 Redis操作封装
新建ShopRedisHandler类
package com.hmdp.cache.handler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Cache;
import com.hmdp.entity.Shop;
import com.hmdp.service.IShopService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import static com.hmdp.utils.RedisConstants.CACHE_SHOP_KEY;
@Component
public class ShopRedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IShopService shopService;
private static final ObjectMapper MAPPER = new ObjectMapper();
@Resource
private Cache<String, Object> shopCache;
// 缓存预热
@Override
public void afterPropertiesSet() throws Exception {
// 初始化缓存
// 1.查询商品信息
List<Shop> shopList = shopService.list();
// 2.放入缓存
for (Shop shop : shopList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(shop);
// 2.2 存入caffeind
String key = CACHE_SHOP_KEY + shop.getId();
shopCache.put(key, shop);
// 2.2.存入redis
redisTemplate.opsForValue().set(key, json);
}
// // 3.查询商品库存信息
// List<ItemStock> stockList = stockService.list();
// // 4.放入缓存
// for (ItemStock stock : stockList) {
// // 2.1.item序列化为JSON
// String json = MAPPER.writeValueAsString(stock);
// // 2.2.存入redis
// redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
// }
}
public void saveShop(Shop shop) {
try {
String json = MAPPER.writeValueAsString(shop);
String key = CACHE_SHOP_KEY + shop.getId();
redisTemplate.opsForValue().set(key + shop.getId(), json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public void deleteShopById(Long id) {
String key = CACHE_SHOP_KEY + id;
redisTemplate.delete(key + id);
}
}
3.2 编写监听器
新建ShopHandler类
通过实现EntryHandler<T>
接口编写监听器,监听Canal消息。注意两点:
- 实现类通过
@CanalTable("tb_item")
指定监听的表信息 - EntryHandler的泛型是与表对应的实体类
package com.hmdp.cache.handler;
import com.github.benmanes.caffeine.cache.Cache;
import com.hmdp.entity.Shop;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
import javax.annotation.Resource;
import static com.hmdp.utils.RedisConstants.CACHE_SHOP_KEY;
@CanalTable(value = "tb_shop")
@Component
public class ShopHandler implements EntryHandler<Shop>{
@Autowired
private ShopRedisHandler redisHandler;
@Resource
private Cache<String, Object> shopCache;
@Override
public void insert(Shop shop) {
// 写数据到JVM进程缓存
String key = CACHE_SHOP_KEY + shop.getId();
shopCache.put(key , shop);
// 写数据到redis
redisHandler.saveShop(shop);
}
@Override
public void update(Shop before, Shop after) {
// 写数据到JVM进程缓存
String key = CACHE_SHOP_KEY + after.getId();
shopCache.put(key, after);
// 写数据到redis
redisHandler.saveShop(after);
}
@Override
public void delete(Shop shop) {
// 删除数据到JVM进程缓存
String key = CACHE_SHOP_KEY + shop.getId();
shopCache.invalidate(key);
// 删除数据到redis
redisHandler.deleteShopById(shop.getId());
}
}
3.3 修改ShopServiceImpl.java
修改ShopServiceImpl中的update方法。注释掉删除缓存的操作。
缓存更新由监听器完成。
@Override
public Result update(Shop shop) {
Long id = shop.getId();
if(id == null){
return Result.fail("店铺id不能为空");
}
//1.更新数据库
updateById(shop);
// @TODO 现在不再需要删除缓存了,由canal监听数据库的变化,然后更新缓存
//2.删除缓存
// stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
return Result.ok();
}
3.4 测试
接下来可以自行调断点测试,也可以运行项目,然后更改数据库,查看终端输出。
参考资料
mysql 8.0找不到my.ini配置文件解决方案_mysql80没有my.ini-CSDN博客
Docker整合canal 踩坑实录_com.alibaba.otter.canal.parse.exception.canalparse-CSDN博客
Canal 1.1.5 启动报错:caching_sha2_password Auth failed_canal启动出现canal.deployer-1.1.5.jar:na-CSDN博客
Canal启动和运行出现的问题_canal启动闪退-CSDN博客