kafka服务端之副本
文章目录
- 概述
- 副本剖析
- 失效副本
- ISR的伸缩
- LW
- LEO与HW的关联
- LeaderEpoch的介入
- 数据丢失的问题
- 数据不一致问题
- Leader Epoch
- 数据丢失
- 数据不一致
- kafka为何不支持读写分离
- 日志同步机制
- 可靠性分析
概述
Kafka中采用了多副本的机制,这是大多数分布式系统中惯用的手法,以此来实现提供容灾能力、提升可用性和可靠性等。我们对此可以引申出一系列的疑问:
- Kafka多副本之间如何进行数据同步?
- 在发生异常时候的处理机制又是什么?
- 多副本间的数据一致性如何解决?
- 基于的一致性协议又是什么?
- 如何确保Kafka的可靠性?
- Kafka中的可靠性和可用性之间的关系是如何协调的?
本篇从副本的角度来回答上面的问题,主要包括副本剖析、日志同步机制和可靠性分析等内容。
副本剖析
副本(Replica )是分布式系统中常见的概念之一,指的是分布式系统对数据和服务提供的一种元余方式。在常见的分布式系统中,为了对外提供可用的服务,我们往往会对数据和服务进行副本处理。数据副本是指在不同的节点上持久化同一份数据,当某一个节点上存储的数据丢失时,可以从副本上读取该数据,这是解决分布式系统数据丢失问题最有效的手段。另一类副本是服务副本,指多个节点提供同样的服务,每个节点都有能力接收来自外部的请求并进行相应的处理。
Kafka从0.8版本开始为分区引入了多副本机制,通过增加副本数量来提升数据容灾能力。同时,Kafka通过多副本机制实现故障自动转移,在Kafka集群中某个broker节点失效的情况下仍然保证服务可用。这里介绍下与副本相关的AR、ISR、HW和LEO的概念,这里简要地复习一下相关的概念:
- 副本是相对于分区而言的,即副本是特定分区的副本。
- 一个分区中包含一个或多个副本,其中一个为leader副本,埃为followe副本,各个副本位于不同的broker节点中。只有leader副本对外提供服务followe副本只负责数据同步。
- 分区中的所有副本统称为AR,而ISR是指与leaer副本保持同步状态的副本集合,当然leade副本本身也是这个集合中的一员。
- LEO标识每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的LEO,ISR中最小的LEO即为HW,俗称高水位,消费者只能拉取到HW之前的消息。
从生产者发出的一条消息首先会被写入分区的leader副本,不过还需要等待ISR集合中的所有follower副本都同步完之后才能被认为已经提交,之后才会更新分区的HW,进而消费者可以消费到这条消息。
失效副本
正常情况下,分区的所有副本都处于ISR集合中,但是难免会有异常情况发生,从而某些副本被剥离出ISR集合中。在ISR集合之外,也就是处于同步失效或功能失效(比如副本处于非存活状态)的副本统称为失效副本,失效副本对应的分区也就称为同步失效分区。
失效副本不仅是指处于功能失效状态的副本,处于同步失效状态的副本也可以看
作失效副本。怎么判定一个分区是否有副本处于同步失效的状态呢?Kafka从0.9.x版本开始就通过唯一的broker端参数replica.lag.time.max.ms来扶择,当ISR集合中的一个follower副本带后leader副本的时间超过此参数指定的值时则判定为同步失败,需要将此follower副本剔除出ISR集合。replica.1ag.time.max.ms参数的默认值为10000。
具体的实现原理也很容易理解,当follower副本将leader副本LEO(LogEndOffset)之前的日志全部同步时,则认为该follower副本已经追赶上leader副本,此时更新该副本的lastCaughtUpTimeMs标识。Kafka的副本管理器会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的lastCaughtUpTimeMs差值是否大于参数replica.lag.time.max.ms 指定的值。十万不要错误地认为folower副本只要拉取leader副本的数据就会更新lastCaughtUpTimeMs。试想一下,当leader副本中消息的流入速度大于follower副本中拉取的速度时,就算follower副本一直不断地拉取leader副本的消息也不能与leader副本同步。如果还将此follower副本置于ISR集合中,那么当leader副本下线而选取此follower副本为新的leader副本时就会造成消息的严重丢失。
Kafka源码注释中说明了一般有两种情况会导致副本失效:
- follower副本进程卡住,在一段时间内根本没有向leader副本发起同步请求,比如频繁的FullGC。
- follower副本进程同步过慢,在一段时间内都无法追赶上leader副本,比如I/O开销过 大。
简单概括就是一个是不工作,一个是工作太慢都会导致副本变为失效副本。
在这里再补充一点,如果通过工具增加了副本因子(参考4.3.4节),那么新增加的副本在赶上leader副本之前也都是处于失效状态的。如果一个follower副本由于某些原因(比如容机)而下线,之后又上线,在追赶上leader副本之前也处于失效状态。
在0.9.x版本之前,Kafka中还有另一个参数replica.lag.max.messages(默认值为4000),它也是用来判定失效副本的,当一个follower副本带后leader副本的消息数超过replica.lag.max.messages的大小时,则判定它处于同步失效的状态。它与replica.lag.time.max.ms参数判定出的失效副本取并集组成一个失效副本的集合,从而进一步剥离出分区的ISR集合。
不过这个replica.1ag.max.messages参数很难给定一个合适的值,若设置得太大,则这个参数本身就没有太多意义,若设置得太小则会让follower副本反复处于同步、未同步、同步的死循环中,进而又造成ISR集合的频繁伸缩。而且这个参数是broker级别的,也就是说,对broker中的所有主题都生效。以默认的值4000为例,对于消息流入速度很低的主题(比如TPS为10),这个参数并无用武之地;而对于消息流入速度很高的主题(比如TPS为20000),这个参数的取值又会引入ISR的频繁变动。所以从0.9.x版本开始,Kafka就彻底移除了这一参数。
具有失效副本的分区可以从侧面反映出Kafka集群的很多问题,毫不夸张地说:如果只用一个指标来衡量Kafka,那么同步失效分区(具有失效副本的分区)的个数必然是首选。
ISR的伸缩
Kafka在启动的时候会开启两个与ISR相关的定时任务,名称分别为“isr-expiration”和“isr-change-propagation”。isr-expiration任务会周期性地检测每个分区是否需要缩减其ISR集合。这个周期和replica.lag.time.max.ms参数有关,大小是这个参数值的一半,默认值为5000ms。当检测到ISR集合中有失效副本时,就会收缩ISR集合。如果某个分区的ISR集合发生变更,则会将变更后的数据记录到ZooKeeper)对应的/brokers/topics//partition//state节点中。
当ISR集合发生变更时还会将变更后的记录缓存到isrChangeSet 中,**isr-change-propagation任务会周期性(固定值为2500ms)地检查isrChangeSet,如果发现isrChangeSet中有ISR集合的变更记录,那么它会在ZooKeepe的/isrchangenotification路径下创建一个以isr_change_开头的持久顺序节点(比如/isr_change_notification/isr_change_0000000000),并将isrChangeSet中的信息保存到这个节点中。Kafka控制器为/isr_change_notification 添加了一个Watcher,当这个节点中有子节点发生变化时会触发Watcher的动作,以此通知控制器更新相关元数据信息并向它管理的broker节点发送更新元数据的请求,最后删除/isr_change_notification足路径下已经处理过的节点。**频繁地触发Watcher会影响Kafka控制器、ZooKeeper甚至其他broker节点的性能。为了避免这种情况,Kafka添加了限定条件,当检测到分区的ISR集合发生变化时,还需要检查以下两个条件:
- (1)上一次ISR集合发生变化距离现在已经超过5s。
- (2)上一次写入ZooKeeper的时间距离现在已经超过60s。
满足以上两个条件之一才可以将ISR集合的变化写入目标节点。
有缩减对应就会有扩充,那么Kafka又是何时扩充ISR的呢?
随着follower副本不断与leader副本进行消息同步,follower副本的LEO也会逐渐后移,并最终追赶上leader副本,此时该follower副本就有资格进入ISR集合。追赶上leader副本的判定准则是此副本的LEO是否不小于leader副本的HW,注意这里并不是和leader副本的LEO相比。ISR扩充之后同样会更新ZooKeeper中的/brokers/topics//partition//state节点和isrChangeSet之后的步骤就和ISR收缩时的相同。
当ISR集合发生增减时,或者ISR集合中任一副本的LEO发生变化时,都可能会影响整个分区的HW。
LW
很多读者对Kafka中的HW的概念并不陌生,但是却并不知道还有一个LW的概念。LW是LOWWatermark的缩写,俗称“低水位”,代表AR集合中最小的logStartOffset值。副本的拉取请求(FetchRequest,它有可能触发新建日志分段而旧的被清理,进而导致logStartOffset的增加)和删除消息请求(DeleteRecordRequest)都有可能促使LW的增长。
LEO与HW的关联
如下图1所示,生产者一直在往leader副本(带阴影的方框)中写入消息。某一时刻,leader副本的LEO增加至5,并且所有副本的HW还都为0。之后follower副本(不带阴影的方框)向leader副本拉取消息,在拉取的请求中会带有自身的LEO信息,这个LEO信息对应的是FetchRequesti请求中的fetchoffset。leader副本返回给follower副本相应的消息,并且还带有自身的HW信息,如图2所示,这个HW信息对应的是FetchResponse中的highwatermark。
此时两个follower副本各自拉取到了消息,并更新各自的LEO为3和4。与此同时,follower副本还会更新自己的HW,更新HW的算法是比较当前LEO和leader副本中传送过来的HW的值,取较小值作为自己的HW值。当前两个follower副本的HW都等于0(min(0,0)-0)。
接下来follower副本再次请求拉取leader副本中的消息,如图3所示。
此时leader副本收到来自follower副本的FetchRequest请求,其中带有LEO的相关信息,选取其中的最小值作为新的HW,即min(15,3,4)-3。然后连同消息和IW一起返回FetchResponse给follower副本,如图3所示。注意leader副本的HW是一个很重要的东西,因为它直接影响了分区数据对消费者的可见性。
两个follower副本在收到新的消息之后更新LEO并且更新自己的HW为3(min(LEO,3)=3)。
在一个分区中,leader副本所在的节点会记录所有副本的LEO,而follower副本所在的节点只会记录自身的LEO,而不会记录其他副本的LEO。对HW而言,各个副本所在的节点都只记录它自身的HW。如图4所示。leader副本中带有其他follower副本的LEO,那么它们是什么时候更新的呢?leader副本收到follower副本的FetchRequest请求之后,它首先会从自己的日志文件中读取数据,然后在返回给follower副本数据前先更新follower副本的LEO。
Kafka的根目录下有cleaner-offset-checkpoint、log-start-offset-checkpoint
recovery-point-offset-checkpoint和replication-offset-checkpoint四个检查点文件。
recovery-point-offset-checkpoint和replication-offset-checkpoint这两个文件分别对应了LEO和HW。Kafka中会有一个定时任务负责将所有分区的LEO刷写到恢复点文件recovery-point-offset-checkpoint中,定时周期由broker端参数log.flush.offset.checkpoint·interval.ms来配置,默认值为60000。还有一个定时任务负责将所有分区的HW刷写到复制点文件replication-offset-checkpoint中,定时周期由broker端参数replica.high.watermark.checkpoint.interval.ms来配置,默认值为5000。
log-start-offset-checkpoint文件对应logStartOffset(注意不能缩写为LSO,因为在Kafka中LSO是LastStableoffset的缩写)它用来标识日志的起始偏移量。各个副本在变动LEO和HW的过程中,logStartofset也有可能随之而动。Kafka也有一个定时任务来负责将所有分区的logStartoffset书写到起始点文件log-start-offset-checkpoint中,定时周期由broker端参数log.flush.start.offset.checkpoint.interval.ms来配置,默认值为60000。
LeaderEpoch的介入
上面介绍的是在正常情况下的leader副本与follower副本之间的同步过程,如果leader副本发生切换,那么同步过程又该如何处理呢?在0.11.0.0版本之前,Kafka使用的是基于HW的同步机制,但这样有可能出现数据丢失或leader副本和follower副本数据不一致的问题。
数据丢失的问题
首先我们来看一下数据丢失的问题,如图5所示,ReplicaB是当前的leader副本(用L标记),ReplicaA是follower副本。参照之前的过程来进行分析:在某
一时刻,B中有2条消息m1和m2,A从B中同步了这两条消息,此时A和B的LEO都为2,同时HW都为1;之后A再向B中发送请求以拉取消息,FetchRequest请求中带上了A的LEO信息,B在收到请求之后更新了自己的HW为2;B中虽然没有更多的消息,但还是在延时一段时间之后(参考延时拉取)返回FetchResponse,并在其中包含了HW信息:最后A根据FetchResponse中的HW信息更新自己的HW为2。
可以看到整个过程中两者之间的HW同步有一个间隙,在A写入消息m2之后(LEO更新为2)需要再一轮的FetchRequest/FetchResponse才能更新自身的HW为2。如图所示,如果在这个时候A岩机了,那么在A重启之后会根据之前HW位置(这个值会存入本地的复制点文件replication-offset-checkpoint)进行日志截断,这样便会将m2这条消息删除,此时A只剩下m1这一条消息,之后A再向B发送FetchRequest请求拉取消息。
此时若B再岩机,那么A就会被选举为新的leader,如图所示。B恢复之后会成为follower,由于follower副本HW不能比leader副本的HW高,所以还会做一次日志截断,以此将HW调整为1。这样一来m2这条消息就丢失了(就算B不能恢复,这条消息也同样丢失)。
对于这种情况,也有一些解决方法,比如等待所有follower副本都更新完自身的HW之后再更新leader副本的HW,这样会增加多一轮的FetchRequest/FetchResponse延迟,自然不够妥
当。还有一种方法就是follower副本恢复之后,在收到leader副本的FetchResponse前不要截断follower副本(follower副本恢复之后会做两件事情:截断自身和向leader发送FetchRequest请求),不过这样也避免不了数据不一致的问题。
数据不一致问题
如图所示,当前leader副本为A,ffollower副本为B,A中有2条消息ml和m2,并
且HW和LEO都为2,B中有1条消息m1,并且HW和LEO都为1。假设A和B同时“挂掉”,然后B第一个恢复并成为为leader。
之后B写入消息m3,并将LEO和HW更新至2(假设所有场景中的min.insync.replicas参数配置为1)。此时A也恢复过来了,根据前面数据丢失场景中的介绍可知它会被赋予follower的角色,并且需要根据HW截断日志及发送FetchRequest至B,不过此时A的HW正好也为2,那么就可以不做任何调整了,如图所示。
如此一来A中保留了m2而B中没有,B中新增了m3而A也同步不到,这样A和B就出现了数据不一致的情形。
Leader Epoch
为了解决上述两种问题,Kafka从0.11.0.0开始引入了leader epoch的概念,在需要截断数据的时候使用leader epoch作为参考依据而不是原本的HW。leader epoch代表leader的纪元信息(epoch),初始值为0。每当leader变更一次,leader epoch的值就会加1,相当于为leader增设了一个版本号。与此同时,每个副本中还会增设一个失量<LeaderEpoch=>Startoffset>,其中StartOffset表示当前LeaderEpoch下写入的第一条消息的偏移量。每个副本的Log下都有一个leader-epoch-checkpoint文件,在发生leaderepoch变更时,会将对应的失量对追加到这个文件中。
数据丢失
下面我们再来看一下引入leader epoch之后如何应付前面所说的数据丢失和数据不一致的场景。首先讲述应对数据丢失的问题,如图所示,这里多了LE(LeaderEpoch的缩写,当前A和B中的LE都为0)
同样A发生重启,之后A不是先忙着截断日志而是先发送OffsetsForLeaderEpochRequest请求给B,B作为目前的leader在收到请求之后会返回当前的LEO(LogEndoffset,注意图中LEO和LEO的不同)。
A在收到2之后发现和目前的LEO相同,也就不需要截断日志了。之后
B发生了容机,A成为新的leader,那么对应的LE=0也变成了LE=1,
对应的消息m2此时就得到了保留,这是原本不能的,如图所示。之后不管B
有没有恢复,后续的消息都可以以LE1为LeaderEpoch陆续追加到A中。
数据不一致
下面我们再来看一下leaderepoch如何应对数据不一致的场景。如图所示,当前A为leader,B为follower,A中有2条消息m1和m2,而B中有1条消息m1。假设A和B同时“挂掉”,然后B第一个恢复过来并成为新的leader。
之后B写入消息m3,并将LEO和HW更新至2,如图所示。注意此时的Leader Epoch已经从LE0增至LE1了。
紧接着A也恢复过来成为follower并向B发送OffsetsForLeaderEpochRequest请求,此时A的LeaderEpoch为LE0。B根据LE0查询到对应的ofset为1并返回给A,A就截断日志并删除了消息m2,如图所示。之后A发送FetchRequest至B请求来同步数据,最终A和B中都有两条消息m1和m3,HW和LEO都为2,并且LeaderEpoch都为LE1,如此便解决了数据不一致的问题。
kafka为何不支持读写分离
总的来说,Kafka只支持主写主读有几个优点:可以简化代码的实现逻辑,减少出错的可能;将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而
且对用户可控;没有延时的影响;在副本稳定的情况下,不会出现数据不一致的情况。为此,Kafka又何必再去实现对它而言毫无收益的主写从读的功能呢?这一切都得益于Kafka优秀的架构设计,从某种意义上来说,主写从读是由于设计上的缺陷而形成的权宜之计。
日志同步机制
我们上面提到了副本的相关理论,那么在kafka中如何保障日志数据同步的?如何在leader失效后选举出的新leader能拥有最全的日志数据?这就是我们要说的日志同步机制。那么在这种集群环境下,一条日志写入leader副本后需要成功同步到多少个follower才能保障数据安全?它采用的会不会是raft这种少数服从多数的算法?如果使用少数服从多数的算法那么为了保障集群的高可用需要保障半数以上的节点需要存活,这种算法常用在共享集群配置中(如:注册中心),但很少用于这种数据存储中心中。其实kafka采用的更像是微软的PacificA算法。
在Kafka中动态维护着一个ISR集合,处于ISR集合内的节点保持与leader相同的高水位(HW),只有位列其中的副本(unclean.leader.election.enable配置为false)才有资格被选为新的leader。写入消息时只有等到所有ISR集合中的副本都确认收到之后才能被认为已经提交。位于ISR中的任何副本节点都有资格成为leader,选举过程简单、开销低,这也是Kafka选用此模型的重要因素。Kafka中包含大量的分区,leader副本的均衡保障了整体负载的均衡,所以这一因素也极大地影响Kafka的性能指标。
在采用ISR模型和(f+I)个副本数的配置下,一个Kafka分区能够容忍最大f个节点失败,相比于“少数服从多数”的方式所需的节点数大幅减少。实际上,为了能够容忍f个节点失败,“少数服从多数”的方式和ISR的方式都需要相同数量副本的确认信息才能提交消息。比如,为了容忍1个节点失败,“少数服从多数”需要3个副本和1个follower的确认信息,采用ISR的方式需要2个副本和1个follower的确认信息。在需要相同确认信息数的情况下,采用ISR的方式所需要的副本总数变少,复制带来的集群开销也就更低,“少数服从多数”的优势在于它可以绕开最慢副本的确认信息,降低提交的延迟,而对Kafka而言,这种能力可以交由客户端自己去选择。
另外,一般的同步策略依赖于稳定的存储系统来做数据恢复,也就是说,在数据恢复时日志文件不可丢失且不能有数据上的冲突。不过它们忽视了两个问题:首先,磁盘故障是会经常发生的,在持久化数据的过程中并不能完全保证数据的完整性;其次,即使不存在硬件级别的故障,我们也不希望在每次写入数据时执行同步刷盘(fsync)的动作来保证数据的完整性,这样会极大地影响性能。而Kafka不需要岩机节点必须从本地数据日志中进行恢复,Kafka的同步方式允许岩机副本重新加入ISR集合,但在进入ISR之前必须保证自己能够重新同步完leader
中的所有数据。
可靠性分析
对于可靠性我们无法说某个系统完全可靠,只能用几个9来衡量。下面就考虑kafka本身使用方式的前提下如何最大程度的提高可靠性。
1、越多的副本数越能够保证数据的可靠性,副本数可以在创建主题时配置
也可以在后期修改,不过副本数越多也会引起磁盘、网络带宽的浪费,同时会引起性能的下降。一般而言,设置副本数为3即可满足绝大多数场景对可靠性的要求,而对可靠性要求更高的场景下,可以适当增大这个数值,比如国内部分银行在使用Kafka时就会设置副本数为5。与此同时,如果能够在分配分区副本的时候引入基架信息(broker.rack参数),那么还要应对机架整体岩机的风险。
与可靠性和ISR集合有关的有一个参数unclean·leader.election.enables这个参数的默认值为false,如果设置为true就意味着当leader下线时候可以从非ISR集合中选举出新的leader,这样有可能造成数据的丢失。如果这个参数设置为false,那么也会影响可用性,非ISR集合中的副本虽然没能及时同步所有的消息,但最起码还是存活的可用副本。随着Kafka版本的变更,有的参数被淘汰,也有新的参数加入进来,而传承下来的参数一般都很少会修改既定的默认值,而unclean.leader.election.enable 就是这样一个反例,从0.11.0.0版本开始unclean.leader.election.enable E的默认值由原来的true改为了false,可以看出Kafka的设计者愈发地偏向于可靠性的提升。
2、生产者客户端参数acks。相比于0和1,acks=-1(客户端还可以配置为all,它的含
义与-1一样,以下只以-1来进行陈述)可以最大程度地提高消息的可靠性。acks=-1的情形,它要求ISR中所有的副本都收到相关的消息之后才能够告知生产者已经成功提交。试想一下这样的情形,leader副本的消息流入速度很快,而follower副本的同步速度很慢,在某个临界点时所有的follower副本都被剔除出了ISR集合,那么ISR中只有一个leader副本,最终acks=-1演变为acks=1白的情形,如此也就加大了消息去失的风险。Kafka也考虑到了这种情况,并为此提供了min.insync.replicas参数(默认值为1)来作为辅助(配合acks=-1来使用),这个参数指定了ISR集合中最小的副本数,如果不满足条件就会抛出NotEnoughReplicasException或NtEnoughReplicasAfterAppendExceptions在正常的配置下,需要满足副本数>min.insync.replicas参数的值。一个典型的配置方案为:副本数配置为3,min.insync.replicas参数值配置为2。注意min.insync.replicas参数在提升可靠性的时候会从侧面影响可用性。试想如果ISR中只有一个leader副本,那么最起码还可以使用,而此时如果配置min,insync.replicas>l,则会使消息无法写入。
3、消息发送模式,即发后即忘、同步和异步。对于发后即忘的模式,不管消息有没有被成功写入,生产者都不会收到通知,那么即使消息写入失败也无从
得知,因此发后即忘的模式不适合高可靠性要求的场景。如果要提升可靠性,那么生产者可以采用同步或异步的模式,在出现异常情况时可以及时获得通知,以便可以做相应的补救措施比如选择重试发送(可能会引起消息重复)。
4、消息重试。有些发送异常属于可重试异常,比如NetworkException,这个可能是由瞬时的网络故障而导致的,一般通过重试就可以解决。对于这类异常,如果直接抛给客户端的使用方也未免过于兴师动众,客户端内部本身提供了重试机制来应对这种类型的异常,通过retries参数即可配置。默认情况下,retries参数设置为0,即不进行重试,对于高可靠性要求的场景,需要将这个值设置为大于0的值,在2.3节中也谈到了与retries 参数相关的还有一个retry.backoff.ms参数,它用来设定两次重试之间的时间间隔,以此避免无效的频繁重试。在配置retries和retry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。如果不知道retries参数应该配置为多少,则可以参KafkaAdminclient,在KafkaAdminClient中retries参数的默认值为5。
注意如果配置的retries参数值大于0,则可能引起一些负面的影响。首先同2.3节中谈及的一样,由于默认的max.in.flight.requests.per.connection参数值为5,这样可能会影响消息的顺序性,对此要么放弃客户端内部的重试功能,要么将
max.in.flight.requests.per.connection参数设置为1,这样也就放弃了吞吐。其次,有些应用对于时延的要求很高,很多时候都是需要快速失败的,设置retries>0会增加客户端对于异常的反馈时延,如此可能会对应用造成不良的影响。
5、消费端可靠性保障。enable.auto.commit参数的默认值为true,即开启自动位移提交的功能,虽然这种方式非常简便,但它会带来重复消费和消息去失的问题,对于高可靠性要求的应用来说显然不可取,所以需要将enable.auto.commit参数设置为false来执行手动位移提交。在执行手动位移提交的时候也要遵循一个原则:如果消息没有被成功消费,那么就不能提交所对应的消费位移。对于高可靠要求的应用来说,宁愿重复消费也不应该因为消费异常而导致消息丢失。有时候,由于应用解析消息的异常,可能导致部分消息一直不能够成功被消费,那么这个时候为了不影响整体消费的进度,可以将这类消息暂存到死信队列中,以便后续的故障排除。
对于消费端,Kafka还提供了一个可以兜底的功能,即回湖消费,通过这个功能可以让我们能够有机会对漏掉的消息相应地进行回补,进而可以进一步提高可靠性。