当前位置: 首页 > article >正文

flink判断两个事件之间有没有超时(不使用CEP)

1.为啥不使用cep呢,cep的超时时间设置不好配置化,无法满足扩展要求

2.超时怎么界定。A事件发生后,过了N时间,还没有收到B事件,算超时。

代码如下:


import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

@Slf4j
public class AsyncModelTimeoutHandler extends KeyedProcessFunction<String, JSONObject, JSONObject> {

    private static final long serialVersionUID = -61608451659272532L;
    private transient ValueState<Long> firstDataTime;

    private transient ValueState<Long> secondDataTime;

    private transient ValueState<String> eventType;

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Long> firstDataDescriptor = new ValueStateDescriptor<>("firstDataTime", Long.class);
        firstDataTime = getRuntimeContext().getState(firstDataDescriptor);

        ValueStateDescriptor<Long> secondDataDescriptor = new ValueStateDescriptor<>("secondDataTime", Long.class);
        secondDataTime = getRuntimeContext().getState(secondDataDescriptor);

        ValueStateDescriptor<String> eventTypeDescriptor = new ValueStateDescriptor<>("eventType", String.class);
        eventType = getRuntimeContext().getState(eventTypeDescriptor);
    }


    @Override
    public void processElement(JSONObject value, KeyedProcessFunction<String, JSONObject, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
        Long currentTimestamp = value.getLong("ts");
        if (value.containsKey("timeout")) {
            //异步请求消息
            long timeout = value.getLong("timeout");
            firstDataTime.update(currentTimestamp + timeout);
            eventType.update(value.getString("event"));
            ctx.timerService().registerProcessingTimeTimer(currentTimestamp + timeout);
        } else {
            secondDataTime.update(currentTimestamp);
        }
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, JSONObject, JSONObject>.OnTimerContext ctx, Collector<JSONObject> out) throws Exception {
        Long firstTime = firstDataTime.value();
        Long lastTime = secondDataTime.value();
        if (lastTime == null || (firstTime != null && lastTime >= firstTime)) {
            //超时了
            log.info("AsyncModelTimeoutHandler onTimer handle triggerTime={}, firstTime={}, secondTime={},key={}", timestamp, firstTime, lastTime, ctx.getCurrentKey());
            JSONObject r = new JSONObject();
            r.put("id", ctx.getCurrentKey());
            r.put("judgeTime", timestamp);
            r.put("event", eventType.value());
            out.collect(r);
        }
        firstDataTime.clear();
        secondDataTime.clear();
        eventType.clear();
    }
}


http://www.kler.cn/a/540002.html

相关文章:

  • 使用Redis实现业务信息缓存(缓存详解,缓存更新策略,缓存三大问题)
  • JAVA中的抽象学习
  • 【算法】【高精度】acwing算法基础 794. 高精度除法
  • 链表(LinkedList) 1
  • U3D支持webgpu阅读
  • DeepSeek和ChatGPT的优劣或者区别(答案来DeepSeek和ChatGPT)
  • jmeter 性能测试Linux 常用的安装
  • 设计模式 ->模板方法模式(Template Method Pattern)
  • matlab simulink 船舶模糊pid控制仿真
  • 网络安全行业的冬天
  • 5.攻防世界 fileinclude
  • xss闯关
  • 【深度学习】基于MXNet的多层感知机的实现
  • 华为OD最新机试真题-考勤信息-C++-OD统一考试(E卷)
  • Java语言的正则表达式
  • 快速在wsl上部署学习使用c++轻量化服务器-学习笔记
  • 金三银四软件测试面试题(800道)
  • 学习threejs,tga格式图片文件贴图
  • C++(进阶六)--STL--unordered_map和unordered_set
  • UI自动化测试框架:PO 模式+数据驱动
  • amis组件crud使用踩坑
  • leetcode 80. 删除有序数组中的重复项 II
  • 音视频协议
  • webpack配置之---output.chunkLoadTimeout
  • 如何解决 javax.xml.crypto.dsig.TransformException: 转换异常问题?亲测有效的解决方法!
  • 项目顺利交付,几个关键阶段