Flink在指定时间窗口内统计均值,超过阈值后报警
1、需求
统计物联网设备收集上来的温湿度数据,如果5分钟内的均值超过阈值(30摄氏度)则发出告警消息,要求时间窗口和阈值可在管理后台随时修改,实时生效(完成当前窗口后下一个窗口使用最新配置)。
物联网设备的数据从kafka中读取,配置数据从mysql中读取,有个管理后台可以调整窗口和阈值大小。
2、思路
使用flink的双流join,配置数据使用广播流,设备数据使用普通流。
3、实现代码
package cu.iot.flink;
import com.alibaba.fastjson2.JSON;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import window.alert.AlertConfig;
import window.alert.EnrichedSensorData;
import window.alert.SensorData;
import java.time.Duration;
import java.util.Properties;
public class BroadcastDemo {
private static String KAFKA_SERVERS = "192.168.213.1:9092,192.168.213.2:9092,192.168.213.3:9092";
private static String KAFKA_GROUP_ID = "public-system-group-dev";
private static String KAFKA_CONSUMER_TOPIC = "public-system-collect-data-dev";
private static String KAFKA_PRODUCER_TOPIC = "public-system-collect-data-dev-output";
private static String KAFKA_PRODUCER_SLIDE_TOPIC = "public-system-collect-data-dev-slide-output";
private static String MYSQL_URL = "jdbc:mysql://10.20.72.1:8190/alerting?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8";
private static String MYSQL_USERNAME = "root";
private static String MYSQL_PASSWORD = "xxxxx";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.enableCheckpointing(1000);
env.setParallelism(1);
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", KAFKA_SERVERS);
kafkaProps.setProperty("group.id", KAFKA_GROUP_ID);
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(KAFKA_CONSUMER_TOPIC, new SimpleStringSchema(), kafkaProps);
kafkaConsumer.setStartFromLatest();
// 侧输出
OutputTag<EnrichedSensorData> lateData = new OutputTag<>("lateData", Types.GENERIC(EnrichedSensorData.class));
DataStream<SensorData> sensorStream = env.addSource(kafkaConsumer)
.map((MapFunction<String, SensorData>) value -> JSON.parseObject(value,SensorData.class));
Properties dbProps = new Properties();
dbProps.setProperty("url", MYSQL_URL);
dbProps.setProperty("username", MYSQL_USERNAME);
dbProps.setProperty("password", MYSQL_PASSWORD);
DataStreamSource<window.alert.AlertConfig> streamSource = env.addSource(new MySQLSourceFunction(dbProps));
SingleOutputStreamOperator<window.alert.AlertConfig> streamOperator = streamSource.assignTimestampsAndWatermarks(
WatermarkStrategy.<window.alert.AlertConfig>forMonotonousTimestamps()
.withTimestampAssigner((o, timestamp) -> o.getTimestamp()));
MapStateDescriptor<String, window.alert.AlertConfig> broadcastStateDescriptor = new MapStateDescriptor<>("alertConfig",
TypeInformation.of(new TypeHint<String>() {}),
TypeInformation.of(new TypeHint<window.alert.AlertConfig>() {}));
// 设置事件时间和水印
SingleOutputStreamOperator<String> alertStream = sensorStream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<SensorData>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((o, timestamp) -> o.getTimestamp()))
.connect(streamOperator.broadcast(broadcastStateDescriptor))
.process(new BroadcastProcessFunction<SensorData, AlertConfig, EnrichedSensorData>() {
@Override
public void processElement(SensorData value, ReadOnlyContext ctx, Collector<EnrichedSensorData> out) throws Exception {
ReadOnlyBroadcastState<String, AlertConfig> broadcastState = ctx.getBroadcastState(broadcastStateDescriptor);
AlertConfig alertConfig = broadcastState.get("alertConfig");
if (alertConfig != null) {
EnrichedSensorData enrichedSensorData = new EnrichedSensorData(value, alertConfig);
//System.out.println("out.collect = "+enrichedSensorData);
out.collect(enrichedSensorData);
}
}
@Override
public void processBroadcastElement(AlertConfig value, BroadcastProcessFunction<SensorData, AlertConfig, EnrichedSensorData>.Context ctx, Collector<EnrichedSensorData> collector) throws Exception {
BroadcastState<String, AlertConfig> broadcastState = ctx.getBroadcastState(broadcastStateDescriptor);
//System.out.println("broadcastState.put = "+value);
broadcastState.put("alertConfig", value);
}
})
.keyBy(EnrichedSensorData::getSensorId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(2))
.sideOutputLateData(lateData)
// .aggregate(
// new AggregateFunction<EnrichedSensorData, Tuple2<Double, Integer>, Double>() {
// @Override
// public Tuple2<Double, Integer> createAccumulator() {
// return new Tuple2<>(0.0D, 0);
// }
//
// @Override
// public Tuple2<Double, Integer> add(EnrichedSensorData val, Tuple2<Double, Integer> accumulator) {
// return new Tuple2<>(accumulator.f0 + val.getTemperature() , accumulator.f1 + 1);
// }
//
// @Override
// public Double getResult(Tuple2<Double, Integer> accumulator) {
// Double rs = accumulator.f0 / accumulator.f1;
// System.out.println("getResult...accumulator.f0 ="+accumulator.f0+", accumulator.f1 = "+accumulator.f1+","+rs);
// return rs;
// }
//
// @Override
// public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
// System.out.println("merge... a.f0 + b.f0="+(a.f0 + b.f0)+", a.f1 + b.f1="+(a.f1 + b.f1));
// return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
// }
// }, new ProcessWindowFunction<Double, String, String, TimeWindow>() {
// @Override
// public void process(String s, ProcessWindowFunction<Double, String, String, TimeWindow>.Context ctx, Iterable<Double> list, Collector<String> out) {
// TimeWindow window = ctx.window();
// int count = 0;
// double sum = 0.0;
// for (Double v : list) {
// sum += v;
// count++;
// }
// out.collect("<Alert> ID:"+s+",window:["+window.getStart()+","+window.getEnd()+") avg="+(count > 0 ? sum / count : 0));
// }
// }
// )
.apply(new WindowFunction<EnrichedSensorData, String, String, TimeWindow>() {
@Override
public void apply(String sensorId, TimeWindow window, Iterable<EnrichedSensorData> input, Collector<String> out) {
double sumTemp = 0;
int count = 0;
AlertConfig alertConfig = null;
for (EnrichedSensorData data : input) {
//System.out.println("Processing event: " + data);
sumTemp += data.getTemperature();
count++;
alertConfig = data.getAlertConfig();
}
double avgTemp = count > 0 ? sumTemp / count : 0;
if (alertConfig != null && avgTemp > alertConfig.getThreshold()) {
out.collect("<ALERT> window: [" + window.getStart() + "," + window.getEnd() + ") sensorId:"+sensorId+",均值:"+avgTemp+",阈值:"+alertConfig.getThreshold());
}
}
})
;
alertStream.print("正常输出>>>");
SingleOutputStreamOperator<String> out = alertStream;
SideOutputDataStream<EnrichedSensorData> output = out.getSideOutput(lateData);
output.printToErr("侧输出>>>");
// Kafka sink for alerts
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(KAFKA_SERVERS, KAFKA_PRODUCER_TOPIC, new SimpleStringSchema());
FlinkKafkaProducer<EnrichedSensorData> kafkaProducer2 = new FlinkKafkaProducer<>(KAFKA_SERVERS, KAFKA_PRODUCER_SLIDE_TOPIC, new JsonSerializationSchema<>());
alertStream.addSink(kafkaProducer);
output.addSink(kafkaProducer2);
env.execute("Sensor Alerts");
}
}
package cu.iot.flink;
import cn.hutool.core.util.RandomUtil;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import window.alert.AlertConfig;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;
public class MySQLSourceFunction implements SourceFunction<AlertConfig> {
private Properties p;
private volatile boolean running = true;
public MySQLSourceFunction(Properties p) {
this.p = p;
}
@Override
public void run(SourceContext<AlertConfig> ctx) throws Exception {
while (running) {
AlertConfig config = fetchAlertConfig();
ctx.collect(config);
Thread.sleep(10000);
}
}
@Override
public void cancel() {
running = false;
}
private AlertConfig fetchAlertConfig() {
AlertConfig config = new AlertConfig();
try (Connection conn = DriverManager.getConnection(p.getProperty("url"), p.getProperty("username"), p.getProperty("password"));
PreparedStatement stmt = conn.prepareStatement("SELECT time_window,upper_limit FROM t_alert_rule WHERE id = 3 AND rule_type = 'timeWindow'");
ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
config.setWindowSizeMillis(rs.getLong("time_window"));
config.setThreshold(rs.getDouble("upper_limit"));
config.setTimestamp(System.currentTimeMillis()- RandomUtil.randomInt(1000, 10000));
}
} catch (Exception e) {
e.printStackTrace();
}
return config;
}
}
package window.alert;
public class AlertConfig {
private Long windowSizeMillis;
private Double threshold;
private Long timestamp;
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Long getWindowSizeMillis() {
return windowSizeMillis;
}
public void setWindowSizeMillis(Long windowSizeMillis) {
this.windowSizeMillis = windowSizeMillis;
}
public Double getThreshold() {
return threshold;
}
public void setThreshold(Double threshold) {
this.threshold = threshold;
}
public AlertConfig() {}
public AlertConfig(long windowSize, double threshold) {
this.windowSizeMillis = windowSize;
this.threshold = threshold;
}
@Override
public String toString() {
return "AlertConfig{" +
"windowSizeMillis=" + windowSizeMillis +
", threshold=" + threshold +
", timestamp=" + timestamp +
'}';
}
}
package window.alert;
public class SensorData {
private String sensorId;
private double temperature;
private double humidity;
private long timestamp;
public SensorData() {}
public String getSensorId() {
return sensorId;
}
public void setSensorId(String sensorId) {
this.sensorId = sensorId;
}
public double getTemperature() {
return temperature;
}
public void setTemperature(double temperature) {
this.temperature = temperature;
}
public double getHumidity() {
return humidity;
}
public void setHumidity(double humidity) {
this.humidity = humidity;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public SensorData(String sensorId, double temperature, double humidity, long timestamp) {
this.sensorId = sensorId;
this.temperature = temperature;
this.humidity = humidity;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "SensorData{" +
"sensorId='" + sensorId + '\'' +
", temperature=" + temperature +
", humidity=" + humidity +
", timestamp=" + timestamp +
'}';
}
}
package window.alert;
public class EnrichedSensorData {
private SensorData sensorData;
private AlertConfig alertConfig;
public EnrichedSensorData(SensorData sensorData, AlertConfig alertConfig) {
this.sensorData = sensorData;
this.alertConfig = alertConfig;
}
public SensorData getSensorData() {
return sensorData;
}
public void setSensorData(SensorData sensorData) {
this.sensorData = sensorData;
}
public String getSensorId() {
return sensorData.getSensorId();
}
public double getTemperature() {
return sensorData.getTemperature();
}
public AlertConfig getAlertConfig() {
return alertConfig;
}
@Override
public String toString() {
return "EnrichedSensorData{" +
"sensorData=" + sensorData +
", alertConfig=" + alertConfig +
'}';
}
}