消息队列实现 Exactly Once,看 Pulsar 是怎样实现的。
大家好 ,我是君哥。
在使用消息队列时,我们希望消息能够精准推送(Exactly Once),不会丢失、也不会重复。Exactly Once 其实是很难实现的,Pulsar 这款消息中间件使用事务消息实现了 Exactly Once,今天就带大家了解一下。
1.一个场景
为什么需要 Exactly Once 呢?下面我们看一个转账场景。
客户从转账 APP 上操作,从 A 账户向 B 账户转账 100 元,但是 B 账户增加金额后,给 Broker 返回 ACK 失败,导致 Broker 再次给账户 B 推送增加金额的消息,导致账户 B 增加了两次,最终导致金额不一致。
当然,账户 B 通过消费者幂等可以避免这个问题,但如果是生产者重复发送导致 Broker 保存了两条消息呢?
2.Pulsar 去重
通过消息去重可以解决上面的消息重复问题吗?我们看一下 Pulsar 的去重机制。
Producer 发送消息时,消息体带一个 sequenceId 字段,这个字段在同一个 Producer 内是严格递增的。Broker 通过<ProducerName, sequenceId> 来记录每一个 Producer 的最大 sequenceId。如果 Broker 收到 Producer 的消息小于等于保存的当前 Producer 的 sequenceId,说明是重复消息,直接返回失败。
消息去重从一定程度上可以避免消息重复,但是只能保证在 Topic-Partition 这个维度进行去重,如果一个 Topic 对应多个 Partition,如下图:
Producer 发送消息后,Broker1 保存成功,但是没有返回 ack,Producer 把消息重新发送到了 Broker2,最终导致 Consumer 收到 2 条消息。
3.事务消息
Pusar 的事务消息不仅可以解决上面的去重问题,还可以解决一些复杂场景。比如下面这个场景:
Consumer 从 Topic1 的两个 Partition 中各消费一条消息后,做加工计算(重复消费会影响加工结果),然后把结果分别发送到 Topic2 的两个 Partition 中。这个复杂的事务,要保证消息既不会重复也不会丢失,仅仅靠去重,就很难实现了。Pulsar 参考了分布式事务的主流实现,支持了消息的分布式事务。
Pulsar 的事务模型能保证生产和消费都能精确一次,即使 Broker 宕机,也不会处理失败。
同时,Pulsar 事务消息支持更复杂的场景,比如:
-
生产者在一个事务中分别发送一条消息到不同 Partition,要不同时成功,要不同时失败;
-
消费者从不同 Partition 消费多条消息,要不全部成功,要不全部失败;
-
上面两个场景的组合,见上面的图。
那 Pulsar 的事务消息是怎么实现的呢?Pulsar 参考了分布式事务的实现方式,我们再回顾一下分布式事务的三个角色:
-
TC: 事务协调器,管理全局事务和分支事务的状态,Pulsar 会选择 Topic 中 Partition 所在的一个 Broker 作为 TC;
-
TM:管理全局事务,包括开启全局事务,提交/回滚全局事务。Pulsar 使用
pulsarClient.newTransaction()
开启一个事务,这会向 TC 注册全局事务并且获得全局事务 ID(TCID); -
RM:管理分支事务。
下图,我们把上面复杂的事务用分布式事务来实现:
说明几点:
-
Producer1 既是生产者也是 TM;
-
Broker1 既是 TC 也是 RM;
-
Producer 和 Consumer 的事务分开来管理。上图中只是画出了生产者的事务提交,消费者类似;
-
我们知道,分布式事务的实现模式一般包括 AT、TCC、SAGA 和 XA,那 Pulsar 的实现模式是哪一种呢?对于 Producer 和 Consumer,情况不一样。
对于 Producer 的事务消息,更像是 AT 模式,消息直接发送给 Broker 并持久化,不过持久化之前会在 TopicTransactionBuffer 中记录元数据(类似 AT 模式中的回滚日志),全局事务回滚时可以使用这些元数据回滚消息。当然回滚消息并不是删除消息,而是让消息不被消费到,具体做法是在回滚的事务会被打上 Aborted 标签,根据这个标签来决定消息不会推送给 Consumer。
对于 Consumer 的事务消息,我个人觉得有点参考 XA 模式,不过这里没有数据源代理,而是用了消息缓存,这里缓存的不是消息本身,而是消费者的 ack 消息。也就是说消费者消费完成后并没有直接发送 ack 给 Broker,而是先发送到 pendingAckSore 做缓存,在提交全局事务时才会真正地提交 ack 消息。
-
全局事务没有提交之前,消息可能会被消费到吗?不会,每个 Topic 都会记录自己的 maxReadPosition 属性,标识消费者可以从 Broker 拉取消息的最大位置,分布式事务提交全局事务之前,maxReadPosition 是不变的,所有未提交全局事务的消息不可能被消费到。但这里也会有一个隐患,那就是阻塞普通消息的消费,在当前事务提交之前,普通消息即使发送成功了,消费者也拉取不到。
4.总结
Pulsar 使用事务消息实现了 Exactly Once 这个消息投递的最高要求。从上面的讲解看,事务消息的实现还是比较复杂的,不过从 Producer 和 Consumer 端分开实现这个角度看 ,更容易理解一些。
最后,一起思考一个极端场景,如果分布式事务中有两个消费者,一个消费者消费成功并且发送 ack,另一个消费者因为代码问题消费失败并且没有回复 ack,最终全局事务因为超时而做回滚,那第一个消费者已经消费,这还能保证全局一致吗?当然不能,除非消费者消费逻辑也加入这个全局事务。
消息队列的分布式事务一直是一个复杂的话题,分布式事务的设计思想也非常值得我们借鉴学习。但无论使用哪个中间件,消费端幂等是保障业务正确性的底线,最靠谱的方式还是从业务代码层面来保证幂等。