redis 过期消息订阅实现(java实现)
一、开启redis消息通知功能
方法1: 修改conf文件
编辑/etc/redis/redis.conf文件,添加或启用以下内容(key过期通知):
notify-keyspace-events Ex
方法2: 使用命令
- 登陆redis-cli
- 输入下列命令
config set notify-keyspace-events Ex
关键字介绍:
上面Ex就是其中的关键字之一
- K:keyspace事件,事件以__keyspace@__为前缀进行发布
- E:keyevent事件,事件以__keyevent@__为前缀进行发布
- g:一般性的,非特定类型的命令,比如del,expire,rename等
- $:字符串特定命令
- l:列表特定命令
- s:集合特定命令
- h:哈希特定命令
- z:有序集合特定命令
- x:过期事件,当某个键过期并删除时会产生该事件
- e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件
- A:g$lshzxe的别名,因此AKE意味着所有事件
订阅者介绍
- onMessage: 收到消息回调
- onSubscribe: 订阅频道(channel)
- onUnsubscribe: 取消订阅频道(channel)
- onPMessage: 收到消息回调-p模式
- onPSubscribe: 订阅频道(channel)p模式
- onPUnsubscribe: 取消订阅频道(channel)p模式
带P的就是可以在订阅的时候支持表达式, 一次性订阅多个频道,
例如:
__keyevent@*__:expired ``` 其中的*标识订阅所有db的key过期事件
二、在pom文件中引入需要的redis依赖
<!--添加redis依赖-->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.8.4.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
三、编写基本配置文件 redis.properties
redis.hostName=192.168.6.138
redis.port=6379
redis.password=123321
# 连接超时时间
redis.timeout=10000
#最大空闲数
redis.maxIdle=300
#控制一个pool可分配多少个jedis实例,用来替换上面的redis.maxActive,如果是jedis 2.4以后用该属性
redis.maxTotal=1000
#最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制。
redis.maxWaitMillis=1000
#连接的最小空闲时间 默认1800000毫秒(30分钟)
redis.minEvictableIdleTimeMillis=300000
#每次释放连接的最大数目,默认3
redis.numTestsPerEvictionRun=1024
#逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
redis.timeBetweenEvictionRunsMillis=30000
#是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个
redis.testOnBorrow=true
#在空闲时检查有效性, 默认false
redis.testWhileIdle=true
四、编写配置类 RedisConfig
package com.lanqiaobei.ssm.yjk.config;
import com.lanqiaobei.ssm.yjk.util.RedisKeyExpirationListener;
//import com.liuyanzhao.ssm.blog.util.RedisLockUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.jcache.config.JCacheConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.nio.charset.StandardCharsets;
@Configuration
@PropertySource("classpath:redis.properties")
public class RedisConfig extends JCacheConfigurerSupport {
@Autowired
private Environment environment;
@Bean
public RedisConnectionFactory redisConnectionFactory() {
JedisConnectionFactory fac = new JedisConnectionFactory();
fac.setHostName(environment.getProperty("redis.hostName"));
fac.setPort(Integer.parseInt(environment.getProperty("redis.port")));
fac.setPassword(environment.getProperty("redis.password"));
fac.setTimeout(Integer.parseInt(environment.getProperty("redis.timeout")));
fac.getPoolConfig().setMaxIdle(Integer.parseInt(environment.getProperty("redis.maxIdle")));
fac.getPoolConfig().setMaxTotal(Integer.parseInt(environment.getProperty("redis.maxTotal")));
fac.getPoolConfig().setMaxWaitMillis(Integer.parseInt(environment.getProperty("redis.maxWaitMillis")));
fac.getPoolConfig().setMinEvictableIdleTimeMillis(
Integer.parseInt(environment.getProperty("redis.minEvictableIdleTimeMillis")));
fac.getPoolConfig()
.setNumTestsPerEvictionRun(Integer.parseInt(environment.getProperty("redis.numTestsPerEvictionRun")));
fac.getPoolConfig().setTimeBetweenEvictionRunsMillis(
Integer.parseInt(environment.getProperty("redis.timeBetweenEvictionRunsMillis")));
fac.getPoolConfig().setTestOnBorrow(Boolean.parseBoolean(environment.getProperty("redis.testOnBorrow")));
fac.getPoolConfig().setTestWhileIdle(Boolean.parseBoolean(environment.getProperty("redis.testWhileIdle")));
return fac;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){
// 创建RedisTemplate对象
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 设置连接工厂
template.setConnectionFactory(connectionFactory);
// 创建JSON序列化工具
GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
// 设置Key的序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(StandardCharsets.UTF_8);
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
// 设置Value的序列化
template.setValueSerializer(jsonRedisSerializer);
template.setHashValueSerializer(jsonRedisSerializer);
template.setDefaultSerializer(stringRedisSerializer);
// 返回
return template;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory redisConnectionFactory,
RedisKeyExpirationListener redisKeyExpirationListener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
// 监听 __keyevent@0__:expired 频道,这里的0指数据库编号为 0;
container.addMessageListener(redisKeyExpirationListener,
new PatternTopic("__keyevent@0__:expired"));
return container;
}
@Bean
public RedisKeyExpirationListener redisKeyExpirationListener() {
return new RedisKeyExpirationListener();
}
// 其他 Bean 定义
}
五、实现监听类 RedisKeyExpirationListener
package com.lanqiaobei.ssm.yjk.util;
import cn.hutool.core.util.StrUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static cn.hutool.core.util.IdUtil.randomUUID;
@Component
public class RedisKeyExpirationListener implements MessageListener {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@Autowired
private PublisherMQ publisherMQ;
//分布式锁过期时间 s 可以根据业务自己调节
private static final Long LOCK_REDIS_TIMEOUT = 2000L;
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取过期的 Key,需要利用byte[]录入和接收,不然会出现中文乱码
byte[] body = message.getBody();
String allKey = redisTemplate.getStringSerializer().deserialize(body);
String expiredKey = StrUtil.removePrefix(allKey, "todo:");//hutool工具里面去掉首位字符
// System.out.println(expiredKey);
// 处理相应的业务逻辑
//————————————————————————————————————————————————————————
String key = "todolock:"+expiredKey;
String value = randomUUID();
//redis尝试获取锁,加锁
Boolean getLock = getLock(key,value);
if(getLock){
publisherMQ.sendMessage("lqb.direct","queueLQBKey",expiredKey);
releaseLock(key,value);
}
}
/**
* 加锁
**/
public Boolean getLock(String key,String value){
Boolean lockStatus = redisTemplate.opsForValue().setIfAbsent(key,value);
if (lockStatus) {
System.out.println("Set key-value successfully!");
redisTemplate.expire(key, LOCK_REDIS_TIMEOUT, TimeUnit.MILLISECONDS);//毫秒级
} else {
System.out.println("Key already exists!");
}
return lockStatus;
}
/**
* 释放锁
**/
public Long releaseLock(String key,String value){
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
RedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript,Long.class);
Long releaseStatus = (Long)this.redisTemplate.execute(redisScript, Collections.singletonList(key),value);
return releaseStatus;
}
}
监听redis过期消息提醒,同一个数据(键)过期会有多次通知提醒。原因是:可能是由于 Redis 的主从复制或者分片集群等机制导致的。在主从复制或者分片集群中,可能会发生多个节点同时订阅了相同的键空间通知,从而导致同一个键空间事件被多次触发。
我的解决方法是:给键过期后提醒的回调函数加锁,收到多个通知提醒,回调函数加锁后最终只会有一个执行,其他没有获得锁的回调不会执行,这样就避免了重复执行任务代码。
这里的实现方法在另一个文章:
https://blog.csdn.net/m0_46652188/article/details/130394484