当前位置: 首页 > article >正文

webflux版定时任务实现方案

通常定时任务我们一般采用spring注解@EnableScheduling来启动,但如何与webflux响应式代码结合实现定时任务呢?下面给出了一个企业内使用的真实案例,希望能帮到你。

@Component
@EnableScheduling
@Slf4j
public class TestTask {
 
    @Resource
    private ReactiveStringRedisTemplate redisTemplate; 

    private final AtomicBoolean isRunning=new AtomicBoolean(false); 
    private final Sinks.Many<MapRecord<String, Object, Object>> sinks = Sinks.many().unicast().onBackpressureBuffer();

    @PostConstruct
    public void init(){
        //初始化时订阅sinks,等待有数据流进来,这里并发执行流中数据
        sinks.asFlux().onErrorResume(err->{
            log.error("出现异常",err);
            return Mono.empty();
        }).parallel().runOn(Schedulers.boundedElastic()).flatMap(record->{
            String orderNo=(String) record.getValue().getOrDefault("orderNo","");
            if(orderNo.isEmpty()){
                return Mono.empty();
            }else{
                //添加业务处理逻辑,处理成功后删除队列中数据即可完成任务调度逻辑
            } 
            }).onErrorResume(err->{
                log.error("orderNo={} 异常",orderNo,err);
                return Mono.empty();
            });
        }).subscribe();
    }
 
    @Scheduled(cron = "0/1 * * * * ?")
    public void timer() {
        //扫描要处理的数据发送到sinks,sinks连接下游处理器源源不断进行处理。
        //定时任务每秒执行一次,如果遇到前一个调度任务没有完成则等待,防止重复调度
        if (!isRunning.getAndSet(true)) {
            doTask().doFinally((v)->{
                isRunning.set(false);
            }).subscribe();
        }
    }
    public Mono<Void> doTask() {
        //每次从redis队列中读取100条要处理的数据发送给sinks
        Consumer consumer = Consumer.from("mygroup","myconsumer");
        return redisTemplate.opsForStream().read(consumer, StreamReadOptions.empty().count(100),
                StreamOffset.create("mystream", ReadOffset.lastConsumed()))
                .flatMap(record->{
            try {
                 sinks.tryEmitNext(record);
            }catch (Exception e){
                log.error("发送数据到sinks异常:{}",record.getValue(),e);
            }
            return Mono.empty();
        }).then();
    }
}


http://www.kler.cn/a/456695.html

相关文章:

  • 【系统分析师】- 案例 -数据库特训
  • Spring boot处理跨域问题
  • 【SpringMVC】REST 风格
  • HALCON中用于分类的高斯混合模型create_class_gmm
  • 华为消费级QLC SSD来了
  • Python Celery快速入门教程
  • LeetCode 242. 有效的字母异位词 (C++实现)
  • 超短波自组网如何守护森防安全?
  • Jmeter自学【8】- 使用JMeter模拟设备通过MQTT发送数据
  • AI开发 - 算法基础 递归 的概念和入门(一) 递归算法的常见应用 PYTHON
  • STM32第十一课:STM32-基于标准库的42步进电机的简单IO控制(附电机教程,看到即赚到)
  • Gavin Wood 的 Polkadot 2024 年度回顾:技术突破与未来的无限可能
  • 汽车的hmi设计还能有哪些创新?要从哪些方面下手
  • 接口测试面试题
  • 【网络】什么是VLAN(Virtual Local Area Network虚拟局域网)?物理网络划分多个子网络的技术
  • 最新的序列数据预测模型SOFTS详解
  • fineReport_“数据保留N位小数“
  • 记一次内存泄漏分析(待写稿)
  • 11.MySQL视图特性
  • 镍氢电池材料合金在电池中的应用与性能优化
  • 《计算机视觉:开启智能感知新时代》
  • openEuler 下安装nginx
  • elasticsearch-java客户端jar包中各模块的应用梳理
  • 洪水防洪预警功能:水库水雨情监测系统的应急响应机制
  • Tomcat调优相关理解
  • 华为 IPD,究竟有什么特点?(一)