当前位置: 首页 > article >正文

kafka动态监听主题

简单版本

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;

@Service
public class DynamicKafkaListenerService {

    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    public void registerListener(String topic) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setMessageListener((MessageListener<String, String>) record -> {
            System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());
        });
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
        container.setBeanName(topic + "-listener");
        container.start();
    }
}

手动ack版本

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;

@Service
public class DynamicKafkaListenerService {

    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    public void registerListener(String topic) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        // 设置消息监听器为 AcknowledgingMessageListener
        containerProperties.setMessageListener((AcknowledgingMessageListener<String, String>) (record, ack) -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());
                // 模拟消息处理逻辑
                // 处理完成后手动确认消息
                if (ack != null) {
                    ack.acknowledge();
                }
            } catch (Exception e) {
                // 处理异常情况,例如记录日志或重试等
                System.err.println("消息处理失败: " + e.getMessage());
            }
        });
        // 设置手动确认模式
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
        container.setBeanName(topic + "-listener");
        container.start();
    }
}

批量处理版本 

    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    public void registerListener(String topic) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        // 设置手动确认模式
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 设置批量消息监听器
        containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {
            try {
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());
                }
                // 模拟消息处理逻辑
                // 处理完成后手动批量确认消息
                ack.acknowledge();
            } catch (Exception e) {
                // 处理异常情况,例如记录日志或重试等
                System.err.println("消息处理失败: " + e.getMessage());
            }
        });
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
        container.setBeanName(topic + "-listener");
        container.start();
    }

可关闭版本

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
public class DynamicKafkaListenerService {


    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    // 用于保存每个主题对应的监听器容器
    private final Map<String, ConcurrentMessageListenerContainer<String, String>> containerMap = new HashMap<>();

    /**
     * 开启一个监听
     */
    public void registerListener(String topic) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        // 设置手动确认模式
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 设置批量消息监听器
        containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {
            try {
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());
                }
                // 模拟消息处理逻辑
                // 处理完成后手动批量确认消息
                ack.acknowledge();
            } catch (Exception e) {
                // 处理异常情况,例如记录日志或重试等
                System.err.println("消息处理失败: " + e.getMessage());
            }
        });
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
        container.setBeanName(topic + "-listener");
        container.start();
        // 将监听器容器保存到 map 中
        containerMap.put(topic, container);
    }

    /**
     * 关闭一个监听
     */
    public void stopListener(String topic) {
        ConcurrentMessageListenerContainer<String, String> container = containerMap.get(topic);
        if (container != null && container.isRunning()) {
            container.stop();
            // 从 map 中移除已停止的监听器容器
            containerMap.remove(topic);
        }
    }
}

调用添加监听

    /**
     * 配置详情
     */
    @GetMapping("/getModelZdyConfInfo")
    public String getModelZdyConfInfo(String topic) {
        dynamicKafkaListenerService.registerListener(topic);
        return "添加" + topic + "监听成功";
    }


http://www.kler.cn/a/543069.html

相关文章:

  • Vue3(2)
  • 算法——数学建模中的“上帝掷骰子”:蒙特卡罗算法
  • 力扣算法题:反转字符串中的元音字母
  • 第三届通信网络与机器学习国际学术会议(CNML 2025)
  • 嵌入式硬件篇---原码、补码、反码
  • 力扣1448. 统计二叉树中好节点的数目
  • Conda 虚拟环境与 venv、virtualenv、pipenv 的对比
  • 基于 DeepSeek 的创新点及其在学术研究与论文发表中的应用
  • uniapp国际化不立即生效(带解决方案)
  • ffmpeg学习:ubuntu下编译Android版ffmpeg-kit
  • 元宵节快乐
  • 力扣刷题(数组篇)
  • 全面理解-命名修饰规则(命名倾轧Name Mangling)
  • Redis 常见面试题汇总(持续更新)
  • 2.2 神经网络语言模型:从词向量到上下文感知的进化革命
  • 第三届通信网络与机器学习国际学术会议(CNML 2025)
  • 光耦隔离的作用及其原理 光耦隔离输入输出能共地
  • 从零到一学习c++(基础篇--筑基期六-string)
  • 【iSAID:用于航空影像实例分割的大规模数据集】
  • 嵌入式之详解:startup.S文件
  • Cherry Studio 连接私域deepseek-r1模型搭建私域知识库和智能体(也可使用第三方模型)
  • 图像处理之图像亮度/对比度调整
  • 【AI知识点】Adversarial Validation(对抗验证)
  • Redis核心技术知识点全集
  • 从工匠故事读懂开源软件的特点与价值
  • 物理引擎Box2D