当前位置: 首页 > article >正文

Spring Boot 整合 Kafka 详解

前言:

上一篇分享了 Kafka 的一些基本概念及应用场景,本篇我们来分享一下在 Spring Boot 项目中如何使用 Kafka。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 集成 Kafka

引入 Kafka 依赖

在项目的 pom.xml 文件中引入 Kafka 依赖,如下:

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-kafka</artifactId>
	<version>3.1.6</version>
</dependency>

添加 Kafka 配置

在 application.properties 文件中配置 Kafka 相关配置,如下:

#kafka server 地址
spring.kafka.bootstrap-servers=10.xxx.4.xxx:9092,10.xxx.4.xxx::9092,10.xxx.4.xxx::9092
spring.kafka.producer.acks = 1
spring.kafka.producer.retries = 0
spring.kafka.producer.batch-size = 30720000
spring.kafka.producer.buffer-memory = 33554432
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
#消费者配置
spring.kafka.consumer.group-id = test-kafka
#是否开启手动提交 默认自动提交
spring.kafka.consumer.enable-auto-commit = true
#如果enable.auto.commit为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000  自动提交已消费offset时间间隔
spring.kafka.consumer.auto-commit-interval = 5000
#earliest:分区已经有提交的offset从提交的offset开始消费,如果没有提交的offset,从头开始消费,latest:分区下已有提交的offset从提交的offset开始消费,没有提交的offset从新产生的数据开始消费
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
#kafka 没有创建指定的 topic 下  项目启动是否报错 true  false
spring.kafka.listener.missing-topics-fatal = false
#Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息
spring.kafka.listener.type = single 
#一次调用 poll() 操作时返回的最大记录数 默认为 500 条
spring.kafka.consumer.max-poll-records = 2
#消息 ACK 模式 有7种
spring.kafka.listener.ack-mode = manual_immediate
#kafka session timeout
spring.kafka.consumer.session.timeout.ms = 300000

配置 Kafka Producer

我们创建一个配置类,并配置生产者工厂,配置 KafkaTemplate。

package com.order.service.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @author :author
 * @description:
 * @modified By:
 * @version: V1.0
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Bean("myProducerKafkaProps")
    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(4);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> newProducerFactory() {
        return new DefaultKafkaProducerFactory<>(getMyKafkaProps());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(newProducerFactory());
    }



}

配置 Kafka Cousumer

我们创建一个配置类,配置消费者工厂和监听容器。

package com.order.service.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @author :author
 * @description:
 * @modified By:
 * @version: V1.0
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String offsetReset;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitIntervalMs;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean("myConsumerKafkaProps")
    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(12);
        //是否自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //kafak 服务器
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        //消费组id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //一次调用poll()操作时返回的最大记录数,默认值为500
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //自动提交时间间隔 默认 5秒
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        //props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        return props;
    }


    /**
     * @return org.springframework.kafka.config.KafkaListenerContainerFactory<org.springframework.kafka.listener.ConcurrentMessageListenerContainer < java.lang.String, java.lang.String>>
     * @date 2024/10/22 19:41
     * @description kafka 消费者工厂
     */
    @Bean("myContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));
        // 并发创建的消费者数量
        factory.setConcurrency(3);
        // 开启批处理
        factory.setBatchListener(true);
        //拉取超时时间
        factory.getContainerProperties().setPollTimeout(1500);
        //是否自动提交 ACK kafka 默认是自动提交
        if (!enableAutoCommit) {
            //共有其中方式
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
        }
        return factory;
    }


}

Kafka 消息发送

创建一个 Kafka 的 Producer,注入 KafkaTemplate,完成消息发送。

package com.order.service.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * @ClassName: KafkaProducer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description:
 */
@Slf4j
@Component
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage( String message) {
        this.kafkaTemplate.send("my-topic", message);
    }

}

Kafka 消息消费

创建一个 Kafka 的 Consumer,使用 @KafkaListener 注解完成消息消费。

package com.order.service.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @ClassName: KafkaConsumer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description:
 */
@Slf4j
@Component
public class MyKafkaConsumer {

    @KafkaListener(id = "my-kafka-consumer",
            idIsGroup = false, topics = "my-topic",
            containerFactory = "myContainerFactory")
    public void listen(String message) {
        log.info("消息消费成功,消息内容:{}", message);
    }

}

Kafka 消息发送消费测试

触发消息发送后,得到如下结果:

2024-10-22 20:22:43.041  INFO 36496 --- [-consumer-0-C-1] c.o.s.kafka.consumer.MyKafkaConsumer     : 消息消费成功,消息内容:第一条 kafka 消息

结果符合预期。

以上我们简单的分享了使用 Spring Boot 集成 Kafka 的过程,希望帮助到有需要的朋友。

如有不正确的地方欢迎各位指出纠正。


http://www.kler.cn/news/366991.html

相关文章:

  • Educational Codeforces Round 170 C New Game
  • 【自然语言处理】BERT模型
  • Maven项目管理工具-初始+环境配置
  • 小白直接冲!一区蛇群优化算法+双向深度学习+注意力机制!SO-BiTCN-BiGRU-Attention多输入单输出回归预测
  • 【大模型理论篇】主流大模型的分词器选择及讨论(BPE/BBPE/WordPiece/Unigram)
  • 985研一,转嵌入式好还是后端开发好?
  • springboot-mybatisplus操作集锦(上)
  • 十分钟Linux中的epoll机制
  • 深入理解Linux内核网络(三):内核发送网络包
  • 【读书笔记·VLSI电路设计方法解密】问题25:为什么时钟如此重要
  • 【1024程序员节】MybatisPlus入门(一)MybatisPlus简介
  • jmeter附件上传
  • 便捷之选:微信小程序驱动的停车场管理系统
  • 导出Excel的常用方法:从前端到后端的全面指南
  • 嵌入式软开项目——MIT 6.S081——学习引导和资料网址
  • python psutil 模块概述
  • 从头开始学PHP之数组
  • 计算机网络:网络层 —— IPv4 地址的应用规划
  • 个体化神经调控 Neurolnavigation介绍
  • ElasticSearch备考 -- rollover
  • HarmonyOS NEXT初级案例:网络数据请求
  • 如何在Node.js中执行解压缩文件操作
  • Http 状态码 301 Permanent Rediret 302 Temporary Redirect
  • python爬虫基础篇:BeautifulSoup解析界面
  • 鸿蒙是必经之路
  • OA命令执行漏洞挖掘