Kafka系列——详解如何使用和配置生产者实现可靠的消息发送
在可靠的系统里使用生产者
即使我们尽可能把 broker 配置得很可靠,但如果没有对生产者进行可靠性方面的配置,整个系统仍然有可能出现突发性的数据丢失。
比如下面的两个例子:
(一)为 broker 配置了 3 个副本,并且禁用了不完全首领选举,这样应该可以保证万无一失。我们把生产者发送消息的** acks 设为 1**(只要首领接收到消息就可以认为消息写入成功)。
- 生产者发送一个消息给首领,首领成功写入,但跟随者副本还没有接收到这个消息。
- 首领向生产者发送了一个响应,告诉它“消息写入成功”,然后它崩溃了,而此时消息还没有被其他副本复制过去。
- **此时另外两个副本此时仍然被认为是同步的(**毕竟判定一个副本不同步需要一小段时间),而且其中的一个副本成了新的首领。
- 因为消息还没有被写入这个副本,所以就丢失了,但发送消息的客户端却认为消息已成功写入。因为消费者看不到丢失的消息,所以此时的系统(从消费者角度来看)仍然是一致的(因为副本没有收到这个消息,所以消息不算已提交),但从生产者角度来看,它丢失了一个消息。
(二)为 broker 配置了 3 个副本,并且禁用了不完全首领选举。把生产者的 acks 设为 all。
- 假设现在往 Kafka 发送消息,分区的首领刚好崩溃,新的首领正在选举当中,Kafka 会向生产者返回“首领不可用”的响应。
- 在这个时候,如果生产者没能正确处理这个错误,也没有重试发送消息直到发送成功,那么消息也有可能丢失。
虽然这算不上是 broker 的可靠性问题,因为 broker 并没有收到这个消息。这也不是一致性问题,因为消费者并没有读到这个消息。
但如果生产者没能正确处理这些错误,就将丢失掉这些消息。
那么,我们该如何避免这些问题呢?从上面两个例子可以看出,每个使用 Kafka的开发人员都要注意两件事情。
- 根据可靠性需求配置恰当的 acks 值。
- 在参数配置和代码里正确处理错误。
发送确认
生产者可以选择以下 3 种不同的确认模式。
acks=0
意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入Kafka。
在这种情况下还是有可能发生错误,比如发送的对象无法被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。
即使是在发生完全首领选举的情况下,这种模式仍然会丢失消息,因为在新首领选举过程中它并不知道首领已经不可用了。
在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,一定会丢失一些消息。
acks=1
意味着首领在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。
在这个模式下,如果发生正常的首领选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送消息,最终消息会安全到达新的首领那里。
不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入首领,但在消息被复制到跟随者副本之前首领发生崩溃。
acks=all
意味着首领在返回确认或错误响应之前,会等待所有同步副本都收到消息。
如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到消息。
这是最保险的做法——生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。
可以通过使用异步模式和更大的批次来加快速度,但这样做通常会降低吞吐量。
配置生产者的重试参数
生产者需要处理的错误包括两部分:一部分是生产者可以自动处理的错误,还有一部分是需要开发者手动处理的错误。
如果 broker 返回的错误可以通过重试来解决,那么生产者会自动处理这些错误。
生产者向 broker 发送消息时,broker 可以返回一个成功响应码或者一个错误响应码。
错误响应码可以 分为两种,一种是在重试之后可以解决的,还有一种是无法通过重试解决的。
例如,如果 broker 返回的是 LEADER_NOT_AVAILABLE 错误,生产者可以尝试重新发送消息。也许在这个时 候一个新的首领被选举出来了,那么这次发送就会成功。也就是说,LEADER_NOT_AVAILABLE 是一个可重试错误。
另一方面,如果 broker 返回的是 INVALID_CONFIG 错误,即使通过重试 也无法改变配置选项,所以这样的重试是没有意义的。这种错误是不可重试错误。
一般情况下,如果你的目标是不丢失任何消息,那么最好让生产者在遇到可重试错误时能够保持重试。为什么要这样?因为像首领选举或网络连接这类问题都可以在几秒钟之内得到解决,如果让生产者保持重试,你就不需要额外去处理这些问题了。
经常会有人 问:“为生产者配置多少重试次数比较好?”
这个要看你在生产者放弃重试并抛出异常之后想做些什么。
如果你想抓住异常并再多重试几次,那么就可以把重试次数设置得多一 点,让生产者继续重试;如果你想直接丢弃消息,多次重试造成的延迟已经失去发送消息1的意义;
如果你想把消息保存到某个地方然后回过头来再继续处理,那就可以停止重试。
Kafka 的跨数据中心复制工具默认会进行无限制的重试(例如 retries=MAX_INT)。作为一个具有高可靠性的复制工具,它决不会丢失消息。
要注意,重试发送一个已经失败的消息会带来一些风险,如果两个消息都写入成功,会导致消息重复。
例如,生产者因为网络问题没有收到 broker 的确认,但实际上消息已经写入 成功,生产者会认为网络出现了临时故障,就重试发送该消息(因为它不知道消息已经写 入成功)。在这种情况下,broker 会收到两个相同的消息。
重试和恰当的错误处理可以保 证每个消息“至少被保存一次”,但0.10.0版本Kafka无法保证每个消息“只被保存一次”。
现实中的很多应用程序在消息里加入唯一标识符,用于检测重复消息,消费者在读取消息时可以对它们进行清理。还要一些应用程序可以做到消息的“幂等”。
额外的错误处理
使用生产者内置的重试机制可以在不造成消息丢失的情况下轻松地处理大部分错误,不过对于开发人员来说,仍然需要处理其他类型的错误,包括:
- 不可重试的 broker 错误,例如消息大小错误、认证错误等;
- 在消息发送之前发生的错误,例如序列化错误;
- 在生产者达到重试次数上限时或者在消息占用的内存达到上限时发生的错误。
这些错误处理 器的代码逻辑与具体的应用程序及其目标有关。
- 丢弃“不合法的消息”?
- 把错误记录下来?
- 把这些消息保存在本地磁盘上?
- 回调另一个应用程序?
具体使用哪一种逻辑要根据具体的架构来决定。只要记住,如果错误处理只是为了重试发送消息,那么最好还是使用生产者内置的重试机制。
参考这里的