当前位置: 首页 > article >正文

Kafka相关知识点(下)

什么是Kafka的重平衡机制?(rebanlance)

典型回答

Kafka 的重平衡机制是指在消费者组中新增或删除消费者时,为了实现消费者的负载均衡和高可用性,Kafka 集群会重新分配主题分区给各个消费者,以保证每个消费者消费的分区数量尽可能均衡。

重平衡机制的目的是实现消费者的负载均衡和高可用性,以确保每个消费者都能够按照预期的方式消费到消息。

重平衡的 3 个触发条件:

·消费者组成员数量发生变化。

·订阅主题数量发生变化:

·订阅主题的分区数发生变化。

当Kafka 集群要触发重平衡机制时,大致的步骤如下:

1.暂停消费:在重平衡开始之前,Kafka 会暂停所有消费者的拉取操作,以确保不会出现重平衡期间的消息丢失或重复消费。

2.计算分区分配方案:Kafka 集群会根据当前消费者组的消费者数量和主题分区数量,计算出每个消费者应该分配的分区列表,以实现分区的负载均衡。

3.通知消费者:一旦分区分配方案确定,Kafka 集群会将分配方案发送给每个消费者,告诉它们需要消费的分区列表,并请求它们重新加入消费者组。

4.重新分配分区:在消费者重新加入消费者组后,Kafka 集群会将分区分配方案应用到实际的分区分配中,重新分配主题分区给各个消费者。

5**.恢复消费**:最后,Kafka 会恢复所有消费者的拉取操作,允许它们消费分配给自己的分区

Kafka 的重平衡机制能够有效地实现消费者的负载均衡和高可用性,提高消息的处理能力和可靠性。但是,由于重平衡会带来一定的性能开销和不确定性,因此在设计应用时需要考虑到重平衡的影响,并采取一些措施来降低重平衡的频率和影响。

在重平衡过程中,所有 Consumer 实例都会停止消费,等待重平衡完成。但是目前并没有什么好的办法来解决重平衡带来的STW,只能尽量避免它的发生。

扩展知识

消费者的五种状态

Kafka的Consumer实例五种状态,分别是:

状态的流转过程:

Kafka如何实现顺序消费?

典型回答

Kafka的消息是存储在指定的topic中的某个partition中的。并且一个topic是可以有多个partition的。同一个partition中的消息是有序的,但是跨partition,或者跨topic的消息就是无序的了

为什么同一个partition的消息是有序的?

因为当生产者向某个partition发送消息时,消息会被追加到该partition的日志文件(log)中,并且被分配一个唯一的 offset,文件的读写是有顺序的。而消费者在从该分区消费消息时,是利用offset 开始逐个读取消息,保证了消息的顺序性。

基于此,想要实现消息的顺序消费,可以有以下几个办法:

1、在一个topic中,只创建一个partition,这样这个topic下的消息都会按照顺序保存在同一个partition中,这就保证了消息的顺序消费。

2、发送消息的时候指定partition,如果一个topic下有多个partition,那么我们可以把需要保证顺序的消息都发送到同一个partition中,这样也能做到顺序消费,

扩展知识

如何发到同一个partition

当我们发送消息的时候,如果key为nul,那么Kafka 默认采用 Round-robin 策略,也就是轮转,实现类是DefaultPartitioner。那么如果想要指定他发送到某个partition的话,有以下三个方式:

指定partition

我们可以在发送消息的时候,可以直接在ProducerRecord中指定partition

指定key

在没有指定 Partition(nul 值) 时,如果有 Key, Kafka 将依据 Key 做hash来计算出一个 Partition 编号来。如果key相同,那么也能分到同一个partition中:

自定义Partitioner

除了以上两种方式,我们还可以实现自己的分区器(Partitioner)来指定消息发送到特定的分区

我们需要创建一个类实现Partitioner接口,并且重写partition()方法。

在partition()方法中,我们使用了一个简单的逻辑,根据键的哈希值将消息发送到相应的分区。为了在Kafka生产者中使用自定义的分区器,你需要在生产者的配置中指定分区器类:

Kafka 几种选举过程简单介绍一下?

典型回答

Kafka 中常见的选举过程有以下几种:

Partition Leader 选举


Kafka 中的每个 Partition 都有一个 Leader,负责处理该 Partition 的读写请求。在正常情况下,Leader 和 ISR集合中的所有副本保持同步,Leader 接收到的消息也会被 ISR 集合中的副本所接收。当leader 副本宕机或者无法正常工作时,需要选举新的leader副本来接管分区的工作。

Leader 选举的过程如下:

·每个参与选举的副本会尝试向 ZooKeeper 上写入一个临时节点,表示它们正在参与 Leader 选举;

·所有写入成功的副本会在 ZooKeeper 上创建一个序列号节点,并将自己的节点序列号写入该节点;

·节点序列号最小的副本会被选为新的 Leader,并将自己的节点名称写入 ZooKeeper 上的 /broker/…/leader 节点中。

Controller 选举


Kafka 集群中只能有一个 Controller 节点,用于管理分区的副本分配、leader 选举等任务。当一个Broker变成Controller后,会在Zookeeper的/controller节点 中记录下来。然后其他的Broker会实时监听这个节点,主要就是避免当这个controller宕机的话,就需要进行重新选举。

Controller选举的过程如下

·所有可用的 Broker 向 ZooKeeper 注册自己的 ID,并监听 ZooKeeper 中 /controller 节点的变化。

·当 Controller 节点出现故障时,ZooKeeper 会删除 /controller 节点,这时所有的 Broker 都会监听到该事件,并开始争夺 Controler 的位置。

·为了避免出现多个 Broker 同时竞选 Controller 的情况,Kafka 设计了一种基于 ZooKeeper 的 Master-Slave 机制,其中一个 Broker 成为 Master,其它 Broker成为 Slave。Master负责选举 Controller,并将选举结果写入 ZooKeeper 中,而 Slave 则监听 /controller 节点的变化,一旦发现 Master 发生故障,则开始争夺 Master 的位置。

·当一个 Broker 发现 Controller 失效时,它会向 ZooKeeper 写入自己的ID,并尝试竞选 Controller 的位置。如果他创建临时节点成功,则该 Broker 成为新的 Controller,并将选举结果写入 ZooKeeper 中。

·其它的 Broker 会监听到 ZooKeeper 中 /controller 节点的变化,一旦发现选举结果发生变化,则更新自己的元数据信息,然后与新的 Controller 建立连接,进行后续的操作。

为什么Kafka没办法100%保证消息不丢失?

典型回答

Kafka提供的Producer和Consumer之间的消息传递保证语义有三种,所谓消息传递语义,其实就是Kafka的消息交付可靠保障,主要有以下三种:

·At most once-消息可能会丢,但绝不会重复传递·

·At least once-消息绝不会丢,但可能会重复传递,

·Exactly once-每条消息只会被精确地传递一次:既不会多,也不会少;

目前,Kafka 默认提供的交付可靠性保障是第二种,即At least once ,但是,其实依靠Kafka自身,是没有办法100%保证可靠性的。

上面的文档中,介绍了Kafka在保证消息的可靠性中做的一些努力,但是我们提到,Kafka只对已提交的消息做最大限度的持久化保证不丢失,但是没办法保证100%。


那么,整体分析下为什么吧。


生产者

Kafka允许生产者以异步方式发送消息,这意味着生产者在发送消息后不会等待确认。当然,我们可以注册一个回调等待消息的成功回调。

但是,如果生产者在发送消息之后,Kafka的集群发生故障或崩溃,而消息尚未被完全写入Kafka的日志中,那么这些消息可能会丢失。虽然后续有可能会重试,但是,如果重试也失败了呢?如果这个过程中刚好生产者也崩溃了呢?那就可能会导致没有人知道这个消息失败了,就导致不会重试了。

消费者

消费者来说比较简单,只要保证在消息成功时,才提交偏移量就行了,这样就不会导致消息丢失了

Broker

即持久化失败或者副本同步的时候失败

Kafka使用日志来做消息的持久化的,日志文件是存储在磁盘之但是如果Broker在消息尚未完全入日志之上的,前崩溃,那么这些消息可能会丢失了

而且,操作系统在写磁盘之前,会先把数据写入Page Cache中,然后再由操作系统中自己决定什么时候同步到磁盘当中,而在这个过程中,如果还没来得及同步到磁盘中,就直接宕机了,那这个消息也就丢了,

当然,也可以通过配置 1og.flush.interva1.messages=1,来实现类似于同步刷盘的功能,但是又回到了前面说的情况,还没来得及做持久化,就宕机了。

即使Kafka中引入了副本机制来提升消息的可靠性,但是如果发生同步延迟,还没来及的同步,主副本就挂掉了那么消息就可能会发生丢失。

这几种情况,只从Broker的角度分析,Broker自身是没办法保证消息不丢失的,但是如果配合Producer,再配合 request.required.acks=-1 这种ACK策略,可以确保消息持久化成功之后,才会ACK给Producer,那如果我们的Producer在一定时间段内,没有收到ACK,是可以重新发送的。

但是,这种重新发送,就又回到了我们前面介绍生产者的时候的问题,生产者也有可能挂,重新发送也有可能会没有发送依据,导致消息最终丢失。

所以,我们说,只靠Kafka自己,其实是没有办法保证极端情况下的消息100%不丢失的。


但是,我们也可以在做一些机制来保证,比如引入分布式事务,或者引入本地消息表等,保证在Kafka Broker没有保存消息成功时,可以重新投递消息。这样才行。

Kafka 消息的发送过程简单介绍一下?

典型回答

当我们使用Kafka发送消息时,一般有两种方式,分别是同步发送(producer.send(msg).get())及异步发送(producer.send(msg,callback))。

同步发送的时候,可以在发送消息后,通过get方法等待消息结果:这种producer.send(record).get();情况能够准确的拿到消息最终的发送结果,要么是成功,要么是失败。

而异步发送,是采用了calback的方式进行回调的,可以大大的提升消息的吞吐量,也可以根据回调来判断消息是否发送成功。

不管是同步发送还是异步发送,最终都需要在Producer端把消息发送到Broker中,那么这个过程大致如下

Kafka 的 Producer 在发送消息时通常涉及两个线程,主线程(Main)和发送线程(Sender)和一个消息累加器(RecordAccumulator)


Main线程是 Producer 的入口,负责初始化, Producer 的配置、创建 KafkaProducer 实例并执行发送逻辑。它会按照用户定义的发送方式(同步或异步)发送消息,然后等待消息发送完成。

一条消息的发送,在调用send方法后,在主线程中会经过拦截器、序列化器及分区器:

  • ·拦截器主要用于在消息发送之前和之后对消息进行定制化的处理,如对消息进行修改、记录日志、统计信息等。
  • ·序列化器负责将消息的键和值对象转换为字节数组,以便在网络上传输。
  • ·分区器决定了一条消息被发送到哪个 Partition 中。它根据消息的键(如果有)或者特定的分区策略,选择出-个目标 Partition。

RecordAccumulator在 Kafka Producer 中起到了消息积累和批量发送的作用,当 Producer 发送消息时,不会立即将每条消息发送到 Broker,而是将消息添加到 RecordAccumulator 维护的内部缓冲区中RecordAccumulator 会根据配置的条件(如batch.size、linger.ms)对待发送的消息进行批量处理

当满足指定条件时,RecordAccumulator 将缓冲区中的消息组织成一个批次(batch),然后一次性发送给Broker。如果发送失败或发生错误,RecordAccumulator 可以将消息重新分配到新的批次中进行重试。这样可以确保消息不会丢失,同时提高消息的可靠性。

Send线程是负责实际的消息发送和处理的。发送线程会定期从待发送队列中取出消息,并将其发送到对应的Partition的Leader Broker 上 。它主要负责网络通信操作,并处理发送请求的结果,包括确认的接收、错误处理等。

NetworkClient 和 Selector 是两个重要的组件,分别负责网络通信和 IO 多路复用。

发送线程会把消息发送到Kafka集群中对应的Partition的Partition Leader,Partition Leader 接收到消息后,会对消息进行一系列的处理。它会将消息写入本地的日志文件(Log)

为了保证数据的可靠性和高可用性,Kafka 使用了消息复制机制。Leader Broker 接收到消息后,会将消息复制至其他副本(Partition Follower)。副本是通过网络复制数据的,它们会定期从 Leader Broker 同步消息。

每一个Partition Follower在写入本地log之后,会向Leader发送一个ACK。

但是我们的Producer其实也是需要依赖ACK才能知道消息有没有投递成功的,而这个ACK是何时发送的Producer又要不要关心呢?这就涉及到了kafka的ack机制,生产者会根据设置的 request.required.acks 参数不同,选择等待或或直接发送下一条消息:

·request.required.acks=0

表示 Producer 不等待来自 Leader 的 ACK 确认,直接发送下一条消息。在这种情况下,如果 Leader分片所在服务器发生宕机,那么这些已经发送的数据会丢失。

·request.required.acks=1

表示 Producer 等待来自 Leader 的 ACK 确认,当收到确认后才发送下一条消息。在这种情况下,消息一定会被写入到 Leader 服务器,但并不保证 Follow 节点已经同步完成。所以如果在消息已经被写入Leader 分片,但是还未同步到 Follower 节点,此时Leader 分片所在服务器宕机了,那么这条消息也就丢失了,无法被消费到。

·request.required.acks =-1

Leader会把消息复制到集群中的所有ISR(In-Sync Replicas,同步副本),后,再向Producer发送ACK消息,然后Producer再继续发下一条消息。

Kafka 高水位了解过吗?为什么 Kafka 需要 Leader Epoch?

典型回答

高水位(HW,High Watermark)是Kafka中的一个重要的概念,主要是用于管理消费者的进度和保证数据的可靠性的。

高水位标识了一个特定的消息偏移量(ofset),即一个分区中已提交消息的最高偏移量(offset),消费者只能拉取到这个 offset 之前的消息。消费者可以通过跟踪高水位来确定自己消费的位置。

这里的已提交指的是ISRs中的所有副本都记录了这条消息

在Kafka中,HW主要有两个作用:

  • ·消费进度管理:消费者可以通过记录上一次消费的偏移量,然后将其与分区的高水位进行比较,来确定自己的.消费进度。消费者可以在和高水位对比之后继续消费新的消息,确保不会错过任何已提交的消息。这样,消费者可以按照自己的节奏进行消费,不受其他消费者的影响。
  • ·数据的可靠性:高水位还用于确保数据的可靠性。在Kafka中,只有消息被写入主副本(Leader Replica)并被所有的同步副本(In-Sync Replicas,ISR)确认后,才被认为是已提交的消息。高水位表示已经被提交的消息的边界。只有高水位之前的消息才能被认为是已经被确认的,其他的消息可能会因为副本故障或其他原因而丢失,即高水位的作用是保证副本切换后消息的一致性问题。

还有一个概念,叫做LEO,即 Log End Offset,他是日志最后消息的偏移量。 它标识当前日志文件中下一条待写入消息的 offset。

当消费者消费消息时,它可以使用高水位作为参考点,只消费高水位之前的消息,以确保消费的是已经被确认的消息,从而保证数据的可靠性。如上图,只消费offet为6之前的消息。

Kafka 为什么有 Topic 还要用 Partition?

典型回答

Topic和Partition是kafka中比较重要的概念。

主题:Topic是Kafka中承载消息的逻辑容器。可以理解为一个消息队列。生产者将消息发送到特定的Topic,消费者从Topic中读取消息。Topic可以被认为是逻辑上的消息流。在实际使用中多用来区分具体的业务。分区:Partition。是Topic的物理分区。一个Topic可以被分成多个Partition,每个Partition是一个有序且持久化存储的日志文件。每个Partition都存储了一部分消息,并且有一个唯一的标识符(称为Partition ID)

看上去,这两个都是存储消息的载体,那为啥要分两层呢,有了Topic还需要Partition干什么呢?

在软件领域中,任何问题都可以加一个中间层来解决,而这,就是类似的思想,在Topic的基础上,再细粒度的划分出了一层,主要能带来以下几个好处:

1.提升吞吐量:通过将一个Topic分成多个Partition,可以实现消息的并行处理。每个Partition可以由不同的消费者组进行独立消费,这样就可以提高整个系统的吞吐量。

2.负载均衡:Partition的数量通常比消费者组的数量多,这样可以使每个消费者组中的消费者均匀地消费消息。当有新的消费者加入或离开消费者组时,可以通过重新分配Partition的方式进行负载均衡。

3.扩展性:通过增加Partition的数量,可以实现Kafka集群的扩展性。更多的Partition可以提供更高的并发处理能力和更大的存储容量。

综上,Topic是逻辑上的消息分类,而Partition是物理上的消息分区。通过将Topic分成多个Partition,可以实现提升吞吐量、负载均衡、以及增加可扩展性。

介绍一下Kafka的ISR机制?

典型回答

ISR,是In-Sync Replicas,同步副本的意思

在Kafka中,每个主题分区可以有多个副本(replica)。ISR是与主副本(Leader Replica)保持同步的副本集合。

ISR机制就是用于确保数据的可靠性和一致性的。

当消息被写入Kafka的分区时,它首先会被写入Leader,然后Leader将消息复制给ISR中的所有副本。只有当ISR中的所有副本都成功地接收到并确认了消息后,主副本才会认为消息已成功提交。这种机制确保了数据的可靠性和一致性。

扩展知识

ISR列表维护

在Kafka中,ISR(In-Sync Replicas)列表的维护是通过副本状态和配置参数来进行的。具体的ISR列表维护机制在不同的Kafka版本中有所变化。

before 0.9.x

在0.9.x之前的版本,Kafka有一个核心的参数:replica.lag.max.messages ,表示如果Follower落后Leader的消息数量超过了这个参数值,就认为Follower就会从ISR列表里移除。

但是,基于 replica.lag.max.messages 这种实现,在瞬间高并发访问的情况下会有问题:比如Leader瞬间接收到几万条消息,然后所有Follower还没来得及同步过去,此时所有follower都会被踢出ISR列表。

after 0.9.x

Kafka从0.9.x版本开始,引入了 replica.lag.max.ms 参数,表示如果某个Follower的LEO(latest endoffset)一直落后Leader超过了10秒,那么才会被从ISR列表里移除。

这样的话,即使出现瞬间流量,导致Follower落后很多数据,但是只要在限定的时间内尽快追上来就行了


http://www.kler.cn/a/378019.html

相关文章:

  • 通过不当变更导致 PostgreSQL 翻车的案例分析与防范
  • C++:AVL树
  • Hive自定义函数—剔除周日周六(小时级别)
  • 鸿蒙HarmonyOS开发:给应用添加基础类型通知和进度条类型通知(API 12)
  • Docker 镜像体积优化实践:从基础镜像重建到层压缩的全流程指南
  • 【Java知识】java进阶-一个好用的java应用分析工具arthas
  • 一篇文章入门傅里叶变换
  • 道品智能科技与系统集成:迈向未来的科技之路
  • metasploit/modules/exploits 有哪些模块,以及具体使用案例
  • django自动创建的表
  • 创建 PostgreSQL 函数案例
  • 动态规划应该如何学习?
  • OpenSSL:生成 DER 格式的 RSA 密钥对
  • 多线程之间的通讯
  • 项目复盘:TapTap聚光灯Gamejam
  • 【1】Excel快速入门的核心概念
  • 视频点播系统扩展示例
  • <项目代码>YOLOv8 夜间车辆识别<目标检测>
  • CSPJ2019-1数字游戏 (Number Games)
  • tcp shutdown, fin_wait1, fin_wait2, close_wait, last_ack, 谢特!
  • Spring源码学习(三):finishBeanFactoryInitialization
  • 线程安全的单例模式(Singleton)。
  • 轮廓图【HTML+CSS+JavaScript】
  • Java日志脱敏(二)——fastjson Filter + 注解 + 工具类实现
  • 统信UOS开发环境支持php
  • 使用Vite构建现代化前端应用