当前位置: 首页 > article >正文

【kafka实战】05 Kafka消费者消费消息过程源码剖析

1. 概述

Kafka消费者(Consumer)是Kafka系统中负责从Kafka集群中拉取消息的客户端组件。消费者消费消息的过程涉及多个步骤,包括消费者组的协调、分区分配、消息拉取、消息处理等。本文将深入剖析Kafka消费者消费消息的源码,并结合相关原理图进行讲解。

以下是一个使用 Java 编写的 KafkaConsumer 的示例。在这个示例中,我们将创建一个简单的 Kafka 消费者,连接到 Kafka 集群,订阅一个主题,并消费该主题中的消息。

1.1 消费者代码使用示例

  • 已经安装并启动了 Kafka 集群。
  • 你已经添加了 Kafka 客户端依赖到你的项目中。如果你使用 Maven,可以在 pom.xml 中添加以下依赖:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

示例代码

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "test-group";

    public static void main(String[] args) {
        // 配置消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        // 自动提交偏移量
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // 键和值的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 创建 Kafka 消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        try {
            // 订阅主题
            consumer.subscribe(Collections.singletonList(TOPIC_NAME));

            // 持续消费消息
            while (true) {
                // 从 Kafka 拉取消息
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭消费者
            consumer.close();
        }
    }
}

代码解释

  1. 配置消费者属性

    • BOOTSTRAP_SERVERS_CONFIG:指定 Kafka 集群的地址。
    • GROUP_ID_CONFIG:指定消费者所属的消费组。
    • ENABLE_AUTO_COMMIT_CONFIG:设置是否自动提交偏移量。
    • AUTO_COMMIT_INTERVAL_MS_CONFIG:设置自动提交偏移量的时间间隔。
    • KEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG:指定键和值的反序列化器。
  2. 创建 Kafka 消费者实例:使用配置好的属性创建 KafkaConsumer 实例。

  3. 订阅主题:使用 subscribe 方法订阅指定的主题。

  4. 持续消费消息:使用 poll 方法从 Kafka 拉取消息,并遍历消费记录,打印消息的偏移量、键和值。

  5. 关闭消费者:在消费完成后,使用 close 方法关闭消费者。

注意事项

  • 确保 Kafka 集群的地址和主题名称正确。
  • 如果需要手动提交偏移量,可以将 ENABLE_AUTO_COMMIT_CONFIG 设置为 false,并使用 commitSync()commitAsync() 方法手动提交偏移量。

2. Kafka消费者消费消息的核心流程

Kafka消费者消费消息的核心流程可以分为以下几个步骤:

  1. 消费者组协调:消费者加入消费者组,并与组协调器(GroupCoordinator)进行协调。
  2. 分区分配:组协调器为消费者分配分区。
  3. 消息拉取:消费者从分配的分区中拉取消息。
  4. 消息处理:消费者处理拉取到的消息。
  5. 提交偏移量:消费者提交已处理消息的偏移量。

下面我们将结合源码详细分析每个步骤。
在这里插入图片描述

3. 源码剖析


关键组件说明

  1. ConsumerCoordinator

    • 负责消费者组的协调和分区分配。
    • 管理消费者的心跳和重平衡。
  2. Fetcher

    • 负责从 Kafka Broker 拉取消息。
    • 管理拉取请求和响应的处理。
  3. SubscriptionState

    • 管理消费者订阅的主题和分区。
    • 记录消费者的消费偏移量。
  4. PartitionAssignor

    • 负责分区分配策略的实现。
  5. OffsetCommitCallback

    • 处理偏移量提交的回调逻辑。

3.1 消费者组协调

消费者在启动时,首先需要加入消费者组,并与组协调器进行协调。组协调器负责管理消费者组的成员和分区分配。

// org.apache.kafka.clients.consumer.KafkaConsumer#subscribe
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
    // 1. 订阅主题
    this.subscriptions.subscribe(new HashSet<>(topics), listener);
    
    // 2. 加入消费者组
    coordinator.subscribe(subscriptions);
}

subscribe方法中,消费者首先订阅指定的主题,然后通过coordinator.subscribe方法加入消费者组。组协调器会为消费者分配一个唯一的memberId,并将其加入到消费者组中。

3.2 分区分配

组协调器在消费者加入消费者组后,会为消费者分配分区。分区分配策略由PartitionAssignor决定,Kafka提供了多种内置的分区分配策略,如RangeAssignorRoundRobinAssignor等。

// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensurePartitionAssignment
private void ensurePartitionAssignment() {
    // 1. 获取分区分配结果
    Map<String, List<TopicPartition>> assignments = partitionAssignor.assign(metadata.fetch(), subscriptions.subscription());
    
    // 2. 更新消费者的分区分配
    subscriptions.assignFromSubscribed(assignments.get(consumerId));
}

ensurePartitionAssignment方法中,组协调器通过partitionAssignor.assign方法为消费者分配分区,并将分配结果更新到消费者的订阅信息中。

3.3 消息拉取

消费者在分配到分区后,会从分配的分区中拉取消息。Kafka消费者采用拉取模式(Pull Model),即消费者主动从Kafka集群中拉取消息。

// org.apache.kafka.clients.consumer.KafkaConsumer#poll
public ConsumerRecords<K, V> poll(Duration timeout) {
    // 1. 拉取消息
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);
    
    // 2. 返回拉取到的消息
    return new ConsumerRecords<>(records);
}

poll方法中,消费者通过fetcher.fetchRecords方法从Kafka集群中拉取消息。fetcher是Kafka消费者中的一个重要组件,负责管理消息的拉取和偏移量的提交。

3.4 消息处理

消费者在拉取到消息后,会对消息进行处理。消息处理的具体逻辑由用户自定义,通常包括消息的反序列化、业务逻辑处理等。

// org.apache.kafka.clients.consumer.KafkaConsumer#poll
public ConsumerRecords<K, V> poll(Duration timeout) {
    // 1. 拉取消息
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);
    
    // 2. 处理消息
    for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
        for (ConsumerRecord<K, V> record : entry.getValue()) {
            // 用户自定义的消息处理逻辑
            processRecord(record);
        }
    }
    
    // 3. 返回拉取到的消息
    return new ConsumerRecords<>(records);
}

poll方法中,消费者通过processRecord方法处理每条消息。processRecord方法的具体实现由用户自定义。

3.5 提交偏移量

消费者在处理完消息后,需要提交已处理消息的偏移量。偏移量的提交可以手动或自动进行,Kafka提供了多种偏移量提交策略,如自动提交、同步提交、异步提交等。

// org.apache.kafka.clients.consumer.KafkaConsumer#commitSync
public void commitSync() {
    // 1. 提交偏移量
    coordinator.commitOffsetsSync(subscriptions.allConsumed());
}

commitSync方法中,消费者通过coordinator.commitOffsetsSync方法同步提交偏移量。同步提交会阻塞当前线程,直到偏移量提交成功。

4. 原理图

以下是Kafka消费者消费消息的核心流程示意图:

+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|  消费者组协调      | ----> |     分区分配       | ----> |     消息拉取       |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+
                                                                      |
                                                                      v
+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|     消息处理       | <---- |     提交偏移量     | <---- |     网络传输       |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+

5. 总结

Kafka消费者消费消息的过程涉及多个步骤,包括消费者组的协调、分区分配、消息拉取、消息处理和偏移量提交。通过源码剖析,我们可以更深入地理解Kafka消费者的工作原理。希望本文能够帮助你更好地理解Kafka消费者的内部机制。

6. 参考

  • Kafka官方文档
  • Kafka源码

http://www.kler.cn/a/539444.html

相关文章:

  • AMD 8845HS 780M核显部署本地deepseek大模型的性能
  • Gitlab中如何进行仓库迁移
  • 如何通过PHP接入DeepSeek的API
  • mysql8.0使用MHA实现高可用
  • Git的使用
  • 练习题(2025.2.9)
  • Kotlin 2.1.0 入门教程(十一)for、while、return、break、continue
  • 捕获一只比特币挖矿木马
  • vllm 部署 qwen2.5 报错2.5 报错404 已解决
  • java基础语法中阶
  • Docker Compose 容器卷映射:是否需要提前将文件拷贝到宿主机?
  • 【论文阅读笔记】HiDDeN:Hiding Data With Deep Networks
  • vue3中使用print-js组件实现打印操作
  • 蓝桥杯51单片机练习(国信长天比赛用)
  • c/c++蓝桥杯经典编程题100道(18)括号匹配
  • Win10+Ollama+AnythingLLM+DeepSeek构建本地多人访问知识库
  • 大数据示例:改变业务的 6 种方式
  • 【虚幻引擎UE】AOI算法介绍与实现案例
  • 【C++八股】std::atomic —— 原子操作
  • ASP.NET Core 如何使用 C# 向端点发出 POST 请求
  • openAI官方prompt技巧(二)
  • 基于springboot+vue的文物管理系统的设计与实现
  • android手机安装deepseek-r1:1.5b
  • DeepSeek开源多模态大模型Janus-Pro部署
  • 在 Linux 系统下,解压 `.tar.gz`
  • 14vue3实战-----获取用户信息和用户的菜单树信息