基于redis完成延迟队列
添加依赖
使用redisson完成延迟队列效果
<!-- redisson依赖 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.4</version> <!-- 请使用最新版本 -->
</dependency>
添加配置类
@Configuration
public class MyRedissonConfig {
/**
* 所有对Redisson的使用都是通过RedissonClient对象
* @return
*/
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient(){
// 创建配置 指定redis地址及节点信息
Config config = new Config();
config.useSingleServer().setAddress("redis://ip:port");
// 根据config创建出RedissonClient实例
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
简单使用
新增常量类 ShoppingConstants
public class ShoppingConstants {
/**
* 商品延迟队列
*/
public static final String REDISSON_DELAY1 = "redisson_delay1";
}
业务代码
@Autowired
private RedissonClient redissonClient;
//创建延迟队列
RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(ShoppingConstants.REDISSON_DELAY1);
//把阻塞队列(对应redis List结构)和延迟队列(对应redis ZSet结构)进行关系绑定
RDelayedQueue<String> rDelayedQueue = redissonClient.getDelayedQueue(blockingDeque);
//延迟队列工作原理:延迟队列里面消息到期后 会把消息从延迟队列中移除 并放入阻塞队列
//存入数据
rDelayedQueue.offer(order.toJsonString(),10, TimeUnit.SECONDS);
//消费队列 此代码会阻塞线程直到获取到数据才会往下执行
String take = blockingQueue.take();
进阶使用(延迟队列效果)
生产者
//创建延迟队列
RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(ShoppingConstants.REDISSON_DELAY1);
//把阻塞队列(对应redis List结构)和延迟队列(对应redis ZSet结构)进行关系绑定
RDelayedQueue<String> rDelayedQueue = redissonClient.getDelayedQueue(blockingDeque);
//延迟队列工作原理:延迟队列里面消息到期后 会把消息从延迟队列中移除 并放入阻塞队列
//存入数据
rDelayedQueue.offer(order.toJsonString(),10, TimeUnit.SECONDS);
消费者
@Configuration
public class RedissonDelayOrder {
@Autowired
private RedissonClient redissonClient;
//监听线程
private Thread listenerThread;
//是否循环
private volatile boolean running = true;
Logger logger = LoggerFactory.getLogger(RedissonDelayOrder.class);
@PostConstruct
public void listener() throws Exception {
//如果在主线程做阻塞操作,可能导致spring无法初始化其他任务
listenerThread = new Thread(() -> {
while (running) {
try {
//获取延迟队列
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(ShoppingConstants.REDISSON_DELAY1);
//消费队列
String take = blockingQueue.take();
//获取到数据
if (take != null){
logger.info("redisson的延迟队列消费者获取到的消息:{}",take);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新设置中断状态
throw new RuntimeException("订单取消监听线程中断", e);
}
}
});
listenerThread.start();
}
@PreDestroy
public void stopListener() {
running = false;
if (listenerThread != null) {
listenerThread.interrupt();
try {
listenerThread.join(5000); // 等待最多5秒让线程结束
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("等待监听线程结束时被中断", e);
}
}
}
}
测试
下单后十秒钟未支付释放库存效果