当前位置: 首页 > article >正文

【某东二面】聊聊 Kafka的分区容错设计思想

👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO 专家博主

⛪️ 个人社区:个人社区
💞 个人主页:个人主页
🙉 专栏地址: ✅ Java 中级
🙉八股文专题:剑指大厂,手撕 Java 八股文

在这里插入图片描述

文章目录

      • 1. Kafka分区的基本概念
      • 2.分区的高可用性设计
      • 3. 副本同步与 ISR
      • 4. 数据一致性策略
      • 5. 分区再均衡
      • 6. 故障恢复机制
      • 7. 用 java 模拟实现 kafka 的分区设计

1. Kafka分区的基本概念

Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道和流应用。Kafka 的核心概念之一是分区(Partition),它在 Kafka 中扮演着至关重要的角色。以下是关于 Kafka 分区的基本概念:

Topic 和 Partition

  • Topic:在 Kafka 中,消息被发布到 Topic 中。Topic 可以被视为消息的类别或主题。
  • Partition:每个 Topic 可以分为多个 Partition。Partition 是 Topic 的物理分割,每个 Partition 都是一个有序的、不可变的消息序列。每个 Partition 在一个 Broker 上有副本(Replica),以实现容错和高可用性。

Partition 的特性

  • 顺序性:在一个 Partition 内,消息是按顺序存储的。这意味着如果一个消费者从一个 Partition 中消费消息,它可以保证这些消息是按照它们被写入的顺序来接收的。
  • 不可变性:一旦消息被写入 Partition,就不能被修改或删除(除非根据配置策略进行日志清理)。
  • 独立性:不同的 Partition 之间是独立的,可以分布在不同的 Broker 上。这使得 Kafka 能够水平扩展,增加更多的 Broker 来处理更多的 Partition。

Partition 的作用

  • 负载均衡:通过将 Topic 分割成多个 Partition,并将这些 Partition 分布在不同的 Broker 上,Kafka 能够实现负载均衡,提高系统的吞吐量。
  • 容错性:每个 Partition 可以有多个副本(Replicas)。其中一个副本是 Leader,负责处理所有的读写请求;其他副本是 Follower,只负责同步 Leader 的数据。如果 Leader 宕机,Kafka 会自动从 Follower 中选举一个新的 Leader。
  • 并行处理:多个消费者可以同时从同一个 Topic 的不同 Partition 中消费消息,从而实现并行处理。每个消费者组中的消费者可以独立地消费不同的 Partition,提高处理速度。

Partition 的管理

  • 创建 Topic 时指定 Partition 数量:可以在创建 Topic 时指定 Partition 的数量。例如,使用 kafka-topics.sh 命令:
    kafka-topics.sh --create --topic my-topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
    
  • 动态调整 Partition 数量:Kafka 允许在运行时增加 Partition 的数量,但不能减少。增加 Partition 数量通常需要重新分配 Partition,这可能会导致短暂的服务中断。
  • Leader 选举:当 Leader 宕机时,Kafka 会从 ISR(In-Sync Replicas)列表中选择一个新的 Leader。ISR 列表包含与 Leader 保持同步的所有副本。

Partition 的配置

  • Replication Factor:每个 Partition 可以有多个副本,以提高容错性。复制因子(Replication Factor)指定了每个 Partition 应该有多少个副本。
  • ISR (In-Sync Replicas):ISR 列表包含了所有与 Leader 保持同步的副本。只有 ISR 列表中的副本才有资格被选为新的 Leader。
  • HW (High Watermark):HW 是 Partition 中最小的 LEO(Log End Offset),即 ISR 中所有副本都已同步的数据位置。消费者只能读取 HW 之前的消息。

Partition 的使用场景

  • 日志聚合:Kafka 可以用来收集和处理大规模的日志数据。
  • 消息队列:Kafka 可以用作可靠的消息队列,支持发布/订阅模式。
  • 事件驱动架构:Kafka 可以作为事件驱动架构的核心组件,用于解耦系统和服务之间的通信。
  • 流处理:Kafka 可以与流处理框架(如 Apache Flink、Apache Spark Streaming)集成,实现实时数据处理。

Kafka 的 Partition 机制是其高性能、可扩展性和容错性的关键。通过将 Topic 分割成多个 Partition,并将这些 Partition 分布在不同的 Broker 上,Kafka 能够实现负载均衡和高吞吐量。此外,通过多副本机制,Kafka 提供了高可用性和容错性,确保即使部分 Broker 故障,系统仍能继续运行。理解 Partition 的基本概念对于设计和优化 Kafka 应用至关重要。

2.分区的高可用性设计

Kafka 的分区(Partition)高可用性设计是确保数据可靠性和系统稳定性的关键。通过多副本机制和 Leader 选举,Kafka 能够在单个 Broker 故障时继续提供服务,并且保证数据不丢失。以下是 Kafka 分区高可用性的主要设计要点:

多副本机制

  • Replication Factor:每个 Partition 可以有多个副本(Replicas),通常配置为 2 或 3 个副本。其中一个副本是 Leader,负责处理所有的读写请求;其他副本是 Follower,只负责同步 Leader 的数据。
  • ISR (In-Sync Replicas):ISR 列表包含了所有与 Leader 保持同步的副本。只有 ISR 列表中的副本才有资格被选为新的 Leader。如果某个 Follower 无法跟上 Leader 的速度,它会被从 ISR 列表中移除。

Leader 选举

  • 自动选举:当 Leader 宕机或不可用时,Kafka 会自动从 ISR 列表中选择一个新的 Leader。这个过程由 Controller 负责管理。
  • 最小 LEO 选择:通常情况下,会选择 ISR 列表中 Log End Offset (LEO) 最小的副本作为新的 Leader,以确保数据的一致性。

数据复制

  • 同步复制:Follower 会定期从 Leader 拉取数据并进行同步。一旦 Follower 同步了 Leader 的数据,它就会向 Leader 发送确认消息。
  • 异步复制:为了提高性能,Follower 的同步可以是异步的。但是,这可能会导致短暂的数据不一致。

容错性

  • 故障检测:Controller 通过心跳机制或其他方式检测到某个 Broker 宕机或变得不可用。
  • 重新分配:一旦检测到 Leader 故障,Controller 会从 ISR 列表中选择一个新的 Leader,并更新集群的元数据。

日志清理和保留策略

  • 日志清理:Kafka 支持基于时间或大小的日志清理策略。过期的日志会被删除,但至少会保留一个最小的时间段,以确保数据的持久性。
  • 日志压缩:对于某些需要长期保存的关键数据,Kafka 支持日志压缩功能,只保留每条消息的最新版本。

网络隔离和脑裂

  • 网络分区:在网络分区的情况下,可能会出现多个子网中的 Broker 都认为自己是 Leader 的情况。Kafka 通过 Zookeeper 来协调这种情况,确保只有一个有效的 Leader。
  • 多数原则:在选举新的 Leader 时,Kafka 会遵循“多数原则”,即新的 Leader 必须获得大多数 ISR 副本的支持。

配置参数

  • min.insync.replicas:设置 ISR 列表中必须存在的最小副本数。如果 ISR 中的副本数少于这个值,Broker 将拒绝生产者的写入请求。
  • replica.lag.time.max.ms:设置 Follower 与 Leader 之间允许的最大滞后时间。超过这个时间,Follower 会被从 ISR 列表中移除。
  • unclean.leader.election.enable:控制是否允许从不在 ISR 列表中的副本中选举新的 Leader。默认情况下,这是禁用的,以避免数据丢失。

监控和告警

  • 监控工具:使用监控工具(如 Prometheus、Grafana 等)来监控 Kafka 集群的状态,包括 Broker 的健康状况、ISR 列表的变化、Leader 选举事件等。
  • 告警机制:设置告警规则,当检测到异常情况(如 ISR 列表为空、Leader 选举频繁等)时,及时通知运维人员。

Kafka 的分区高可用性设计通过多副本机制、Leader 选举、数据复制和日志管理等措施,确保了系统的可靠性和稳定性。合理的配置和监控可以帮助你更好地管理和维护 Kafka 集群,确保在故障发生时能够快速恢复服务。理解这些设计要点对于构建高可用的 Kafka 应用至关重要。

3. 副本同步与 ISR

在 Apache Kafka 中,副本同步和 ISR(In-Sync Replicas)是确保数据一致性和高可用性的关键机制。下面是关于副本同步与 ISR 的详细解释:

副本同步

Kafka 通过多副本机制来提高数据的可靠性和容错性。每个 Partition 可以有多个副本,其中一个副本是 Leader,负责处理所有的读写请求;其他副本是 Follower,只负责同步 Leader 的数据。

同步过程

  1. Follower 从 Leader 拉取数据

    • Follower 会定期向 Leader 发送 Fetch 请求,请求最新的消息。
    • Leader 收到 Fetch 请求后,将最新的消息发送给 Follower。
  2. Follower 写入本地日志

    • Follower 将从 Leader 获取的消息写入自己的本地日志文件。
    • Follower 会记录每条消息的偏移量(Offset),并更新其 Log End Offset (LEO)。
  3. Follower 发送确认

    • Follower 在成功写入消息后,会向 Leader 发送一个确认消息(Ack),表示已经同步了这些消息。
    • Leader 会记录哪些 Follower 已经同步了特定的消息。
  4. Leader 更新 HW (High Watermark)

    • Leader 会根据所有 Follower 的确认情况,更新 High Watermark (HW)。
    • HW 是所有 ISR 中最小的 LEO,表示所有 ISR 中都已同步的数据位置。
  5. 生产者确认

    • 生产者可以选择等待所有 ISR 中的副本确认(acks=all)或仅等待 Leader 确认(acks=1)。
    • 如果生产者设置 acks=all,则必须等到所有 ISR 中的副本都确认后才会收到成功响应。

ISR (In-Sync Replicas)

ISR 是一组与 Leader 保持同步的副本集合。只有 ISR 中的副本才有资格被选为新的 Leader。如果某个 Follower 无法跟上 Leader 的速度,它会被从 ISR 列表中移除。

ISR 的管理

  1. 初始 ISR

    • 当 Topic 创建时,所有副本都被认为是 ISR 的一部分。
  2. Follower 落后

    • 如果 Follower 无法及时同步 Leader 的数据,超过了配置的滞后时间(replica.lag.time.max.ms),它会被从 ISR 中移除。
    • 这个配置参数决定了 Follower 与 Leader 之间允许的最大滞后时间。
  3. Follower 重新加入 ISR

    • 如果 Follower 能够赶上 Leader 的进度,并且满足一定的条件(如滞后时间小于配置值),它可以重新加入 ISR。
  4. Leader 选举

    • 当 Leader 宕机或不可用时,Controller 会从 ISR 列表中选择一个新的 Leader。
    • 通常会选择 ISR 列表中 LEO 最小的副本作为新的 Leader,以确保数据的一致性。

配置参数

  • min.insync.replicas:设置 ISR 列表中必须存在的最小副本数。如果 ISR 中的副本数少于这个值,Broker 将拒绝生产者的写入请求。

  • replica.lag.time.max.ms:设置 Follower 与 Leader 之间允许的最大滞后时间。超过这个时间,Follower 会被从 ISR 列表中移除。

  • unclean.leader.election.enable:控制是否允许从不在 ISR 列表中的副本中选举新的 Leader。默认情况下,这是禁用的,以避免数据丢失。

  • 副本同步:Follower 通过定期从 Leader 拉取数据并写入本地日志来同步数据。Leader 会根据 Follower 的确认情况更新 High Watermark (HW)。

  • ISR:ISR 是一组与 Leader 保持同步的副本集合。只有 ISR 中的副本才有资格被选为新的 Leader。Follower 会根据其同步状态动态地加入或离开 ISR。

  • 配置参数:通过配置 min.insync.replicasreplica.lag.time.max.ms 来控制 ISR 的行为,确保数据的一致性和可靠性。

理解副本同步和 ISR 的工作机制对于设计高可用的 Kafka 应用至关重要。通过合理的配置和监控,可以确保 Kafka 集群在故障发生时能够快速恢复服务,同时保证数据不丢失。

4. 数据一致性策略

在 Apache Kafka 中,数据一致性是通过多种机制来保证的,包括多副本机制、ISR(In-Sync Replicas)、Leader 选举以及生产者的确认策略。以下是一些关键的数据一致性策略:

多副本机制

  • Replication Factor:每个 Partition 可以有多个副本(通常配置为 2 或 3 个)。其中一个副本是 Leader,负责处理所有的读写请求;其他副本是 Follower,只负责同步 Leader 的数据。
  • 高可用性:通过多副本机制,即使某个 Broker 宕机,Kafka 也能从其他副本中选举一个新的 Leader 来继续提供服务。

ISR (In-Sync Replicas)

  • 定义:ISR 是一组与 Leader 保持同步的副本集合。只有 ISR 中的副本才有资格被选为新的 Leader。
  • 同步条件:Follower 必须能够及时同步 Leader 的数据,否则会被从 ISR 中移除。同步的条件由 replica.lag.time.max.ms 参数控制。
  • Leader 选举:当 Leader 宕机时,Controller 会从 ISR 列表中选择一个新的 Leader,确保数据的一致性。

生产者的确认策略

  • acks 参数
    • acks=0:生产者发送消息后不等待任何确认,这种方式最快但最不可靠。
    • acks=1:生产者发送消息后等待 Leader 的确认,只要 Leader 写入成功就认为消息已提交。这种方式平衡了性能和可靠性。
    • acks=all(或 -1):生产者发送消息后等待所有 ISR 中的副本确认。这是最可靠的方式,但性能较低。

最小同步副本数

  • min.insync.replicas:这个参数设置了 ISR 列表中必须存在的最小副本数。如果 ISR 中的副本数少于这个值,Broker 将拒绝生产者的写入请求,以避免数据丢失。

不干净的 Leader 选举

  • unclean.leader.election.enable:默认情况下,这个参数是禁用的。如果启用了这个参数,Kafka 允许从不在 ISR 列表中的副本中选举新的 Leader。这可能会导致数据丢失,但在某些情况下可以提高系统的可用性。

High Watermark (HW)

  • 定义:HW 是 ISR 中所有副本都已同步的数据位置。消费者只能读取 HW 之前的消息。
  • 更新:Leader 会根据所有 Follower 的确认情况更新 HW。只有当所有 ISR 中的副本都确认了某条消息,该消息才会被标记为已提交。

日志清理和保留策略

  • 日志清理:Kafka 支持基于时间或大小的日志清理策略。过期的日志会被删除,但至少会保留一个最小的时间段,以确保数据的持久性。
  • 日志压缩:对于某些需要长期保存的关键数据,Kafka 支持日志压缩功能,只保留每条消息的最新版本。

事务支持

  • 幂等性生产者:启用幂等性生产者(enable.idempotence=true)可以防止重复消息,确保每条消息只会被写入一次。
  • 事务:Kafka 支持事务,允许生产者将多个消息作为一个原子操作进行提交。这样可以确保要么所有消息都被提交,要么都不提交。

Kafka 通过多副本机制、ISR、Leader 选举、生产者的确认策略等多种机制来保证数据的一致性和可靠性。合理配置这些参数和策略,可以确保在故障发生时数据不会丢失,并且系统能够快速恢复服务。以下是几个关键点:

  • 多副本:确保数据的高可用性。
  • ISR:确保只有同步的副本才能成为新的 Leader。
  • 确认策略:控制生产者的写入行为,平衡性能和可靠性。
  • 最小同步副本数:确保 ISR 中有足够的副本。
  • 不干净的 Leader 选举:权衡数据一致性和系统可用性。
  • High Watermark:确保消费者只能读取已提交的数据。
  • 日志清理和保留策略:管理数据的生命周期。
  • 事务支持:确保消息的幂等性和原子性。

通过这些机制,Kafka 能够在大规模分布式环境中提供可靠的数据传输和存储。理解并正确配置这些参数对于构建高一致性的 Kafka 应用至关重要。

5. 分区再均衡

在 Apache Kafka 中,分区再均衡(Rebalance)是指消费者组(Consumer Group)中的消费者重新分配 Topic 的 Partition 的过程。这个过程通常发生在以下几种情况下:

  1. 新消费者加入:当一个新的消费者加入到现有的消费者组时。
  2. 消费者离开:当一个消费者离开消费者组时,例如消费者宕机或主动退出。
  3. 订阅变更:当消费者组订阅的 Topic 或者 Topic 的 Partition 数量发生变化时。

分区再均衡的过程

  1. 触发条件
  • 新消费者加入:新的消费者向 Broker 发送心跳,Broker 检测到新的消费者并触发再均衡。
  • 消费者离开:Broker 在一段时间内没有收到某个消费者的心跳,认为该消费者已经离开,并触发再均衡。
  • 订阅变更:消费者组订阅的 Topic 或 Partition 数量发生变化时,Broker 会触发再均衡。
  1. 协调器(Coordinator)的作用
  • 检测变化:Kafka 集群中的每个 Broker 都可以作为消费者组的协调器。协调器负责检测消费者组的变化,并触发再均衡。
  • 发送 Rebalance 请求:当检测到上述变化之一时,协调器会向所有消费者发送 Rebalance 请求。
  1. 消费者的行为
  • 停止消费:收到 Rebalance 请求后,消费者会停止从当前分配的 Partition 中消费消息。
  • 提交偏移量:消费者会提交当前已处理的消息偏移量,以确保在再均衡后可以从正确的位置继续消费。
  • 等待再均衡完成:消费者进入 REBALANCING 状态,等待再均衡完成。
  1. 再均衡策略
  • Range 策略(默认策略):
    • 将 Partition 划分为连续的范围,并将这些范围分配给消费者。
    • 适用于消费者数量少于 Partition 数量的情况。
  • RoundRobin 策略
    • 将 Partition 均匀地分配给消费者,不考虑 Partition 的顺序。
    • 适用于消费者数量多于或等于 Partition 数量的情况。
  • 自定义策略
    • 用户可以实现自定义的再均衡策略,以满足特定的需求。
  1. 再均衡完成
  • 分配 Partition:协调器根据再均衡策略重新分配 Partition 给消费者。
  • 恢复消费:消费者接收到新的 Partition 分配后,从新的位置开始消费消息。

再均衡的影响

  • 短暂的停顿:在再均衡期间,消费者会暂停消费消息,这会导致短暂的服务中断。
  • 重复消费:如果消费者在提交偏移量之前发生再均衡,可能会导致消息被重复消费。
  • 数据丢失:如果消费者在再均衡前没有提交偏移量,可能会导致部分消息未被处理而丢失。

优化再均衡

  • 减少再均衡频率:通过增加消费者的 Session Timeout 和 Heartbeat Interval,可以减少不必要的再均衡。
  • 使用幂等性生产者和事务:确保消息不会被重复处理,提高数据一致性。
  • 选择合适的再均衡策略:根据具体场景选择 Range 或 RoundRobin 策略,或者实现自定义策略。
  • 预热时间:在消费者开始消费之前,可以设置一个预热时间,让消费者有时间进行必要的初始化操作。

分区再均衡是 Kafka 消费者组中非常重要的机制,它确保了消费者能够均匀地分配 Topic 的 Partition,从而实现负载均衡。理解再均衡的过程和影响,以及如何优化再均衡,对于构建高可用和高性能的 Kafka 应用至关重要。通过合理的配置和策略,可以减少再均衡带来的负面影响,提高系统的稳定性和可靠性。

6. 故障恢复机制

Apache Kafka 的故障恢复机制是确保系统高可用性和数据一致性的关键。Kafka 通过多种机制来处理 Broker 和消费者组的故障,确保在发生故障时能够快速恢复服务。以下是 Kafka 故障恢复的主要机制:

  1. Broker 故障恢复

a. 多副本机制

  • Replication Factor:每个 Partition 可以有多个副本(通常配置为 2 或 3 个)。其中一个副本是 Leader,负责处理所有的读写请求;其他副本是 Follower,只负责同步 Leader 的数据。
  • ISR (In-Sync Replicas):ISR 是一组与 Leader 保持同步的副本集合。只有 ISR 中的副本才有资格被选为新的 Leader。

b. Leader 选举

  • 自动选举:当 Leader 宕机或不可用时,Controller 会从 ISR 列表中选择一个新的 Leader。这个过程是自动的,无需人工干预。
  • 最小 LEO 选择:通常情况下,会选择 ISR 列表中 Log End Offset (LEO) 最小的副本作为新的 Leader,以确保数据的一致性。

c. Zookeeper 协调

  • 心跳机制:每个 Broker 会定期向 Zookeeper 发送心跳。如果 Controller 在一段时间内没有收到某个 Broker 的心跳,它会认为该 Broker 已经宕机,并触发 Leader 选举。
  • 临时节点:Broker 会在 Zookeeper 中创建一个临时节点 /controller,用于标识当前的 Controller。如果该 Broker 宕机,临时节点会被自动删除,触发新的 Controller 选举。
  1. 消费者组故障恢复

a. 再均衡(Rebalance)

  • 新消费者加入:当一个新的消费者加入到现有的消费者组时,会触发再均衡。
  • 消费者离开:当一个消费者离开消费者组时(例如消费者宕机或主动退出),会触发再均衡。
  • 订阅变更:当消费者组订阅的 Topic 或者 Topic 的 Partition 数量发生变化时,会触发再均衡。

b. 协调器(Coordinator)的作用

  • 检测变化:Kafka 集群中的每个 Broker 都可以作为消费者组的协调器。协调器负责检测消费者组的变化,并触发再均衡。
  • 发送 Rebalance 请求:当检测到上述变化之一时,协调器会向所有消费者发送 Rebalance 请求。

c. 消费者的行为

  • 停止消费:收到 Rebalance 请求后,消费者会停止从当前分配的 Partition 中消费消息。
  • 提交偏移量:消费者会提交当前已处理的消息偏移量,以确保在再均衡后可以从正确的位置继续消费。
  • 等待再均衡完成:消费者进入 REBALANCING 状态,等待再均衡完成。
  • 恢复消费:消费者接收到新的 Partition 分配后,从新的位置开始消费消息。
  1. 日志清理和保留策略
  • 日志清理:Kafka 支持基于时间或大小的日志清理策略。过期的日志会被删除,但至少会保留一个最小的时间段,以确保数据的持久性。
  • 日志压缩:对于某些需要长期保存的关键数据,Kafka 支持日志压缩功能,只保留每条消息的最新版本。
  1. 事务支持
  • 幂等性生产者:启用幂等性生产者(enable.idempotence=true)可以防止重复消息,确保每条消息只会被写入一次。
  • 事务:Kafka 支持事务,允许生产者将多个消息作为一个原子操作进行提交。这样可以确保要么所有消息都被提交,要么都不提交。
  1. 监控和告警
  • 监控工具:使用监控工具(如 Prometheus、Grafana 等)来监控 Kafka 集群的状态,包括 Broker 的健康状况、ISR 列表的变化、Leader 选举事件等。
  • 告警机制:设置告警规则,当检测到异常情况(如 ISR 列表为空、Leader 选举频繁等)时,及时通知运维人员。

Kafka 通过以下几种主要机制来实现故障恢复:

  • 多副本机制:通过多个副本确保数据的高可用性。
  • ISR 和 Leader 选举:确保只有同步的副本才能成为新的 Leader。
  • 再均衡:在消费者组发生变化时重新分配 Partition,确保负载均衡。
  • 日志管理和保留策略:管理数据的生命周期,确保数据的持久性。
  • 事务支持:确保消息的幂等性和原子性。
  • 监控和告警:实时监控集群状态,及时发现和处理问题。

理解这些机制并合理配置相关参数,可以帮助你构建高可用和可靠的 Kafka 应用。通过合理的配置和监控,可以确保 Kafka 集群在故障发生时能够快速恢复服务,同时保证数据不丢失。

7. 用 java 模拟实现 kafka 的分区设计

为了模拟 Kafka 的分区设计,我们可以创建一个简单的 Java 程序来展示如何实现 Topic、Partition 和消息的发布与消费。这个示例将包括以下组件:

  1. Topic:包含多个 Partition。
  2. Partition:存储消息并支持顺序读写。
  3. Broker:管理 Topic 和 Partition。
  4. Producer:向指定的 Partition 发送消息。
  5. Consumer:从指定的 Partition 消费消息。

项目结构

kafka-simulator
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── example
│   │   │           ├── Broker.java
│   │   │           ├── Consumer.java
│   │   │           ├── Message.java
│   │   │           ├── Partition.java
│   │   │           ├── Producer.java
│   │   │           ├── Topic.java
│   │   │           └── Main.java

代码实现

Message.java

定义消息类。

package com.example;

public class Message {
    private final String key;
    private final String value;

    public Message(String key, String value) {
        this.key = key;
        this.value = value;
    }

    public String getKey() {
        return key;
    }

    public String getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "Message{" +
                "key='" + key + '\'' +
                ", value='" + value + '\'' +
                '}';
    }
}

Partition.java

定义 Partition 类,用于存储消息。

package com.example;

import java.util.ArrayList;
import java.util.List;

public class Partition {
    private final int id;
    private final List<Message> messages;

    public Partition(int id) {
        this.id = id;
        this.messages = new ArrayList<>();
    }

    public int getId() {
        return id;
    }

    public synchronized void addMessage(Message message) {
        messages.add(message);
    }

    public synchronized Message consumeMessage() {
        if (messages.isEmpty()) {
            return null;
        }
        return messages.remove(0);
    }

    public synchronized int getMessageCount() {
        return messages.size();
    }
}

Topic.java

定义 Topic 类,包含多个 Partition。

package com.example;

import java.util.ArrayList;
import java.util.List;

public class Topic {
    private final String name;
    private final List<Partition> partitions;

    public Topic(String name, int numPartitions) {
        this.name = name;
        this.partitions = new ArrayList<>(numPartitions);
        for (int i = 0; i < numPartitions; i++) {
            partitions.add(new Partition(i));
        }
    }

    public String getName() {
        return name;
    }

    public Partition getPartition(int index) {
        if (index < 0 || index >= partitions.size()) {
            throw new IllegalArgumentException("Invalid partition index: " + index);
        }
        return partitions.get(index);
    }

    public int getNumPartitions() {
        return partitions.size();
    }
}

Broker.java

定义 Broker 类,管理 Topic 和 Partition。

package com.example;

import java.util.HashMap;
import java.util.Map;

public class Broker {
    private final Map<String, Topic> topics;

    public Broker() {
        this.topics = new HashMap<>();
    }

    public void createTopic(String name, int numPartitions) {
        if (topics.containsKey(name)) {
            throw new IllegalArgumentException("Topic already exists: " + name);
        }
        topics.put(name, new Topic(name, numPartitions));
    }

    public Topic getTopic(String name) {
        return topics.get(name);
    }

    public boolean topicExists(String name) {
        return topics.containsKey(name);
    }
}

Producer.java

定义 Producer 类,用于发送消息到指定的 Partition。

package com.example;

public class Producer {
    private final Broker broker;

    public Producer(Broker broker) {
        this.broker = broker;
    }

    public void send(String topicName, int partitionIndex, Message message) {
        Topic topic = broker.getTopic(topicName);
        if (topic == null) {
            throw new IllegalArgumentException("Topic does not exist: " + topicName);
        }
        Partition partition = topic.getPartition(partitionIndex);
        partition.addMessage(message);
        System.out.println("Sent to " + topicName + " - Partition " + partitionIndex + ": " + message);
    }
}

Consumer.java

定义 Consumer 类,用于从指定的 Partition 消费消息。

package com.example;

public class Consumer {
    private final Broker broker;

    public Consumer(Broker broker) {
        this.broker = broker;
    }

    public Message consume(String topicName, int partitionIndex) {
        Topic topic = broker.getTopic(topicName);
        if (topic == null) {
            throw new IllegalArgumentException("Topic does not exist: " + topicName);
        }
        Partition partition = topic.getPartition(partitionIndex);
        Message message = partition.consumeMessage();
        if (message != null) {
            System.out.println("Consumed from " + topicName + " - Partition " + partitionIndex + ": " + message);
        }
        return message;
    }
}

Main.java

主程序,演示如何使用上述类。

package com.example;

public class Main {
    public static void main(String[] args) {
        // 创建 Broker
        Broker broker = new Broker();

        // 创建 Topic
        broker.createTopic("my-topic", 3);

        // 创建 Producer
        Producer producer = new Producer(broker);

        // 发送消息
        producer.send("my-topic", 0, new Message("key1", "value1"));
        producer.send("my-topic", 1, new Message("key2", "value2"));
        producer.send("my-topic", 2, new Message("key3", "value3"));

        // 创建 Consumer
        Consumer consumer = new Consumer(broker);

        // 消费消息
        consumer.consume("my-topic", 0);
        consumer.consume("my-topic", 1);
        consumer.consume("my-topic", 2);
    }
}

运行结果

运行 Main.java 后,你会看到类似如下的输出:

Sent to my-topic - Partition 0: Message{key='key1', value='value1'}
Sent to my-topic - Partition 1: Message{key='key2', value='value2'}
Sent to my-topic - Partition 2: Message{key='key3', value='value3'}
Consumed from my-topic - Partition 0: Message{key='key1', value='value1'}
Consumed from my-topic - Partition 1: Message{key='key2', value='value2'}
Consumed from my-topic - Partition 2: Message{key='key3', value='value3'}

通过 BrokerTopicPartitionProducerConsumer 类,我们实现了基本的消息发布和消费功能。这个示例可以帮助你理解 Kafka 分区的基本原理,并为更复杂的应用提供基础。实际的 Kafka 集群会涉及更多的细节,如多副本机制、Leader 选举、ISR 管理等。

精彩专栏推荐订阅:在下方专栏👇🏻
✅ 2023年华为OD机试真题(A卷&B卷)+ 面试指导
✅ 精选100套 Java 项目案例
✅ 面试需要避开的坑(活动)
✅ 你找不到的核心代码
✅ 带你手撕 Spring
✅ Java 初阶

在这里插入图片描述


http://www.kler.cn/news/362916.html

相关文章:

  • R语言绘图——坐标轴及图例
  • 机器学习与神经网络:科技的星辰大海
  • C++侯捷内存管理课程学习笔记汇总
  • 技术成神之路:设计模式(二十二)命令模式
  • JVM、字节码文件介绍
  • 【优选算法】探索双指针之美(一):双指针与单调性的完美邂逅
  • 《性能之巅:洞悉系统、企业与云计算》读书笔记-Part 1
  • 【rabbitmq】为什么使用消息队列?
  • 促进绿色可持续发展 能源环保管理重中之重
  • 【记录】Android|安卓平板 猫游戏(四款,peppy cat,含下载教程和链接)
  • 大数据新视界 -- 大数据大厂之如何降低大数据存储成本:高效存储架构与技术选型
  • 什么是代理模式?
  • 6.mysql安装【Docker】
  • Redis简介及其在NoSQL应用开发中的优化策略
  • blender 批量导入导出obj文件
  • 2024年华为OD机试真题-第k个排列-Python-OD统一考试(E卷)
  • 若依前后分离版集成积木报表
  • perl模式匹配修饰符
  • Linux-shell实例练习
  • 常用Python数据分析开源库:Numpy、Pandas、Matplotlib、Seaborn、Sklearn介绍
  • 六大知名Web安全漏洞靶场
  • AI虚拟主播之面部捕捉与生成!
  • 在linux上部署ollama+open-webu,且局域网访问教程
  • centos 和 Ubuntu 离线安装 lvm
  • 【Android】图片点击放大放小
  • 【数据结构与算法】之链表经典算法大集合