redis实现延迟任务
定时任务:有固定周期,有明确的触发时间
延迟任务:没有固定的开始时间,由一个事件触发,在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟执行。
场景1:订单下单30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消。
场景2:接口对接接口出现网络问题,1分钟后重试,如果失败,2分钟后重试,直到出现阈值终止。
实现方式
- DelayQueue,基于JVM
- RabbitMQ:TTL+死信队列
- Redis:zset(本项目实现)
实现思路
- 添加任务的时候会先放入数据库中
- list队列存储的是立即执行的任务,如果
执行时间<=当前时间
,则任务进入list队列中,立即执行。 - zset队列中存储的是未来要执行的任务,如果
执行时间<=当前时间+5min
(不需要把所有未来执行的任务都放入zset中,如果执行时间>当前时间+5min
,则还是放入数据库中),则任务进入zset队列,每分钟定时刷新把到期的任务再放入list队列中。
延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务,内存数据库的存储是有限的,需要考虑数据持久化的问题,存储到数据库中可以保证数据安全。
任务量过大以后,zset的性能会下降
如果任务数据特别大,为了防止阻塞,所以只需要把未来几分钟要执行的数据存入缓存即可,是一种优化的形式。
数据库自身解决并发的两种策略
-
悲观锁:每次拿数据的时候都认为别人会修改,所以在每次拿数据的时候都会上锁。
-
乐观锁:每次拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据(版本号)。
乐观锁的实现步骤
- 在版本号上加
@Version
注解
@Version
private Integer version;
- 在mybatis-plus开启乐观锁支持
/**
* mybatis-plus乐观锁支持
* @return
*/
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
return interceptor;
}
配置redis
- 安装redis:
docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"
- 项目中集成redis
- 导入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
- 添加配置:
spring:
redis:
host: 192.168.140.102
password: leadnews
port: 6379
实现
1. 拉取任务
/**
* 拉取任务
* @param task
*/
public long addTask(Task task) {
// 1. 添加任务到数据库中
boolean success = addTaskToDb(task);
if (!success) {
throw new RuntimeException("添加任务到数据库失败");
}
// 2. 添加任务到redis中
addTaskToCache(task);
return task.getTaskId();
}
/**
* 添加任务到数据库中
* @param task
* @return
*/
private boolean addTaskToDb(Task task) {
try {
// 1. 保存任务表
Taskinfo taskinfo = new Taskinfo();
BeanUtils.copyProperties(task, taskinfo);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfoMapper.insert(taskinfo);
// 2. 保存任务日志数据
TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
BeanUtils.copyProperties(taskinfo, taskinfoLogs);
taskinfoLogs.setVersion(1);
taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
taskinfoLogsMapper.insert(taskinfoLogs);
// 设置taskId
task.setTaskId(taskinfo.getTaskId());
} catch (BeansException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 添加任务到redis中
* @param task
*/
private void addTaskToCache(Task task) {
String key = task.getTaskType() + "_" + task.getPriority();
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
long nextScheduledTime = calendar.getTimeInMillis();
if(task.getExecuteTime() <= System.currentTimeMillis()) {
// 如果任务的执行时间≤当前时间,存入list中
cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
}else if(task.getExecuteTime() <= nextScheduledTime) {
// 如果任务的执行时间>当前时间 && 任务的执行时间>预设时间(未来五分钟),存入zset中
cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
}
}
- 在数据库中保存一份,要保存到任务表和任务数据表中
- 添加任务到redis中,如果≤当前时间的,要存入list中,如果≤预设时间的,要存入zset中。
2. 取消任务
/**
* 取消任务
* @param taskId
* @return
*/
@Override
public boolean cancelTask(long taskId) {
// 1.删除任务,更新任务日志
Task task = updateDb(taskId, ScheduleConstants.CANCELLED);
if(task == null) {
throw new RuntimeException("删除任务失败");
}
// 2.删除redis的数据
removeTaskFromCache(task);
return true;
}
/**
* 删除缓存中的数据
* @param task
*/
private void removeTaskFromCache(Task task) {
try {
String key = task.getTaskType() + "_" + task.getPriority();
if(task.getExecuteTime() <= System.currentTimeMillis()) {
// 如果任务的执行时间≤当前时间,从list中删除
cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task));
}else {
// task任务不是在list中,就是在zset中,这里就不需要再判断一次了
cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 删除任务,更新任务日志
* @param taskId
* @param status
* @return
*/
private Task updateDb(long taskId, int status) {
Task task = null;
try {
// 删除任务
taskinfoMapper.deleteById(taskId);
// 更新任务日志
TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
taskinfoLogs.setStatus(status);
taskinfoLogsMapper.updateById(taskinfoLogs);
task = new Task();
BeanUtils.copyProperties(taskinfoLogs, task);
task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
} catch (Exception e) {
throw new RuntimeException(e);
}
return task;
}
- 在数据库中,从任务表中删除taskId的任务,更新任务日志表中的日志状态
- 在缓存中,删除list或zset中的任务
3. 消费任务
从list中消费任务
public Task poll(int type, int priority) {
Task task = null;
try {
// 从redis中拉取数据
String key = type + "_" + priority;
String taskJson = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
if(taskJson != null) {
task = JSON.parseObject(taskJson, Task.class);
// 修改数据库信息
updateDb(task.getTaskId(), ScheduleConstants.EXECUTED);
}
} catch (Exception e) {
e.printStackTrace();
}
return task;
}
未来数据定时刷新
从zset中定时刷新任务到list中
获取zset中所有的key:
- keys模糊匹配:效率低
Set<String> keys = cacheService.keys("future_*");
- scan命令:是一个基于游标的迭代器,每次被调用之后,都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新的游标作为SCAN命令的游标参数。
Set<String> scan = cacheService.scan("future_*");
- 添加定时任务,每分钟刷新,从zset中取出符合条件的数据后放入redis管道中
/**
* 未来数据定时刷新
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
log.info("refresh...");
// 获取未来数据的keys集合
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
// 按照key和分值查询符合条件的数据
for (String futureKey : futureKeys) { // future_100_50
// 获取当前数据的topicKey
String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
// 0到当前时间所有查询符合条件的数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
if(!tasks.isEmpty()) {
cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
}
}
}
- 在启动类上开启定时任务
@EnableScheduling // 开启定时任务
【目前存在的问题】:如果启动了两个服务,那么这两个服务会在同一时间去执行定时任务,会导致方法抢占的问题。但是我们的需求是只要时间到了,有一台服务在执行定时任务即可。
【解决办法】:分布式锁来解决方法抢占的问题
分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。
服务A在访问的Redis服务的时候,先加锁,此时只有A可以访问Redis服务,B访问Redis服务会失败。A释放锁后,B才可以访问。
A获取锁后,其他客户端不能操作,只能等待A释放锁以后,其他客户端才能操作。
- 加锁代码:
/**
* 加锁
*
* @param name
* @param expire
* @return
*/
public String tryLock(String name, long expire) {
name = name + "_lock";
String token = UUID.randomUUID().toString();
RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
RedisConnection conn = factory.getConnection();
try {
//参考redis命令:
//set key value [EX seconds] [PX milliseconds] [NX|XX]
Boolean result = conn.set(
name.getBytes(),
token.getBytes(),
Expiration.from(expire, TimeUnit.MILLISECONDS),
RedisStringCommands.SetOption.SET_IF_ABSENT //NX
);
if (result != null && result)
return token;
} finally {
RedisConnectionUtils.releaseConnection(conn, factory,false);
}
return null;
}
- 修改refresh()方法:
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
String token = cacheService.tryLock("FUTRUE_TASK_SYNC", 1000 * 30);
if(StringUtils.isNotBlank(token)) {
// 原本的refresh逻辑...
}
}
4. 数据库任务定时同步到redis中
/**
* 数据库任务定时同步到redis中
*/
@PostConstruct // 微服务启动的时候执行
@Scheduled(cron = "0 */5 * * * ?")// 每五分钟执行
public void reloadData() {
// 清理缓存中的数据 list、zset
clearCache();
// 查询符合条件的任务(小于未来五分钟的数据)
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
long nextScheduledTime = calendar.getTimeInMillis();
List<Taskinfo> taskinfoList = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));
// 把任务添加到redis中
if(taskinfoList != null && !taskinfoList.isEmpty()) {
for (Taskinfo taskinfo : taskinfoList) {
Task task = new Task();
BeanUtils.copyProperties(taskinfo, task);
task.setExecuteTime(taskinfo.getExecuteTime().getTime());
addTaskToCache(task);
}
}
log.info("数据库任务同步到redis中");
}