当前位置: 首页 > article >正文

项目实践----Spring Boot整合Kafka,实现单条消费和批量消费

如何安装Kafka,可以参考docker搭载Kafka集群,一个文件搞定,超简单,亲试可行-CSDN博客

1、在pom.xml中加入依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>3.1.6</version>
        </dependency>

2、配置application.yml文件

在application.yml中加入

spring
  kafka:
    #Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔
    bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095

    producer:
      # 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认
      acks: 1
      # 发送失败时的重试次数,0表示不重试
      retries: 0
      # 批量发送时的批次大小(字节)
      batch-size: 30720000 # 30MB
      # 生产者的内存缓冲区大小(字节)
      buffer-memory: 33554432 # 32MB
      # Key的序列化器类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Value的序列化器类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:
      # 消费者所属的组ID
      group-id: test-kafka
      # 禁用自动提交offset,改为手动提交
      enable-auto-commit: false
      # 偏移量重置策略:
      # earliest:从最早的记录开始消费
      # latest:从最新的记录开始消费
      auto-offset-reset: earliest
      # Key的反序列化器类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value的反序列化器类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 每次poll()调用返回的最大消息条数
      max-poll-records: 2
      session:
        # 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)
        timeout:
          ms: 300000 # 5分钟

    listener:
      # 如果指定的主题不存在,是否让应用启动失败,false表示不会报错
      missing-topics-fatal: false
      # 消费模式:single=单条消息,batch=批量消费
      type: single
      # 消费确认模式:
      # manual_immediate:手动确认消息,立即提交offset
      ack-mode: manual_immediate

3、主要示例代码

创建一个目录和四个java文件,可以做测试

3.1、KafkaConfig.java

Kafka监听器配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConfig {

    // 单条消费监听器工厂,手动提交offset
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    // 批量消费监听器工厂,手动提交offset
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true); // 启用批量消费
        factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

3.2、KafkaProducer.java

生产者

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class KafkaProducer {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducer.class, args);
    }

    @Bean
    CommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {
        return args -> {
            String topic = "test-topic";
            for (int i = 1; i <= 10; i++) {
                String message = "Message " + i;
                kafkaTemplate.send(topic, message);
                System.out.println("Sent: " + message);
                Thread.sleep(500); // 模拟消息发送间隔
            }
        };
    }
}

3.3、SingleConsumer.java

单条消息消费者

autoStartup参数:是是否自动启动;=”true“:自动启动,即生产者启动,该消费者将会开始消费;=”false":不自动启动,不开该模式的消费。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class SingleConsumer {
    @KafkaListener(topics = "test-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        System.out.println("SingleConsumer - Received: " + record.value());
        // 手动提交offset
        acknowledgment.acknowledge();
    }
}

3.4、BatchConsumer.java

批量消息消费者

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class BatchConsumer {

    @KafkaListener(topics = "test-topic", groupId = "test-group", containerFactory = "batchFactory", autoStartup = "false")
    public void batchListen(List<String> messages, Acknowledgment acknowledgment) {
        System.out.println("BatchConsumer - Received batch: " + messages);
        // 手动提交offset
        acknowledgment.acknowledge();
    }
}

4、测试

4.1、单条信息消费模式

在SingleConsumer.java中设置autoStartup = "true",启动KafkaProducer.java

消费成功

4.2、批量信息消费模式

在BatchConsumer.java中设置autoStartup = "true",启动KafkaProducer.java

配置文件中设置了max-poll-records: 2,所有一次只消费两条

消费成功

如果在BatchConsumer.java和SingleConsumer.java中设置autoStartup = "true",Kafka会随机选择消费者组里的一个消费者进行消费,所有可以会导致其中一个消费者没有消费信息


http://www.kler.cn/a/430357.html

相关文章:

  • 自动驾驶控制与规划——Project 6: A* Route Planning
  • 如何让用户在网页中填写PDF表格?
  • maven之插件调试
  • 【第01阶段-基础必备篇-第二部分--Python之基础】04 函数
  • 汽车信息安全 -- S32K1如何更新BOOT_MAC
  • Swin-Transformer
  • 河南省的教育部科技查新工作站有哪些?
  • SpringBoot篇(缓存层)
  • 《数据结构》(应用题)
  • Android 因为混淆文件配置,打release包提示running R8问题处理
  • 从0开始边做边学,用vue和python做一个博客,非规范化项目,怎么简单怎么弄,跑的起来有啥毛病解决啥毛病(三)
  • Logstash stopped processing because of an error: (SystemExit) exit
  • 【系统设计】俭约架构七大法则
  • 什么是 Merkle 树
  • Redis探秘Sentinel(哨兵模式)
  • 讲讲什么是 JSX ?
  • Linux Ubuntu 安装配置RabbitMQ,springboot使用RabbitMQ
  • 链式设计模式总结
  • 【游戏设计】游戏中复活点系统类型总结
  • SSM虾米音乐项目2--分页查询
  • Python Web 应用:FastAPI 与 SQLAlchemy ORM
  • 单词拼写纠正-03-72.力扣编辑距离 leetcode edit-distance
  • RISC-V架构下OP-TEE 安全系统实践
  • Python 爬虫 (1)基础 | XHR
  • ruoyi-nbcio为安全起见actuator为仅暴露health端点
  • 数据仓库实验二 实现警务数据仓库OLAP应用