webflux版定时任务实现方案
通常定时任务我们一般采用spring注解@EnableScheduling来启动,但如何与webflux响应式代码结合实现定时任务呢?下面给出了一个企业内使用的真实案例,希望能帮到你。
@Component
@EnableScheduling
@Slf4j
public class TestTask {
@Resource
private ReactiveStringRedisTemplate redisTemplate;
private final AtomicBoolean isRunning=new AtomicBoolean(false);
private final Sinks.Many<MapRecord<String, Object, Object>> sinks = Sinks.many().unicast().onBackpressureBuffer();
@PostConstruct
public void init(){
//初始化时订阅sinks,等待有数据流进来,这里并发执行流中数据
sinks.asFlux().onErrorResume(err->{
log.error("出现异常",err);
return Mono.empty();
}).parallel().runOn(Schedulers.boundedElastic()).flatMap(record->{
String orderNo=(String) record.getValue().getOrDefault("orderNo","");
if(orderNo.isEmpty()){
return Mono.empty();
}else{
//添加业务处理逻辑,处理成功后删除队列中数据即可完成任务调度逻辑
}
}).onErrorResume(err->{
log.error("orderNo={} 异常",orderNo,err);
return Mono.empty();
});
}).subscribe();
}
@Scheduled(cron = "0/1 * * * * ?")
public void timer() {
//扫描要处理的数据发送到sinks,sinks连接下游处理器源源不断进行处理。
//定时任务每秒执行一次,如果遇到前一个调度任务没有完成则等待,防止重复调度
if (!isRunning.getAndSet(true)) {
doTask().doFinally((v)->{
isRunning.set(false);
}).subscribe();
}
}
public Mono<Void> doTask() {
//每次从redis队列中读取100条要处理的数据发送给sinks
Consumer consumer = Consumer.from("mygroup","myconsumer");
return redisTemplate.opsForStream().read(consumer, StreamReadOptions.empty().count(100),
StreamOffset.create("mystream", ReadOffset.lastConsumed()))
.flatMap(record->{
try {
sinks.tryEmitNext(record);
}catch (Exception e){
log.error("发送数据到sinks异常:{}",record.getValue(),e);
}
return Mono.empty();
}).then();
}
}