当前位置: 首页 > article >正文

Spring Boot 配置Kafka

1 Kafka

        Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。

2 Maven依赖


            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>

3 Spring Boot配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      batch-size: 16384
      buffer-memory: 67108864
      acks: 1
      compression-type: lz4
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      enable-auto-commit: true
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringSerializer
      value-deserializer: org.apache.kafka.common.serialization.StringSerializer

4 生产者配置

4.1 KafkaProducerConfig

        生产者的相关配置,指定kafka的地址,消息序列化器。

        topic的分区数、副本数。

package com.xudongbase.kafka.producer;

import com.xudongbase.kafka.constant.KafkaTopicConstant;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@Configuration
public class KafkaProducerConfig {
    @Value(value = "${spring.kafka.bootstrap-servers:}")
    private String bootstrapAddress;
    /**
     * 分区(分区数需要慎重设置,一般分区数为消费者的倍数,要不然在消费高峰时刻会出现消费速度不一样的情况)
     */
    private static final int NUM_PARTITIONS = 5;
    /**
     * 副本
     */
    private static final short REPLICATION_FACTOR = (short) 2;

    public static final String EFAK_SYSTEM_GROUP = "xudong.system.group";

    @Bean(name = "kafkaProducerProperties")
    public Properties Properties() {
        Properties props = new Properties();

        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, EFAK_SYSTEM_GROUP);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                org.apache.kafka.common.serialization.StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                org.apache.kafka.common.serialization.StringSerializer.class);
        return new org.springframework.kafka.core.DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public AdminClient adminClient() {
        Map<String, Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return AdminClient.create(config);
    }

    @Bean
    public NewTopic kafkaTopic() {
        return new NewTopic(KafkaTopicConstant.KAFKA_TOPIC, NUM_PARTITIONS, REPLICATION_FACTOR);
    }


}

4.2 发送消费

package com.xudongbase.kafka;

import com.xudongbase.XuDongBaseApplication;
import com.xudongbase.kafka.constant.KafkaTopicConstant;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;

import javax.annotation.Resource;

@SpringBootTest(classes = XuDongBaseApplication.class)
public class KafkaProducerTest {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    public void test() {
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send(KafkaTopicConstant.KAFKA_TOPIC, "xudongmaster" + i);
        }
    }
}

4.3 发送结果

5 消费者配置

 5.1 KafkaConsumerConfig

        设置消费组的配置,指定kafka的地址、是否批量获取、并发线程数、消费反序列化器、是否手动提交偏移量。

package com.xudongbase.kafka.consumer;

import com.xudongbase.kafka.constant.KafkaConsumerGroupIdConstant;
import com.xudongbase.kafka.enums.KafkaConsumerResetOffsetEnum;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {
    @Value(value = "${spring.kafka.bootstrap-servers:}")
    private String bootstrapAddress;


    private Map<String, Object> consumerConfig(String groupId, String autoOffsetReset) {
        Map<String, Object> consumerConfig = new HashMap<>();
        // Kafka集群的初始连接地址配置
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        // 消费者组ID的配置
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // 消费者的偏移量重置策略配置
        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //拉取的最小字节数 200MB,默认值52428800
        consumerConfig.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024 * 200);
        //拉取的最大字节数 200MB,默认值52428800
        consumerConfig.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024 * 1024 * 200);
        //分区拉取的最大字节数 200MB,默认值1048576
        consumerConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024 * 200);
        // 一次fetch操作的最大等待时间,“最大等待时间”与“最大字节”任何一个先满足了就立即返回给消费者
        // 需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms
        consumerConfig.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000); // fetch.max.wait.ms, 缺省是500
        //一次poll操作最大获取的记录数量,缺省是500。该值越大,则吞吐量也越大,但要求消费者能够在不超时的情况下处理完所有的消息。
        consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 400000);
        // 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”、"拉取条数"任何一个先满足了就立即返回给消费者
        //两次poll操作的间隔时间 10分钟
        consumerConfig.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
        //会话超时时间30秒,默认10秒
        consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
        // 关闭自动提交
        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // 消息键的反序列化类配置
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 消息值的反序列化类配置
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return consumerConfig;
    }

    private ConcurrentKafkaListenerContainerFactory<Integer, String> batchFactory(Map<String, Object> config) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
        // 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        // 设置并发消费者数量
        factory.setConcurrency(1);
        // 设置 Ack 模式为 MANUAL_IMMEDIATE,即手动提交消费偏移量
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 设置轮询超时时间为 30 秒
        factory.getContainerProperties().setPollTimeout(30000);
        return factory;
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaBatchConsumerFactory() {
        return batchFactory(consumerConfig(KafkaConsumerGroupIdConstant.KAFKA_CONSUMER_DEFAULT_GROUP, KafkaConsumerResetOffsetEnum.EARLIEST.getType()));
    }


}

5.2 topic消费者

package com.xudongbase.kafka.consumer.listener;

import com.xudongbase.kafka.constant.KafkaConsumerGroupIdConstant;
import com.xudongbase.kafka.constant.KafkaTopicConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;


@Slf4j
@Component
public class KafkaTopicListener {


    @KafkaListener(groupId = KafkaConsumerGroupIdConstant.KAFKA_CONSUMER_DEFAULT_GROUP,
            topics = KafkaTopicConstant.KAFKA_TOPIC,
            containerFactory = "kafkaBatchConsumerFactory")
    public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        String topicName = KafkaTopicConstant.KAFKA_TOPIC;
        try {
            List<String> valueList = records.stream()
                    .map(ConsumerRecord::value).collect(Collectors.toList());
            for (String value : valueList) {
                log.info("topic:{}, value:{}", topicName, value);
            }
            ack.acknowledge();
        } catch (Exception e) {
            log.error("kafka消费{}:", topicName, e);
        }
    }
}

5.3 消费结果

注:

1、部分代码未能在博客内体现,请点击以下链接跳转至Gitee的xudongbase项目的kafka分支。

xudongbase: 主要是项目中可以用到的共通方法,现有easyexcel分支在持续更新中。欢迎大家Star和提交Issues。easyexcel分支:批量设置样式,批量添加批注,批量合并单元格,设置冻结行和列,设置行高列宽,隐藏行和列,绑定下拉框数据,设置水印,插入图片 - Gitee.comicon-default.png?t=O83Ahttps://gitee.com/xudong_master/xudongbase/tree/kafka/

2、消费消息时,建议关闭自动提交。

自动提交流程为本次提交上次消费完的偏移量,也就是说自动提交会出现重复消费的情况。

enable.auto.commit=false
AckMode=ContainerProperties.AckMode.MANUAL_IMMEDIATE

3、 消费者偏移量重置策略配置建议设置为earliest,earliest代表消费者消费当前消费组在当前分区最后一次消费之后的消息,当前分区之前从未消费过,那就从当前分区最开始的消息开始消费。

4、不需要指定spring-kafka依赖版本,使用SpringBoot对应的默认 spring-kafka依赖版本即可。


http://www.kler.cn/a/443403.html

相关文章:

  • 【MySQL 保姆级教学】用户管理和数据库权限(16)
  • KCP解读:C#库类图
  • 和为0的四元组-蛮力枚举(C语言实现)
  • nginx 日志规范化意义及实现!
  • BGP的local_preference本地优先级属性
  • 28、使用StreamPark管理作业中,关于默认环境变量设置和默认动态参数设置的修改
  • clearvoice 语音降噪、语音分离库
  • 初学stm32 --- 时钟配置
  • SQL进阶技巧:如何计算先进先出的收支平衡问题?
  • Firewalld 防火墙全面解析与配置指南
  • Hadoop yarn安装
  • Java设计模式及示例
  • LeetCode:3376. 破解锁的最少时间 I(DFS回溯 Java)
  • uboot 打开log 的 方法
  • 题海拾贝:P8772 [蓝桥杯 2022 省 A] 求和
  • 在Visual Studio Code (VSCode) 中将终端输出重定向到一个文本文件中
  • 如何在Playwright中操作窗口的变化
  • 【SH】Ubuntu Server 24搭建Web服务器访问Python程序研发笔记
  • 在Rocky Linux中安装【Jenkins】的详细指南
  • Python MySQL 进阶用法详解
  • TRELLIS,一键生成3D模型,图像转3D,微软开源
  • MYSQL语法
  • 【人工智能】从TF-IDF到BERT:Python实现文本分类的全面指南
  • 12.7深度学习_经典神经网络_VGG
  • 八、Hbase
  • 数字设计工程师学习路线:从基础到高阶的全面指南