当前位置: 首页 > article >正文

springboot 配置Kafka 关闭自启动连接

这里写自定义目录标题

  • springboot 配置Kafka 关闭自启动连接
      • 方法一:使用 @ConditionalOnProperty
      • 方法二:手动管理Kafka监听器容器
      • 方法三:使用 autoStartup=false
      • 结语

springboot 配置Kafka 关闭自启动连接

在Spring Boot应用程序中,默认情况下,Kafka监听器容器会在应用程序启动时自动开始连接到Kafka broker。如果你希望禁用这种自动启动行为,可以通过配置来实现。以下是几种常见的方法:

方法一:使用 @ConditionalOnProperty

你可以使用条件注解来控制Kafka监听器容器的启动。通过设置一个属性来决定是否启用Kafka监听器。
步骤:

  1. 定义配置属性: 在你的application.yml或application.properties文件中添加一个自定义属性,用于控制Kafka监听器的启用状态。
   spring:
     kafka:
       enabled: false
  1. 使用 @ConditionalOnProperty 注解: 在你的Kafka监听器类上使用@ConditionalOnProperty注解,根据配置属性来决定是否启用该监听器。
   import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
   import org.springframework.kafka.annotation.KafkaListener;
   import org.springframework.stereotype.Component;

   @Component
   @ConditionalOnProperty(name = "spring.kafka.enabled", havingValue = "true")
   public class MyKafkaListener {

       @KafkaListener(topics = "your-topic-name", groupId = "your-group-id")
       public void listen(String message) {
           System.out.println("Received Message: " + message);
       }
   }
   

方法二:手动管理Kafka监听器容器

另一种方法是手动管理Kafka监听器容器的生命周期,而不是依赖于Spring Boot的自动配置。
步骤:

  1. 禁用自动配置: 在你的主应用程序类或配置类上排除KafkaAutoConfiguration。
   import org.springframework.boot.SpringApplication;
   import org.springframework.boot.autoconfigure.SpringBootApplication;
   import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;

   @SpringBootApplication(exclude = KafkaAutoConfiguration.class)
   public class DeviceExchangeApplication {

       public static void main(String[] args) {
           long startTime = System.currentTimeMillis();
           System.out.println("-----------> 数据交换链[device-exchange]启动...");
           SpringApplication.run(DeviceExchangeApplication.class, args);
           System.out.println("-----------> 数据交换链[device-exchange]启动成功,耗时:" + (System.currentTimeMillis() - startTime) + "毫秒");
       }
   }
   
  1. 手动创建和管理Kafka监听器容器: 创建并管理Kafka监听器容器,以便在需要的时候手动启动它们。
   import org.apache.kafka.clients.consumer.ConsumerConfig;
   import org.apache.kafka.common.serialization.StringDeserializer;
   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.context.annotation.Bean;
   import org.springframework.context.annotation.Configuration;
   import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
   import org.springframework.kafka.core.ConsumerFactory;
   import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
   import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
   import org.springframework.kafka.listener.MessageListenerContainer;

   import java.util.HashMap;
   import java.util.Map;

   @Configuration
   public class KafkaConfig {

       @Autowired
       private MyKafkaListener myKafkaListener;

       @Bean
       public Map<String, Object> consumerConfigs() {
           Map<String, Object> props = new HashMap<>();
           props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
           props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id");
           return props;
       }

       @Bean
       public ConsumerFactory<String, String> consumerFactory() {
           return new DefaultKafkaConsumerFactory<>(consumerConfigs());
       }

       @Bean
       public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, String> factory =
                   new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(consumerFactory());
           return factory;
       }

       @Bean
       public MessageListenerContainer kafkaListenerContainer() {
           ConcurrentMessageListenerContainer<String, String> container =
                   kafkaListenerContainerFactory()
                           .createContainer("your-topic-name");
           container.setupMessageListener(myKafkaListener::listen);
           return container;
       }
   }
   
  1. 手动启动Kafka监听器容器: 在需要的时候手动启动Kafka监听器容器。
   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.boot.CommandLineRunner;
   import org.springframework.kafka.listener.MessageListenerContainer;
   import org.springframework.stereotype.Component;

   @Component
   public class KafkaStarter implements CommandLineRunner {

       @Autowired
       private MessageListenerContainer kafkaListenerContainer;

       @Override
       public void run(String... args) throws Exception {
           // 手动启动Kafka监听器容器
           kafkaListenerContainer.start();
       }
   }
   

方法三:使用 autoStartup=false

你可以在Kafka监听器容器的配置中设置autoStartup=false,这样它就不会在应用程序启动时自动启动。
步骤:

  1. 配置 autoStartup=false: 在你的Kafka监听器配置中设置autoStartup=false。
   import org.springframework.kafka.annotation.KafkaListener;
   import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
   import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
   import org.springframework.kafka.listener.MessageListenerContainer;
   import org.springframework.stereotype.Component;

   @Component
   public class MyKafkaListener {

       @KafkaListener(id = "myListener", topics = "your-topic-name", autoStartup = "false")
       public void listen(String message) {
           System.out.println("Received Message: " + message);
       }
   }
   
  1. 手动启动Kafka监听器容器: 使用MessageListenerContainer接口的手动启动方法。
   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.kafka.listener.MessageListenerContainer;
   import org.springframework.stereotype.Component;

   @Component
   public class KafkaStarter {

       @Autowired
       private MessageListenerContainer myListenerContainer;

       public void startKafkaListener() {
           myListenerContainer.start();
       }
   }
   

总结
通过上述三种方法,你可以有效地控制Kafka监听器容器的自动启动行为。选择适合你项目需求的方法来实现即可。通常情况下,使用@ConditionalOnProperty是最简单和灵活的方式。

结语

以上答案来自大模型,第二种和第三种都比较麻烦,最后采用了第一种方式在所有的消费类上加了@ConditionalOnProperty(name = "spring.kafka.enabled", havingValue = "true"),启动就很快了,KafkaAdmin 和 KafkaConsumer就没有自动启动了。用kafkaTemplate发送消息还是会去连接Kafka服务器,不影响正常使用。
注意:必须是所有的消费类必须加,不然就不会起作用。
主要场景:一般线上部署环境才会去连接kafka,本地开发的时候 不一定要去连,所以想暂时关闭一下


http://www.kler.cn/a/444759.html

相关文章:

  • 音视频入门基础:MPEG2-TS专题(21)——FFmpeg源码中,获取TS流的视频信息的实现
  • 腾讯云云开发 Copilot 深度探索与实战分享
  • Spring Boot中Bean的 构造器注入、字段注入和方法注入
  • js常用方法之: 预览大图(uniapp原生方法封装)
  • 【Axure高保真原型】伸缩表单
  • WEB开发: 全栈工程师起步 - Python Flask +SQLite的管理系统实现
  • 大数据-255 离线数仓 - Atlas 数据仓库元数据管理 数据血缘关系 元数据
  • 如何更好的对WebSocket的理解?应用场景?
  • 【自动化部署】Ansible Playbook 基础应用
  • 百度面试手撕 go context channel部分学习
  • 自动呼入机器人如何实现自动化学习?
  • 代码随想录-笔记-其七
  • 【C语言程序设计——选择结构程序设计】求阶跃函数的值(头歌实践教学平台习题)【合集】
  • 深度学习基础--自定义函数对数据集进行图像分类,以车牌号识别为例
  • MCU驱动使用
  • MFC 应用程序语言切换
  • #Java篇:java项目init和写接口流程步骤详细
  • UG NX二次开发(C#)-如何设置UGOpen的UF_CAM_geom_type_e枚举类型
  • Go语言封装Cron定时任务
  • 【c++丨STL】set/multiset的使用
  • 2025年NISP考试时间是什么时候?NISP要多少钱?NISP考试时间及费用超全解说!
  • tryhackme-Pre Security-HTTP in Detail(HTTP的详细内容)
  • 2024159读书笔记|《南山册页:齐白石果蔬册鱼虫册》节选
  • 【Rust自学】4.3. 所有权与函数
  • WPF+MVVM案例实战与特效(四十三)- 打造动态炫酷彩虹字控件,让你的界面动起来
  • SQLite 命令