Kafka知识点总结
一、概述
¥1. 推拉模式
- pull(拉)模式:consumer采用从broker中主动拉取数据,不足之处是如果没有数据,消费者可能会陷入循环中,一直返回空数据。
- push(推)模式:由broker主动向消费者主动推送消息,缺点是由broker决定消息发送速率,很难适应所有消费者的消费速率
- Kafka采用 pull(拉)模式!
pull模式的优点:
- 控制消费速率:消费者可以根据自己的处理能力决定拉取数据的频率和数量,从而避免被过多的数据淹没。
- 简化系统复杂性:推模式需要复杂的流量控制机制来防止生产者发送数据过快导致消费者无法处理。而拉模式将这种控制交给消费者,简化了系统设计。
- 灵活性:消费者可以灵活地选择何时以及从哪个分区拉取数据,这使得 Kafka 能够支持多种消费模式,如实时处理、批量处理等
二、生产者
1. 线程模型
生产者在发送消息时主要存在两个线程:主线程 和 Sender线程
- 主线程即调用KafkaProducer.send方法的线程。当send方法被调用时,消息并没有真正被发送,而是暂存到RecordAccumulator。通过暂存机制可以提升吞吐量,可以将多条消息通过一个ProduceRequest批量发送出去,并提高数据压缩效率
- Sender线程在满足一定条件后,会去RecordAccumulator中取消息并发送到Kafka Server端。
2. 消息生产流程
- 当接受外部传过来的数据的时候,会先创建一个main线程,在main线程中创建producer对象,然后调用send方法,将数据进行发送
- 消息会经过拦截器,对发送的数据进行处理、加工,再经过序列化器,对传输的数据进行序列化
- 根据分区器的分区策略决定传输的数据发送至哪个分区
- 通过消息暂存器将数据写入暂存区
- 消息暂存器维护了一个内存池(默认大小32M),并且为每一个分区维护了一个双端队列,队列中是一个个批次
- 写入数据时,会从内存池中取出内存,创建批次(默认大小16k)
- 当缓存空间耗尽,其他发送调用将被阻塞当缓存空间耗尽,其他发送调用将被阻塞
- 当一个批次的数据大小积累到 batch.size 或者到达了延迟时间 linger.ms,唤醒sender线程
- Sender线程从分区中拉取数据。拉取数据的方式是以brokerId为key,所有分区的请求为value放到队列中
- Sender线程通过selector发送数据,数据发送成功之后,会有应答机制,返回acks,应答级别有3种。如果反馈回来的请求是成功,则会删除发送数据成功的请求以及清理分区中请求中拉取的数据(释放批次的内存,放回到内存池中)。如果失败会进行重试,重试的次数(默认是Int的最大值,可以进行修改,一般是3-5次)
- 如果发送数据的第一个请求到达集群中的某一个broker没有应答,允许继续发送请求,默认每个broker节点最多缓存5个请求
3. 分区选择策略
- 指明 partition 的情况下,直接将指明的值直接作为 partiton 值
- 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition值,也就是常说的 round-robin 算法。
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值;在Producer往Kafka插入数据时,控制同一Key分发到同一Partition
¥4. ACK级别
生产者数据发送成功之后,会有应答机制,返回acks,ack有三种级别:
- 0:生产者发送过来的数据,不需要等数据落盘就应答。有可能在落盘的过程中,leader服务器挂了,但是已经返回成功的信息,所以则会导致丢失数据。
- 1:生产者发送过来的数据,Leader收到数据后再去应答。有可能在应答完成之后,还没有开始同步副本数据,Leader挂了,然后会在剩余的follower中重新选择出一个Leader,但是新的Leader不会接受到之前发送的数据,因为之前的Leader已经返回发送成功的消息了,所以则会导致数据丢失。
- -1:生产者发送过来的数据,Leader和ISR队列中的所有节点都收齐数据之后才去应答。
数据重复问题:Leader和ISR队列里面的所有节点收齐数据后应答,接受完数据之后,在未进行应答反馈之前,leader挂掉,会在follower中重新选举leader,producer未接受到成功的反馈,所以会向leader再次发送数据,就造成了消息重复的问题。需要考虑幂等解决方案
三、消费者
1. 消息消费流程
- 消费者创建连接客户端(ConsumerNetworkClient),调用 sendFetches 方法向kafka集群发送消费请求
- Server收到请求后,向消费者按批次发送消息数据
- 消费者端将收到的消息按照批次大小放到一个队列中(completedFetches)
- 然后消费者会从队列(FetchedRecords)中抓取数据,Max.poll.records一次拉取数据返回消息的最大条数默认500条,然后经过反序列化、拦截器等进行数据处理,最终进行消费。
2. 消费者配置
- Fetch.min.bytes:每批次最小抓取大小(默认1字节), 当一批次数据不满足最小的抓取大小(1字节),等待到达超时时间,也会将这些数据返回。
- fetch.max.wait.ms:一批数据最小值未达到的超时时间(默认500ms)
- Fetch.max.bytes:每批次最大抓取大小(默认50m)
- Max.poll.records:一次拉取数据返回消息的最大条数(默认500条)
3. 分区分配策略
Kafka将一个Topic下的数据横向分成了多个"分区"(Partition),而每个分区内的数据只会被同一消费者组中的一个消费者消费。有以下几种分区分配策略:
- Range:通过partitions/consumer数来决定每个消费者应该消费几个分区,按顺序依次分配,如果除不尽则前面几个消费者多消费1个分区
再平衡策略:如果一个消费者挂了,那么分配给该消费者的分区会被整体分配到某个其他消费者
数据倾斜问题:如果有多个 topic都除不尽,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。
- RoundRobin:先将所有分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者
再平衡策略:某个消费者挂掉之后,所有的分区会重新进行一次再分配(可能会造成资源的浪费)
- Sticky:分区分配尽可能均匀(冲突时该原则优先),分区分配尽可能与上次分配保持相同
再平衡策略:0 号消费者挂掉之后,0 号消费者的分区以轮询的方式尽可能均匀地分配到1号消费者或者 2 号消费者。而1号消费者或者 2 号消费者依旧保持原有的分区,即重分配后尽可能和上次分配保持相同,使分配策略具备一定“黏性”,从而减少系统资源的消耗和异常情况的发生
分区分配原理:全部消费者组被分为多个子集,每个消费者组子集在服务端都对应一个 GroupCoordinator 对其进行管理。而 GroupCoordinator 最重要的职责就是负责执行消费者再均衡的操作
如果多个消费者,彼此配置的分配策略并不完全相同:
- GroupCoordinator会收集各个消费者支持的所有分配策略,组成候选集candidates
- 每个消费者从候选集candidates中找出第一个自身支持的策略,为这个策略投上一票
- 选票数最多的策略即为当前消费者组的分配策略
¥4. 偏移量
kafka为分区中的每条消息保存一个偏移量(offset),这个偏移量是该分区中一条消息的唯一标示。也表示消费者在分区的位置。已提交的位置是已安全保存的最后偏移量,如果进程失败或重新启动时,消费者将恢复到这个偏移量。
偏移量的提交方式分为两种:
- 自动提交:消费者每间隔一段时间会自动提交消费的offset到系统主题中。(auto.commit.interval.ms:每隔多长时间自动提交一次,默认5s)。可能导致消息丢失,没有确认已经对消息作了处理,就标记为已消费
- 手动提交:可能导致重复消费,如果在插入数据库之后,手动提交之前失败
- commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据,如果失败则会发起重试
- commitAsync(异步提交):发送完提交offset请求后,就开始消费下一批数据了
5. 重置策略
auto.offset.reset参数决定了在消费者启动时如何处理未知的偏移量(offset)。具体来说,当消费者的组(consumer group)没有已提交的偏移量,或者消费者的偏移量超出了可用的日志范围时,auto.offset.reset 策略会决定从哪里开始读取消息。
- earliest:从最早的偏移量开始读取。确保消费者从头开始读取所有消息,适用于需要处理所有历史数据的场景。
- latest(默认):从最新的偏移量开始读取。确保消费者只读取新产生的消息,适用于只需要处理最新数据的场景。
- none:抛出异常
三、服务端
1. 消息存储结构
- Topic内部又划分成多个Partition,Partition所对应的一个或多个Replica,Replica由Log文件存储
- 为了防止Log过大,增加消息过期和数据检索的成本,Log又会按一定大小划分成"段",即LogSegment。
- 为了提升查询效率,还为Log生成了索引文件.index和.timeindex
¥2. 高性能IO
顺序I/O
- LogSegment创建时,一口气申请LogSegment最大size的磁盘空间,这样一个文件内部尽可能分布在一个连续的磁盘空间内
- .log文件也好,.index和.timeindex也罢,在设计上都是只追加写入,不做更新操作,这样避免了随机IO的场景
不同Partition在磁盘上的存储位置不保证连续,当以不同Partition为读写目标并发地向Kafka发送请求时,Server端近似于随机IO。因此不要创建太多的Partition
PageCache
Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。
零拷贝
消费者非零拷贝流程:
- 从磁盘文件到linux内核页缓存(DMA拷贝)
- 从内核页缓存到kafka(cpu拷贝)
- 接着从kafka到内核的socket缓存(cpu拷贝)
- 从socket缓存到网卡(DMA拷贝)
CPU拷贝:需要在用户态和内核态之间来回切换,会消耗大量的资源和时间。
DMA拷贝:DMA是一种硬件技术,允许外设(如网卡)直接访问内存,绕过CPU的参与,从而实现高速数据传输。
零拷贝:broker应用层不操作数据,所以不需要将数据拷贝到broker应用层,可以直接由页面缓存通过网卡将数据发送到消费者。
四、高可用
1. 副本
副本是针对分区而言的,即副本是分区的副本。一个分区有一个 leader 副本,多个 follower 副本,只有 leader副本对外提供服务,follower 副本只负责数据同步
分区所有副本统称 AR,ISR 指与 leader副本保持同步状态的副本集合。相关参数:
- replication.factor:指定了一个分区(partition)的副本数量
- min.insync.replicas:指定了一个分区的最小同步副本数(in-sync replicas, ISR)。ISR 是指那些与领导者节点保持同步的副本集合。等到所有 ISR 集合中的副本都确认收到后才能被认为已提交,在副本稳定的情况下,不会出现数据不一致的情况
2.CAP模型
通过不同配置实现不同的CAP模型
- CP模型:replication.factor = 3,min.insync.replicas = 3,acks = all
- AP模型:replication.factor = 3,min.insync.replicas = 3,acks = 1。对于任意写入一条数据,当主节点commmit了之后就返回ack;如果主节点在数据被replicate到从节点之前就宕机,这时,重新选举之后,消费端就读不到这条数据。
- 兼顾:replication.factor = 3,min.insync.replicas = 2,acks = all。可以容忍一个节点宕机,同时也可以容忍这个节点和其它节点产生网络分区,它们都可以看成是Kafka的容错(Fault tolerance)机制
3. 宕机恢复
leader epoch 代表 leader 的纪元信息,初始值为0,每当 leader 变更一次, leader epoch 的值就会加1。 当某一节点从宕机状态恢复时,会去查找该节点当前 leader epoch 的 LEO 作为截断依据。
四、消息队列常见问题
1. 线程安全问题
生产者是线程安全的,在线程之间共享单个生产者实例,通常单例比多个实例要快。
消费者是非线程安全的,KafkaConsumer的每个公用方法在执行操作前都会调用 acquire() 方法,该方法用来检测当前是否只有一个线程在操作(CAS),若有其他线程正在操作则会抛出 ConcurrentModifcationException 异常
解决方案:
- 线程封闭:为每个线程实例化一个 KafkaConsumer对象
- Reactor 模型:消费者线程专门用来接收消息,接收到消息后采用多线程的方式(线程池)来处理消息
¥2. 如何保证消息有序性
原理:
- 写入同一个partion分区中的数据是一定有顺序的
- kafka中一个消费者消费一个partion的数据,消费者取出数据时,也是有顺序的
实现:
- 在生产者端,应保证消息被写入同一分区。可以在构造消息时指定消息的key或指定分区。并且设置参数max.in.flight.requests.per.connection=1,也即同一个链接只能发送一条消息,如此便可严格保证Kafka消息的顺序
- max.in.flight.requests.per.connection 是 Kafka 生产者配置中的一个重要参数,它控制了在等待确认(acks)之前可以发送的最多请求数量。当 acks 设置为 all 或 -1 时,或生产者配置了重试机制(retries > 0),如果 max.in.flight.requests.per.connection 设置为大于 1 的值,可能会导致消息乱序。
- 设置enable.idempotence=true,开启生产者的幂等生产,允许max.in.flight.requests.per.connection>1提升吞吐量。如果消息序号比Broker维护的序号差值比一大,说明中间有数据尚未写入,即乱序,此时Broker拒绝该消息
- 在消费者端,需要被顺序处理的消息让同一线程顺序处理。可以在消费者中,消息分发至不同的线程时,加一个队列,消费者去做hash分发,将需要顺序处理的数据分发至同一个队列中,最后多个线程从各自的队列中取数据
- 在进行分区扩容时,发送至同一个partion分区的方案可能会有问题。可以在消息中指明消息顺序,在消费者的代码逻辑中处理顺序
¥3. 如何保证消息可靠性
- 在生产者端需确保生产者生产的消息被接收,acks 设置为-1,通过重试机制来确保消息写入Kafka
- 在消费者端需确保消费者正确的消费消息,应手动提交偏移量
- 在Broker端需保证数据高可用,区副本大于等于 2 (–replication-factor 2),ISR 里应答的最小副本数量大于等于 2(min.insync.replicas = 2)
¥4. 如何保证消息消费幂等性
- 生产者幂等性:enable.idempotence参数,可保证单个生产者会话(session)中单分区的消息幂等
原理:
- 每个新的生产者实例在初始化时会被分配一个PID(producer id)
- 对于每个PID,消息发送到的每一个分区都有对应的序列号,序列号从0开始单调递增,生产者每发送一条消息就会将<PID,分区>对应的序列号值加1
- broker端会在内存中为每一对<PID,分区>维护一个序列号,对于收到的每一条消息,只有当它的序列号的值(SN_new)正好比broker端中维护的对应序列号的值(SN_old)大1,broker才会接收该消息。如果 SN_new < SN_old + 1,说明消息被重复写入,broker会将该消息丢弃。否则,说明中间有数据尚未写入,暗示可能有消息丢失,对应生产者会抛出 OutOfOrderSequenceException 异常
- 消费者幂等性:
- 唯一Id:在数据库中发现某个id的消息已被消费过,则放弃消费
- 数据库乐观锁:生产消息时先查询版本号,并将版本号连同消息一起发给消息队列;消费端收到消息和版本号后,在执行SQL时带上版本号
- 事务处理:消息的消费和入库操作在一个事务中
5. 事务消息
- 原子性:确保一组消息在多个分区和主题之间保持原子性。这意味着如果事务提交成功,所有消息都会被写入;如果事务失败,所有消息都不会被写入。
- 幂等性:重复发送相同的消息不会导致重复处理,包括多分区。这通过 idempotent 生产者实现,确保每个消息只被处理一次。
- 隔离性:提供了类似于数据库的事务隔离级别,确保在事务进行期间,其他消费者看不到未提交的消息。通过设置消费端的参数 isolation.level 确定,默认值为 read_uncommitted
6. 消息积压问题
- kafka消费能力不足:同时提升分区数量和消费者组的消费者数量,保持 消费者数 = 分区数量
- 下游消息处理不及时:
- 如果没有充分利用下游消费者的处理能力,可以提高每批次拉取的消息数量,如默认一次拉取500条可以修改为1000条;也可以一次多拉一点,调整 fetch.max.bytes 大小,默认是 50m
- 优化消费者的处理逻辑,检查消费者的代码是否存在性能瓶颈或是复杂的处理逻辑
- 扩展硬件资源
- 下游接口阻塞,无法成功消费:先将堆积的消息存储到数据库,待接口恢复正常后,再将消息推送到MQ