Spring Boot 配置Kafka
1 Kafka
Kafka 是由 Linkedin
公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。
2 Maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3 Spring Boot配置
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
batch-size: 16384
buffer-memory: 67108864
acks: 1
compression-type: lz4
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
enable-auto-commit: true
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
4 生产者配置
4.1 KafkaProducerConfig
生产者的相关配置,指定kafka的地址,消息序列化器。
topic的分区数、副本数。
package com.xudongbase.kafka.producer;
import com.xudongbase.kafka.constant.KafkaTopicConstant;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers:}")
private String bootstrapAddress;
/**
* 分区(分区数需要慎重设置,一般分区数为消费者的倍数,要不然在消费高峰时刻会出现消费速度不一样的情况)
*/
private static final int NUM_PARTITIONS = 5;
/**
* 副本
*/
private static final short REPLICATION_FACTOR = (short) 2;
public static final String EFAK_SYSTEM_GROUP = "xudong.system.group";
@Bean(name = "kafkaProducerProperties")
public Properties Properties() {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, EFAK_SYSTEM_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
return new org.springframework.kafka.core.DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public AdminClient adminClient() {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return AdminClient.create(config);
}
@Bean
public NewTopic kafkaTopic() {
return new NewTopic(KafkaTopicConstant.KAFKA_TOPIC, NUM_PARTITIONS, REPLICATION_FACTOR);
}
}
4.2 发送消费
package com.xudongbase.kafka;
import com.xudongbase.XuDongBaseApplication;
import com.xudongbase.kafka.constant.KafkaTopicConstant;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import javax.annotation.Resource;
@SpringBootTest(classes = XuDongBaseApplication.class)
public class KafkaProducerTest {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Test
public void test() {
for (int i = 0; i < 10; i++) {
kafkaTemplate.send(KafkaTopicConstant.KAFKA_TOPIC, "xudongmaster" + i);
}
}
}
4.3 发送结果
5 消费者配置
5.1 KafkaConsumerConfig
设置消费组的配置,指定kafka的地址、是否批量获取、并发线程数、消费反序列化器、是否手动提交偏移量。
package com.xudongbase.kafka.consumer;
import com.xudongbase.kafka.constant.KafkaConsumerGroupIdConstant;
import com.xudongbase.kafka.enums.KafkaConsumerResetOffsetEnum;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers:}")
private String bootstrapAddress;
private Map<String, Object> consumerConfig(String groupId, String autoOffsetReset) {
Map<String, Object> consumerConfig = new HashMap<>();
// Kafka集群的初始连接地址配置
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
// 消费者组ID的配置
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// 消费者的偏移量重置策略配置
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
//拉取的最小字节数 200MB,默认值52428800
consumerConfig.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024 * 200);
//拉取的最大字节数 200MB,默认值52428800
consumerConfig.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024 * 1024 * 200);
//分区拉取的最大字节数 200MB,默认值1048576
consumerConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024 * 200);
// 一次fetch操作的最大等待时间,“最大等待时间”与“最大字节”任何一个先满足了就立即返回给消费者
// 需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms
consumerConfig.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000); // fetch.max.wait.ms, 缺省是500
//一次poll操作最大获取的记录数量,缺省是500。该值越大,则吞吐量也越大,但要求消费者能够在不超时的情况下处理完所有的消息。
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 400000);
// 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”、"拉取条数"任何一个先满足了就立即返回给消费者
//两次poll操作的间隔时间 10分钟
consumerConfig.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
//会话超时时间30秒,默认10秒
consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
// 关闭自动提交
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 消息键的反序列化类配置
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 消息值的反序列化类配置
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return consumerConfig;
}
private ConcurrentKafkaListenerContainerFactory<Integer, String> batchFactory(Map<String, Object> config) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
// 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
// 设置并发消费者数量
factory.setConcurrency(1);
// 设置 Ack 模式为 MANUAL_IMMEDIATE,即手动提交消费偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 设置轮询超时时间为 30 秒
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaBatchConsumerFactory() {
return batchFactory(consumerConfig(KafkaConsumerGroupIdConstant.KAFKA_CONSUMER_DEFAULT_GROUP, KafkaConsumerResetOffsetEnum.EARLIEST.getType()));
}
}
5.2 topic消费者
package com.xudongbase.kafka.consumer.listener;
import com.xudongbase.kafka.constant.KafkaConsumerGroupIdConstant;
import com.xudongbase.kafka.constant.KafkaTopicConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Component
public class KafkaTopicListener {
@KafkaListener(groupId = KafkaConsumerGroupIdConstant.KAFKA_CONSUMER_DEFAULT_GROUP,
topics = KafkaTopicConstant.KAFKA_TOPIC,
containerFactory = "kafkaBatchConsumerFactory")
public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
String topicName = KafkaTopicConstant.KAFKA_TOPIC;
try {
List<String> valueList = records.stream()
.map(ConsumerRecord::value).collect(Collectors.toList());
for (String value : valueList) {
log.info("topic:{}, value:{}", topicName, value);
}
ack.acknowledge();
} catch (Exception e) {
log.error("kafka消费{}:", topicName, e);
}
}
}
5.3 消费结果
注:
1、部分代码未能在博客内体现,请点击以下链接跳转至Gitee的xudongbase项目的kafka分支。
xudongbase: 主要是项目中可以用到的共通方法,现有easyexcel分支在持续更新中。欢迎大家Star和提交Issues。easyexcel分支:批量设置样式,批量添加批注,批量合并单元格,设置冻结行和列,设置行高列宽,隐藏行和列,绑定下拉框数据,设置水印,插入图片 - Gitee.comhttps://gitee.com/xudong_master/xudongbase/tree/kafka/
2、消费消息时,建议关闭自动提交。
自动提交流程为本次提交上次消费完的偏移量,也就是说自动提交会出现重复消费的情况。
enable.auto.commit=false
AckMode=ContainerProperties.AckMode.MANUAL_IMMEDIATE
3、 消费者偏移量重置策略配置建议设置为earliest,earliest代表消费者消费当前消费组在当前分区最后一次消费之后的消息,当前分区之前从未消费过,那就从当前分区最开始的消息开始消费。
4、不需要指定spring-kafka依赖版本,使用SpringBoot对应的默认 spring-kafka依赖版本即可。