当前位置: 首页 > article >正文

Kafka 入门与实战

一、Kafka 基础

1.1 创建topic

kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create

1.2 查看消费者偏移量位置

kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group test

1.3 消息的生产与发送

#生产者
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test

#消费者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

二、Kafka 集群部署

在 Kafka 集群中,每个节点是一个 broker,broker 中可以创建多个 Topic,每个Topic都可以被分成多个Partition,每个 Partition 都是一个有序的、不可变的消息序列。Partition 是Kafka集群中消息存储的最小单元,也是Kafka集群中消息分发和负载均衡的最小单位,Partition 通过副本机制 Replication 来保证数据的高可用性和容错性,每个 Partition 都有一个 leader 的副本和 多个 follower 副本,leader 副本负责接收和处理消息,follower 副本负责复制leader 副本的数据。

2.1 修改server.properties

#设置 broker 不唯一
broker.id=1
#若是在一台机器上,需要更改端口号,避免冲突
listeners=PLAINTEXT://:9091
#日志目录,选择性更改
log.dirs=D:/kafka-cluster/data/kafka-broker-1

2.2 创建启动脚本文件

#zookeeper 启动脚本
@echo off
cd /d %~dp0
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
pause

#每个Kafka服务器创建启动脚本
@echo off
cd /d %~dp0
.\bin\windows\kafka-server-start.bat .\config\server.properties
pause

2.3 创建批处理启动脚本

@echo off
cd /d %~dp0
cd ./kafka-broker-1
start zookeeper.bat
ping 127.0.0.1 -n 10 >nul
start kafka.bat
cd ../kafka-broker-2
start kafka.bat
cd ../kafka-broker-3
start kafka.bat
pause

2.4 创建批处理清除脚本

再次重新启动会启动失败,需要清除文件夹

@echo off
cd /d %~dp0
rd /s /q D:\kcluster\data
pause

Windows 下使用反斜杠,若使用正斜杠,可能会报错。

三、zookeeper 服务启动 

3.1 kafka 启动,zookeeper节点变化

controller:集群模式下会有多个 broker 节点,而这些broker节点会选举出一个管理者-controller。

zookeeper 中数据存储的方式类似文件:

当有kafka连接 zk 时,就会创建节点,存储相关信息,此时的 controller 是临时节点,当我们的 kafka 服务器关闭时,该节点就会被删除(默认创建临时节点):

 3.2 集群模式

当我们以集群的方式去启动时,controller 的选举策略是 “先入为主”,即哪个 broker 先注册至 zk 中,和 zk 建立连接并创建 controller 节点,它就会被选举为 controller。此时其他 broker 节点在尝试创建 controller 节点时就会失败,但仍会监听 controller 节点,如果 controller 节点挂掉了,此时其他 broker 节点就会去竞争创建 controller 节点,先创建的就会被选举为 controller 节点,而其他的 broker 节点会创建失败,继续去监听新的 controller。

3.3 controller 与 broker 通信

监听  /brokers/ids 节点是为了监听是否有新的 broker 节点,并管理这些 broker 节点。

第二个broker 启动流程

四、主题创建

主题创建时需要指定 name,partition 和 replication,partition 是数据负载均衡的最小单位,partition 数一般小于等于 broker 节点数,replication 是partition的副本节点,replication 一般会和partition 存储到不同的 broker 节点。

五、生产数据

5.1 生产流程

先将信息封装成 ProducerRecord ,然后再经过拦截器得到一个经过拦截处理后的ProducerRecord,之后再通过序列化器针对 key 和 value 进行序列化,通过分区器计算数据发送至哪个partition,即发送到对应的 broker 节点,然后加入到缓冲区,直到缓存刷新或者缓存区满后通过 Sender 发送线程将数据发送至 broker。

分区器会通过 MetadataCache 来获取 topic 的相关信息,并针对 partition 来计算对应的分区。(如果设置分区编号,则会直接使用,不会对编号进行校验,如果没有对应的分区,数据则无法正确存储到对应的 topic 中)

5.2 数据的异步发送和同步发送

数据默认是异步发送的, 将数据发送至数据缓冲区中,之后会由 sender 线程来发送。同步发送:

    @RequestMapping("/send")
    public String send(String msg) throws ExecutionException, InterruptedException {
            CompletableFuture future=kafkaTemplate.send(TOPIC,msg);
            future.get();
            return "success";
    }

5.3 ACKS 应答处理机制

  • 0:sender 线程发送消息后,直接返回 ack,此时数据只是放到了网络当中,不会考虑数据是否真正存到 Kafka 中。
  • 1:当数据保存到磁盘后,即保存到对应的分区后会直接返回 ack。
  • -1(all):当多个副本 replication 都同步消息完成后,才会返回 ack,该级别是最安全的。

5.4 数据重复及乱序的原因

要了解原因首先知道重传机制: 当数据没有正常发送,没有接收到对应的 ack 时,此时就会重新发送,直到发送成功或者超过最大重试次数。

当我们的leader 节点保存数据到磁盘后,在返回ack的时候,由于网络问题,导致连接超时或者ack 丢失,就会导致我们的数据重新发送,此时就会导致数据重复。

由于数据没有正常发送,此时数据就会重新回到缓冲区,sender 线程再重新拉取并发送到对应的 topic 中,而在重新发送成功之前,此时其他消息已经保存到了 topic  中,这时就导致数据乱序。 

5.5 幂等性操作

幂等性可以解决上述数据乱序和重复的问题,但是幂等性开启后有以下几点要求:

  • acks= -1
  • 需要开启重试机制
  • 在图请求缓冲区不能大于5 

如何解决的?

在broker维护了一个保存生产者生产数据的分区状态 ProducerState ,之里面会维护最近五条消息,在新发送消息后,会去验证消息是否相同,若重复则不会继续添加,若没有重复,则会判断顺序号是否是连续的,如果顺序号不是连续的,则会将数据重新返回发送缓冲区,再重新发送,这也是要求为什么要求开启重试机制。

值得注意的是: 幂等性只能保证一个分区的数据不重复和顺序连续,无法保证多个分区的连续。由于我们的幂等性由生产者id + 数据顺序号来决定的,当我们的 broker 重启,生产者 id就会改变,此时相同的数据由于不同的生产者id 仍然会造成上述问题,也就是说无法实现跨会话幂等。

5.6 事务操作及流程

 为了解决跨会话幂等性,可以通过事务来解决。

当我们开启事务后,可以保证broker 节点多次重启,保证生产者 id 不变,这就解决了我们上述的幂等性出现的问题。但是事务仍然只能保证单个分区的幂等性,即开启事务可以保证跨会话的幂等性,但无法保证跨分区的幂等性。引文在 bachMetadata 中保存的只有一个分区的最近五条消息,无法跨会话进行判断数据的重复和顺序。

流程:producer 首先会发送请求事务管理器的所在分区节点,Broker 根据事务 id 的 hash 值并对事务管理器状态分区个数(50)取余,返回对应的事务管理器所在分区。producer 初始化生产者 id,并将数据的分区信息发送给事务管理器。之后 prducer 开始生产数据,并将数据发送对应分区的 leader 节点,当数据保存后,对应的 broker 节点会将数据保存成功后的数据分区信息发送给事务协调器,并向生产者返回 acks 应答响应。producer 接收到应答响应后向事务协调器发送关闭事务,事务协调器接收到请求后,首先会将 __trancsaction_state 中的事务状态修改为 PrepareCommit(准备提交),然后再将事务当前的状态返回给 broker 节点,最后事务协调器会将 __transaction_state 中保存的事务状态改为 CompleteCommit。

六、代码片段

6.1 消费数据偏移量

    @RequestMapping("/send")
    public String send(String msg) throws ExecutionException, InterruptedException {
            CompletableFuture future=kafkaTemplate.send(TOPIC,msg);
            future.get();
            return "success";
    }
    //监听所订阅的主题
    @KafkaListener(topics = {Producer.TOPIC})
    public void onMessage(String data,Acknowledgment ack){
        System.out.println("receive: " + data);
        ack.acknowledge();
    }

kafka-consumer-groups.bat --bootstrap-server 127.0.0.1:9092 --group test --topic test --reset-offsets --to-earliest --execute

手动更改偏移量可使消费过的数据从头消费 

spring:
  kafka:
    consumer:
    producer:
      #如果之前用相同的消费者组消费过该主题,并且 offset已经记录在 kafka 中,那么从kafka中读取的offset就是该offset,kafka 只会在找不到偏移量时会使用这个配置,如果想要从头消费,可以使用心得消费者group-id,或者手动提交偏移量.
      auto-offset-reset: earliest
      # 是否自动提交偏移量
      enable-auto-commit: false
  • earliest:自动将偏移量重置为最早的偏移量
  • latest:自动将偏移量重置为最新的偏移量
  • none:如果没有为消费者找到以前的偏移量,则向消费者抛异常

6.2 send() 常用方法

    @RequestMapping("/send2")
    public String sendMessage(){
        Message<String> message= MessageBuilder.withPayload("hello kafka")
                .setHeader(KafkaHeaders.TOPIC,"test")
                .build();
        kafkaTemplate.send(TOPIC,"hello kafka");
        return "success";
    }

    @RequestMapping("/send3")
    public String sendRecord(){
        //Headers 中可以存放一些信息(信息是key-valur 键值对),当消费者接收到消息后,可以拿到这个 headers 里面存放的信息.
        Headers headers=new RecordHeaders();
        headers.add("phone","1234567".getBytes(StandardCharsets.UTF_8));
        headers.add("name","zhangsna".getBytes(StandardCharsets.UTF_8));
        ProducerRecord record=new ProducerRecord<>("test",0,System.currentTimeMillis(),"k1","hello kafka",headers);
        kafkaTemplate.send(record);
        return "success";
    }

    //消费者获取 header 信息
    @KafkaListener(topics = {Producer.TOPIC})
    public void onMessage(String data, Acknowledgment ack,
                          @Header(value = KafkaHeaders.RECEIVED_TOPIC, required = false) String topic,
                          @Header(value = "name", required = false)String name,
                          @Header(value = "phone", required = false)String phone
                          ){
        User user= JsonUtils.fromJson(data, User.class);
        System.out.println("receive: " + data.toString()+" topic: "+topic+" name: "+name+" phone: "+phone);
        ack.acknowledge();
    }


     //同步提交
    @RequestMapping("/send4")
    public String sendSync(){
        for(int i=0;i<10;i++) {
            System.out.println("发送消息: "+i);
            CompletableFuture future=kafkaTemplate.send("test",i+"");
            try {
                SendResult<String,String> result= (SendResult<String, String>) future.get();
                if(result.getRecordMetadata()!=null){
                    System.out.println("消息发送成功: "+result.getRecordMetadata().toString());
                }
                System.out.println(result.getRecordMetadata());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
            System.out.println("成功发送消息: "+i);
        }
        return "success";
    }

    //异步提交
    @RequestMapping("/send5")
    public String sendAsync(){
        for(int i=0;i<10;i++){
            System.out.println("发送消息: "+i);
            CompletableFuture<SendResult<String,String> >future=kafkaTemplate.send("test",i+"");
            future.thenAccept((sendResult)->{
                if(sendResult.getRecordMetadata()!=null){
                    System.out.println("消息发送成功: "+sendResult.getRecordMetadata().toString());
                }
                System.out.println(sendResult.getProducerRecord());
            }).exceptionally((t)->{
                t.printStackTrace();
                //做失败处理
                return null;
            });
        }

        return "success";
    }

6.3 发送引用类型信息

将应用类型转换成 json 对象

package org.aliang.kafkademo.utils;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class JsonUtils {
    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    public static String toJson(Object object) {
        try {
            return OBJECT_MAPPER.writeValueAsString(object);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    public static <T> T fromJson(String json, Class<T> clazz) {
        try {
            return OBJECT_MAPPER.readValue(json, clazz);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

}

生产者消费者代码 

     @Autowired
    private KafkaTemplate<String,User> kafkaTemplate2;
    @RequestMapping("/send6")
    public void sendObject(User user){
        String msg= JsonUtils.toJson(user);
        kafkaTemplate.send("test",msg);
    }

    @KafkaListener(topics = {Producer.TOPIC})
    public void onMessage(String data,Acknowledgment ack){
        User user= JsonUtils.fromJson(data, User.class);
        System.out.println("receive: " + data.toString());
        ack.acknowledge();
    }

6.4 继承 springboot 创建 topic

@Configuration
public class CreateTopic {
    @Bean
    public NewTopic newTopic() {
        return new NewTopic("test", 1, (short) 1);
    }

    @Bean
    public NewTopic updateTopic() {
        return new NewTopic("test", 2, (short) 1);
    }
}

更新 分区时只能增加分区数量,无法减少数量。

副本分区的数量不能大于 broker 节点个数。

6.5 发送消息配置分区策略

    @Value("${kafka.cluster.bootstrap-servers}")
    private  String bootstrapServers;
    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;
    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;
    public Map<String, Object> producerConfig(){
        Map<String,Object> props =new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
        return props;
    }

    public ProducerFactory<String,Object> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String,Object> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }

但是当我们运行后,发现并不是每个分区都有数据,不符合我们的轮询算法,这是因为在发送消息的过程中,会调用两次我们的 partition 方法,最终就导致不符合我们的预期。

6.6 配置自定义分区策略

public class CustomPartitioner implements Partitioner {
    private AtomicInteger nextPartition = new AtomicInteger(0);
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if(key==null){
            int next=nextPartition.getAndIncrement();
            if(next>=numPartitions){
                nextPartition.set(next,0);
            }
//            System.out.println("分区值"+next);
            return next;
        }else {
            //kafka 默认的分区策略
            return Utils.toPositive(Utils.murmur2(keyBytes))%numPartitions;
        }
    }

}

    public Map<String, Object> producerConfig(){
        Map<String,Object> props =new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);
        //配置自定义分区策略
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
        return props;
    }

6.7 自定义消息拦截器

public class CustomerInterceptor implements ProducerInterceptor{
    /**
     * 发送消息是会调用该方法,可以在拦截器中做一些处理,记录日志操作
     * @param producerRecord
     * @return
     */
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        System.out.println("拦截器拦截到消息:"+producerRecord.value());
        return producerRecord;
    }

    /**
     * @param recordMetadata 服务器返回的元数据信息
     * @param e
     */
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if(recordMetadata!=null){
            System.out.println("消息发送成功,偏移量: "+recordMetadata.offset());
        }else{
            System.out.println("消息发送失败");
        }
    }
    @Override
    public void close() {

    }
    @Override
    public void configure(Map<String, ?> map) {

    }
}

    public Map<String, Object> producerConfig(){
        Map<String,Object> props =new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);
//        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,CustomerInterceptor.class.getName());
        return props;
    }

6.8 消费者同时接受消息体和消息头

    @KafkaListener(topics = {Producer.TOPIC})
    public void onMessage(Acknowledgment ack,
                          ConsumerRecord<String,String> consumerRecord ){
        System.out.println("接收到consumerRecord消息: "+consumerRecord.toString());
        ack.acknowledge();
    }

 6.9 指定 topic-partition-offset 消费

    @KafkaListener(topicPartitions = @TopicPartition(
            //监听 test 主题
            topic = "test",
            //消费 0,1,2 分区的所有消息
            partitions = {"0","1","2"},
            partitionOffsets = {
                    //第三、四个分区的偏移量从 2 开始消费
                    @PartitionOffset(partition = "3",initialOffset = "2"),
                    @PartitionOffset(partition = "4",initialOffset = "2")
            }))
    public void onMessage(ConsumerRecord record, Acknowledgment ack){
        System.out.println("receive: " + record.value()+"partition: "+record.partition()+" offset: "+record.offset());
//        ack.acknowledge();
    }

若配置了监听的分区,但该主题下还有其他分区没配置,例如没有配置 5 分区,则不会消费 partition5 分区的消息。

6.10 批量消费信息

    //批量消费数据
    @KafkaListener(topics = "test")
    public void onMessage(List<ConsumerRecord<String,String>> recordList, Acknowledgment ack){
        System.out.println("receive: " + recordList.size());
        ack.acknowledge();
    }

#设置批量消费

spring.kafka.type=batch

#批量消费每次消费多少条消息

spring.kafka.consummer.max-poll-records

6.11 集成消费拦截器 

1. 实现kafka 的 ConsumerInterceptor 拦截器接口

@Configuration
public class CustomConsumerInterceptor implements ConsumerInterceptor<String,String> {
    //在监听器消费消息之前执行
    @Override
    public ConsumerRecords<String,String> onConsume(ConsumerRecords consumerRecords) {
        System.out.println("拦截器拦截到消息:"+consumerRecords);
        return consumerRecords;
    }
    //在消息提交偏移量之前执行
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> var) {
        System.out.println("拦截器执行 onCommit,提交offset");
    }
    @Override
    public void close() {

    }


    @Override
    public void configure(Map<String, ?> map) {

    }
}

2. 在 kafka 消费者的 ConsumerFactory 配置中注册拦截器 

@Configuration
public class KafkaConfig {
    @Value("${kafka.cluster.bootstrap-servers}")
    private  String bootstrapServers;
    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;
    public Map<String, Object> consumerConfig(){
        Map<String,Object> props =new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String,Object> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    @Bean
    public KafkaListenerContainerFactory<?> customListenerContainerFactory(ConsumerFactory<String,Object> consumerFactory){
        ConcurrentKafkaListenerContainerFactory<String,String> factory=new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

 3. 消费者代码

    @KafkaListener(topics = "test",containerFactory = "customListenerContainerFactory", groupId = "test")
    public void onMessage(String data, Acknowledgment ack){
        System.out.println("receive: " + data);
        ack.acknowledge();
    }

总结: 通过配置新的监听器工厂,并配置监听器工厂中的消费者厂,消费者中配置自定义拦截器

上述代码虽然实现了自定义消息拦截器,但在运行过程中,发现我们的消费者和监听器的配置都没有生效,这是怎么回事呢?

    public static void main(String[] args) {

//        SpringApplication.run(KafkaDemoApplication.class, args);
        ApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args);
        Map<String, ConsumerFactory> beansOfType = context.getBeansOfType(ConsumerFactory.class);
        beansOfType.forEach((k,v)->{
            System.out.println(k+"-->"+v);
        });
        System.out.println("--------------------------------------------------------------");
        Map<String , KafkaListenerContainerFactory> beansOfType1 = context.getBeansOfType(KafkaListenerContainerFactory.class);
        beansOfType1.forEach((k,v)->{
            System.out.println(k+"-->"+v);
        });
    }

我们进行调试,从打印信息中可以看到,kafka 监听器有两个bean工厂,有一个是我们自定义的的,一个是默认的,默认的 listener工厂中的消费者配置都是我们的配置文件中的,而我们自定义的 listener 工厂中的消费者配置是我们在创建监听器时,传入的 consumerFactory 中的 ConsumerConfig 中的配置信息,由于我们在监听器制定了自定义 listener 工厂,因此我们配置文件中的配置才会失效,如果想要配置生效,就需要把想要配置的选项重新在 ConsumerConfig 中配置。

6.12 消息的转发

    @KafkaListener(topics = "testA", groupId = "test")
//    要转发的 topic
    @SendTo(value = "testB")
    public String onMessageA(String data, Acknowledgment ack){
        System.out.println("TestA receive消息: " + data);
        ack.acknowledge();
        return data;
    }
    @KafkaListener(topics = "testB", groupId = "test2")
    public void onMessageB(String data, Acknowledgment ack){
        System.out.println("TestB receive转发后的消息: " + data);
        ack.acknowledge();
    }

注: 在使用@SendTo 注解后,同时配置了新的分区策略和拦截器后,不知道为何原因,因为重新注入了新的 KafkaTemplate,在项目启动后,会找不到对应的 bean,而不使用@SendTo 注解却可以正常加载。就算定义加载顺序(注入的bean的名字也为更改),仍然找不到对应的 bean。原因位置(埋点)

6.12 配置消费者分区策略

  • RangeAssignor(默认策略):按范围进行分配,如果由8个分区,3个消费者,C1 消费0、1、2;C2消费 3、4、5;C3 消费 6、7
  • RoundRobinAssignor:轮询,如果由8个分区,3个消费者,C1 消费0、3、6;C2消费 1、4、7;C3 消费 2、5
  • StickAssignor:尽量保持现有分区不变,当有新的消费者加入或离开后,只更改少量消费者所消费分区,大部分保持不变,仍然保持现有消费分区
  • CooperativeStickAssignor:优化后的粘性分区策略

代码:

配置消费策略

@Configuration
public class KafkaConfig {
    @Value("${kafka.cluster.bootstrap-servers}")
    private  String bootstrapServers;
    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;
    public Map<String, Object> consumerConfig(){
        Map<String,Object> props =new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
//        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        return props;
    }

    @Bean
    public ConsumerFactory<String,Object> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    @Bean
    public KafkaListenerContainerFactory<?> customListenerContainerFactory(ConsumerFactory<String,Object> consumerFactory){
        ConcurrentKafkaListenerContainerFactory<String,String> factory=new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

配置消费者

    //批量消费数据
    @KafkaListener(topics = "test",groupId = "testC",containerFactory = "customListenerContainerFactory",concurrency = "3")
    public void onMessage(ConsumerRecord record, Acknowledgment ack){
        System.out.println("receive: " + record.partition() +"分区"+record.value()+"消费者: "+Thread.currentThread().getId());
        ack.acknowledge();
    }

七、_consmuer_offsets

在每次消费一个消息并且提交后,会保存当前消费的最近的一个 offset;

在 zookeeper 中,有一个 _consumer_offsets主题,消费者提交的 offset 信息会写入到该topic 中,_consumer_offsets 保存了每个 consumer group 某一时刻提交的 offset 信息,_consumer_offsets 默认有 50 个分区,集群模式 zk 会给每个 broker 节点分配分区

consumer_group 保存到哪个分区中的计算公式:

Math.abs("groupid".hashCode())%groupMetadataTopicPartitionCount

八、ISR、HW、LEO

8.1 ISR:

在同步中的副本(In-Sync-Replicas),包含了 Leader 副本和所有与 Leader 副本保持同步的 Follower 副本

写请求首先由 Leader 副本处理,之后 Follwer 副本会从 Leader 上拉取写入的消息,这个过程会有一定延迟,导致 Follower 副本中保存的消息略少于 Leader 副本,但是只要没有超出阈值都可以容忍,但是一旦一个 Follower 副本出现异常,那这是 Leader 就会把他踢出去,Kafka 通过 ISR 集合来维护一个 “ 可用且消息量与 Leader 差不多的副本集合,他是整个副本的一个子集

在 kafka 中,一个副本要成为 ISR 副本,要满足以下条件:

  1.  Leader 副本本身就是一个 ISR 副本
  2.  Follower 副本的最后一条消息的 offset 与 leader 副本的最后一条消息的 offset 之间的差值不能超过指定的阈值,超过阈值则该 Follower 副本将会从 ISR 列表中删除
     
  • replica.lag.time.max.ms:默认是 30 秒;如果该 follower 在此时间间隔内一直没有追上过 Leader 副本就会被 ISR 集合剔除
  • replica.lag.max.messages:落后了多少条消息时,该 Follower 副本就会被剔除 ISR 列表,该配置参数现在在新版本已经过时了。

8.2 LEO

日志末端偏移量(Log End Offset),该消息日志中下一条消息的偏移量

8.3 HW

HW(High Watermark),即高水位,它表示一个偏移量 offset 信息,表示消息的复制进度,也就是消息已经成功复制到哪个位置了。即在 HW 之前的所有消息都已经成功写入副本中,并且可以在所有的副本中找到,因此,消费者可以安全的消费这些消息。而对于消费者而言,它只能拉取 HW之前的消息,对于这之后未同步完成的消息,是不可见的。


http://www.kler.cn/a/536062.html

相关文章:

  • 《手札·开源篇》数字化转型助力永磁电机企业降本增效:快速设计软件如何让研发效率提升40%?
  • Fedora 的 2025 年展望:AI 集成与 HDR 支持打造强大 Linux 桌面体验
  • 哈希(Hashing)在 C++ STL 中的应用
  • Apache Kafka:高吞吐分布式流平台的深度解析
  • CSS(三)less一篇搞定
  • 设计模式——策略模式
  • (2025,推理语言模型 / RLM,deepseek-v3,推理结构,推理策略,强化学习概念,监督学习方法,计算优化技术)
  • OBS::Display
  • GlusterFS源码讲解:如何实现最终一致性
  • 【实用技能】如何将 Web 视图添加到 Compose Multiplatform 应用程序
  • Java项目: 基于SpringBoot+mybatis+maven+mysql实现的智能学习平台管理系(含源码+数据库+毕业论文)
  • Web3 跨链技术:构建互联互通的虚拟世界
  • C++Primer 赋值运算符
  • MyBatis框架详解
  • 通过vLLM部署LLM模型到生产环境中
  • 2502全球无线产品认证新闻资讯|英利检测
  • 计算机组成原理——指令系统(五)
  • 十一、CentOS Stream 9 安装 Docker
  • 【图像处理】-不同的图像存储格式
  • 蓝桥杯生命之树(DP)
  • 学习笔记:机器学习中的数学原理(一)
  • 【数据安全】现代智能手机的数据加密机制
  • Linux ftrace 内核跟踪入门
  • 可计算性与计算复杂性:理论计算机科学的核心领域
  • osclass增加支持webp格式
  • 【CPP】C++后端开发面试:深入理解编程中的锁机制