当前位置: 首页 > article >正文

Flink在指定时间窗口内统计均值,超过阈值后报警

1、需求

统计物联网设备收集上来的温湿度数据,如果5分钟内的均值超过阈值(30摄氏度)则发出告警消息,要求时间窗口和阈值可在管理后台随时修改,实时生效(完成当前窗口后下一个窗口使用最新配置)。

物联网设备的数据从kafka中读取,配置数据从mysql中读取,有个管理后台可以调整窗口和阈值大小。

2、思路

使用flink的双流join,配置数据使用广播流,设备数据使用普通流。

3、实现代码

package cu.iot.flink;

import com.alibaba.fastjson2.JSON;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import window.alert.AlertConfig;
import window.alert.EnrichedSensorData;
import window.alert.SensorData;

import java.time.Duration;
import java.util.Properties;


public class BroadcastDemo {

    private static String KAFKA_SERVERS = "192.168.213.1:9092,192.168.213.2:9092,192.168.213.3:9092";
    private static String KAFKA_GROUP_ID = "public-system-group-dev";
    private static String KAFKA_CONSUMER_TOPIC = "public-system-collect-data-dev";
    private static String KAFKA_PRODUCER_TOPIC = "public-system-collect-data-dev-output";
    private static String KAFKA_PRODUCER_SLIDE_TOPIC = "public-system-collect-data-dev-slide-output";

    private static String MYSQL_URL = "jdbc:mysql://10.20.72.1:8190/alerting?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8";
    private static String MYSQL_USERNAME = "root";
    private static String MYSQL_PASSWORD = "xxxxx";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.enableCheckpointing(1000);
        env.setParallelism(1);
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", KAFKA_SERVERS);
        kafkaProps.setProperty("group.id", KAFKA_GROUP_ID);
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(KAFKA_CONSUMER_TOPIC, new SimpleStringSchema(), kafkaProps);
        kafkaConsumer.setStartFromLatest();
        // 侧输出
        OutputTag<EnrichedSensorData> lateData = new OutputTag<>("lateData", Types.GENERIC(EnrichedSensorData.class));

        DataStream<SensorData> sensorStream = env.addSource(kafkaConsumer)
                .map((MapFunction<String, SensorData>) value -> JSON.parseObject(value,SensorData.class));
        Properties dbProps = new Properties();
        dbProps.setProperty("url", MYSQL_URL);
        dbProps.setProperty("username", MYSQL_USERNAME);
        dbProps.setProperty("password", MYSQL_PASSWORD);
        DataStreamSource<window.alert.AlertConfig> streamSource = env.addSource(new MySQLSourceFunction(dbProps));
        SingleOutputStreamOperator<window.alert.AlertConfig> streamOperator = streamSource.assignTimestampsAndWatermarks(
                WatermarkStrategy.<window.alert.AlertConfig>forMonotonousTimestamps()
                        .withTimestampAssigner((o, timestamp) -> o.getTimestamp()));
        MapStateDescriptor<String, window.alert.AlertConfig> broadcastStateDescriptor = new MapStateDescriptor<>("alertConfig",
                        TypeInformation.of(new TypeHint<String>() {}),
                        TypeInformation.of(new TypeHint<window.alert.AlertConfig>() {}));
        // 设置事件时间和水印
        SingleOutputStreamOperator<String> alertStream = sensorStream
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<SensorData>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                                .withTimestampAssigner((o, timestamp) -> o.getTimestamp()))
                .connect(streamOperator.broadcast(broadcastStateDescriptor))
                .process(new BroadcastProcessFunction<SensorData, AlertConfig, EnrichedSensorData>() {
                    @Override
                    public void processElement(SensorData value, ReadOnlyContext ctx, Collector<EnrichedSensorData> out) throws Exception {
                        ReadOnlyBroadcastState<String, AlertConfig> broadcastState = ctx.getBroadcastState(broadcastStateDescriptor);
                        AlertConfig alertConfig = broadcastState.get("alertConfig");
                        if (alertConfig != null) {
                            EnrichedSensorData enrichedSensorData = new EnrichedSensorData(value, alertConfig);
                            //System.out.println("out.collect = "+enrichedSensorData);
                            out.collect(enrichedSensorData);
                        }
                    }
                    @Override
                    public void processBroadcastElement(AlertConfig value, BroadcastProcessFunction<SensorData, AlertConfig, EnrichedSensorData>.Context ctx, Collector<EnrichedSensorData> collector) throws Exception {
                        BroadcastState<String, AlertConfig> broadcastState = ctx.getBroadcastState(broadcastStateDescriptor);
                        //System.out.println("broadcastState.put = "+value);
                        broadcastState.put("alertConfig", value);
                    }
                })
                .keyBy(EnrichedSensorData::getSensorId)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(2))
                .sideOutputLateData(lateData)
//                .aggregate(
//                        new AggregateFunction<EnrichedSensorData, Tuple2<Double, Integer>, Double>() {
//                    @Override
//                    public Tuple2<Double, Integer> createAccumulator() {
//                        return new Tuple2<>(0.0D, 0);
//                    }
//
//                    @Override
//                    public Tuple2<Double, Integer> add(EnrichedSensorData val, Tuple2<Double, Integer> accumulator) {
//                        return new Tuple2<>(accumulator.f0 + val.getTemperature() , accumulator.f1 + 1);
//                    }
//
//                    @Override
//                    public Double getResult(Tuple2<Double, Integer> accumulator) {
//                        Double rs = accumulator.f0 / accumulator.f1;
//                        System.out.println("getResult...accumulator.f0 ="+accumulator.f0+", accumulator.f1 = "+accumulator.f1+","+rs);
//                        return rs;
//                    }
//
//                    @Override
//                    public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
//                        System.out.println("merge...  a.f0 + b.f0="+(a.f0 + b.f0)+", a.f1 + b.f1="+(a.f1 + b.f1));
//                        return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
//                    }
//                }, new ProcessWindowFunction<Double, String, String, TimeWindow>() {
//                    @Override
//                    public void process(String s, ProcessWindowFunction<Double, String, String, TimeWindow>.Context ctx, Iterable<Double> list, Collector<String> out) {
//                        TimeWindow window = ctx.window();
//                        int count = 0;
//                        double sum = 0.0;
//                        for (Double v : list) {
//                            sum += v;
//                            count++;
//                        }
//                        out.collect("<Alert> ID:"+s+",window:["+window.getStart()+","+window.getEnd()+") avg="+(count > 0 ? sum / count : 0));
//                    }
//                }
//                )
                .apply(new WindowFunction<EnrichedSensorData, String, String, TimeWindow>() {
                    @Override
                    public void apply(String sensorId, TimeWindow window, Iterable<EnrichedSensorData> input, Collector<String> out) {
                        double sumTemp = 0;
                        int count = 0;
                        AlertConfig alertConfig = null;
                        for (EnrichedSensorData data : input) {
                            //System.out.println("Processing event: " + data);
                            sumTemp += data.getTemperature();
                            count++;
                            alertConfig = data.getAlertConfig();
                        }
                        double avgTemp = count > 0 ? sumTemp / count : 0;
                        if (alertConfig != null && avgTemp > alertConfig.getThreshold()) {
                            out.collect("<ALERT> window: [" + window.getStart() + "," + window.getEnd() + ") sensorId:"+sensorId+",均值:"+avgTemp+",阈值:"+alertConfig.getThreshold());
                        }
                    }
                })
                ;
        alertStream.print("正常输出>>>");
        SingleOutputStreamOperator<String> out = alertStream;
        SideOutputDataStream<EnrichedSensorData> output = out.getSideOutput(lateData);
        output.printToErr("侧输出>>>");
        // Kafka sink for alerts
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(KAFKA_SERVERS, KAFKA_PRODUCER_TOPIC, new SimpleStringSchema());
        FlinkKafkaProducer<EnrichedSensorData> kafkaProducer2 = new FlinkKafkaProducer<>(KAFKA_SERVERS, KAFKA_PRODUCER_SLIDE_TOPIC, new JsonSerializationSchema<>());
        alertStream.addSink(kafkaProducer);
        output.addSink(kafkaProducer2);
        env.execute("Sensor Alerts");
    }
}
package cu.iot.flink;


import cn.hutool.core.util.RandomUtil;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import window.alert.AlertConfig;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;


public class MySQLSourceFunction implements SourceFunction<AlertConfig> {

    private Properties p;

    private volatile boolean running = true;

    public MySQLSourceFunction(Properties p) {
        this.p = p;
    }

    @Override
    public void run(SourceContext<AlertConfig> ctx) throws Exception {
        while (running) {
            AlertConfig config = fetchAlertConfig();
            ctx.collect(config);
            Thread.sleep(10000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
    private AlertConfig fetchAlertConfig() {
        AlertConfig config = new AlertConfig();
        try (Connection conn = DriverManager.getConnection(p.getProperty("url"), p.getProperty("username"), p.getProperty("password"));
             PreparedStatement stmt = conn.prepareStatement("SELECT time_window,upper_limit FROM t_alert_rule WHERE id = 3 AND rule_type = 'timeWindow'");
             ResultSet rs = stmt.executeQuery()) {
            if (rs.next()) {
                config.setWindowSizeMillis(rs.getLong("time_window"));
                config.setThreshold(rs.getDouble("upper_limit"));
                config.setTimestamp(System.currentTimeMillis()- RandomUtil.randomInt(1000, 10000));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return config;
    }
}
package window.alert;


public class AlertConfig {
    private Long windowSizeMillis;
    private Double threshold;
    private Long timestamp;

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public Long getWindowSizeMillis() {
        return windowSizeMillis;
    }

    public void setWindowSizeMillis(Long windowSizeMillis) {
        this.windowSizeMillis = windowSizeMillis;
    }

    public Double getThreshold() {
        return threshold;
    }

    public void setThreshold(Double threshold) {
        this.threshold = threshold;
    }

    public AlertConfig() {}

    public AlertConfig(long windowSize, double threshold) {
        this.windowSizeMillis = windowSize;
        this.threshold = threshold;
    }

    @Override
    public String toString() {
        return "AlertConfig{" +
                "windowSizeMillis=" + windowSizeMillis +
                ", threshold=" + threshold +
                ", timestamp=" + timestamp +
                '}';
    }
}
package window.alert;

public class SensorData {

    private String sensorId;
    private double temperature;
    private double humidity;
    private long timestamp;

    public SensorData() {}

    public String getSensorId() {
        return sensorId;
    }

    public void setSensorId(String sensorId) {
        this.sensorId = sensorId;
    }

    public double getTemperature() {
        return temperature;
    }

    public void setTemperature(double temperature) {
        this.temperature = temperature;
    }

    public double getHumidity() {
        return humidity;
    }

    public void setHumidity(double humidity) {
        this.humidity = humidity;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public SensorData(String sensorId, double temperature, double humidity, long timestamp) {
        this.sensorId = sensorId;
        this.temperature = temperature;
        this.humidity = humidity;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "SensorData{" +
                "sensorId='" + sensorId + '\'' +
                ", temperature=" + temperature +
                ", humidity=" + humidity +
                ", timestamp=" + timestamp +
                '}';
    }
}
package window.alert;


public class EnrichedSensorData {

    private SensorData sensorData;
    private AlertConfig alertConfig;

    public EnrichedSensorData(SensorData sensorData, AlertConfig alertConfig) {
        this.sensorData = sensorData;
        this.alertConfig = alertConfig;
    }

    public SensorData getSensorData() {
        return sensorData;
    }

    public void setSensorData(SensorData sensorData) {
        this.sensorData = sensorData;
    }

    public String getSensorId() {
        return sensorData.getSensorId();
    }

    public double getTemperature() {
        return sensorData.getTemperature();
    }

    public AlertConfig getAlertConfig() {
        return alertConfig;
    }

    @Override
    public String toString() {
        return "EnrichedSensorData{" +
                "sensorData=" + sensorData +
                ", alertConfig=" + alertConfig +
                '}';
    }
}


http://www.kler.cn/a/549923.html

相关文章:

  • 架构设计系列(三):架构模式
  • 备战蓝桥杯 Day2 枚举 Day3 进制转换
  • FFmpeg源码:av_strlcpy函数分析
  • kamailio中的PV,PV Headers,App Lua,Dialog,UUID,Dianplan等模块的讲解
  • Unity状态机的实现方法二
  • Vue3(3)
  • 【Oracle】层次查询步骤,理解 where 条件执行顺序
  • 项目上传github步骤
  • DeepSeek与医院电子病历的深度融合路径:本地化和上云差异化分析
  • MATLAB图像处理:图像分割方法
  • 【go语言规范】关于接口设计
  • YOLOv11目标检测:解密mAP50与mAP的背后秘密
  • 代码随想录刷题攻略---动态规划---子序列问题1---子序列
  • java八股文-redis
  • python的类装饰器
  • 【系列专栏】银行IT的云原生架构-存储架构-数据库部署 10
  • 青少年编程与数学 02-009 Django 5 Web 编程 16课题、权限管理
  • rtsp rtmp 跟 http 区别
  • Kubernetes控制平面组件:etcd高可用集群搭建
  • 250214-java类集框架