当前位置: 首页 > article >正文

【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步

【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步

  • 0 背景
  • 1 配置MySQL
    • 1.1 开启MySQL的binlog功能
      • 1.1.1 找到mysql配置文件my.ini的位置
      • 1.1.2 开启binlog
    • 1.2 创建canal用户
  • 2 下载配置canal
    • 2.1 canal 1.1.5下载
    • 2.2 配置canal
    • 2.3 启动canal
    • 2.4 测试
  • 3 canal实现双写一致
    • 3.1 Redis操作封装
    • 3.2 编写监听器
    • 3.3 修改ShopServiceImpl.java
    • 3.4 测试
  • 参考资料

0 背景

【黑马点评优化】之使用Caffeine+Redis实现应用级二层缓存_caffeine redis二级缓存-CSDN博客

当时使用Redis+Caffeine实现对商铺信息的应用层两级缓存。文章提到了两级缓存Redis+Caffeine可以解决缓存雪等问题也可以提高接口的性能,但是可能会出现缓存一致性问题。如果数据频繁的变更,可能会导致Redis和Caffeine数据不一致的问题。

为此,使用Canel来解决这一问题。

MySQL工作原理如下:

一句话总结(详细工作原理,可以查看下面的介绍)

模拟MySQL从库读取binlog实现数据变更监听。它支持数据过滤、转换,并能将变更数据推送到不同的下游系统,如消息队列和其他数据库。

我的相关软件版本
mysql:8.0.36
redis:6.2.6
canal:1.1.5

1 配置MySQL

1.1 开启MySQL的binlog功能

1.1.1 找到mysql配置文件my.ini的位置

开始的时候找不到mysql配置文件(my.ini)的位置

通过下列方法找到:

  • 先连接mysql

mysql -h localhost -u root -p123456 (注意,有密码的话,才-p123456)

  • 输入以下命令

show variables like 'datadir';

这样就找到了配置文件所在的位置

1.1.2 开启binlog

my.ini的最后几行加上以下内容

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1

1.2 创建canal用户

DROP USER IF EXISTS 'canal'@'%';

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

但是由于mysql8.0之后 如果只设置了 % 的访问权限,
会导致localhost无法访问
所以 我们需要把当前权限更新为 localhost 再执行一遍
继续执行下述命令

select mysql;

update user set host = 'localhost' where user = 'canal' and host='%';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;
查看所有用户访问权限结果如下:

-- 查看所有用户访问权限
 SELECT DISTINCT CONCAT('User: ''',user,'''@''',host,''';') AS query FROM mysql.user;

在这里插入图片描述

自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password

解决:修改canal用户对应的身份验证插件为mysql_native_password

因此,接着执行下列命令

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'password';

ALTER USER 'canal'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';

之后我们再次查看canal用户对应的身份验证插件,如下即修改成功

在这里插入图片描述

2 下载配置canal

2.1 canal 1.1.5下载

在这里我们选择1.1.15版本的canal,因为canal1.1.15版本以后的canal不兼容Java 1.8

Release v1.1.5 · alibaba/canal (github.com)

选择canal.deployer-1.1.5.tar.gz下载,之后解压到自己的软件目录下。

在这里插入图片描述

2.2 配置canal

进入解压后的Canal目录,找到conf目录下的example实例,通常情况下,你可以通过修改conf/example/instance.properties文件来配置Canal连接到MySQL的参数,主要配置项包括:

canal.instance.master.address:MySQL服务器地址和端口。
canal.instance.dbUsername和canal.instance.dbPassword:用于连接MySQL的用户名和密码。
canal.instance.connectionCharset:数据库的字符集,通常为UTF-8。
canal.instance.tsdb.enable:是否启用表结构历史记录功能,建议开启。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.3 启动canal

当安装好canal的时候,在window中启动bat的时候有些问题,JDK17版本已经不持’PermSize=128m’

因此,删除start.bat中的这一行

在这里插入图片描述

之后,双击bin/start.bat运行即可。

结果如下:

在这里插入图片描述

2.4 测试

pom.xml中导入依赖

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.7</version>
        </dependency>

之后,编写测试类如下:


import java.net.InetSocketAddress;
import java.util.List;



import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import org.jetbrains.annotations.NotNull;

public class CanalTest {


    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmtryCount = 1200;
            while (emptyCount < totalEmtryCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(@NotNull List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(@NotNull List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

运行后,能看到,监听到了数据库的变化

在这里插入图片描述

3 canal实现双写一致

导入依赖

        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>

新建com.hmdp.cache.handler包

3.1 Redis操作封装

新建ShopRedisHandler类

package com.hmdp.cache.handler;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.github.benmanes.caffeine.cache.Cache;
import com.hmdp.entity.Shop;
import com.hmdp.service.IShopService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

import static com.hmdp.utils.RedisConstants.CACHE_SHOP_KEY;

@Component
public class ShopRedisHandler implements InitializingBean {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private IShopService shopService;


    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Resource
    private Cache<String, Object> shopCache;

    // 缓存预热

    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化缓存
        // 1.查询商品信息
        List<Shop> shopList = shopService.list();
        // 2.放入缓存
        for (Shop shop : shopList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(shop);
            // 2.2 存入caffeind
            String key = CACHE_SHOP_KEY + shop.getId();
            shopCache.put(key, shop);
            // 2.2.存入redis
            redisTemplate.opsForValue().set(key, json);
        }

//        // 3.查询商品库存信息
//        List<ItemStock> stockList = stockService.list();
//        // 4.放入缓存
//        for (ItemStock stock : stockList) {
//            // 2.1.item序列化为JSON
//            String json = MAPPER.writeValueAsString(stock);
//            // 2.2.存入redis
//            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
//        }
    }

    public void saveShop(Shop shop) {
        try {
            String json = MAPPER.writeValueAsString(shop);
            String key = CACHE_SHOP_KEY  + shop.getId();
            redisTemplate.opsForValue().set(key + shop.getId(), json);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteShopById(Long id) {
        String key = CACHE_SHOP_KEY  + id;
        redisTemplate.delete(key + id);
    }
}

3.2 编写监听器

新建ShopHandler类

通过实现EntryHandler<T>接口编写监听器,监听Canal消息。注意两点:

  • 实现类通过@CanalTable("tb_item")指定监听的表信息
  • EntryHandler的泛型是与表对应的实体类
package com.hmdp.cache.handler;

import com.github.benmanes.caffeine.cache.Cache;
import com.hmdp.entity.Shop;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

import javax.annotation.Resource;

import static com.hmdp.utils.RedisConstants.CACHE_SHOP_KEY;

@CanalTable(value = "tb_shop")
@Component
public class ShopHandler implements EntryHandler<Shop>{


    @Autowired
    private ShopRedisHandler redisHandler;

    @Resource
    private Cache<String, Object> shopCache;

    @Override
    public void insert(Shop shop) {
        // 写数据到JVM进程缓存
        String key = CACHE_SHOP_KEY + shop.getId();
        shopCache.put(key , shop);
        // 写数据到redis
        redisHandler.saveShop(shop);
    }

    @Override
    public void update(Shop before, Shop after) {
        // 写数据到JVM进程缓存
        String key = CACHE_SHOP_KEY + after.getId();
        shopCache.put(key, after);
        // 写数据到redis
        redisHandler.saveShop(after);
    }

    @Override
    public void delete(Shop shop) {
        // 删除数据到JVM进程缓存
        String key = CACHE_SHOP_KEY + shop.getId();
        shopCache.invalidate(key);
        // 删除数据到redis
        redisHandler.deleteShopById(shop.getId());
    }
}

3.3 修改ShopServiceImpl.java

修改ShopServiceImpl中的update方法。注释掉删除缓存的操作。

缓存更新由监听器完成。

    @Override
    public Result update(Shop shop) {

        Long id = shop.getId();
        if(id == null){
            return Result.fail("店铺id不能为空");
        }
        //1.更新数据库
        updateById(shop);

        // @TODO 现在不再需要删除缓存了,由canal监听数据库的变化,然后更新缓存
        //2.删除缓存
//        stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
        return Result.ok();
    }

3.4 测试

接下来可以自行调断点测试,也可以运行项目,然后更改数据库,查看终端输出。
在这里插入图片描述

参考资料

mysql 8.0找不到my.ini配置文件解决方案_mysql80没有my.ini-CSDN博客

Docker整合canal 踩坑实录_com.alibaba.otter.canal.parse.exception.canalparse-CSDN博客

Canal 1.1.5 启动报错:caching_sha2_password Auth failed_canal启动出现canal.deployer-1.1.5.jar:na-CSDN博客

Canal启动和运行出现的问题_canal启动闪退-CSDN博客


http://www.kler.cn/a/553563.html

相关文章:

  • 负载均衡 方式
  • 【Elasticsearch】搜索时分片路由
  • Go入门之函数
  • 高并发系统架构设计全链路指南
  • 自制AirTag,支持安卓/鸿蒙/PC/Home Assistant,无需拥有iPhone
  • 应用程序中的网络协议:原理、应用与挑战
  • Java 大视界 -- 企业数字化转型中的 Java 大数据战略与实践(93)
  • 什么是网关,网关的作用是什么?网络安全零基础入门到精通实战教程!
  • 当Qt遇见IOCP:用C++打造高并发服务器
  • 【物种分布】基于R语言、MaxEnt模型融合技术的物种分布模拟、参数优化方法、结果分析制图与论文写作
  • 51单片机学习之旅——在LCD1602上显示时钟
  • FPGA实现GTY光口视频转USB3.0传输,基于FT601+Aurora 8b/10b编解码架构,提供2套工程源码和技术支持
  • DeepSeek-R1:通过强化学习激励大型语言模型的推理能力
  • javaSE学习笔记22-线程(thread)-线程通信、线程池
  • 应用接入gpt
  • React之旅-01 初识
  • windows使用clion运行lua文件,并且使用cjson
  • 【etcd】etcd_go操作与etcd锁实现原理
  • 通过例子学 rust 个人精简版 3-1
  • 【系统架构】分布式事务模型详解