Redis篇--常见问题篇3--缓存击穿(数据查询上锁,异步操作,熔断降级,三种缓存问题综合优化策略)
1、缓存击穿
(1)、概述
缓存击穿是指某个热点数据在缓存中过期后,大量并发请求同时访问该数据,导致这些请求全部穿透到数据库,形成瞬间的高负载,给数据库和服务带来巨大的压力,甚至会崩溃。
这种情况通常发生在高并发场景下,尤其是在某些热点数据的缓存过期时。
示意图:
(2)、解决方案
1、加锁机制
当缓存失效时,可以使用分布式锁来确保只有一个线程能够去查询数据库并更新缓存,其他线程等待锁释放后再从缓存中获取数据。这样可以避免多个线程同时查询数据库。
示例如下:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class CacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private DataService dataService; // 假设这是你的数据服务,负责从数据库获取数据
private static final String LOCK_KEY = "cache:lock";
private static final long LOCK_EXPIRE_TIME = 5; // 锁的过期时间(秒)
/**
* 获取数据,使用分布式锁防止缓存击穿
* @param key 数据的唯一标识
* @return 数据
*/
public String getData(String key) {
// 1. 尝试从缓存中获取数据
String cachedData = (String) redisTemplate.opsForValue().get(key);
if (cachedData != null) {
return cachedData;
}
// 2. 尝试获取分布式锁
Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, "locked", LOCK_EXPIRETime, TimeUnit.SECONDS);
if (lockAcquired) {
try {
// 3. 锁定成功,去数据库中加载数据
String data = dataService.getDataByKey(key);
if (data != null) {
// 4. 将数据存入缓存
redisTemplate.opsForValue().set(key, data, 60, TimeUnit.SECONDS); // 设置60秒的TTL
}
return data;
} finally {
// 5. 释放锁
redisTemplate.delete(LOCK_KEY);
}
} else {
// 6. 锁定失败,等待一段时间后重试
try {
Thread.sleep(100); // 等待 100 毫秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 7. 重试获取数据
return getData(key);
}
}
}
2、异步更新缓存
当缓存失效时,后端可以返回给前端一个标识,告诉前端继续使用之前的旧缓存数据,并异步更新缓存。这样可以保证用户不会感知到缓存失效,同时减轻数据库的压力。
注意:这种方式往往需要搭配其他策略一起实现,如搭配限流或熔断机制。因为击穿往往在很大并发量下才会出现,异步操作就会导致启动大量子线程去执行任务,虽然这些子线程可以被分配到不同的CPU核心去处理,但对于我们业务而言,实际上只需要一个处理成功即可,其他的线程都是多余的,造成CPU资源浪费,所以此时结合熔断或限流机制就非常合适。
代码示例1:异步操作,不采用熔断和限流
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Service
public class CacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private DataService dataService; // 假设这是你的数据服务,负责从数据库获取数据
/**
* 获取数据,使用异步加载防止缓存击穿
* @param key 数据的唯一标识
* @return 数据
*/
public String getData(String key) {
// 1. 尝试从缓存中获取数据
String cachedData = (String) redisTemplate.opsForValue().get(key);
if (cachedData != null) {
return cachedData;
}
// 2. 异步加载新数据
CompletableFuture.supplyAsync(() -> dataService.getDataByKey(key))
.thenAccept(data -> { // data这里是返回值
if (data != null) {
// 3. 将新数据data存入缓存
redisTemplate.opsForValue().set(key, data, 60, TimeUnit.SECONDS); // 设置 60 秒的 TTL
}
});
// 4. 返回旧的缓存数据(如果有)
return cachedData;
}
}
代码示例2:异步操作,结合限流
限流可以作用在接口上,也可以作用在数据库的查询上,示例如下
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.ratelimiter.RateLimiterRegistry;
// 配置限流器,每秒最多允许 10 个异步任务
RateLimiter rateLimiter = RateLimiter.of("cacheUpdateLimiter",
RateLimiterConfig.custom()
.limitForPeriod(10) // 每秒最多 10 个请求
.limitRefreshPeriod(Duration.ofSeconds(1)) // 每秒刷新一次
.timeoutDuration(Duration.ofMillis(500)) // 超时时间为 500 毫秒
.build());
// 在异步更新缓存时使用限流器
CompletableFuture.supplyAsync(() -> {
if (rateLimiter.acquirePermission()) { // 校验是否限流
// 未达到限流限制,执行读取数据库获取数据的任务
return dataService.getDataByKey(key);
} else {
// 超过限流限制,直接返回null或其他默认值
return null;
}
}).thenAccept(data -> {
if (data != null) { // 返回数据不为空,即查询数据库有返回结果
// 更新本地缓存和 Redis
localCache.put(key, data);
redisTemplate.opsForValue().set(key, data, getRandomTtl(), TimeUnit.SECONDS);
}
});
代码示例3:异步操作,结合熔断
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
// 配置熔断器,当错误率达到 50% 时跳闸,等待 10 秒后重试
CircuitBreaker circuitBreaker = CircuitBreaker.of("cacheUpdateCircuit",
CircuitBreakerConfig.custom()
.failureRateThreshold(50) // 错误率达到 50% 时跳闸
.waitDurationInOpenState(Duration.ofSeconds(10)) // 等待 10 秒后重试
.slidingWindowSize(10) // 使用 10 个请求的滑动窗口
.build());
// 在异步更新缓存时使用熔断器
CompletableFuture.supplyAsync(() -> {
try {
return circuitBreaker.executeSupplier(() -> dataService.getDataByKey(key)); // 使用熔断器监听查询数据库的任务结果,如果执行失败次数超出熔断器配置,熔断器会主动跳闸(抛出异常),以保护当前的服务正常运行。
} catch (Exception e) {
// 熔断器跳闸,返回 null
return null;
}
}).thenAccept(data -> {
if (data != null) { // 执行成功
// 更新本地缓存和 Redis
localCache.put(key, data);
redisTemplate.opsForValue().set(key, data, getRandomTtl(), TimeUnit.SECONDS);
}
});
3、多级缓存策略
使用双缓存策略,即一个主缓存和一个影子缓存。当主缓存失效时,先从影子缓存中获取数据,影子缓存的过期时间比主缓存稍长。这样可以减少缓存失效时的冲击。如,使用本地缓缓+分布式缓存的方式,示例可以参考缓存雪崩篇的代码示例。
4、缓存不过期,定时更新
对于某些非常重要的热点数据,可以考虑将其设置为永不过期的缓存。通过定时任务或后台线程定期更新这些数据,确保缓存中的数据始终是最新的。
示例代码:
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class CacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private DataService dataService; // 假设这是你的数据服务,负责从数据库获取数据
/**
* 定时更新热点数据
*/
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void updateHotData() {
// 1. 获取热点数据的 key 列表
List<String> hotKeys = dataService.getHotKeys();
// 2. 更新每个热点数据
for (String key : hotKeys) {
String data = dataService.getDataByKey(key);
if (data != null) {
// 3. 将数据存入缓存,设置较长的 TTL
redisTemplate.opsForValue().set(key, data, 3600, TimeUnit.SECONDS); // 设置 1 小时的 TTL
}
}
}
}
2、三种缓存问题综合优化策略
为了同时应对缓存穿透、缓存雪崩和缓存击穿,我们可以结合多种技术手段,构建一个健壮的缓存系统。
(1)、综合策略概述
缓存穿透:
- 对于不存在的数据,缓存空结果(如null),并设置较短的过期时间。(推荐)
- 使用布隆过滤器快速判断数据是否存在。(不太推荐,过滤器准确度以及需要维护成本)
缓存雪崩:
- 为每个缓存项设置随机的过期时间,避免所有缓存项在同一时间点失效。(推荐)
- 实现缓存预热,在系统启动时或定期加载常用数据到缓存中。(推荐)
缓存击穿:
- 使用分布式锁确保只有一个线程去查询数据库并更新缓存,其他线程等待锁释放后再从缓存中获取数据。(推荐)
- 实现 双缓存策略,即主缓存和影子缓存,减少缓存失效时的冲击。
- 异步更新缓存,确保用户不会感知到缓存失效。(可以在锁机制的基础上,适当添加限流或熔断机制)
(2)、代码示例
本例仅以结合以上4中优化方式为例。
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@Service
public class ComprehensiveCacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// Redisson 客户端用于分布式锁
@Autowired
private RedissonClient redissonClient;
// 主缓存前缀
private static final String MAIN_CACHE_PREFIX = "main:";
// 模拟数据库查询
private Object queryFromDatabase(String key) {
// 模拟数据库查询逻辑
System.out.println("Querying from database for key: " + key);
return "data-" + key; // 返回模拟数据
}
/**
* 获取数据,综合处理缓存穿透、缓存雪崩和缓存击穿问题
**/
public Object getData(String key) {
// 1、尝试从主缓存中获取数据
Object mainCachedValue = redisTemplate.opsForValue().get(MAIN_CACHE_PREFIX + key);
if (mainCachedValue != null) {
return mainCachedValue;
}
// 2、使用分布式锁,确保只有一个线程去查询数据库,防止击穿
RLock lock = redissonClient.getLock(key);
try {
boolean isLocked = lock.tryLock(10, 100, TimeUnit.MILLISECONDS);
if (!isLocked) {
// 如果获取锁失败,直接返回 null 或者旧的缓存数据
return null;
}
// 3、再次检查缓存,防止其他线程已经更新了缓存
mainCachedValue = redisTemplate.opsForValue().get(MAIN_CACHE_PREFIX + key);
if (mainCachedValue != null) {
return mainCachedValue;
}
// 4、如果缓存中没有数据,查询数据库
Object dbValue = queryFromDatabase(key);
// 5、更新缓存,设置相对随机的过期时间,防止雪崩
int randomExpiration = ThreadLocalRandom.current().nextInt(500, 700); // 随机 500-700 秒
redisTemplate.opsForValue().set(MAIN_CACHE_PREFIX + key, dbValue, randomExpiration, TimeUnit.SECONDS);
// 6、如果数据库中没有数据,缓存空对象结果60秒,防止穿透
if (dbValue == null) {
redisTemplate.opsForValue().set(MAIN_CACHE_PREFIX + key, "NULL", 60, TimeUnit.SECONDS); // 缓存空结果60秒
return null;
}
return dbValue;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Failed to acquire lock.", e);
} finally {
// 7、释放锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
/**
* 缓存预热方法,可以通过一些项目初始化执行的方式执行,进行预热
**/
@PostConstruct
public void warmUpCache() {
// 模拟预热一些常用的数据
for (int i = 1; i <= 100; i++) {
String key = "key-" + i;
Object dbValue = queryFromDatabase(key);
int randomExpiration = ThreadLocalRandom.current().nextInt(500, 700); // 随机 500-700 秒
redisTemplate.opsForValue().set(MAIN_CACHE_PREFIX + key, dbValue, randomExpiration, TimeUnit.SECONDS);
}
System.out.println("Cache pre-warmed with 100 keys.");
}
}
附录:
熔断器介绍:
1、概述
在高并发场景下,熔断器(Circuit Breaker)的作用是保护系统免受故障服务的影响。当后端服务出现故障时,熔断器会根据配置的规则自动跳闸,阻止更多的请求继续穿透到故障服务,从而避免系统过载或崩溃。随着故障的恢复,熔断器会逐渐从"打开"状态进入"半开"状态,尝试恢复正常的请求处理。
2、熔断器的状态
熔断器有三种状态:
- 关闭状态(Closed):正常工作状态,允许所有请求通过。
- 打开状态(Open):当错误率超过阈值时,熔断器会进入"打开"状态,阻止后续请求执行,直接返回默认值或抛出异常。
- 半开状态(Half-Open):在"打开"状态等待一段时间后,熔断器会进入"半开"状态,允许少量请求通过,以检测后端服务是否恢复正常。如果这些请求成功,熔断器会回到"关闭"状态;否则,它会再次进入"打开"状态。
3、熔断器行为解释
3.1、初始状态
我们定义了一个熔断器实例circuitBreaker,并指定了熔断器的名称为“cacheUpdateCircuit”,并配置了熔断器的行为如下。此时的熔断器处于关闭状态(Closed),会放行所有的请求通过。
CircuitBreaker circuitBreaker = CircuitBreaker.of("cacheUpdateCircuit",
CircuitBreakerConfig.custom()
.failureRateThreshold(50) // 错误率达到 50% 时跳闸
.waitDurationInOpenState(Duration.ofSeconds(10)) // 等待 10 秒后重试
.slidingWindowSize(10) // 使用 10 个请求的滑动窗口
.permittedNumberOfCallsInHalfOpenState(3) // 在半开状态下允许3个请求通过
.minimumNumberOfCalls(3) // 在半开状态下至少需要3个请求才能评估
.successRateThreshold(75) // 成功率达到 75% 时才恢复
.build());
3.2、跳闸行为
定义的熔断器实例circuitBreaker用于检测dataService.getDataByKey(key)的执行结果。
因为我们上面配置的窗口大小是10个请求,这里以10个请求为例进行说明。
try {
return circuitBreaker.executeSupplier(() -> dataService.getDataByKey(key));
} catch (Exception e) {
// 熔断器跳闸,返回 null 或其他默认值
return null;
}
情况1:如果连续的10次请求中,超过50%都请求都成功执行了任务,则熔断器会继续处于关闭状态,继续进行下一个窗口周期的10次统计。(注意:窗口是滚动的)
情况2:如果连续的10次请求中,大于等于50%都请求都失败了,那么熔断器会立即进入开启状态(即发生跳闸),直接拒绝接下来10秒(如上配置为10秒)内的所有请求。
3.3、重复跳闸与恢复
当熔断器进入"打开"状态后,它会等待10秒(配置中的waitDurationInOpenState的时间),这10秒内直接拒绝所有的请求。10秒后会自动进入"半开"状态下,此时熔断器会允许少量请求通过(默认为1个,如果配置了permittedNumberOfCallsInHalfOpenState,则以配置为准),以检测后端服务是否已经恢复正常。如上的示例中,我们配置为3个。
情况1:放行了3个请求后,这3次的成功率超过75%(successRateThreshold配置),则认为服务恢复,熔断器进入关闭状态,允许所有的请求通过。
情况2:放行了3个请求后,这3次的成功率低于75%,熔断器会认为服务仍然不可用,并再次进入"打开"状态,拒绝所有请求。10秒后,再次进入半开状态,以此往复。
4、熔断器总结
-
跳闸条件:当滑动窗口内的错误率达到配置的阈值(例如 50%)时,熔断器会跳闸,进入 "打开"状态。此时,所有新的请求都会被拒绝,直接返回默认值,以防止更多的请求继续穿透到故障服务。
-
半开状态:在"打开"状态等待一段时间后,熔断器会进入"半开"状态,允许少量请求通过,以检测后端服务是否恢复正常。如果这些请求成功,熔断器会回到"关闭"状态;如果失败,熔断器会再次进入"打开"状态。
-
并发场景:在高并发场景下,熔断器能够快速响应错误率的变化,及时跳闸保护系统,并在后端服务恢复后逐步恢复正常请求处理。
熔断器可以在高并发场景下有效地保护系统免受故障服务的影响,确保系统的稳定性和可用性。