当前位置: 首页 > article >正文

当kafka消费的数据滞后1000条时,打印告警信息

要在 Kafka
消费者中实现当数据滞后1000条时打印告警信息,你需要在消费循环中添加逻辑来检查当前消费者的偏移量与主题中的最新偏移量之间的差异。如果这个差异大于1000,就打印告警信息。以下是修改后的代码示例:

package com.mita.web.core.config.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author sunpeiyang
 * @date 2024/11/12 14:54
 */
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        int numConsumers = 5; // 增加消费者的数量
        for (int i = 0; i < numConsumers; i++) {
            new Thread(new KafkaConsumerThread()).start();
        }
    }

    static class KafkaConsumerThread implements Runnable {
        private static final int ALERT_THRESHOLD = 1000; // 设置告警阈值

        @Override
        public void run() {
            // 配置消费者属性
            Properties props = new Properties();
            props.put("bootstrap.servers", "4.15.18.14:9092");
            props.put("group.id", "test-group");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "5000"); // 增加自动提交偏移量的间隔
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 调整消费者配置
            props.put("fetch.min.bytes", "1"); // 减少最小获取字节数
            props.put("fetch.max.wait.ms", "100"); // 减少最大等待时间
            props.put("max.poll.records", "500"); // 增加一次拉取的最大记录数

            // 创建消费者实例
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

            // 订阅主题
            consumer.subscribe(Collections.singletonList("test-topic"));

            // 消费消息
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                if (!records.isEmpty()) {
                    processRecords(records); // 异步处理消息
                    checkLag(ALERT_THRESHOLD, consumer, "test-topic"); // 检查滞后并告警
                    consumer.commitAsync(); // 异步提交偏移量
                }
            }
        }

        private void processRecords(ConsumerRecords<String, String> records) {
            // 异步处理消息的逻辑
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                // 这里可以添加消息处理逻辑,例如使用线程池并行处理
            }
        }

        private void checkLag(int threshold, KafkaConsumer<String, String> consumer, String topic) {
            for (TopicPartition partition : consumer.assignment()) {
                long currentOffset = consumer.position(partition);
                long endOffset = consumer.endOffsets(Collections.singleton(partition)).values().iterator().next();
                long lag = endOffset - currentOffset;

                if (lag > threshold) {
                    System.out.printf("Alert: Consumer lag for partition %s is %d, which exceeds the threshold of %d%n", partition, lag, threshold);
                }
            }
        }
    }
}

这里你可以发送钉钉消息等告警信息

在这里插入图片描述
在这里插入图片描述

其实我的积压很多,哈哈

在这里插入图片描述
积压的数据还有400多万,怎么快速的处理积压数据,当前代码也有做处理哈


http://www.kler.cn/a/393291.html

相关文章:

  • androidstudio下载gradle慢
  • 准确率调整研究中心
  • 【操作系统】守护进程
  • 第一个 Flutter 项目(1)共46节
  • 文件输入输出——NOI
  • 时间管理的三个痛点
  • 在 Jupyter Notebook 中使用 Matplotlib 进行交互式可视化的教程
  • 第23节 arkts 如何实现多语言
  • 蓝桥杯每日真题 - 第7天
  • 重构代码之移动字段
  • AutoSAR CP DoIP规范导读
  • 网页直播/点播播放器EasyPlayer.js无插件H5播放器关于其后地址不带协议后缀的判断逻辑
  • 方法论-利益驱动模型与系列选择模型(说服他人)
  • acwing算法基础02一高精度,前缀和,差分
  • 引入第三方jar包部署服务器后找不到jar处理方法
  • Ansible剧本检测Windows防火墙状态
  • Redis - 哨兵(Sentinel)
  • 【MySQL】summary
  • 传奇996_21——龙岭事件
  • 丹摩征文活动 |通过Pycharm复现命名实体识别模型--MECT模型
  • JS 实现SSE通讯和了解SSE通讯
  • AI大模型识别多人发音的实时语音交互理论研究
  • Logback 日志介绍及与Spring Boot 的整合 【保姆级教程】
  • 数据库基础(11) . SQL脚本
  • 在arm64架构下, Ubuntu 18.04.5 LTS 用命令安装和卸载qt4、qt5
  • golang将word、excel转换为pdf