redis缓存雪崩、击穿、穿透
1.redis雪崩
1.1redis为什么会产生雪崩
Redis雪崩 是指在 Redis 缓存系统中,当大量缓存同时失效时,所有请求直接打到数据库,导致数据库瞬间压力激增,甚至崩溃的现象。雪崩问题通常出现在高并发的系统中,因为缓存的失效导致后端数据库承受不了巨大的请求量。
1.2redis雪崩解决方案
- 缓存预热:在系统上线之前,可以提前将一些常用数据缓存到 Redis 中,避免上线后大量请求直接打到数据库。这可以通过后台线程预先加载一些热门数据,也可以手动设置缓存。
- 设置不同的缓存过期时间:如果所有的缓存数据设置相同的过期时间,当缓存到期后,可能会出现大量缓存同时失效的情况。为了避免这种情况,可以为不同的缓存设置不同的过期时间,或者在设置缓存时加上一个随机的时间差。
- 缓存永不过期:对于一些热点数据,特别是经常被访问但又很少变化的数据,可以设置缓存永不过期,同时在后台更新缓存。
- 缓存降级:当 Redis 宕机或者出现异常时,可以使用缓存降级策略,允许某些非核心数据的读取失败。也可以通过服务降级手段,限制对数据库的访问,从而保护数据库。
- 互斥锁(防止击穿):当大量缓存同时失效时,如果多个线程同时请求数据库并写入缓存,可能会导致数据库压力剧增。可以使用互斥锁的方式,确保只有一个线程能够更新缓存,其他线程等待缓存更新完成后再读取缓存。
- 数据持久化与集群:使用 Redis 的持久化机制(如 RDB、AOF)或搭建 Redis 集群来保证缓存的高可用性。当某个节点失效时,可以自动切换到其他节点,避免缓存服务器宕机导致雪崩。
- 请求限流和熔断:对系统进行限流和熔断保护,当缓存失效时,限制对数据库的请求数量,防止数据库过载。
1.3 、解决方案的具体实现
1.3.1、缓存预热
通过提前加载一些常用的缓存数据,避免在系统刚启动时,所有请求直接打到数据库。这可以通过手动加载或者后台任务实现。
@Service
public class CachePrewarmService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void preloadCache() {
// 假设我们要预热一些数据
String key = "hot_data_key";
Object data = loadDataFromDB(); // 从数据库加载数据
redisTemplate.opsForValue().set(key, data, 1, TimeUnit.HOURS); // 设置缓存,并设定1小时过期
}
private Object loadDataFromDB() {
// 模拟从数据库加载数据
return new Object(); // 返回数据库中的数据
}
}
缓存预热实现代码
1.3.2. 随机过期时间(解决大规模缓存同时失效)
我们可以通过在设置缓存过期时间时,给每个缓存增加一个随机值,避免同时过期导致雪崩。
@Service
public class CacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void setCacheWithRandomTTL(String key, Object value) {
// 设置基础的缓存时间,比如1小时
long baseTime = 60 * 60;
// 添加一个随机的过期时间,避免同一时间大量缓存同时失效
long randomTime = new Random().nextInt(300); // 随机增加0~300秒
redisTemplate.opsForValue().set(key, value, baseTime + randomTime, TimeUnit.SECONDS);
}
}
1.3.3. 使用互斥锁防止缓存击穿
缓存击穿是指某个热点数据的缓存失效后,瞬间大量请求直接打到数据库,导致数据库压力骤增。可以使用分布式锁,确保在缓存失效时,只有一个线程能请求数据库,其他线程等待缓存重新生成。
@Service
public class CacheWithLockService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 获取数据时,使用分布式锁
public Object getCacheWithLock(String key) {
Object value = redisTemplate.opsForValue().get(key);
if (value == null) {
// 使用 Redis 的 setIfAbsent (NX) 命令实现分布式锁
String lockKey = key + "_lock";
Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "LOCK", 5, TimeUnit.SECONDS);
if (lockAcquired != null && lockAcquired) {
try {
// 缓存失效且获得锁,查询数据库并更新缓存
value = loadDataFromDB();
redisTemplate.opsForValue().set(key, value, 60, TimeUnit.SECONDS);
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
} else {
// 未获得锁,等待缓存更新
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return redisTemplate.opsForValue().get(key); // 再次尝试获取缓存
}
}
return value;
}
private Object loadDataFromDB() {
// 模拟从数据库加载数据
return new Object(); // 返回数据库中的数据
}
}
1.3.4. 缓存降级
当 Redis 不可用时,系统可以通过降级策略,直接访问数据库或者返回一些默认值。我们可以通过 try-catch
捕获 Redis 异常,来实现降级逻辑。
@Service
public class CacheDegradeService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public Object getData(String key) {
try {
Object value = redisTemplate.opsForValue().get(key);
if (value != null) {
return value;
}
} catch (Exception e) {
// Redis 发生异常时,执行降级逻辑
System.out.println("Redis不可用,执行降级策略");
}
// Redis不可用或者缓存失效,直接从数据库获取数据
return loadDataFromDB();
}
private Object loadDataFromDB() {
// 模拟从数据库加载数据
return new Object(); // 返回数据库中的数据
}
}
1.3.5. 数据持久化与集群
Redis 提供了 RDB 和 AOF 的持久化机制来保证数据不会因为 Redis 崩溃而丢失。同时,通过 Redis 的集群模式,我们可以将数据分布在多个节点上,提升系统的可靠性和可用性。
# 开启 AOF 持久化
appendonly yes
# 每秒同步一次 AOF 文件
appendfsync everysec
# Redis Cluster 配置,启动多个节点,配置集群
cluster-enabled yes
cluster-config-file nodes-6379.conf
cluster-node-timeout 5000
1.4总结
- Redis 雪崩 是在缓存失效后,大量请求直接打到数据库,导致数据库压力骤增甚至崩溃的问题。在高并发场景下,Redis 雪崩可能会带来严重后果。
- 为了避免 Redis 雪崩,可以采取多种措施,如 缓存预热、设置不同过期时间、使用互斥锁防止缓存击穿、缓存降级、限流与熔断机制 等。
- 持久化与集群 是提升 Redis 可用性的关键,确保即便在单个节点失效的情况下,服务依然能够正常工作。
2.redis穿透
2.1、什么是redis穿透
缓存穿透:缓存和数据库中都没有的数据,可用户还是源源不断的发起请求,导致每次请求都会到数据库,从而压垮数据库。
2.2、解决方案
①、业务层校验
用户发过来的请求,根据请求参数进行校验,对于明显错误的参数,直接拦截返回。
比如,请求参数为主键自增id,那么对于请求小于0的id参数,明显不符合,可以直接返回错误请求。
②、不存在数据设置短过期时间
对于某个查询为空的数据,可以将这个空结果进行Redis缓存,但是设置很短的过期时间,比如30s,可以根据实际业务设定。注意一定不要影响正常业务。
③、布隆过滤器
关于布隆过滤器,后面会详细介绍。布隆过滤器是一种数据结构,利用极小的内存,可以判断大量的数据“一定不存在或者可能存在”。
对于缓存击穿,我们可以将查询的数据条件都哈希到一个足够大的布隆过滤器中,用户发送的请求会先被布隆过滤器拦截,一定不存在的数据就直接拦截返回了,从而避免下一步对数据库的压力。
2.3、解决方案的具体实现
2.3.1、缓存空对象
/**
* 缓存空值解决缓存穿透
* @param id
* @return
*/
@Override
public Result queryById(Long id) {
String cacheShopKey = CACHE_SHOP_KEY + id;
// 1.从Redis中查询id
String shopJson = stringRedisTemplate.opsForValue().get(cacheShopKey);
if (StrUtil.isNotBlank(shopJson)) {
// 2.命中返回商铺信息
return Result.ok(JSONUtil.toBean(shopJson, Shop.class));
}else if(shopJson != null){
// 2.1.缓存为空直接返回错误信息
return Result.fail("店铺信息不存在!");
}
// 3.未命中查询数据库
Shop shop = getById(id);
// 4.商铺不在数据库
if(shop == null){
// 4.1.缓存空值
stringRedisTemplate.opsForValue().set(cacheShopKey, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 4.2.返回错误信息
return Result.fail("店铺不存在");
}
// 5.商铺在数据库
// 5.1.保存到Redis中
stringRedisTemplate.opsForValue().set(cacheShopKey, JSONUtil.toJsonStr(shop));
stringRedisTemplate.expire(cacheShopKey, CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 5.2.返回商铺信息
return Result.ok(shop);
}
2.3.2.布隆过滤器
布隆过滤器(Bloom Filter)是一种空间效率高的概率数据结构,用于测试一个元素是否属于一个集合。它可以有效地表示一个集合,并快速判断某个元素是否在集合中。布隆过滤器的核心思想是通过多个哈希函数对数据进行多次映射,从而减少存储空间,并提供快速的查询操作。
工作原理:
1.初始化: 初始化一个大小为 m 的位数组,每个位置初始值为 0。选择 k 个独立的哈希函数,每个哈希函数将输入元素映射到 [0, m-1] 范围内的一个位置。
2.添加元素:
对于要加入集合的每个元素,用 k 个哈希函数对该元素进行哈希运算,得到 k 个哈希值。
将这 k 个哈希值对应的位数组中的位置设为 1。
3.查询元素:
对于要查询的元素,同样使用 k 个哈希函数对该元素进行哈希运算,得到 kkk 个哈希值。
检查位数组中这 k 个位置,如果其中有任何一个位置为 0,则该元素一定不在集合中。
如果这 k 个位置全部为 1,则该元素可能在集合中。
布隆过滤器的优缺点:
优点:
1.空间效率高: 相比于直接存储所有元素,布隆过滤器使用的存储空间更少。
2.插入和查询速度快: 插入和查询操作的时间复杂度都是 O(k),其中 k 是哈希函数的个数。
缺点:
1.存在误判率: 布隆过滤器只能告诉你一个元素“可能在”集合中,或者“肯定不在”集合中。它会有一定的误判率,即可能会误认为一个不在集合中的元素在集合中。
2.删除操作困难: 一旦元素插入布隆过滤器后,就很难删除它们,因为无法确定某个位上的 1 是否是由多个元素共同设置的。
实现:
添加Guava依赖
在 pom.xml 中添加 Guava 依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
定义布隆过滤器
在你的服务类中定义和初始化布隆过滤器:
/**
* 布隆过滤器解决缓存穿透
* @param id
* @return
*/
private BloomFilter bloomFilter;
@PostConstruct
public void init(){
// 初始化布隆过滤器,假设最多有1000000个店铺,误判率为0.01
bloomFilter = BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8),
100000, 0.1);
// 预加载布隆过滤器
List<Shop> shopList = list();
for(Shop shop : shopList){
bloomFilter.put(CACHE_SHOP_KEY + shop.getId());
}
}
@Override
public Result queryById(Long id) {
String cacheShopKey = CACHE_SHOP_KEY + id;
// 1.布隆过滤器过滤,不存在直接返回错误信息
if(!bloomFilter.mightContain(cacheShopKey)){
return Result.fail("店铺不存在");
}
// 2.可能存在,从Redis中查询id
String shopJson = stringRedisTemplate.opsForValue().get(cacheShopKey);
if (StrUtil.isNotBlank(shopJson)) {
// 2.1命中返回商铺信息
return Result.ok(JSONUtil.toBean(shopJson, Shop.class));
}
// 3.未命中查询数据库
Shop shop = getById(id);
// 4.商铺不在数据库,返回错误信息
if(shop == null){
return Result.fail("店铺不存在");
}
// 5.商铺在数据库
// 5.1.保存到Redis中
stringRedisTemplate.opsForValue().set(cacheShopKey, JSONUtil.toJsonStr(shop));
stringRedisTemplate.expire(cacheShopKey, CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 5.2.返回商铺信息
return Result.ok(shop);
}
3.redis击穿
3.1、什么是缓存击穿
缓存击穿是部分key过期导致的严重后果。
为什么大量key过期会产生问题而少量的key也会有问题?
缓存击穿问题也叫热点Key问题,就是⼀个被高并发访问并且缓存重建业务较复杂的key突然失效了,
无数的请求访问会在瞬间给数据库带来巨大的冲击。
具体情况如下图所示:
上述:假设此时该热点key的TTL时间到(失效了),则查询缓存未命中,会继续查询数据库,并进行缓存重建工作。但是由于查询SQL逻辑比较复杂、重建缓存的时间较久,并且该key又是热点key,短时间内有大量的线程对其进行访问,所以请求会直接 “打到” 数据库中,数据库就有可能崩掉!
3.2、缓存击穿解决方案(2种)
(1)互斥锁
简单的来说:
并不是所有的线程都有 “ 资格 ” 去访问数据库,只有持有锁的线程才可以对其进行操作。
不过该操作有一个很明显的问题,就是会出现相互等待的情况。
2)逻辑过期
不设置TTL
之前所说导致缓存击穿的原因就是该key的TTL到期了,所以我们在这就不设置TTL了,
而是使用一个字段,例如:expire表示过期时间(逻辑上的)。当我们想让它 “ 过期 ” 的时候,
我们可以直接手动将其删除(热点key,即只是在一段时间内,其被访问的频次很高)。
这种方案巧妙在于,异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据。
3. 互斥锁与逻辑过期的对比分析
4、利用互斥锁解决缓存击穿问题
核心思路:
相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 进行查询之后,
如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有得到,
则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询
如果获取到了锁的线程,再去进行查询,查询后将数据写入 redis,再释放锁,返回数据,
利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿。
代码实现
(1)首先,我们声明一下获取锁、释放锁的方法,tryLock()、unLock()
/**
* 获取锁
* @param key
* @return
*/
private boolean tryLock(String key) {
// setnx 就是 setIfAbsent 如果存在
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.MINUTES);
// 装箱是将值类型装换成引用类型的过程;拆箱就是将引用类型转换成值类型的过程
// 不要直接返回flag,可能为null
return BooleanUtil.isTrue(flag);
}
/**
* 释放锁
* @param key
*/
private void unLock(String key) {
stringRedisTemplate.delete(key);
}
注意:这里的锁不是真正的线程锁,而是redis里面的一个特殊的key。
(2)互斥锁解决缓存击穿 queryWithMutex()
/**
* 互斥锁解决缓存击穿 queryWithMutex()
* @param id
* @return
*/
public Shop queryWithMutex(Long id) {
// 1.从redis查询商铺缓存
String key = CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
return JSONUtil.toBean(shopJson, Shop.class);
}
// 判断空值
if (shopJson != null) {
// 返回一个错误信息
return null;
}
String lockKey = "lock:shop:" + id;
Shop shop = null;
try {
// 4.实现缓存重建
// 4.1获取互斥锁
boolean isLock = tryLock(lockKey);
// 4.2判断是否成功
if (!isLock) {
// 4.3失败,则休眠并重试
Thread.sleep(50);
// 递归
return queryWithMutex(id);
}
// 4.4成功,根据id查询数据库
shop = getById(id);
// 模拟延迟
Thread.sleep(200);
// 5.不存在,返回错误
if (shop == null) {
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
// 6.存在,写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL,TimeUnit.MINUTES);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
} finally {
// 7.释放锁
unLock(lockKey);
}
// 8.返回
return shop;
}
5、利用逻辑过期解决缓存击穿问题
需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题
注意:这里的key是否过期,不是由redis控制的,而是由我们自己去手动编写逻辑去控制的。
代码实现
(1)添加逻辑过期时间的字段
之前的Shop中是没有逻辑过期的字段,要如何让它带有这个属性,又不修改之前的代码呢?
新建一个RedisData对象,里面的data指的是Shop对象,而expireTime是逻辑过期时间。
即:我们可以使用 JSONUtil.toBean 将Shop对象通过序列化、反序列化到RedisData类的data属性中。
@Data
public class RedisData {
// LocalDateTime : 同时含有年月日时分秒的日期对象
// 并且LocalDateTime是线程安全的!
private LocalDateTime expireTime;
private Object data;
}
(2)逻辑过期解决缓存击穿问题 queryWithLogicalExpire()
缓存重建
/**
* 重建缓存,先缓存预热一下,否则queryWithLogicalExpire() 的expire为null
* @param id
* @param expireSeconds
*/
public void saveShopRedis(Long id, Long expireSeconds) {
// 1.查询店铺数据
Shop shop = getById(id);
// 2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds)); // 过期时间
// 3.写入redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/**
* 逻辑过期解决缓存击穿问题 queryWithLogicalExpire()
* 测试前要先缓存预热一下!不然 data 与 expireTime 的缓存值是null!
* @param id
* @return
*/
public Shop queryWithLogicalExpire(Long id) {
// 1.从redis查询商铺缓存
String key = CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isBlank(shopJson)) {
return null;
}
// 4.命中,需要将json反序列化为对象
// redisData没有数据
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if (expireTime.isAfter(LocalDateTime.now())) {
// 5.1未过期,直接返回店铺信息
return shop;
}
// 5.2已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean islock = tryLock(lockKey);
// 6.2.判断是否获取互斥锁成功
if (islock) {
// 6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit( () -> {
try {
// 重建缓存,过期时间为20L
saveShopRedis(id,20L);
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
unLock(lockKey);
}
});
}
// 6.4.返回过期店铺信息
return shop;
}
可以看到在测试的时候,name的值为:“100XXXX”
修改一下数据库,将值改为:“900XXXX”,看看并发情况下缓存重建能否正确!
通过Jmeter做压力测试
再查看Redis中的数据,可以看到name的值已经被修改了,而且上面的jmeter的每一个http都是正常的!