单机环境下Caffeine和Redis两级缓存的实现与问题解决
1. Caffeine和Redis介绍
Caffeine 是一个高性能、基于 Java 的缓存库。它被设计用来作为 Java 应用程序的本地缓存解决方案,提供了比传统缓存更高效的功能和性能。Caffeine 可以缓存任意类型的数据,并具有丰富的配置选项,以满足不同应用的缓存需求。
Caffeine 通过使用多种缓存策略(如基于大小、时间、引用等),支持自动过期、最大容量限制、以及基于异步加载的缓存操作。它的设计目标是高效、低延迟,并能在多线程环境中保持性能。
Redis 是一个开源的、基于内存的数据结构存储系统,支持多种数据结构,如字符串(string)、哈希(hash)、列表(list)、集合(set)、有序集合(sorted set)等。它不仅可以用作数据库,还可以作为缓存和消息中间件。由于其速度非常快(大多数操作在内存中进行),Redis 被广泛应用于需要快速读取和写入的场景,例如缓存、会话管理、实时数据分析、消息队列等。
在本文章中,我们就详细介绍一下,如何使用Caffeine和Redis搭建一个本地+远程的二级缓存结构并且让它们的交互变得更加丝滑。
2. 搭建过程
2.1 SpringBoot 项目引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.1.8</version>
</dependency>
2.2 注册Caffeine中cache对象为Bean对象
@Bean
public Cache<String, Object> localCache() {
return Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.initialCapacity(100)
.maximumSize(1000)
.build();
}
后续使用只需要注入这个类型为Cache的bean对象就可以调用其中的增删查改方法.
基本使用:
cache.put("key1","value1");//存入key-value键值对
cache.getIfPresent("key1");//根据key删除键值对
cache.invalidate("key1");//根据key删除键值对
2.3 注入操作Redis客户端的对象stringRedisTemplate
@Autowired
private StringRedisTemplate stringRedisTemplate;
基本使用:
stringRedisTemplate.opsForValue().set("key1", "value1");//存入key-value键值对
stringRedisTemplate.opsForValue().get("key1");//根据key获取键值对
stringRedisTemplate.opsForValue().del("key1");//根据key删除键值对
2.4 缓存工具类CacheUtil结合使用
我们给出get方法作为示例:
@Configuration
public class CacheUtil {
@Autowired
private Cache cache;
@Autowired
private StringRedisTemplate stringRedisTemplate;
public boolean set(String key, String value) {
String localCache = cache.getIfPresent("key1";//Caffeine本地缓存获取key-value键值对
String remoteCache = stringRedisTemplate.opsForValue().get("key1");//Redis远程缓存获取key-value键值对
return true;
} catch (Exception e) {
log.error("CacheUtil error, set({}, {})", key, value, e, tryConnectTime.isDegrade());
return false;
}
}
}
3. 进阶问题
3.1 如何让两者产生关联?
我们上述的例子实际上仅仅是把这两个缓存的get相关代码"放到了一起",实际上并没有让这两者产生任何关联性.思考一下,在Redis中查询不到数据的时候,我们是不是会查询数据库,再把对应数据放入Redis中.那么,我们实际上可以使用"桥接模式",仍然让Caffeine中的缓存数据从Redis中获取.
注册bean对象----本地cache和redis客户端
/**
* 配置Caffeine本地缓存
* @return
*/
@Bean
public Cache<String, Object> localCache() {
return Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.initialCapacity(100)
.maximumSize(1000)
.build();
}
/**
* 获取redis本地客户端
* @param lettuceConnectionFactory
* @return
*/
@Bean
public RedisClient redisClient(@Autowired LettuceConnectionFactory lettuceConnectionFactory) {
return (RedisClient) lettuceConnectionFactory.getNativeClient();
}
其中redis客户端的获取使用了"工厂模式",直接使用注入的工厂即可.
注册缓存上下文(cacheFrontendContext)为bean对象----结合本地cache和redis客户端对象
/**
* 缓存上下文
* @param redisClient
* @param cache
* @return
*/
@Bean
public CacheFrontendContext cacheFrontendContext(@Autowired RedisClient redisClient, @Autowired Cache cache) {
return new CacheFrontendContext(redisClient, cache);
}
这个对象可以用来帮助我们设置Caffeine跟踪Redis中的数据变化.
public class CacheFrontendContext {
private static final Logger log = LoggerFactory.getLogger(CacheFrontendContext.class);
@Getter
private CacheFrontend cacheFrontend;
private RedisClient redisClient;
@Getter
private Cache cache;
StatefulRedisConnection<String, String> connection;
public CacheFrontendContext(RedisClient redisClient, Cache cache) {
this.redisClient = redisClient;
this.cache = cache;
}
}
核心追踪方法
connection.addListener(message -> {
List<Object> content = message.getContent(StringCodec.UTF8::decodeKey);
log.info("type:{}, content:{}",message.getType(), content);
if (message.getType().equalsIgnoreCase("invalidate")) {
List<String> keys = (List<String>) content.get(1);
keys.forEach(key -> {
cache.invalidate(key);
});
}
});
当Redis中数据变动后,当我们尝试从Caffeine中根据某个key值获取对应value时,会监听消息的类型,如果类型为"invalidate"(已经变动),就自动清除Caffeine中这对key-value缓存,再重新将Redis中的数据放入Caffeine.
Caffeine-Redis桥接关键
connection = redisClient.connect();
CacheFrontend<String, String> frontend = ClientSideCaching.enable(
new CaffeineCacheAccessor(cache),
connection,
TrackingArgs.Builder.enabled()
);
this.cacheFrontend = frontend;
通过这个代码片段设置对Redis消息的追踪.
CaffeineCacheAccessor----Caffeine-Redis桥接类
这个类的是十分必要的,Caffeine和Redis本身是毫不相干的两个组件,要将它们结合在一起,除了追踪以外,仍然需要告诉Caffeine:获取到Redis中的最新数据后,应该怎么处理这些数据,再存放到Caffeine中.
public class CaffeineCacheAccessor implements CacheAccessor {
private static final Logger log = LoggerFactory.getLogger(CaffeineCacheAccessor.class);
private Cache cache;
public CaffeineCacheAccessor(Cache cache) {
this.cache = cache;
}
@Override
public Object get(Object key) {
log.info("caffeine get => {}", key);
return cache.getIfPresent(key);
}
@Override
public void put(Object key, Object value) {
log.info("caffeine set => {}:{}", key, value);
cache.put(key, value);
}
@Override
public void evict(Object key) {
log.info("caffeine evict => {}", key);
cache.invalidate(key);
}
}
通过这样一系列的设置,我们就能够把get部分的代码简化:
@Autowired
private CacheFrontendContext cacheFrontendContext;
public String get(String key) {
return cacheFrontendContext.getCacheFrontend().get(key).toString();
}
在存数据时只需要将数据存入Redis中即可,在读取时优先读取Caffeine中的数据,如果不存在或者过期了,Caffeine会自动从Redis中读取数据,成功让Caffeine和Redis产生了依赖关系.
3.2 Redis挂了怎么办?(熔断降级与断开重连的实现)
经过上述的操作,Caffeine的确和Redis产生了依赖关系,但是如果Redis挂了怎么办?我们就不能再向Redis中存数据了.那么我们就需要实现熔断降级,就是解开这些组件的耦合关系.在这个案例中,就是实现Caffeine本地缓存的存取数据,不至于影响到整个系统.那么我们实际上可以构造一个check方法轮询来确保Redis的连接状态,如果连接断开了我们就尝试重新连接,如果多次重新连接失败,就利用Caffeine来存取数据.
check方法
@SneakyThrows
public void check() {
if (connection != null) {
if (connection.isOpen()) {
return;
}
}
try {
tryConnectTime.increase();
//成功降级就减少重连的频率
if (tryConnectTime.isDegrade()) Thread.sleep(5000);
//重新建立连接
connection = redisClient.connect();
CacheFrontend<String, String> frontend = ClientSideCaching.enable(
new CaffeineCacheAccessor(cache),
connection,
TrackingArgs.Builder.enabled()
);
this.cacheFrontend = frontend;
//添加监听,如果redis数据变动,caffeine获取时自动清除缓存
connection.addListener(message -> {
List<Object> content = message.getContent(StringCodec.UTF8::decodeKey);
log.info("type:{}, content:{}",message.getType(), content);
if (message.getType().equalsIgnoreCase("invalidate")) {
List<String> keys = (List<String>) content.get(1);
keys.forEach(key -> {
cache.invalidate(key);
});
}
});
log.warn("The redis client side connection had been reconnected.");
}catch (RuntimeException e) {
log.error("The redis client side connection had been disconnected, waiting reconnect...", e);
}
}
TryConnectTime----用于计重连次数的类
public class TryConnectTime {
private volatile AtomicInteger time = new AtomicInteger(0);
public void increase() {
time.incrementAndGet();
}
public void reset() {
time.set(0);
}
public boolean isDegrade() {
return time.get() > 5;//五次尝试重连失败,就熔断降级
}
}
@Bean
public TryConnectTime tryConnectTime() {
return new TryConnectTime();
}
CacheUtil工具类set方法示例:
public boolean set(String key, String value) {
try {
if (tryConnectTime.isDegrade()) {
return setByCaffeine(key, value);
}
return setByRedis(key, value);
} catch (Exception e) {
log.error("CacheUtil error, set({}, {}), isDegrade:{}", key, value, e, tryConnectTime.isDegrade());
return false;
}
}
3.3 Redis重连后的数据一致性问题
那么我们之前说Caffeine是依赖于Redis中的数据的,如果Redis重启后,在这段Redis挂掉期间的缓存数据是存放在Caffeine中的,当Redis服务又可用时会清除它自己的所有缓存数据,会不会把Caffeine中实际有用的数据当作过期的数据,从而进行覆盖呢?实际上是有可能的.
解决方法也十分简单,我们只需要记录下这段期间(熔断降级后到Redis服务可用)内对Caffeine缓存数据的变动,另外设置一个Caffeine的bean对象,把这些数据在Redis重新成功连接时,再设置回到Redis中.(因为有两个Cache类型的bean对象,需要使用@Qualifier根据名称注入,@Autowired默认是根据类型注入)
@Bean
public Cache<String, Object> waitingSyncDataCache() {
return Caffeine.newBuilder()
.expireAfterWrite(120, TimeUnit.MINUTES)
.initialCapacity(100)
.maximumSize(3000)
.build();
}
check完整方法
public class CacheFrontendContext {
private static final Logger log = LoggerFactory.getLogger(CacheFrontendContext.class);
@Getter
private CacheFrontend cacheFrontend;
private RedisClient redisClient;
@Getter
private Cache cache;
@Qualifier("waitingSyncDataCache")
private Cache waitingSyncDataCache;
@Autowired
private TryConnectTime tryConnectTime;
@Autowired
private StringRedisTemplate stringRedisTemplate;
StatefulRedisConnection<String, String> connection;
public CacheFrontendContext(RedisClient redisClient, Cache cache) {
this.redisClient = redisClient;
this.cache = cache;
}
@SneakyThrows
public void check() {
if (connection != null) {
if (connection.isOpen()) {
if (!waitingSyncDataCache.asMap().entrySet().isEmpty()) {
syncDataToRedis(waitingSyncDataCache);
}
tryConnectTime.reset();
return;
}
}
try {
tryConnectTime.increase();
if (tryConnectTime.isDegrade()) Thread.sleep(5000);
//重新建立连接
connection = redisClient.connect();
CacheFrontend<String, String> frontend = ClientSideCaching.enable(
new CaffeineCacheAccessor(cache),
connection,
TrackingArgs.Builder.enabled()
);
this.cacheFrontend = frontend;
if (!waitingSyncDataCache.asMap().entrySet().isEmpty()) {
syncDataToRedis(waitingSyncDataCache);
}
//添加监听,如果redis数据变动,caffeine自动清除缓存
connection.addListener(message -> {
List<Object> content = message.getContent(StringCodec.UTF8::decodeKey);
log.info("type:{}, content:{}",message.getType(), content);
if (message.getType().equalsIgnoreCase("invalidate")) {
List<String> keys = (List<String>) content.get(1);
keys.forEach(key -> {
cache.invalidate(key);
});
}
});
log.warn("The redis client side connection had been reconnected.");
}catch (RuntimeException e) {
log.error("The redis client side connection had been disconnected, waiting reconnect...", e);
}
}
private void syncDataToRedis(Cache waitingSyncDataCache) {
Set<Map.Entry<String, String>> entrySet = waitingSyncDataCache.asMap().entrySet();
for (Map.Entry<String, String> entry : entrySet) {
if (!stringRedisTemplate.hasKey(entry.getKey())) {
stringRedisTemplate.opsForValue().set(entry.getKey(), entry.getValue());
log.info("同步key:{},value:{}到Redis客户端",entry.getKey(), entry.getValue());
}
waitingSyncDataCache.invalidate(entry.getKey());
}
}
}