当前位置: 首页 > article >正文

Flink定时器

flink的定时器都是基于事件时间(event time)或事件处理时间(processing time)的变化来触发响应的。对一部分新手玩家来说,可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解,防止下面懵逼。简单来说事件时间就相当于人出生的时间,一般数据生成的时候也会有创建时间。而事件处理时间则相当于人具体做某件事的时间,一条数据可能是2023年生成的,但是到2024年才被处理,这个2024年便被称为这个事件的处理时间。

一、事件时间定时器(event time),这是基于事件时间来触发的,这里面有一个小坑。当第一个事件到的时候创建一个定时器10秒后触发。对我们大部分人来说我既然已经创建了这个定时器,那么10秒后,他就会自动触发。但事实上他10秒后如果没有事件到来他并不会触发。大概意思就是前一个事件创建的定时器需要后一个事件的时间来触发。下面是事件时间定时器的一种实现方式。

import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

public class EventTime {
    public static void main(String[] args) throws Exception {
        SourceTemperature mySourceTemperature = new SourceTemperature();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);
        WatermarkStrategy<Temperature> twsDS
                = WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());

        SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);

        KeyedStream<Temperature, String> keyByDS = tSSODS.keyBy(temperature -> temperature.getDay());

        SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {

            ListState<Temperature> temperatureListState;
            ValueState<Temperature> temperatureState;
            ValueState<Integer> size;
            ValueState<Long> temperature;
            @Override
            public void open(OpenContext openContext) throws Exception {
                ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);
                temperatureListState = getRuntimeContext().getListState(listStateDescriptor);
                temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));
                size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));
                temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));
            }

            @Override
            public void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {

                Temperature value1 = temperatureState.value();
                //System.out.println(ctx.timestamp());
                if(value1 == null){
                    temperatureState.update(value);
                    temperatureListState.add(value);
                    size.update(1);
                    //System.out.printf("当前事件处理:"+DateFormat.getDateTime(ctx.timestamp()));
                    //System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));
                    temperature.update(value.getTimestamp());
                    ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);
                }else{
                    if(value1.getTemperature() < value.getTemperature()){
                        temperatureState.update(value);
                        temperatureListState.add(value);
                        size.update(size.value()+1);
                        //System.out.println(size.value());
                        if(size.value()>= 3){
                            System.out.printf("警告警告:");
                            Iterator<Temperature> iterator = temperatureListState.get().iterator();
                            while(iterator.hasNext()){
                                out.collect(iterator.next());
                            }
                            temperatureListState.clear();
                            temperatureState.clear();
                            size.clear();
                            ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);
                        }
                    }else{
                        System.out.println("温度降低了");
                        temperatureState.update(value);
                        temperatureListState.clear();
                        temperatureListState.add(value);
                        size.update(1);
                        ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);
                        temperature.update(value.getTimestamp());
                        ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);
                    }
                }
            }

            @Override
            public void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {
                System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));
                temperatureListState.clear();
                temperatureState.clear();
                size.clear();
                if(temperature.value() != null)
                    ctx.timerService().deleteEventTimeTimer(temperature.value() + 10*1000);
            }
        });
        process.print("当前警告温度为:");
        env.execute();
    }
}

//自己定义数据源
 class SourceTemperature extends RichSourceFunction<Temperature> {

    @Override
    public void run(SourceContext<Temperature> ctx) throws Exception {
        Scanner scanner = new Scanner(System.in);
        while (true) {
            Temperature temperature = new Temperature();
            System.out.print("请输入温度: ");
            //double temp = Math.random()*40;
            double temp = scanner.nextDouble();
            //System.out.println(temp);
            temperature.setTemperature(temp);
            temperature.setTimestamp(new Date().getTime());
            ctx.collect(temperature);
            //Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {

    }
}

//自定义实体类
class Temperature1 {
    public Temperature1(double temperature, long timestamp) {
        this.temperature = temperature;
        this.timestamp = timestamp;
    }

    public Temperature1(){};

    //温度
    private double temperature;
    //时间
    private long timestamp;
    //id
    private String day = "2024-12-24";

    public double getTemperature() {
        return temperature;
    }

    public void setTemperature(double temperature) {
        this.temperature = temperature;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public String getDay() {
        return day;
    }

    public void setDay(String day) {
        this.day = day;
    }

    @Override
    public String toString() {
        return "Temperature1{" +
                "temperature=" + temperature +
                ", timestamp=" + timestamp +
                ", day='" + day + '\'' +
                '}';
    }
}

下面我们做一个测试,来验证一下这个解释:前一个事件创建的定时器需要后一个事件的时间来触发。他们的时间间隔超过了10秒钟,但是时间并没有触发,而是下一个事件到的时候才触发的。

二、事件处理时间,事件处理时间触发有系统时间有关

package com.xcj;
import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

public class ProcessTime {
    public static void main(String[] args) throws Exception {
        SourceTemperature mySourceTemperature = new SourceTemperature();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);
//        WatermarkStrategy<Temperature> twsDS
//                = WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0))
//                .withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());
//
//        SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);

        KeyedStream<Temperature, String> keyByDS = tDSSource.keyBy(temperature -> temperature.getDay());

        SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {

            ListState<Temperature> temperatureListState;
            ValueState<Temperature> temperatureState;
            ValueState<Integer> size;
            ValueState<Long> temperature;
            @Override
            public void open(OpenContext openContext) throws Exception {
                ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);
                temperatureListState = getRuntimeContext().getListState(listStateDescriptor);
                temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));
                size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));
                temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));
            }

            @Override
            public void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {

                Temperature value1 = temperatureState.value();
                //System.out.println(ctx.timestamp());
                System.out.printf("当前事件时间:"+DateFormat.getDateTime(value.getTimestamp()));
                System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));
                if(value1 == null){
                    temperatureState.update(value);
                    temperatureListState.add(value);
                    size.update(1);
                    temperature.update(ctx.timerService().currentProcessingTime());
                    ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);
                }else{
                    if(value1.getTemperature() < value.getTemperature()){
                        temperatureState.update(value);
                        temperatureListState.add(value);
                        size.update(size.value()+1);
                        //System.out.println(size.value());
                        if(size.value()>= 3){
                            System.out.printf("警告警告:");
                            Iterator<Temperature> iterator = temperatureListState.get().iterator();
                            while(iterator.hasNext()){
                                out.collect(iterator.next());
                            }
                            temperatureListState.clear();
                            temperatureState.clear();
                            size.clear();
                            ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);
                        }
                    }else{
                        System.out.println("温度降低了");
                        temperatureState.update(value);
                        temperatureListState.clear();
                        temperatureListState.add(value);
                        size.update(1);
                        ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);
                        temperature.update(value.getTimestamp());
                        ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);
                    }
                }
            }

            @Override
            public void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {
                System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));
                temperatureListState.clear();
                temperatureState.clear();
                size.clear();
                if(temperature.value() != null)
                    ctx.timerService().deleteProcessingTimeTimer(temperature.value() + 10*1000);
            }
        });
        process.print("当前警告温度为:");
        env.execute();
    }
}

//自己定义数据源
class SourceTemperature extends RichSourceFunction<Temperature> {

    @Override
    public void run(SourceContext<Temperature> ctx) throws Exception {
        Scanner scanner = new Scanner(System.in);
        while (true) {
            Temperature temperature = new Temperature();
            System.out.print("请输入温度: ");
            //double temp = Math.random()*40;
            double temp = scanner.nextDouble();
            //System.out.println(temp);
            temperature.setTemperature(temp);
            temperature.setTimestamp(new Date().getTime());
            ctx.collect(temperature);
            //Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {

    }
}

//自定义实体类
class Temperature1 {
    public Temperature1(double temperature, long timestamp) {
        this.temperature = temperature;
        this.timestamp = timestamp;
    }

    public Temperature1(){};

    //温度
    private double temperature;
    //时间
    private long timestamp;
    //id
    private String day = "2024-12-24";

    public double getTemperature() {
        return temperature;
    }

    public void setTemperature(double temperature) {
        this.temperature = temperature;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public String getDay() {
        return day;
    }

    public void setDay(String day) {
        this.day = day;
    }

    @Override
    public String toString() {
        return "Temperature1{" +
                "temperature=" + temperature +
                ", timestamp=" + timestamp +
                ", day='" + day + '\'' +
                '}';
    }
}

事件处理时间是不需要下一个事件触发的

三、总结

事件时间(event time) 与事件处理时间(process time)定时器整体代码其实差不多,主要是在注册定时器的时候选择的方法

//事件时间
ctx.timerService().registerEventTimeTimer(value.getTimestamp());
//事件处理事件            
ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);

和不同定时器的逻辑。注意:事件时间定时器是需要下一个事件来触发上一个事件的定时任务,但是事件处理时间定时器是不需要下一个事件来触发的,他是根据注册时间和系统时间的差值来触发的。

上面我把注册时间改为了过去很久的时间,来一个就触发一次定时任务,因为注册时间与当前系统时间相差>10秒,所以会直接触发。


http://www.kler.cn/a/451972.html

相关文章:

  • 嵌入式学习-QT-Day08
  • Word窗体联动Excel实现级联组合框
  • 机试题——最大时间
  • STM32单片机芯片与内部41 DAC TIM触发双DAC DMA搬运同步输出正弦波
  • Matrix-Breakout 2 Morpheus
  • 使用vcpkg安装opencv>=4.9后#include<opencv2/opencv.hpp>#include<opencv2/core.hpp>无效
  • C语言-09内存管理
  • MR-GDINO: Efficient Open-World Continual Object Detection
  • vue中做一个最多输入一位小数且可以为负数的输入框(包含最前面最后面为小数点及多个-符号与前导零校验)
  • PaginationInnerInterceptor,spring中pojo
  • WebRTC搭建与应用(五)-Coturn踩坑记
  • 游戏APP如何设计混合变现,最大化变现收益?
  • Unity 重写GridLayoutGroup使居中对齐
  • HarmonyOS NEXT 实战之元服务:静态案例效果---最近播放音乐
  • imx6ull qt多页面控制系统(正点原子imx系列驱动开发)
  • ASN.1 轻松入门2
  • HarmonyOS NEXT 实战之元服务:静态案例效果(二)
  • 131、sqlserver中使用mybatis中的Page进行分页查询时,SQL成功执行(控制台已打印),Page的Records没值bug1.代码复现:
  • NUCLEO-F446RE测试板验证DS100示波器功能
  • 【视觉惯性SLAM:编译及编译工具】