当前位置: 首页 > article >正文

手写消息队列(基于RabbitMQ)

一、什么是消息队列?

提到消息队列是否唤醒了你脑海深处的记忆?回看前面的这篇文章:《Java 多线程系列Ⅳ(单例模式+阻塞式队列+定时器+线程池)》,其中我们在介绍阻塞队列时说过,阻塞队列最大的用途就是实现 生产者消费者模型

我们知道对于生产者消费者模型来说,它具有两个十分亮眼的特点:

  1. 解耦合.
  2. 削峰填谷.

(1)解耦合
在引入生产者消费者模型之前,两台服务器之间通常是直接交互,这种交互模式使得服务器之间的耦合是非常大的。而引入生产者消费者模型之后,两台服务器之间不再进行直接通信,而是借助阻塞队列进行业务处理,起到了解耦的效果。

(2)削峰填谷
在引入生产者消费者模型之前,同样是两台服务器进行直接通信,如果在一个时间点,服务器 A 突然发送一组请求峰值,此刻服务器 B 也会随之感受到峰值,这种情况下很可能造成服务器故障。如果此时使用阻塞队列,A 将收到的请求发给队列,虽然队列中有很多请求,但是服务器 B 仍然和以按照原有的节奏读取请求。

其实正是因为生产者消费者模型具有以上诸多好处,在实际的后端开发中,特别是分布式系统里,跨主机使用生产者消费者模型是非常普遍的需求。因此通常会把阻塞队列单独分离出来,赋予更加丰富的功能,封装成一个独立的服务器程序,这个程序就称为 消息队列

二、需求整理

1、生产者消费者模型核心概念

  1. 生产者 (Producer): 负责将消息发送到消息队列中。

  2. 消费者 (Consumer): 从消息队列中接收和处理消息。。

  3. 中间人 (Broker): 它负责接收发布者发送的消息,并将这些消息存储在队列中,然后将这些消息传递给订阅者。

  4. 发布 (Publish): 生产者将消息投递到中间人的过程。

  5. 订阅 (Subscribe): 消费者在中间人这里注册的过程。只有消费者注册之后,当一个消息发布到消息队列时消息才会被发送给相应的订阅者。

根据以上概念,我们可以大致画出生产者消费者模型概念图:(PS:下面的每个模块均表示服务器)

一个生产者,一个消费者:

N 个生产者,N 个消费者:

2、Broker 设计概要

我们当前的目的是为了实现一个消息队列,其中 Broker 是最核心的部分,它主要负责消息的 存储转发,其中涉及到的核心概念如下:

  1. 虚拟主机 (VirtualHost): 类似于 MySQL 的 “database”,是⼀个逻辑上的集合。在实际的开发中一个 BrokerServer 可能会同时管理多组业务线上的数据,此时可以使用不同的 VirtualHost 进行区分。

  2. 交换机 (Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则,把消息转发给不同的 Queue。

  3. 队列 (Queue): 真正用来存储消息的部分,每个消费者决定自己从哪个 Queue 上读取消息(根据订阅的队列)。⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息),一个 Queue 也可以被多个 Exchange 绑定
    (一个 Queue 中的消息可以来自于多个 Exchange)。

  4. 绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 “多对多” 关系。使用一个关联表就可以把这两个概念联系起来。

  5. 消息 (Message): 具体来说是服务器之间的请求和响应。一个消息,可以视为一个字符串(二进制数据),具体由程序员自定义。

上述概念在 Broker 中的体现如图所示:

补充说明1:数据存储

以上这些概念对应的数据,既需要在内存中存储,也需要在硬盘上存储,以内存为主,硬盘为辅:

  • 内存存储:对于 MQ 来说,能够高效的处理转发数据时非常关键的指标,因此使用内存组织上述数据,能够得到较高的效率。
  • 硬盘存储:主要是为了防止内存中的数据随着进程/主机重启而丢失。

补充说明:2: 交换机类型与转发规则

上面我们提到,在生产者发送消息时,首先会将消息发送到 Broker 的交换机上,再由交换机根据不同的规则转发到相应的队列中。在 MQ 中支持四种类型的交换机,它们分别是: Direct(直接交换机)、Fanout(扇出交换机)、Topic(主题交换机)、Header(头部交换机)。其中 Header 这种方式比较复杂,也比较少见,当前项目中主要实现了前三种,下面分别对他们进行详细介绍:

前要说明:

  • 以下 bindingKey(绑定键)是在创建队列和交换机绑定关系时指定的关键字。
  • 以下 routingKey(路由键)是生产者发送消息时指定的关键字。

(1)Direct(直接交换机)

  1. 生产者发送消息时,会指定一个"目标队列"的名字(此时的 routingKey 就是 队列的名字,bindingKey 无效)
  2. 交换机收到消息后,查看当前交换机对应的绑定里面是否存在队列名字为routingKey的队列
  3. 如果有,就转发过去(把消息塞进对应的“目标队列”中)
  4. 如果没有,消息直接丢弃

(2)Fanout(扇出交换机)

  1. 生产者无需指定routingKey,直接发送消息到指定交换机
  2. 交换机收到消息后,直接将消息转发给当前交换机已绑定的所有队列中。(此时的 bindingKey 和 routingKey 对扇出交换机无效。)

(3)Topic(主题交换机)

  1. 生产者发送消息时,指定一个 routingKey
  2. 交换机收到消息后,查看当前交换机对应的绑定中是否存在一个 bindingKey 通过一定的规则和 routingKey 相匹配
  3. 如果有,就将消息转发到对应的绑定队列中。
  4. 如果没有,则将消息丢弃。

PS:以上所有概念出自 AMQP 协议:一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

3、Broker 核心 API

Broker 基于以上概念和功能,需要实现的核心 API 如下:

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

补充说明:

  1. 从上面可以看出来,在进行创建操作时并没有使用 create,而是使用 declare,这是从语义上说明,这里的创建起到的效果是不存在则创建,存在则啥也不做。
  2. 上述并没有创建一个“消费消息”的API,这是因为当前我们使用的工作模式是 push(推),Broker 会将消息主动推送给订阅的消费者。当然也有 pull(拉) 工作模式,需要消费者主动调用 Broker 的 API 获取消息,当前项目不涉及这种模式。
  3. 在MQ中有两种应答方式,一种是自动应答,这种方式下 Broker 将消息推送给订阅的消费者后就算应答完毕。另一种应答方式是手动应答,上述确认消息(basicAck)起到的效果,是可以让消费者 显式 的告诉 Broker,这个消息我处理完毕了,提高整个系统的可靠性。

4、客户端设计概要

生产者、消费者都是客户端程序,broker 则是作为服务器,通过网络进行通信。此处设定,使用 TCP + 自定义的应用层协议 实现 生产者/消费者 和 BrokerServer 之间的交互工作,这里需要给客户端提供一组 API,让客户端的业务代码来调用,从而通过网络通信的方式远程调用 brokerserver 上的方法。

客户端核心API:

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

补充说明:

  1. 和 Broker 服务器 API 相比,客户端程序还提供了如下 4 个 API:创建 Connection、关闭 Connection、创建 Channel、关闭 Channel。
  2. 这里的一个Connection对象代表一个TCP连接。
  3. Channel 是 Connection 内部逻辑上的链接,多个Channel复用同一个TCP连接,一个Connection 中可以包含多个Channel,每个Channel负责客户端中不同的模块,其中传输的数据是互不相干的。
  4. 这样的设定主要是为了能够更好的复用 TCP 连接, 达到长连接的效果, 避免频繁的创建关闭 TCP 连接。
  5. 上述客户端提供的 API 只是给业务代码进行调用,真正的方法执行是交给了BrokerServer。这个过程称为 远程过程调用(Remote Procedure Call,简称RPC)是一种计算机通信协议,它允许程序调用另一个地址空间(通常是不同的计算机)中的过程或函数,而无需程序员显式编写网络代码。通过使用RPC,应用程序可以像调用本地进程一样调用远程服务器上的进程。

5、小结

最后简单总结一下,我们大致需要做的工作,其中涉及到的细节问题,我们后面在进行补充:

  1. 实现生产者、BrokerServer、消费者三个部分
  2. 针对生产者、消费者来说,主要编写客户端和服务器的网络通信部分。
  3. 重点实现BrokerServer,包括内部的基本概念和核心 API。
  4. 数据的持久化存储。

三、具体实现

附上连接:

1、消息队列详细设计与实现思维导图
2、Gitee 完整代码地址


http://www.kler.cn/a/135581.html

相关文章:

  • 物联网:七天构建一个闭环的物联网DEMO
  • 欧科云链研究院:ChatGPT 眼中的 Web3
  • 基于STM32的自动水满报警系统设计
  • 【信号滤波 (补充)】二阶陷波滤波代码推导过程(C++)
  • 后台运行 Python
  • linux系统(ubuntu,uos等)连接鸿蒙next(mate60)设备
  • 【高级程序设计】Week2-4Week3-1 JavaScript
  • Transformer学习资料
  • Codeforces Round 908 (Div. 2)
  • 配置命令别名
  • 【LeetCode】160. 相交链表
  • 从零带你底层实现unordered_map (1)
  • CISP全真模拟测试题(一)
  • 设计模式-责任链-笔记
  • 【Web】Ctfshow SSRF刷题记录1
  • 程序员开发者神器:10个.Net开源项目
  • Leetcode—206.反转链表【简单】
  • java基于RestTemplate的微服务发起http请求
  • k8s运维管理
  • Flutter笔记:桌面应用 窗口定制库 bitsdojo_window
  • WIFI版本云音响设置教程腾讯云平台版本
  • 基于SSM的供电公司安全生产考试系统设计与实现
  • MATLAB 嵌套switch语句||MATLAB while循环
  • C++中只能有一个实例的单例类
  • LeetCode Hot100之十:239.滑动窗口最大值
  • 网络运维与网络安全 学习笔记2023.11.19