当前位置: 首页 > article >正文

使用[KafkaStreams流计算框架实时计算产生报警(升级报警)

使用KafkaStream(Apache Kafka)实时计算报警,官方文档非常完善。

对Kafka不太了解的,可以看下我的博客Kafka集群部署和调优实践_offsets.topic.replication.factor-CSDN博客

需求背景很简单,每秒钟采集一次设备数据,流计算框架需要对数据做处理,判断采集值超过100就产生报警,如果持续5分钟产生高报,持续10分钟产生高高报。流计算服务只负责产出报警到topic,下游服务负责监听topic后续处理。需要注意,当报警被处置后会向接收数据的主题发送处置信号,处置后需要重置这个设备的时间窗口,它对应的报警从新开始计算。每个设备在报警未被处置前只会升级报警,不会重复报警

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;

import java.time.Duration;
import java.util.Properties;

public class SensorAlarmApp {

    public static void main(String[] args) {
        // 配置 Kafka Streams
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sensor-alarm-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // 从 sensor-readings 主题读取传感器数据
        KStream<String, Double> readings = builder.stream("sensor-readings");
        
        // 从 sensor-reset-topic 主题读取报警处置信号
        KStream<String, Double> resetStream = builder.stream("sensor-reset-topic");
        
        // 合并传感器数据流和重置信号流
        KStream<String, Double> filteredReadings = readings
            .merge(resetStream) // 合并数据流
            .filter((k, v) -> true); // 可以添加更多的过滤逻辑,如果需要的话

        // 使用 SessionWindows 来处理数据流的窗口
        SessionWindows sessionWindows = SessionWindows.with(Duration.ofMinutes(10)).grace(Duration.ofMinutes(5));

        // 将数据流转换为 KTable,并使用 SessionWindows 计算报警次数
        KTable<SessionWindowed<String>, Long> alarmCounts = filteredReadings
            .filter((k, v) -> v > 100) // 只处理值大于100的记录
            .groupBy((k, v) -> k) // 按照传感器ID分组
            .windowedBy(sessionWindows) // 使用 SessionWindows 窗口
            .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("alarm-count-store").withValueSerde(Serdes.Long()));

        // 创建一个状态存储,用于跟踪报警状态
        final String alarmStateStoreName = "alarm-state-store";
        final KeyValueStore<String, AlarmStatus> alarmStateStore = builder
            .store(
                Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore(alarmStateStoreName),
                    Serdes.String(),
                    AlarmStatus.serde()
                ).withCachingEnabled()
            );

        // 处理报警
        KTable<Windowed<String>, String> lowAlarms = alarmCounts
            .toStream()
            .filter((k, v) -> v == 1) // 第一次超过100
            .filter((k, v) -> shouldTriggerAlarm(k.key(), "low", alarmStateStore))
            .mapValues((k, v) -> updateAlarmStatus(k.key(), "low", "ALARM: Sensor value over 100", alarmStateStore));

        // 处理高报
        KTable<Windowed<String>, String> highAlarms = alarmCounts
            .toStream()
            .filter((k, v) -> k.window().end() - k.window().start() >= Duration.ofMinutes(5).toMillis()) // 窗口持续时间 >= 5分钟
            .filter((k, v) -> shouldTriggerAlarm(k.key(), "high", alarmStateStore))
            .mapValues((k, v) -> updateAlarmStatus(k.key(), "high", "HIGH ALARM: Sensor value over 100 for more than 5 minutes", alarmStateStore));

        // 处理高高报
        KTable<Windowed<String>, String> highHighAlarms = alarmCounts
            .toStream()
            .filter((k, v) -> k.window().end() - k.window().start() >= Duration.ofMinutes(10).toMillis()) // 窗口持续时间 >= 10分钟
            .filter((k, v) -> shouldTriggerAlarm(k.key(), "high-high", alarmStateStore))
            .mapValues((k, v) -> updateAlarmStatus(k.key(), "high-high", "HIGH HIGH ALARM: Sensor value over 100 for more than 10 minutes", alarmStateStore));

        // 处置报警
        filteredReadings.foreach((k, v) -> handleAlarmDisposal(k, v, alarmStateStore, filteredReadings));

        // 输出报警通知
        lowAlarms.toStream().to("low-alarm-notifications", Produced.with(Serdes.String(), Serdes.String()));
        highAlarms.toStream().to("high-alarm-notifications", Produced.with(Serdes.String(), Serdes.String()));
        highHighAlarms.toStream().to("high-high-alarm-notifications", Produced.with(Serdes.String(), Serdes.String()));

        // 启动 Kafka Streams 实例
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }

    // 更新报警状态的方法
    private static String updateAlarmStatus(String sensorId, String alarmType, String message, KeyValueStore<String, AlarmStatus> store) {
        AlarmStatus status = store.get(sensorId);
        if (status == null) {
            status = new AlarmStatus(); // 创建新的报警状态
        }
        status.setAlarmType(alarmType);
        status.setAlarmMessage(message);
        status.setLastUpdated(System.currentTimeMillis());
        store.put(sensorId, status); // 保存报警状态
        return message;
    }

    // 决定是否触发报警的方法
    private static boolean shouldTriggerAlarm(String sensorId, String alarmType, KeyValueStore<String, AlarmStatus> store) {
        AlarmStatus status = store.get(sensorId);
        if (status == null) {
            return true; // 初始状态,可以触发报警
        } else {
            if (alarmType.equals(status.getAlarmType())) {
                return false; // 报警类型相同,不触发
            }
            if ("low".equals(status.getAlarmType()) && "high".equals(alarmType)) {
                return true; // 升级到高报
            }
            if ("high".equals(status.getAlarmType()) && "high-high".equals(alarmType)) {
                return true; // 升级到高高报
            }
            return false; // 其他情况不触发
        }
    }

    // 处置报警的方法
    private static void handleAlarmDisposal(String sensorId, Double value, KeyValueStore<String, AlarmStatus> store, KStream<String, Double> readings) {
        if (value < 100) {
            store.remove(sensorId); // 清除报警状态
            // 发送设备的重置信号到 sensor-reset-topic
            readings.filter((k, v) -> k.equals(sensorId))
                    .to("sensor-reset-topic", Produced.with(Serdes.String(), Serdes.Double()));
        }
    }

    // 报警状态类
    static class AlarmStatus {
        private String alarmType;
        private String alarmMessage;
        private long lastUpdated;

        public String getAlarmType() {
            return alarmType;
        }

        public void setAlarmType(String alarmType) {
            this.alarmType = alarmType;
        }

        public String getAlarmMessage() {
            return alarmMessage;
        }

        public void setAlarmMessage(String alarmMessage) {
            this.alarmMessage = alarmMessage;
        }

        public long getLastUpdated() {
            return lastUpdated;
        }

        public void setLastUpdated(long lastUpdated) {
            this.lastUpdated = lastUpdated;
        }

        public static Serde<AlarmStatus> serde() {
            return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(AlarmStatus.class));
        }
    }
}


http://www.kler.cn/a/284017.html

相关文章:

  • 从华为到创业公司
  • Elasticsearch基本概念及使用
  • vscode远程连接服务器并启用tmux挂载进程
  • 将Excel文件的两个表格经过验证后分别读取到Excel表和数据库
  • 开发语言中,堆区和栈区的区别
  • 2024-11-13 学习人工智能的Day26 sklearn(2)
  • 深入解析Nginx的Fair调度算法:实现请求的智能分配
  • 中国各地区数字经济发展对环境污染的影响数据(2011-2021年)
  • 坐牢第三十天(c++)
  • 【微信小程序】分包--独立分包
  • 请问基于规则和强化学习的各自的优缺点有哪些?应用场景有哪些?今天的北京机器人展上的技术专家的技术
  • 学习日志30
  • 鸿蒙( Beta5.0版)开发实战:自定义TabBar页签
  • 新加坡服务器解决访问慢的方法有哪些?
  • LSTM唐诗生成
  • SX_Git Graph图形化案例详解_20
  • 网易校招面试题 - 如何给 10 台机器安装 Nginx
  • 设备智能语音交互控制,乐鑫ESP32-S3智能AI方案,助力产品个性化交互
  • 计算机毕业设计选题推荐-在线培训平台-Java/Python项目实战
  • 浅析事件驱动Reactor
  • Pyqt5高级技巧:多线程任务、窗体交互、常用控件介绍(含基础Demo)
  • Apache Tomcat与反向代理
  • Elastic Stack(三):Logstash介绍及安装
  • JDBC中的execute, executeQuery, 和 executeUpdate方法区别
  • 如何构建小学至大学素质评价档案系统 —— php Vue 实践指南
  • 【 html+css 绚丽Loading 】 000027 旋风破云扇