当前位置: 首页 > article >正文

深入理解RocketMQ延迟消息机制原理

深入理解RocketMQ延迟消息机制原理

延迟消息介绍

  • 基本概念:延迟消息是指生产者发送消息发送消息后,不能立刻被消费者消费,需要等待指定的时间后才可以被消费。
  • 场景案例:用户下了一个订单之后,需要在指定时间内(例如30分钟)进行支付,在到期之前可以发送一个消息提醒用户进行支付。

一些消息中间件的Broker端内置了延迟消息支持的能力,如:

  • NSQ:这是一个go语言的消息中间件,其通过内存中的优先级队列来保存延迟消息,支持秒级精度,最多2个小时延迟。Java中也有对应的实现,如ScheduledThreadPoolExecutor内部实际上也是使用了优先级队列。

  • QMQ:采用双重时间轮实现。

  • RabbitMQ:需要安装一个rabbitmq_delayed_message_exchange插件。

  • RocketMQ:RocketMQ 开源版本延迟消息临时存储在一个内部主题中,不支持任意时间精度,支持特定的 level,例如定时 5s,10s,1m 等。

  • Broker端内置延迟消息处理能力,核心实现思路都是一样:将延迟消息通过一个临时存储进行暂存,到期后才投递到目标Topic中。如下图所示:

步骤说明如下:

  1. producer要将一个延迟消息发送到某个Topic中

  2. Broker判断这是一个延迟消息后,将其通过临时存储进行暂存。

  3. Broker内部通过一个延迟服务(delay service)检查消息是否到期,将到期的消息投递到目标Topic中。这个的延迟服务名字为delay service,不同消息中间件的延迟服务模块名称可能不同。

  4. 消费者消费目标topic中的延迟投递的消息

显然,临时存储模块和延迟服务模块,是延迟消息实现的关键。上图中,临时存储和延迟服务都是在Broker内部实现,对业务透明。

此外, 还有一些消息中间件原生并不支持延迟消息,如Kafka。在这种情况下,可以选择对Kafka进行改造,但是成本较大。另外一种方式是使用第三方临时存储,并加一层代理。

第三方存储选型要求:

对于第三方临时存储,其需要满足以下几个特点:

  • 高性能:写入延迟要低,MQ的一个重要作用是削峰填谷,在选择临时存储时,写入性能必须要高,关系型数据库(如Mysql)通常不满足需求。

  • 高可靠:延迟消息写入后,不能丢失,需要进行持久化,并进行备份

  • 支持排序:支持按照某个字段对消息进行排序,对于延迟消息需要按照时间进行排序。普通消息通常先发送的会被先消费,延迟消息与普通消息不同,需要进行排序。例如先发一条延迟10s的消息,再发一条延迟5s的消息,那么后发送的消息需要被先消费。

  • 支持长时间保存:一些业务的延迟消息,需要延迟几个月,甚至更长,所以延迟消息必须能长时间保留。不过通常不建议延迟太长时间,存储成本比较大,且业务逻辑可能已经发生变化,已经不需要消费这些消息。

RocketMQ中的延迟消息

开源RocketMQ支持延迟消息,但是不支持秒级精度。默认支持18个level的延迟消息,这是通过broker端的messageDelayLevel配置项确定的,如下:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。

延迟级别的值可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持2天的延迟,修改最后一个level的值为2d,这个时候依然是18个level;也可以增加一个2d,这个时候总共就有19个level。

由于日常开发中遇到几次使用延时消息的场景,而且目前业务中使用到的消息中间件有rabbitmq和kafka,对延时消息的支持都不太理想。

  • rabbitmq 延时消息是通过 设置队列ttl+死信exchange实现
    • 缺点嘛:每次都得设置两个队列,一个用来实现延时,过期后经死信exchange转到对应的业务队列提供消费。
    • 另:rabbitmq有提供延时插件,但缺点较多,如:1. 启动插件要么重启,要么引入一个新的集群;2. 不支持高可用,延时消息发送前只存储在当前broker节点的内部数据库Mnesia中,不会被镜像复制;3. 延时不可靠,存在消息数量较大或使用很久后延迟不准确;4:不支持大规模消息,同3;5:只支持ram节点(Mnesia数据库存在磁盘中)
  • kafka 延时消息通过在消费端判断消息是否达到消费时间,决定是否进行消费实现。未达到延时时间则暂停消费。
    • 缺点:针对单个topic固定的延时时间。 需要额外在消费端进行开发 (实际上这种在消费端控制延时的方式大部分消息队列都能做到)

无论是rabbitmq的死信还是kafka消费端控制,基本上都是每个topic只能使用固定的延时时间。但现实中,也存在一些同一个业务场景使用不同延时时间的消息的场景:

  • 考试结束后强制交卷。不同的考试规定的时间是不一样的,不可能每个考试都创建一个新的队列。
  • 一些异常的重试操作。执行某个操作失败后,需要多次不同等级的延时重试。(虽说这个用一个本地线程也可以,但是在同一台机器上延时重试,仍然存在较大可能失败。所以比较关键场景可以使用延时消息,分发到其他机器上执行。)

rocketmq 的架构

在这里插入图片描述

整个架构如图,简单描述一下:

  • nameServer 提供注册中心的服务,负责broker的管理,以及topic路由信息的管理。
  • brokerServer 则主要负责消息的存储、投递和查询及高可用。
  • Producer 连接nameServer获取到broker信息后,发送信息到对应的broker。
  • Consumer 同样先连接 nameServer,查询topic路由信息,然后连接broker消费消息。

消息的存储

在这里插入图片描述
如图,rocketmq 的所有消息都存储在 commitlog 中,然后ConsumerQueue作为逻辑消费队列,维护一个topic消息的索引,记录topic内消息在commitlog中的一些信息。其中 ConsumeQueue的存储单元为8字节的offset+4字节的size+8字节的tags hashcode, 对于延时消息,最后8字节则用于存储消息计划投递时间。

然后关于rocketmq的延时消息的使用
rocketmq 只支持固定18个等级的延时消息:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

发送延时消息只需要 setDelayTimeLevel 就可以。

延时消息的实现

在这里插入图片描述

总结

rocketmq 先将不同延时等级的消息存入内部对应延时队列中,然后不断的从延时队列中拉取消息判断是否到期,然后进行投递到对应的topic中。

通过固定延时等级的方式,同一个队列中的消息都是相同的延时等级,不需要对消息进行排序,只需要按顺序拉取消息判断是否可以投递就行了。但也限制了延时时间。

另外,因为只要延时消息存入延时队列中,就会写入commitlog文件中,然后rocketmq的高可用(同步复制或异步复制)就会将消息复制到slave中,从而保证延时消息的可靠性。

虽然rocketmq不支持任意延时时间,但相比于rabbitmq的死信消息,仍然提供了18个延时等级,基本也能覆盖很多场景了。


http://www.kler.cn/a/384924.html

相关文章:

  • printf影响单片机中断速度
  • wx.openSetting未调起小程序设置界面的坑
  • GIT GUI和 GIT bash区别
  • SQL 数据结构查询
  • 秃姐学AI系列之:GRU——门控循环单元 | LSTM——长短期记忆网络
  • RNN中的梯度消失与梯度爆炸问题
  • 2-143 基于matlab-GUI的脉冲响应不变法实现音频滤波功能
  • LabVIEW编程过程中为什么会出现bug?
  • 算法训练(leetcode)二刷第十九天 | *39. 组合总和、*40. 组合总和 II、*131. 分割回文串
  • [沫忘录]Redis 持久化
  • 分割回文串(DFS)
  • 技术分享 | 大语言模型赋能软件测试:开启智能软件安全新时代
  • explain执行计划分析 ref_
  • 【数据结构】Java 集合 Set 接口及其实现类的定义简介
  • 测试-正交表与工具pairs的介绍使用(1)
  • Qt字符编码
  • Matlab实现海马优化算法(SHO)求解路径规划问题
  • 倒计时3天 | 2024 CCF中国开源大会仪式解读
  • 高级AI记录笔记(一)
  • [卷积神经网络]使用YOLOv11训练自己的模型
  • SQL,力扣题目1709,访问日期之间最大的空档期
  • Oceanbase学习之一迁移mysql数据到oceanbase
  • 基于SSM的校园美食交流系统【附源码】
  • 缓存-基础概念
  • (蓝桥杯C/C++)——基础算法(下)
  • 【大模型推理加速技术】SIMD 与SIMT