当前位置: 首页 > article >正文

消息队列实现 Exactly Once,看 Pulsar 是怎样实现的。

大家好 ,我是君哥。

在使用消息队列时,我们希望消息能够精准推送(Exactly Once),不会丢失、也不会重复。Exactly Once 其实是很难实现的,Pulsar 这款消息中间件使用事务消息实现了 Exactly Once,今天就带大家了解一下。

1.一个场景

为什么需要 Exactly Once 呢?下面我们看一个转账场景。

图片

客户从转账 APP 上操作,从 A 账户向 B 账户转账 100 元,但是 B 账户增加金额后,给 Broker 返回 ACK 失败,导致 Broker 再次给账户 B 推送增加金额的消息,导致账户 B 增加了两次,最终导致金额不一致。

当然,账户 B 通过消费者幂等可以避免这个问题,但如果是生产者重复发送导致 Broker 保存了两条消息呢?

2.Pulsar 去重

通过消息去重可以解决上面的消息重复问题吗?我们看一下 Pulsar 的去重机制。

Producer 发送消息时,消息体带一个 sequenceId 字段,这个字段在同一个 Producer 内是严格递增的。Broker 通过<ProducerName, sequenceId> 来记录每一个 Producer 的最大 sequenceId。如果 Broker 收到 Producer 的消息小于等于保存的当前 Producer 的 sequenceId,说明是重复消息,直接返回失败。

消息去重从一定程度上可以避免消息重复,但是只能保证在 Topic-Partition 这个维度进行去重,如果一个 Topic 对应多个 Partition,如下图:

图片

Producer 发送消息后,Broker1 保存成功,但是没有返回 ack,Producer 把消息重新发送到了 Broker2,最终导致 Consumer 收到 2 条消息。

3.事务消息

Pusar 的事务消息不仅可以解决上面的去重问题,还可以解决一些复杂场景。比如下面这个场景:

图片

Consumer 从 Topic1 的两个 Partition 中各消费一条消息后,做加工计算(重复消费会影响加工结果),然后把结果分别发送到 Topic2 的两个 Partition 中。这个复杂的事务,要保证消息既不会重复也不会丢失,仅仅靠去重,就很难实现了。Pulsar 参考了分布式事务的主流实现,支持了消息的分布式事务。

Pulsar 的事务模型能保证生产和消费都能精确一次,即使 Broker 宕机,也不会处理失败。

同时,Pulsar 事务消息支持更复杂的场景,比如:

  • 生产者在一个事务中分别发送一条消息到不同 Partition,要不同时成功,要不同时失败;

  • 消费者从不同 Partition 消费多条消息,要不全部成功,要不全部失败;

  • 上面两个场景的组合,见上面的图。

那 Pulsar 的事务消息是怎么实现的呢?Pulsar 参考了分布式事务的实现方式,我们再回顾一下分布式事务的三个角色:

  • TC: 事务协调器,管理全局事务和分支事务的状态,Pulsar 会选择 Topic 中 Partition 所在的一个 Broker 作为 TC;

  • TM:管理全局事务,包括开启全局事务,提交/回滚全局事务。Pulsar 使用 pulsarClient.newTransaction()开启一个事务,这会向 TC 注册全局事务并且获得全局事务 ID(TCID);

  • RM:管理分支事务。

下图,我们把上面复杂的事务用分布式事务来实现:

图片

说明几点:

  1. Producer1 既是生产者也是 TM;

  2. Broker1 既是 TC 也是 RM;

  3. Producer 和 Consumer 的事务分开来管理。上图中只是画出了生产者的事务提交,消费者类似;

  4. 我们知道,分布式事务的实现模式一般包括 AT、TCC、SAGA 和 XA,那 Pulsar 的实现模式是哪一种呢?对于 Producer 和 Consumer,情况不一样。 

    对于 Producer 的事务消息,更像是 AT 模式,消息直接发送给 Broker 并持久化,不过持久化之前会在 TopicTransactionBuffer 中记录元数据(类似 AT 模式中的回滚日志),全局事务回滚时可以使用这些元数据回滚消息。当然回滚消息并不是删除消息,而是让消息不被消费到,具体做法是在回滚的事务会被打上 Aborted 标签,根据这个标签来决定消息不会推送给 Consumer。 

    对于 Consumer 的事务消息,我个人觉得有点参考 XA 模式,不过这里没有数据源代理,而是用了消息缓存,这里缓存的不是消息本身,而是消费者的 ack 消息。也就是说消费者消费完成后并没有直接发送 ack 给 Broker,而是先发送到 pendingAckSore 做缓存,在提交全局事务时才会真正地提交 ack 消息。

    图片

  5. 全局事务没有提交之前,消息可能会被消费到吗?不会,每个 Topic 都会记录自己的 maxReadPosition 属性,标识消费者可以从 Broker 拉取消息的最大位置,分布式事务提交全局事务之前,maxReadPosition 是不变的,所有未提交全局事务的消息不可能被消费到。但这里也会有一个隐患,那就是阻塞普通消息的消费,在当前事务提交之前,普通消息即使发送成功了,消费者也拉取不到。

4.总结

Pulsar 使用事务消息实现了 Exactly Once 这个消息投递的最高要求。从上面的讲解看,事务消息的实现还是比较复杂的,不过从 Producer 和 Consumer 端分开实现这个角度看 ,更容易理解一些。

最后,一起思考一个极端场景,如果分布式事务中有两个消费者,一个消费者消费成功并且发送 ack,另一个消费者因为代码问题消费失败并且没有回复 ack,最终全局事务因为超时而做回滚,那第一个消费者已经消费,这还能保证全局一致吗?当然不能,除非消费者消费逻辑也加入这个全局事务。

消息队列的分布式事务一直是一个复杂的话题,分布式事务的设计思想也非常值得我们借鉴学习。但无论使用哪个中间件,消费端幂等是保障业务正确性的底线,最靠谱的方式还是从业务代码层面来保证幂等。


http://www.kler.cn/a/585432.html

相关文章:

  • item_get_pro-获得淘宝商品详情高级版 ||API接口数据采集||附实例
  • 深入理解Tomcat:Java Web服务器的安装与配置
  • 【系统架构设计师】商用构件的标准规范
  • 设计模式之美
  • Python进行深度学习来处理包含900天太阳相关数据的数据集并完成预测,同时开发用户界面的详细示例
  • HDMI高速接口EMI问题改版建议
  • 【eNSP实战】使用ACL实现路由器安全
  • 使用 VLOOKUP 和条件格式在 Excel 中查找并标红匹配的串号
  • 详解SQL数据查询功能
  • 如何在Django中有效地使用Celery进行定时任务?
  • 查找特定的值(信息学奥赛一本通-1110)
  • 17.使用读写包操作Excel文件:pyxlsb 包
  • Interview preparation.md
  • 分享一个工具可以国内无限制访问GitHub(来源于GitHub开源项目)
  • Linux--操作系统/进程
  • golang算法二叉搜索树
  • 游戏引擎学习第158天
  • 图像分割技术深度解析:语义、实例与全景分割,及FCN、U-Net、Mask R-CNN、UPSNet的应用
  • Pycharm实用技巧
  • 泛目录技术:智能缓存提升网站速度与稳定性