【Redis】Redis缓存击穿
1. 概述
缓存击穿:缓存击穿问题也叫热点key问题,一个高并发的key或重建缓存耗时长(复杂)的key失效了,此时大量的请求给数据库造成巨大的压力。如下图,线程1还在构建缓存时,线程2,3,4也来查询缓存,未命中目标到达数据库中查询数据并重建缓存。
2. 解决方案
针对缓存击穿,有两种解决方案。分别是互斥锁和逻辑过期。
2.1 互斥锁
思想:在众多的线程中,只有一个线程可以获得锁,获得锁的线程才能够重建缓存,再释放锁。没有获得锁的线程让其休眠一段时间后再次查询缓存,如果命中目标就返回数据了,如果还是没有命中目标,就再次尝试获得锁,如果获得锁就可以重建缓存,否则再休眠一段时间去缓存中查询,查看是否能命中目标,一直这样循环直到获得目标数据。
获得锁:这个锁不是我们常用的lock, synchronized锁,这两种锁拿到了就会执行,没拿到就等待。但我们这里的锁需要自定义拿到锁和未拿到锁需要干什么。这学习redis基本语法的时候,redis中有个命令和上诉的功能类似,那就是 setnx,如果key不存在就添加,返回结果是1,否则不添加,返回结果是0。
释放锁:释放锁直接将其删除就好了。del setnx
注意:为了避免锁没有被释放而造成死锁原因,最好设置一个有效期作为兜底,即便没有释放锁,有效期过后自动删除,就不会造成死锁了。
这种方式有一个缺点,如果重建缓存比较久,因为加了锁的原因,重建缓存的这段时间其它线程只能等待,性能不高。万一某个因素导致锁没有释放,会发生死锁的情况。
2.2 逻辑过期
逻辑过期,顾名思义,不是真正意义的过期,也可简单理解为永不过期。出现缓存击穿的问题也是key失效导致的,那么我们就不给缓存设置过期时间ttl了。不设置过期时间怎么维护这些缓存呢?总不能一直存在缓存中吧?当然不是了,我们可以在存储数据时再额外存入一个过期时间,后续我们只要维护这个额外的过期时间就好了。
但是换一个角度来看,这个过期时间是由开发人员添加的,redis并不会帮我们管理这些数据,也就是说,这些数据一旦存入redis中,在某种意义上这些数据是持久性的。
一般来说,这些热点key都是在商品做活动的时候用的多,我们会提前把这些高并发数据导入到缓存中,导入数据时就为它们添加逻辑过期时间,等活动结束后,将它们移除即可。另外,查询这些数据理论上来说是一定能命中的,如果没有命中,说明这个数据不是活动数据。所以说只需要判断这些数据是否逻辑过期即可。
那逻辑过期了,也就是说缓存中的是旧数据,需要重建缓存,为了解决线程安全问题,这里也是需要加锁的,但值得一提的是,获得锁的线程(线程1)并不会自己去重建缓存,而是重开一个线程(线程2),委托新线程(线程2)去重建缓存,线程1会先凑合使用旧数据。如果线程2在重建缓存期间,来了一个线程3,因为缓存过期了,必然会尝试获取锁,但锁已经被线程2获取了,所以线程3肯定是获取锁失败的,此时线程3知道了有人帮我们做缓存更新了,于是线程3也拿到过期的数据返回了。就在这时,线程2已经重建好了缓存,并把锁释放了。刚好来了一个线程4,在缓存中命中了目标数据,并返回了最新的数据。
2.3 总结
互斥锁就是在缓存重建的过程,让其他线程进行等待,从而确保数据一致性,但线程需要等待,如果锁没有释放,还会导致服务阻塞,甚至不可用的状态。
逻辑过期是保证在缓存重建期间服务依然可用,但不能保证数据一致性。
3. 实现
3.1 基于互斥锁解决缓存击穿
思想:利用redis的setnx方法来表示获取锁,该方法含义是如果redis中没有这个key,则插入成功,返回1。但是在spring中它帮我们转为了Boolean,因此在stringRedisTemplate中返回true, 如果有这个key则插入失败,则返回0,在stringRedisTemplate返回false,我们可以通过true,或者是false,来表示是否有线程成功插入key,成功插入的key的线程我们认为他就是获得到锁的线程。
// tryLock:尝试获取锁。锁就是redis中的一个key,所以key由使用者传给我们,我们就不在这写死了
private boolean tryLock(String key) {
// 执行setnx,ctrl + p查看参数,可以发现它在存的时候是可以同时设置有效期的
// 有效期的时长跟你的业务有关,一般正常你的业务执行时间是多少,你这个锁的有效期就比它长一点,长个10倍20倍(避免异常情况),例如这里就设置为10秒钟
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
// 这里不要直接将flag返回,因为直接返回它是会做拆箱的,在拆箱的过程中是有可能出现空指针的,因此这里建议大家使用一个工具类BooleanUtil,是hutool包中的,它可以帮你做一个判断(isTrue、isFalse方法),返回的是一个基本数据类型;或者它也可以直接帮你拆箱(isBollean方法)
return BooleanUtil.isTrue(flag);
}
// unlock:释放锁
private void unlock(String key) {
// 之前分析过了,方法锁就是将锁删掉
stringRedisTemplate.delete(key);
}
缓存击穿和缓存穿透的逻辑非常相似,可以在缓存穿透的基础上按照上面的流程图修改。
实现类
public Shop queryWithMutex(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1、从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2、判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 存在,直接返回
return JSONUtil.toBean(shopJson, Shop.class);
}
//判断命中的值是否是空值
if (shopJson != null) {
//返回一个错误信息
return null;
}
// 4.实现缓存重构,缓存重建业务比较复杂,不是一步两步就能搞定的
// 4.1 获取互斥锁,是一个key
String lockKey = "lock:shop:" + id;
Shop shop = null;
try {
boolean isLock = tryLock(lockKey);
// 4.2 判断否获取成功
if(!isLock){
// 4.3 失败,则休眠并重试
// 休眠不要花费太长时间,这里可以先休眠50毫秒试一试,这个方法有异常,最后解决它
Thread.sleep(50);
// 重试就是递归即可
return queryWithMutex(id);
}
// PS:获取锁成功应该再次检测redis缓存是否存在,做DoubleCheck。如果存在则无需重建缓存。但是这里先不检查了。
// 4.4 成功,根据id查询数据库
shop = getById(id);
// 5.不存在,返回错误 // 这个是解决缓存穿透的
if(shop == null){
//将空值写入redis
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
//返回错误信息
return null;
}
// 6.写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_NULL_TTL,TimeUnit.MINUTES);
// 最后ctrl + T用try-catch-finally将代码包起来
} catch (Exception e){
// 这里异常我们就不去做处理了,因为sleep是打断的异常,直接往外抛即可
throw new RuntimeException(e);
}
finally {
// 7.释放互斥锁,因为抛异常的情况下,也是需要执行unlock的,因此需要放到unlock
unlock(lockKey);
}
// 返回
return shop;
}
根据上面的逻辑,为空直接返回null, 为了给用户一个良好的操作体验,查询数据时对返回结果做一个非空判断,给用户一个提示。
@Override
public Result queryById(Long id) {
// 缓存穿透
// Shop shop = queryWithPassThrough(id);
// 互斥锁解决缓存击穿
Shop shop = queryWithMutex(id);
if (shop == null) {
return Result.fail("店铺不存在!");
}
return Result.ok(shop);
}
3.2 基于逻辑过期解决缓存击穿
思想:当用户请求数据时,首先到redis缓存中查询,理论上讲这个是不会出现未命中的情况,因为现在key是不会过期的,因此我们可以认为,一旦这个key添加到了缓存里面,它应该会是永久存在的,除非活动结束,然后我们再删除。像这种热点key往往是一些参加活动的一些商品,我们会提前给它们加入缓存,在那个时候就会给它设置一下逻辑时间。但是在为了健壮性考虑,还是判断一下它有没有命中,真的未命中我们也不需要去做一些击穿、穿透这样的一些解决方案,我们直接给它返回空即可。
核心逻辑其实就是默认它命中了,在命中的情况下,我们需要判断的是它有没有过期,也就是它的逻辑过期时间,这个结果有两种:过期和不过期。如果没有过期,则直接返回redis中的数据,如果过期,那就说明它需要重新加载,去做缓存处理。但是不是任何线程都可以重建,因此这里需要有一个争抢,即它需要先尝试去获取互斥锁,然后判断获取是否成功,如果获取失败,说明在这之前有线程去获取数据库数据,那这个更新我们就不用管了,直接返回旧的即可。而获取锁成功的线程,就需要执行缓存重建,但是也不是自己去执行,而是开启一个独立的线程,由这个线程去执行缓存重建,它自己也是返回旧的数据先用着。
1. 设置逻辑过期时间
由于这个字段是我们为了解决缓存击穿才出现的,所以这个字段在实体类中必然是不存在的,有以下3中方式添加字段。
方式一:在实体类中添加字段,修改了原有代码,具有代码侵入性。(不推荐)
方式二:另外创建一个实体类存放逻辑过期字段,然后在实体类中继承新创建的类,也修改了原有代码,具有代码侵入性。(不推荐)
方式三:在 RedisData
中添加一个Object属性,也就是 RedisData
它自己带有过期时间,并且它里面带有数据,这个数据就是你想存进redis的数据,例如Shop、或者其他的数据,因此它是一个万能的存储对象。这种方案就完全不用对原来的实体类做任何修改
package com.hmdp.utils;
@Data
public class RedisData {
// 设置的逻辑过期时间
private LocalDateTime expireTime;
private Object data;
}
2. 缓存预热
这种热点数据,是需要提前将缓存导入进去的,实际开发中可能会有一个后台管理系统,可以把某一些热点提前在后台添加到缓存中,但由于我们现在没有一个后台管理的系统,因此基于单元测试方式来把数据加入到缓存中,充当是提前做一个缓存的预热。
下面这个方法将查询的数据写入到了缓存中,并为其封装了逻辑过期时间
// saveShop2Redis:将shop添加到redis中
public void saveShop2Redis(Long id, Long expireSeconds) {
// 1.查询店铺数据
Shop shop = getById(id);
// 2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
// 过期时间由参数传进来
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
// 3.写入Redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
这里直接调用上面封装好的代码,模拟热点数据写入缓存中
@Test
void testSaveShop() {
shopService.saveShop2Redis(1L, 10L);
}
3. 处理缓存击穿实现代码
3.1 设置一个常量类存放key, 和锁的过期时间
public static final String LOCK_SHOP_KEY = "lock:shop:"; // 店铺获取的锁(key)的前缀
public static final Long LOCK_SHOP_TTL = 10L; // 锁的过期时间
3.2 缓存穿透核心代码块
@Override
public Result queryById(Long id) {
// 缓存穿透
// Shop shop = queryWithPassThrough(id);
// 互斥锁解决缓存击穿
// Shop shop = queryWithMutex(id);
// 逻辑过期解决缓存击穿
Shop shop = queryWithLogicalExpire(id);
if (shop == null) {
return Result.fail("店铺不存在!");
}
return Result.ok(shop);
}
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire( Long id ) {
String key = CACHE_SHOP_KEY + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否命中
if (StrUtil.isBlank(json)) {
// 3.未命中,直接返回null
return null;
}
// 4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
// redisData.getData()返回的是Object类型,因为RedisData中的data类型是Object,所以使用JSON工具在做反序列化的时候,它并不知道你的类型是不是店铺Shop。此时redisData.getData()的返回值的本质其实是JSONObject,因此这里可以直接强转
JSONObject data = (JSONObject) redisData.getData();
// 当拿到JSONObject类型后,依旧使用JSON工具类,toBean除了可以接收JSON字符串以外,还可以接收JSONObject,然后告诉它我的实际类型是店铺,此时它就能返回给你一个店铺结果了
Shop shop = JSONUtil.toBean(data, Shop.class);
// 当然上面两步有点多余,完全可以放一步,但这里为了方便理解,依旧分为两步
// Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期:过期时间是不是在当前时间之后?
if(expireTime.isAfter(LocalDateTime.now())) {
// 5.1.未过期,直接返回店铺信息
return shop;
}
// 5.2.已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2.判断是否获取锁成功
if (isLock){
// 获取锁成功应该再次检测redis缓冲是否过期,做DoubleCheck。如果存在则无需重建缓存。
// 6.3 成功,开启独立线程实现缓存重建。建议:使用线程池,不要自己去写一个线程,那一定话性能不太好,经常的创建和销毁。
// 提交任务,这个任务我们可以写成一个Lambda表达式的形式
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
// 重建缓存,直接调用之前封装好的方法即可。
// 这里过期时间准确来讲应该设置为30分钟,但是我们为了等一会测试,就先设置成20秒,我们期待的是缓存到底了,然后看看它会不会触发缓存重建的线程安全问题,因此设置短一点,方便我们观察效果
this.saveShop2Redis(id, 20L);
} catch (Exception e){
throw new RuntimeException(e);
} finally {
// 重建缓存一定要释放锁,并且释放锁的动作最好写到finally中
unlock(lockKey);
}
});
}
// 6.4.返回过期的商铺信息
return shop;
}
为了模拟重建缓存有延迟,这里休眠200毫秒。休眠时间越长,越容易引发线程安全问题。