当前位置: 首页 > article >正文

Java通过Redis进行延时队列,定时发布消息(根据用户选择时间进行发布)

前言

目前很多产品都用到过定时发布或者定时推送等功能,定时推送有两种定义,一种是后台自己有相关规则,通过定时器设置好相应的时间进行推送(例如定时任务框架QuartZ、xxl-job等实现,或者通过springboot自带定时任务@Scheduled注解等实现),这些都是基于后台设定的规则来进行定时推送。
还有一种场景便是根据用户自己选择想要的时间进行推送,这时候再用到上面的方法来做会比较麻烦和复杂,就需要用到延时队列来实现

实现方式

在做这个功能之前,我在网上查阅了想要实现这种根据用户选择时间来推送的相关资料,发现方式还是挺多的,包括但不限于以下几种:

  1. 最简单暴力的方法,通过上述的定时任务框架或者springboot自带的定时器来实现,把cron表达式书写为每分钟一次,然后每分钟都去检查是否和用户设置的时间能匹配上,如能匹配上就进行相关的业务操作
  2. 通过实现springboot自带的SchedulingConfigurer接口来进行动态任务调用
  3. 通过DelayQueue队列进行实现
  4. 通过MQ中间件的发送消费来实现
  5. 通过Redis设置key过期时间触发来进行实现

    上面的几种实现第1、2点比较简单,也很有效果,但是容易出现效率问题和准确性的问题,下面45点的比较不错,但是相比较起来学习成本会高一些,具体实现的思路差不太多,这些都有相关的资料,通过上面的关键字搜索便能查阅到

功能实现

看了那么多的方案之后再结合自身的项目,最终决定用一种新的方案来实现,通过Redis自带的DelayedQueue延时队列来完成,和上面的第45点其实思路差不太多,只不过这个更简单方便一点

定义一个实体类来进行配置

@Data
public class TaskBodyDto implements Serializable {

		/**
     * 重试最大次数
     */
    public static final int MAX_RETRY = 3;

    private String idKey;

    private String beanName;

    private String methodName;

    private Map<String, Object> paramMap;

    /**
     * 重试计时器
     */
    private int cnt;
    /**
     * 延迟的时间
     */
    private long delay;
    /**
     * 延迟的时间单位
     */
    private TimeUnit timeUnit;


}

定义RedisDelayedQueueListener接口

public interface RedisDelayedQueueListener<T> {

    void invoke(T t);

}

然后再配置好监听器,在监听器里面通过反射获取到相关的方法然后执行里面的业务

@Slf4j
@Component
public class TaskListener implements RedisDelayedQueueListener<TaskBodyDto> {

    private static final List<Class> WRAP_CLASS = Arrays.asList(Integer.class, Boolean.class, Double.class, Byte.class, Short.class, Long.class, Float.class, Double.class, BigDecimal.class, String.class);

		//队列Queue
    @Autowired
    private RedissonDelayQueue redissonDelayQueue;
    @Autowired
    private TaskSender taskSender;

    @Override
    public void invoke(TaskBodyDto reqVo) {
        log.info("开始执行监听...{}", reqVo);
        reqVo.setCnt(reqVo.getCnt() + 1);
        try {
            Object bean = ApplicationContextUtil.getBean(reqVo.getBeanName());
            Method method = ReflectUtil.getMethodByName(bean.getClass(), reqVo.getMethodName());
            Class target = AopUtils.getTargetClass(bean);
            Method targetMethod = ReflectUtil.getMethodByName(target, reqVo.getMethodName());
            List<Object> objects = getMethodParamList(targetMethod, reqVo.getParamMap());
            method.invoke(bean, objects.toArray());
        } catch (Exception e) {
            log.error("invoke task err!", e);
            if (reqVo.getCnt() > TaskBodyDto.MAX_RETRY) {
                log.error("重试次数超过最大次数,不再重试。");
                DeadQueDto deadQueDto = new DeadQueDto();
                deadQueDto.setBeanName(reqVo.getBeanName());
                deadQueDto.setMethodName(reqVo.getMethodName());
                deadQueDto.setParamMap(reqVo.getParamMap());
                taskSender.sendTask(deadQueDto);
            } else {
                //重试,30分钟后重试,秒为单位则用原数据
                if (reqVo.getTimeUnit().name().equals(TimeUnit.DAYS.name()) || reqVo.getTimeUnit().name().equals(TimeUnit.HOURS.name())) {
                    reqVo.setDelay(30);
                    reqVo.setTimeUnit(TimeUnit.MINUTES);
                    redissonDelayQueue.add(reqVo);
                } else if (reqVo.getTimeUnit().name().equals(TimeUnit.MINUTES.name()) && reqVo.getDelay() > 30) {
                    reqVo.setDelay(30);
                    reqVo.setTimeUnit(TimeUnit.MINUTES);
                    redissonDelayQueue.add(reqVo);
                } else {
                    redissonDelayQueue.add(reqVo);
                }
            }
        }
    }

    private List<Object> getMethodParamList(Method method, Map<String, Object> paramMap) throws Exception {
        List<Object> objectList = new ArrayList<>();

        // 利用Spring提供的类获取方法形参名
        DefaultParameterNameDiscoverer nameDiscoverer = new DefaultParameterNameDiscoverer();
        String[] param = nameDiscoverer.getParameterNames(method);

        for (int i = 0; i < method.getParameterTypes().length; i++) {
            Class<?> parameterType = method.getParameterTypes()[i];

            Object object = null;
            // 基本类型不支持,支持包装类
            String paramKey = param[i];
            if (WRAP_CLASS.contains(parameterType)) {
                if (param != null && paramMap.containsKey(paramKey)) {
                    object = paramMap.get(paramKey);
                    object = ConvertUtils.convert(object, parameterType);
                }
            } else if (!parameterType.isPrimitive()) {
                if (parameterType.isAssignableFrom(List.class) || parameterType.isAssignableFrom(Map.class) || parameterType.isAssignableFrom(Set.class)) {
                    object = paramMap.get(paramKey);
                } else {
                    object = parameterType.newInstance();
                    BeanUtils.populate(object, paramMap);
                }
            }
            objectList.add(object);
        }
        return objectList;
    }

}

加入环境上下文,以便在项目启动的时候开始监听并执行对redis队列的消费,然后开启一个新的线程去实现相关的业务

@Slf4j
@Component
public class QueueContextAware implements ApplicationContextAware {

    @Autowired
    private RedissonClient redissonClient;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Thread thread = new Thread(() -> {
            Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
            for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
                String listenerName = taskEventListenerEntry.getValue().getClass().getName();
                startThread(listenerName, taskEventListenerEntry.getValue());
            }
        });
        thread.setName("redis-delayqueue");
        thread.start();
    }

    /**
     * 启动线程获取队列*
     *
     * @param queueName                 queueName
     * @param redisDelayedQueueListener 任务监听回调
     * @param <T>                       泛型
     * @return
     */
    private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        Thread thread = new Thread(() -> {
            log.info("启动监听队列" + queueName);
            while (true) {
                try {
                    T t = blockingFairQueue.take();
                    log.info("监听队列{},获取到值:{}", queueName, t);
                    new Thread(() -> {
                        redisDelayedQueueListener.invoke(t);
                    }).start();
                } catch (Exception e) {
                    log.info("监听队列错误,", e);
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException ex) {
                    }
                }
            }
        });
        thread.setName(queueName);
        thread.start();
    }

}

定义RedissonDelayQueue类

@Slf4j
@Component
public class RedissonDelayQueueDemo {

    @Resource
    private RedissonClient redissonClient;

    private RBlockingQueue rBlockingQueue;
    private RDelayedQueue rDelayedQueue;

    @PostConstruct
    private void init() {
        rBlockingQueue = redissonClient.getBlockingQueue(TaskListener.class.getName());
        rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
    }

    public void add(TaskBodyDto reqVo) {
        rDelayedQueue.offer(reqVo, reqVo.getDelay(), reqVo.getTimeUnit());
        log.info("增加了延时队列{}", reqVo);
    }

    /**
     * 增加订单延时队列 -单位为秒
     *
     * @param id         id,传入一个唯一标识,可以是业务ID
     * @param beanName   类名
     * @param methodName 方法名
     * @param paramMap   参数
     * @param delay      延迟时间
     */
    public void add(String id, String beanName, String methodName, Map<String, Object> paramMap, long delay) {
        TaskBodyDto reqVo= new TaskBodyDto();
        String idKey = beanName + ":" + methodName + ":" + id;
        log.info("增加了延时队列" + idKey);
        reqVo.setIdKey(idKey);
        reqVo.setBeanName(beanName);
        reqVo.setMethodName(methodName);
        reqVo.setParamMap(paramMap);
        reqVo.setTimeUnit(TimeUnit.SECONDS);
        reqVo.setDelay(delay);
        this.add(reqVo);
    }

    /**
     * 删除延时队列
     *
     * @param id         id,传入一个唯一标识,可以是业务ID
     * @param beanName   类名
     * @param methodName 方法名
     */
    public void remove(String id, String beanName, String methodName) {
        String idKey = beanName + ":" + methodName + ":" + id;
        log.info("删除了延时队列:" + idKey);
        RDelayedQueue<TaskBodyDto> delayedQueue = rDelayedQueue;
        Stream<TaskBodyDto> stream = delayedQueue.stream().filter(s -> idKey.equals(s.getIdKey()));
        List<TaskBodyDto> c = stream.collect(Collectors.toList());
        if (!c.isEmpty()) {
//            log.info("删除延时队列{}", c);
            delayedQueue.remove(c.get(0));
        }
    }


}

然后写一个工具类方便调用

@Component
public class RedissionDelayQueueUtils {

    @Autowired
    private RedissonDelayQueue redissonDelayQueue;

    @Autowired
    private static RedissonDelayQueue staticRedissonDelayQueue;

    @PostConstruct
    public void init() {
        staticRedissonDelayQueue = redissonDelayQueue;
    }

    /**
     * 添加定时任务
     * @param id 唯一标识,可以是业务ID
     * @param paramMap 参数 key-value
     * @param beanName bean类名称 注意类名需要小写
     * @param methodName 方法名
     * @param seconds 延迟时间 单位为秒
     */
    public static void addDelayQueue(String id, Map<String, Object> paramMap, String beanName, String methodName, Integer seconds) {
        staticRedissonDelayQueue.add(id, beanName, methodName, paramMap, seconds);
    }

    /**
     * 删除定时任务
     * @param id 唯一标识,可以是业务ID
     * @param beanName bean类名称 注意类名需要小写
     * @param methodName 方法名
     */
    public static void removeDelayQueue(String id, String beanName, String methodName) {
        staticRedissonDelayQueue.remove(id, beanName, methodName);
    }

}

都配置好之后,可以写个方法进行测试
比如我要三十分钟之后执行test方法

public class Test {

    private void test(String name,String value){
        //执行业务代码
    }

}

然后在需要执行这个功能的地方进行调用,比如用户在界面选择了发布时间之后,后端接口收到请求进行处理

						//延时队列
            Map<String, Object> map = new HashMap<>();
            map.put("name","张三");
            map.put("value","这是value");
            RedissionDelayQueueUtils.addDelayQueue("唯一标识",map,"test","test", (int) DateUtil.between("用户选择的时间",new Date(), DateUnit.SECOND));
            

注意事项:这上面的map便是被执行的方法需要的一些参数,切记不能直接传入Object类,只能通过基本数据类型进行传递,传入的bean类名也需要小写,DateUtil.between()这个方法是用的hutool工具类里面的日期工具类,为了算出用户选择的时间和当前时间相差多少秒,可自行更改为适合自己的方法,反正最后只需要取到两者时间差多少秒即可

后续redis的配置那些照常配置即可

总结

总结下来其实思路还是比较明确,就是通过redis延时队列的机制,这边配置好相关的参数然后加入到redis里面去,配置好监听器之后由redis进行监听触发,然后再通过反射的方式取到需要执行的bean和方法进行执行即可,其实延时队列的方法很多,我上面还推荐了一些其他的方法,通过给出的关键字即可查阅相关的资料,总之根据自身的情况选择最适合的方法就行

最后不管采取哪种方式,建议在触发以及执行的地方及时把日志打印出来,方便后期调试以及对问题的定位


http://www.kler.cn/a/155386.html

相关文章:

  • 65,【5】buuctf web [SUCTF 2019]Upload Labs 2
  • 卸载和安装Git小乌龟、git基本命令
  • C++经典例题
  • 大模型GUI系列论文阅读 DAY2续:《一个具备规划、长上下文理解和程序合成能力的真实世界Web代理》
  • HTML知识点复习
  • 3 前端(中):JavaScript
  • python爬虫抓取网页图片教程
  • Spring事务管理介绍
  • yolo.txt格式与voc格式互转,超详细易上手
  • Centos图形化界面封装OpenStack Ubuntu镜像
  • Electron+Ts+Vue+Vite桌面应用系列:TypeScript常用时间处理工具
  • Python ctypes:揭秘高级Python与底层交互秘籍
  • JavaScript编程基础 – For循环
  • ChatGPT等大语言模型为什么没有智能
  • JavaWeb | 表单开发
  • 智能优化算法应用:基于原子搜索算法无线传感器网络(WSN)覆盖优化 - 附代码
  • 二叉树在线OJ
  • python-迭代器与生成器
  • 强化学习(一)——基本概念及DQN
  • matlab科学计算
  • 如何使用注解实现接口的幂等性校验
  • Linux下activemq的安装与安装成功确认
  • 面试题:千万量级数据中查询 10W 量级的数据有什么方案?
  • Java架构师技术为业务赋能
  • 【DPDK】Trace Library
  • 【目标检测实验系列】YOLOv5创新点改进实验:通过转置卷积,动态学习参数,减少上采用过程特征丢失,提高模型对目标的检测精度!(超详细改进代码流程)