当前位置: 首页 > article >正文

动态订阅kafka mq实现(消费者组动态上下线)

和上篇文章 动态订阅rocket mq实现(消费者组动态上下线) 目的一致,直接上代码

    /**
     * Kafka topic container集合
     */
    private static final Map<String, ConcurrentMessageListenerContainer<String, String>> topics = new HashMap<>();

	public void registerKafkaListeners(BinlogPortDatabaseConfig binlogPortDatabaseConfig) {
	/*
		BinlogPortDatabaseConfig是自定义的数据结构,即需要动态注册的kafka配置
		包含topic、sever、client,自定义即可
	*/
        ConsumerFactory<String, String> consumerFactory = binlogPortDatabaseConfig.createConsumerFactory();
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setBatchListener(true);
        if (consumerFactory == null) {
            return;
        }
        factory.setConsumerFactory(consumerFactory);
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(binlogPortDatabaseConfig.getTopic());
        //设置为false,解决client后自动加-0的问题
        container.setAlwaysClientIdSuffix(false);
        container.setupMessageListener((MessageListener<String, String>) record -> {
           //TODO:你的消费逻辑,record即为消息体
                }
            } catch (IllegalArgumentException e) {
                log.error("registerKafkaListeners JSON解析失败", e);
            } catch (NullPointerException e) {
                log.error("registerKafkaListeners 消息为空或部分字段缺失", e);
            } catch (Exception e) {
                log.error("registerKafkaListeners 注册异常", e);
            }
        });
        container.start();
        topics.put(binlogPortDatabaseConfig.getTopic(), container);
    }


    public void factoryDel(String topic) {
        ConcurrentMessageListenerContainer<String, String> container = topics.get(topic);
        if (!topic.isEmpty()) {
            container.stop();
            topics.remove(topic);
        }
    }

    public ConsumerFactory<String, String> createConsumerFactory() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, /*你的kafka server*/);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, /*你的kafka client*/);
        if (SystemEnvUtil.isTest()) {
            props.put(ConsumerConfig.GROUP_ID_CONFIG, Constant.consumerGroupIdOffline + topic);
        }
        if (SystemEnvUtil.isProd() || SystemEnvUtil.isSandbox()) {
            props.put(ConsumerConfig.GROUP_ID_CONFIG,/*你的group id*/);
        }
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(100));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(false));

        Map<String, Object> configMap = new java.util.HashMap<>();
        for (Map.Entry<Object, Object> entry : props.entrySet()) {
            configMap.put((String) entry.getKey(), entry.getValue());
        }
        return new DefaultKafkaConsumerFactory<>(configMap);
    }




http://www.kler.cn/a/558002.html

相关文章:

  • 在windows10上基于Python部署marker,实现PDF转markdown文件(保姆级)
  • ue5地面上出现preview字样
  • 小程序(物流、快递),接入GPS北斗获取路线以及当前车辆位置
  • 【后端】gitHub访问速度太慢解决办法
  • UE5.3 C++ TArray系列(一)
  • 【Python爬虫(43)】云端探秘:Python分布式爬虫部署攻略
  • Jenkins 自动构建Job
  • 14.5 基于LangChain重构Auto-GPT:新一代自主智能体架构设计实践
  • 当前 Java Web 开发的最新实现方式
  • 【复习】计算机网络
  • Spring Boot Validation 接口校验:从零到掌握
  • STM32 HAL库I2C函数使用详解:以MPU6050传感器为例
  • Windows 系统下,使用 PyTorch 的 DataLoader 时,如果 num_workers 参数设置为大于 0 的值,报错
  • Apache-CC6链审计笔记
  • PWR电源控制详解教程文章 ~内置初始化驱动代码!!!
  • 网络安全风险事件排名 网络安全事件划分
  • 网络运维学习笔记 012网工初级(HCIA-Datacom与CCNA-EI)某机构新增:GRE隧道与EBGP实施
  • 如何查询网站是否被百度蜘蛛收录?
  • CSS中块级格式化上下文(BFC)详解
  • windwos与linux环境下Iperf3带宽测试工具的安装、使用