当前位置: 首页 > article >正文

redisson 延迟队列实现任务过期监听

一、需求:

任务超过一个小时以后,如果还为待执行状态,则自动转为结束状态。

二、实现:

  1. 创建延迟队列的监听任务RedisDelayedQueueListener,消费延迟队列;
  2. 创建新增延迟队列的类,用于创建延迟队列;
  3. 整体初始化,把监听任务与spring绑定,扫描各个监听延迟队列的实现类,并开启单独线程,监听任务;
  4. 创建延迟任务。

三、实现步骤:

1.引入redisson依赖,这里直接引入springboot整合好的依赖,如果引用原生的依赖,需要自己配置redissonClient Bean。

<dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson-spring-boot-starter</artifactId>
        <version>3.10.5</version>
</dependency>

2.创建延时队列监听接口,定义延时队列到期事件处理方法,消费延时队列

/**
 * redis 队列事件监听,需要实现这个方法
 * @param <T>
 */
public interface RedisDelayedQueueListener<T> {

    /**
     * 执行方法
     * @param t
     */
    void invoke(T t);
}

3.具体的延时队列消费实现

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 订单支付过期监听
 */
@Component
@Slf4j
public class OrderPayExpirationListener implements RedisDelayedQueueListener<String>{

    @Autowired
    private ITOrderService orderService;

    @Override
    public void invoke(String orderId) {
        log.info("===" + orderId + ===");
        //查询到订单,判断为未支付,修改订单状态
        TOrder order = orderService.lambdaQuery().eq(TOrder::getOrderId, orderId).one();
        if (order.getOrderStatus() == 1) { //订单未支付
            TOrder tOrder = new TOrder();
            tOrder.setOrderId(orderId);
            tOrder.setOrderStatus(0); //更新订单为取消状态
            orderService.updateById(tOrder);
        }
    }
}

4.初始化,把监听任务与spring绑定

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Map;

/**
 * redis 延时队列初始化
 */
@Component
@Slf4j
public class RedisDelayedQueueInit implements ApplicationContextAware {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 获取应用上下文并获取相应的接口实现类
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
        for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
            String listenerName = taskEventListenerEntry.getValue().getClass().getName();
            startThread(listenerName, taskEventListenerEntry.getValue());
        }
    }

    /**
     * 启动线程获取队列
     * @param queueName 队列名称
     * @param redisDelayedQueueListener 任务回调监听
     */
    private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        //由于此线程需要常驻,可以新建线程,不用交给线程池管理
        Thread thread = new Thread(() -> {
            log.info("启动监听队列线程" + queueName);
            while (true) {
                try {
                    T t = blockingFairQueue.take();
                    log.info("监听队列线程{},获取到值:{}", queueName, JSON.toJSONString(t));
                    redisDelayedQueueListener.invoke(t);
                } catch (Exception e) {
                    log.info("监听队列线程错误,", e);
                }
            }
        });
        thread.setName(queueName);
        thread.start();
    }
}

5.创建延时任务

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;

/**
 * Redis 延时队列
 */
@Component
@Slf4j
public class RedisDelayedQueue {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 添加对象进延时队列
     * @param putInData 添加数据
     * @param delay     延时时间
     * @param timeUnit  时间单位
     * @param queueName 队列名称
     * @param <T>
     */
    private <T> void addQueue(T putInData,long delay, TimeUnit timeUnit, String queueName){
        log.info("添加延迟队列,监听名称:{},时间:{},时间单位:{},内容:{}" , queueName, delay, timeUnit,putInData);
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(putInData, delay, timeUnit);
    }

    /**
     * 添加队列-秒
     *
     * @param t     DTO传输类
     * @param delay 时间数量
     * @param <T>   泛型
     */
    public <T> void addQueueSeconds(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
        addQueue(t, delay, TimeUnit.SECONDS, clazz.getName());
    }

    /**
     * 添加队列-分
     *
     * @param t     DTO传输类
     * @param delay 时间数量
     * @param <T>   泛型
     */
    public <T> void addQueueMinutes(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
        addQueue(t, delay, TimeUnit.MINUTES, clazz.getName());
    }

    /**
     * 添加队列-时
     *
     * @param t     DTO传输类
     * @param delay 时间数量
     * @param <T>   泛型
     */
    public <T> void addQueueHours(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
        addQueue(t, delay, TimeUnit.HOURS, clazz.getName());
    }
    /**
     * 添加队列-天
     *
     * @param t     DTO传输类
     * @param delay 时间数量
     * @param <T>   泛型
     */
    public <T> void addQueueDays(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
        addQueue(t, delay, TimeUnit.DAYS, clazz.getName());
    }
}

6.此时只需要再下单成功的方法里面新增以下逻辑即可

@Autowired
 private RedisDelayedQueue redisDelayedQueue;

 //将订单id放入延时队列,配置过期监听的处理类
redisDelayedQueue.addQueueHours(id,2, OrderPayExpirationListener.class);

以上参考:https://www.cnblogs.com/huaixiaonian/p/16978606.html

四、我的优化

4.1 此场景中,ApplicationContextAware存在的问题

介绍ApplicationContextAware和ApplicationRunner的区别

  • ApplicationContextAware:在Bean初始化过程中initializeBean()函数中;(项目没启动完成)
  • ApplicationRunner:在所有bean都初始化完成后调用,在AfterFinish中执行;

因此ApplicationContextAware初始化会有两个问题:

  1. 未完全启动完成就监听,可能会导致消费队列的相关类未全部加载完成,导致在启动完成前这段时间,消息消费异常;
  2. 代码里是新建线程异步消费,当有系统启动异常时,线程还在启动着,会不断打印log.info(“监听队列线程错误,”, e);

4.1.2 优化一:ApplicationRunner替代ApplicationContextAware

@Slf4j
@Component
public class RedisDelayedQueueInitRunner implements ApplicationRunner {
 .......
}
@Override
    public void run(ApplicationArguments args) {
        String listenerName = String.format("XX", redisDelayedQueueListener.getClass().getSimpleName());
        startThread(listenerName, redisDelayedQueueListener);
    }

4.2 上次关闭的时候的消息到期了,不会马上发送

上次关闭的时候的消息到期了,不会马上发送,要等新消息来,才会消费。

原因:因为是在添加消息的时候才初始化管道的:
在这里插入图片描述
解决方法:这个地方吧管道开启就可以了
在这里插入图片描述
这个是在启动的时候去执行 要在invoke 方法里面捕获,防止启动失败了。


http://www.kler.cn/a/317845.html

相关文章:

  • C# 对象和类型(结构)
  • 【HTML+CSS+JS+VUE】web前端教程-2-HTML5介绍和基础骨架
  • Agentic RAG 解释
  • 每日一题-两个链表的第一个公共结点
  • 深度学习与计算机视觉 (博士)
  • 腾讯云AI代码助手编程挑战赛-凯撒密码解码编码器
  • Hbase操作手册
  • git笔记之重置本地仓库所有分支和远程保持一致、工作区恢复干净,像刚clone下来一样
  • 阅读记录:Gradient Episodic Memory for Continual Learning
  • 十三 系统架构设计(考点篇)
  • 【python】数据类型
  • react hooks--useCallback
  • 误删系统引导如何恢复?如何创建系统引导?
  • Vue 内存泄漏分析:如何避免开发过程中导致的内存泄漏问题
  • Appium高级话题:混合应用与原生应用测试策略
  • Mysql 常用方法和函数(查询)
  • 数据结构应试-树和二叉树
  • 这个浏览器插件:提高测试效率且好用!
  • Haskell网络编程:代理服务器的高级使用技巧
  • mac安装JetBtains全家桶新版本时报错:Cannot start the IDE
  • GitLab将会持续支持FluxCD
  • Vulkan 学习(9)---- vkSuraceKHR 创建
  • Matlab simulink建模与仿真 第十七章(补充离散库和补充数学库)
  • DevOps在提升软件质量方面的作用
  • 动手学深度学习8.5. 循环神经网络的从零开始实现-笔记练习(PyTorch)
  • Linux——常用系统设置和快捷键操作指令