当前位置: 首页 > article >正文

redis实现延迟任务

定时任务:有固定周期,有明确的触发时间
延迟任务:没有固定的开始时间,由一个事件触发,在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟执行。
在这里插入图片描述

场景1:订单下单30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消。
场景2:接口对接接口出现网络问题,1分钟后重试,如果失败,2分钟后重试,直到出现阈值终止。

实现方式

  1. DelayQueue,基于JVM
  2. RabbitMQ:TTL+死信队列
  3. Redis:zset(本项目实现)

实现思路

  1. 添加任务的时候会先放入数据库中
  2. list队列存储的是立即执行的任务,如果执行时间<=当前时间,则任务进入list队列中,立即执行。
  3. zset队列中存储的是未来要执行的任务,如果执行时间<=当前时间+5min(不需要把所有未来执行的任务都放入zset中,如果执行时间>当前时间+5min,则还是放入数据库中),则任务进入zset队列,每分钟定时刷新把到期的任务再放入list队列中。
    在这里插入图片描述

延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务,内存数据库的存储是有限的,需要考虑数据持久化的问题,存储到数据库中可以保证数据安全。
任务量过大以后,zset的性能会下降
如果任务数据特别大,为了防止阻塞,所以只需要把未来几分钟要执行的数据存入缓存即可,是一种优化的形式。

数据库自身解决并发的两种策略

  1. 悲观锁:每次拿数据的时候都认为别人会修改,所以在每次拿数据的时候都会上锁。

  2. 乐观锁:每次拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据(版本号)。

乐观锁的实现步骤

  1. 在版本号上加@Version注解
@Version
private Integer version;
  1. 在mybatis-plus开启乐观锁支持
/**
 * mybatis-plus乐观锁支持
 * @return
 */
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){
    MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
    interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
    return interceptor;
}

配置redis

  1. 安装redis:
docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"
  1. 项目中集成redis
  • 导入依赖:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>
  • 添加配置:
spring:
  redis:
    host: 192.168.140.102
    password: leadnews
    port: 6379

实现

1. 拉取任务

/**
 * 拉取任务
 * @param task
 */
public long addTask(Task task) {
    // 1. 添加任务到数据库中
    boolean success = addTaskToDb(task);
    if (!success) {
        throw new RuntimeException("添加任务到数据库失败");
    }
    // 2. 添加任务到redis中
    addTaskToCache(task);
    return task.getTaskId();
}

/**
 * 添加任务到数据库中
 * @param task
 * @return
 */
private boolean addTaskToDb(Task task) {
    try {
        // 1. 保存任务表
        Taskinfo taskinfo = new Taskinfo();
        BeanUtils.copyProperties(task, taskinfo);
        taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
        taskinfoMapper.insert(taskinfo);
        // 2. 保存任务日志数据
        TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
        BeanUtils.copyProperties(taskinfo, taskinfoLogs);
        taskinfoLogs.setVersion(1);
        taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
        taskinfoLogsMapper.insert(taskinfoLogs);
        // 设置taskId
        task.setTaskId(taskinfo.getTaskId());
    } catch (BeansException e) {
        e.printStackTrace();
        return false;
    }
    return true;
}

/**
 * 添加任务到redis中
 * @param task
 */
private void addTaskToCache(Task task) {
    String key = task.getTaskType() + "_" + task.getPriority();
    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.MINUTE, 5);
    long nextScheduledTime = calendar.getTimeInMillis();
    if(task.getExecuteTime() <= System.currentTimeMillis()) {
        // 如果任务的执行时间≤当前时间,存入list中
        cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
    }else if(task.getExecuteTime() <= nextScheduledTime) {
        // 如果任务的执行时间>当前时间 && 任务的执行时间>预设时间(未来五分钟),存入zset中
        cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
    }
}
  1. 在数据库中保存一份,要保存到任务表和任务数据表中
  2. 添加任务到redis中,如果≤当前时间的,要存入list中,如果≤预设时间的,要存入zset中。

2. 取消任务

在这里插入图片描述

/**
 * 取消任务
 * @param taskId
 * @return
 */
@Override
public boolean cancelTask(long taskId) {
    // 1.删除任务,更新任务日志
    Task task = updateDb(taskId, ScheduleConstants.CANCELLED);
    if(task == null) {
        throw new RuntimeException("删除任务失败");
    }
    // 2.删除redis的数据
    removeTaskFromCache(task);
    return true;
}
/**
 * 删除缓存中的数据
 * @param task
 */
private void removeTaskFromCache(Task task) {
    try {
        String key = task.getTaskType() + "_" + task.getPriority();
        if(task.getExecuteTime() <= System.currentTimeMillis()) {
            // 如果任务的执行时间≤当前时间,从list中删除
            cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task));
        }else {
            // task任务不是在list中,就是在zset中,这里就不需要再判断一次了
            cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task));
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
/**
 * 删除任务,更新任务日志
 * @param taskId
 * @param status
 * @return
 */
private Task updateDb(long taskId, int status) {
    Task task = null;
    try {
        // 删除任务
        taskinfoMapper.deleteById(taskId);
        // 更新任务日志
        TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
        taskinfoLogs.setStatus(status);
        taskinfoLogsMapper.updateById(taskinfoLogs);
        task = new Task();
        BeanUtils.copyProperties(taskinfoLogs, task);
        task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    return task;
}
  1. 在数据库中,从任务表中删除taskId的任务,更新任务日志表中的日志状态
  2. 在缓存中,删除list或zset中的任务

3. 消费任务

从list中消费任务

在这里插入图片描述

public Task poll(int type, int priority) {
    Task task = null;
    try {
        // 从redis中拉取数据
        String key = type + "_" + priority;
        String taskJson = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
        if(taskJson != null) {
            task = JSON.parseObject(taskJson, Task.class);
            // 修改数据库信息
            updateDb(task.getTaskId(), ScheduleConstants.EXECUTED);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return task;
}

未来数据定时刷新

从zset中定时刷新任务到list中
在这里插入图片描述
获取zset中所有的key

  • keys模糊匹配:效率低
Set<String> keys = cacheService.keys("future_*");
  • scan命令:是一个基于游标的迭代器,每次被调用之后,都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新的游标作为SCAN命令的游标参数。
    在这里插入图片描述
Set<String> scan = cacheService.scan("future_*");
  1. 添加定时任务,每分钟刷新,从zset中取出符合条件的数据后放入redis管道中
    在这里插入图片描述
/**
* 未来数据定时刷新
 */
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
    log.info("refresh...");
    // 获取未来数据的keys集合
    Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
    // 按照key和分值查询符合条件的数据
    for (String futureKey : futureKeys) { // future_100_50
        // 获取当前数据的topicKey
        String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
        // 0到当前时间所有查询符合条件的数据
        Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
        if(!tasks.isEmpty()) {
            cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
        }
    }
}
  1. 在启动类上开启定时任务
@EnableScheduling // 开启定时任务

【目前存在的问题】:如果启动了两个服务,那么这两个服务会在同一时间去执行定时任务,会导致方法抢占的问题。但是我们的需求是只要时间到了,有一台服务在执行定时任务即可。
【解决办法】:分布式锁来解决方法抢占的问题

分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。
在这里插入图片描述

服务A在访问的Redis服务的时候,先加锁,此时只有A可以访问Redis服务,B访问Redis服务会失败。A释放锁后,B才可以访问。
A获取锁后,其他客户端不能操作,只能等待A释放锁以后,其他客户端才能操作。

  1. 加锁代码:
/**
 * 加锁
 *
 * @param name
 * @param expire
 * @return
 */
public String tryLock(String name, long expire) {
    name = name + "_lock";
    String token = UUID.randomUUID().toString();
    RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
    RedisConnection conn = factory.getConnection();
    try {

        //参考redis命令:
        //set key value [EX seconds] [PX milliseconds] [NX|XX]
        Boolean result = conn.set(
                name.getBytes(),
                token.getBytes(),
                Expiration.from(expire, TimeUnit.MILLISECONDS),
                RedisStringCommands.SetOption.SET_IF_ABSENT //NX
        );
        if (result != null && result)
            return token;
    } finally {
        RedisConnectionUtils.releaseConnection(conn, factory,false);
    }
    return null;
}
  1. 修改refresh()方法:
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
    String token = cacheService.tryLock("FUTRUE_TASK_SYNC", 1000 * 30);
    if(StringUtils.isNotBlank(token)) {
        // 原本的refresh逻辑...
    }
}

4. 数据库任务定时同步到redis中

/**
* 数据库任务定时同步到redis中
  */
 @PostConstruct // 微服务启动的时候执行
 @Scheduled(cron = "0 */5 * * * ?")// 每五分钟执行
 public void reloadData() {
     // 清理缓存中的数据 list、zset
     clearCache();
     // 查询符合条件的任务(小于未来五分钟的数据)
     Calendar calendar = Calendar.getInstance();
     calendar.add(Calendar.MINUTE, 5);
     long nextScheduledTime = calendar.getTimeInMillis();
     List<Taskinfo> taskinfoList = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));
     // 把任务添加到redis中
     if(taskinfoList != null && !taskinfoList.isEmpty()) {
         for (Taskinfo taskinfo : taskinfoList) {
             Task task = new Task();
             BeanUtils.copyProperties(taskinfo, task);
             task.setExecuteTime(taskinfo.getExecuteTime().getTime());
             addTaskToCache(task);
         }
     }
     log.info("数据库任务同步到redis中");
 }

http://www.kler.cn/a/532716.html

相关文章:

  • 排序算法--选择排序
  • Baklib探讨如何通过内容中台提升组织敏捷性与市场竞争力
  • 【爬虫】JS逆向解决某药的商品价格加密
  • Rust中使用ORM框架diesel报错问题
  • AJAX笔记原理篇
  • 15 刚体变换模块(rigid.rs)
  • 结构体排序 C++ 蓝桥杯
  • C++引用练习题
  • 基于springboot的电影评论网站(源码+数据库+文档)
  • PVE纵览-实现极致性能:在Proxmox VE中配置硬盘直通
  • Office / WPS 公式、Mathtype 公式输入花体字、空心字
  • 【C# 】图像资源的使用
  • 结合 vim-plug 安装并使用 Gruvbox 主题教程
  • 使用Posix共享内存区实现进程间通信
  • 二维数组 C++ 蓝桥杯
  • vue生命周期及其作用
  • 基于机器学习的布伦特原油价格的分析与研究
  • 通向AGI之路:人工通用智能的技术演进与人类未来
  • 数据库索引:秋招面试中的经典高频题目 [特殊字符](索引原理/操作/优缺点/B+树)
  • module_init宏是什么?
  • web-XSS-CTFHub
  • python学opencv|读取图像(五十六)使用cv2.GaussianBlur()函数实现图像像素高斯滤波处理
  • 线程创建与管理 - 创建线程、线程同步(C++)
  • git进阶--6---git stash
  • 一文了解边缘计算
  • 数据降维技术研究:Karhunen-Loève展开与快速傅里叶变换的理论基础及应用