当前位置: 首页 > article >正文

消息队列 - RabbitMQ

消息队列 - RabbitMQ

    • 1. 初识 MQ
      • 1.1 同步调用
      • 1.2 异步调用
      • 1.3.技术选型
    • 2. RabbitMQ
      • 2.1 安装
      • 2.2 收发信息
        • 2.2.1 交换机(Exchange)
        • 2.2.2 队列
        • 2.2.3 绑定关系
        • 2.2.4 发送消息
      • 2.3 数据隔离

1. 初识 MQ

微服务一旦拆分,必然涉及到服务之间的相互调用,之前讲到的基于 OpenFeign 的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢?

两种方式的区别:

  • 同步通讯:就如同打视频电话,双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。
  • 异步通讯:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。因此你可以多线操作,同时跟多人聊天。

两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信,但是往往响应会有延迟。

所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。

1.1 同步调用

我们以注册成功后邮件通知为例

image-20231207225257184

同步调用的业务流程是:

  • 发送注册请求
  • 注册成功将信息存入数据库
  • 发送邮件通知
  • 返回成功信息

这样的业务流程会存在下面三个问题

一、性能低

由于同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和:100+100+300+50 = 550ms

二、扩展性差

比如需要添加短信通知,需要在代码中添加短信服务的远程调用代码。

三、级联失效

在这个案例中,注册成功了,但是邮件通知服务出现了问题抛出了错误会让存入数据库的信息回滚导致整个注册业务失败。

需要解决这些问题就需要用异步调用的方式来代替同步调用

1.2 异步调用

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

image-20231207230036497

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。
这样,发送消息的人和接收消息的人就完全解耦了。

还是以注册通知为例:

image-20231207230635303

将远程调用逻辑删除,改为发送一条消息到Broker。而相关的微服务都可以订阅消息通知,一旦消息到达Broker,则会分发给每一个订阅了的微服务,处理各自的业务。

后续需要添加短信通知也只需要添加一个短信服务订阅消息即可。

而且注册服务的耗时也和其他服务无关。

综上,异步调用的优势包括:

  • 耦合度更低
  • 性能更好
  • 业务拓展性强
  • 故障隔离,避免级联失败

当然,异步通信也并非完美无缺,它存在下列缺点:

  • 完全依赖于Broker的可靠性、安全性和性能
  • 架构复杂,后期维护和调试麻烦

1.3.技术选型

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.
目比较常见的MQ实现:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka

2. RabbitMQ

RabbitMQ 是一个开源的消息中间件(Message Broker),用于在分布式系统中传递消息。它实现了高级消息队列协议(AMQP),这是一种网络协议,用于在应用程序之间传递消息。

以下是 RabbitMQ 的一些主要特性和概念:

  1. 消息队列: RabbitMQ 提供了一个消息队列,允许不同的应用程序通过消息进行通信。发送方将消息放入队列,而接收方则从队列中获取消息。
  2. 发布/订阅模型: RabbitMQ 支持发布/订阅模型,其中一个应用程序(发布者)发布消息,而多个应用程序(订阅者)可以订阅并接收这些消息。
  3. 可靠性: RabbitMQ 提供了持久性,确保即使在代理重新启动后,队列和消息也不会丢失。
  4. 灵活的路由: RabbitMQ 具有灵活的路由机制,可以根据不同的条件将消息路由到不同的队列。
  5. 多协议支持: RabbitMQ 支持多种消息传递协议,包括 AMQP、STOMP、MQTT 等。
  6. 插件系统: RabbitMQ 具有丰富的插件系统,可以通过插件扩展其功能,如集群、身份验证、可视化工具等。
  7. 可视化管理界面: RabbitMQ 提供了一个易于使用的管理界面,允许用户监视和管理队列、交换机等组件。
  8. 集群支持: RabbitMQ 支持构建集群,以提高可用性和性能。
  9. 消息确认: 客户端可以向 RabbitMQ 确认已成功处理消息,确保消息不会因为消费者故障而丢失。
  10. 死信队列: RabbitMQ 支持死信队列,用于处理无法被消费的消息。

2.1 安装

这里介绍 Docker 安装 RabbitMQ 的过程。

2.1.1 拉取镜像

docker pull rabbitmq:management #  拉取带有web界面的镜像

image-20231207210107857

2.1.2 启动容器

docker run -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root --name mq --hostname q -p 15672:15672 -p 5672:5672 -d rabbitmq:management
# -e RABBITMQ_DEFAULT_USER=root 用户名
# -e RABBITMQ_DEFAULT_PASS=root 密码

端口介绍:

  • 4369/tcp: Erlang Port Mapper Daemon (EPMD)端口,用于Erlang节点之间的通信。
  • 5671/tcp: AMQP over TLS/SSL端口,用于安全的AMQP连接。
  • 5672/tcp: 默认的AMQP端口,用于客户端连接和消息传递。
  • 15671/tcp: RabbitMQ Management插件的安全AMQP端口。
  • 25672/tcp: 集群端口,用于Erlang分布式节点之间的内部通信。
  • 15672/tcp: RabbitMQ Management插件的Web管理界面端口,通过Web界面进行RabbitMQ的管理。

image-20231207231928650

2.1.3 访问管理页面

http://ip地址:15672

image-20231207232019327

RabbitMQ对应的架构如图:

image-20231208134920751

其中包含几个角色:

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

2.2 收发信息

2.2.1 交换机(Exchange)

打开Exchanges选项,可以看到RabbitMQ预定义了7种交换机,这7种交换机有4种类型。
image-20231207232533500

关于交换机类型的简介:

  1. Direct Exchange(直连交换机):

    直连交换机是最简单的交换机类型。

    它将消息路由到与消息中的路由键完全匹配的队列中。

    在消息生产者指定的路由键和队列的绑定键完全相同时,消息将被发送到相应的队列。

  2. Fanout Exchange(扇出交换机):

    • 扇出交换机将消息广播到与交换机绑定的所有队列,忽略路由键。
    • 适用于广播消息给多个消费者的场景,不关心消息的具体内容。
  3. Topic Exchange(主题交换机):

    • 主题交换机通过匹配路由键和模式的方式来路由消息到队列。
    • 路由键可以包含通配符 *(匹配一个单词)和 #(匹配零个或多个单词),允许更复杂的路由规则。
  4. Headers Exchange(头交换机):

    • 头交换机使用消息的头部信息而不是路由键来决定如何路由消息。
    • 在绑定队列时指定一组键值对(headers),只有当消息的头部信息与这些键值对完全匹配时,消息才会被路由到队列。

关于预定义的7种交换机简介:

  1. AMQP default:
    • 这是一个直连交换机,是 RabbitMQ 的默认交换机。
    • 当消息的路由键与队列的绑定键完全匹配时,消息将被发送到相应的队列。
  2. amq.direct:
    • 也是一个直连交换机。
    • 类似于 AMQP 默认交换机,将消息路由到与消息中的路由键完全匹配的队列。
  3. amq.fanout:
    • 是一个扇出交换机。
    • 将消息广播到与交换机绑定的所有队列,忽略路由键。
  4. amq.headers:
    • 是一个头交换机。
    • 使用消息的头部信息而不是路由键来决定如何路由消息。在绑定队列时指定一组键值对(headers)。
  5. amq.match:
    • 是一个主题交换机。
    • 通过匹配路由键和模式的方式来路由消息到队列。支持更复杂的路由规则。
  6. amq.rabbitmq.trace:
    • 这是一个专用于消息追踪的交换机。
    • 用于在 RabbitMQ 中追踪消息的流动,通常不用于常规消息传递。
  7. amq.topic:
    • 也是一个主题交换机。
    • 通过匹配路由键和模式的方式来路由消息到队列。类似于 amq.match,支持更复杂的路由规则。
2.2.2 队列

打开Queues选项卡,新建一个队列:

image-20231208133420881

image-20231208133500631

2.2.3 绑定关系

点击 amq.fanout 交换机

image-20231208133656411

绑定队列 queue1

image-20231208133615746

image-20231208133641084

2.2.4 发送消息

点击 amq.fanout 交换机

image-20231208133925673

发送消息

image-20231208133915372

队列成功收到消息

image-20231208133941568

image-20231208133958825

2.3 数据隔离

RabbitMQ 使用 virtual host 实现数据隔离

新建一个虚拟主机

image-20231208140406859

添加一个用户

image-20231208140459340

现在用户test还没有能访问的虚拟主机

image-20231208140549326

切换登录用户test后无法获取没有访问权限的虚拟主机的队列消息。

image-20231208135723332

点击test为用户添加虚拟主机

image-20231208140634479

image-20231208140658453

右上角 虚拟主机切换为 /test 看不到其他虚拟主机的消息队列,这就是基于virtual host 的隔离效果。

image-20231208140741872


http://www.kler.cn/a/162267.html

相关文章:

  • Mysql数据库里的SSH连接
  • 准确率调整研究中心
  • C++编程技巧与规范-类和对象
  • 代码随想录第二十一天| 669. 修剪二叉搜索树 108.将有序数组转换为二叉搜索树 538.把二叉搜索树转换为累加树
  • 开发语言中,堆区和栈区的区别
  • 山泽光纤HDMI线:铜线的隐藏力量
  • Elasticsearch、Logstash、Kibana(ELK)环境搭建
  • 阿里云通义千问720亿参数模型开源,适配企业级、科研级高性能应用
  • 2023第十二届“认证杯”数学中国数学建模国际赛赛题A完整解析
  • 文本润色工具有哪些,高质量的文本润色软件
  • 57、postgresql 查询流程
  • 聊聊模糊测试,以及几种模糊测试工具的介绍!
  • 解决Flutter运行报错Could not run build/ios/iphoneos/Runner.app
  • win10编译golang程序报病毒错误
  • js中数组对象去重的方法
  • 【mysql】事物与隔离级别
  • CKafka 一站式搭建数据流转链路,助力长城车联网平台降低运维成本
  • 模块式雨水调蓄池施工简单,无需大型机械,可实现当天开挖当天回填
  • Python Appium Selenium 查杀进程的实用方法
  • 微信小程序 - PC端选择ZIP文件
  • 微软 Power Platform 零基础 Power Pages 网页搭建实际案例实践(三)
  • 013 OpenCV copyMakeBorder(padding)
  • vscode如何在没有网络的情况下安装插件
  • Python 中 Thread 线程的用法
  • 非常抱歉的通知
  • 在imx6ull中加入ov5640模块