当前位置: 首页 > article >正文

深入浅出消息队列 (MQ)

深入浅出消息队列 (MQ)

1. 前言

在分布式系统中,消息队列 (Message Queue, MQ) 经常被用来实现异步解耦、流量削峰等功能。它能够让系统在高并发环境下更具弹性,并且在各个模块之间实现“松耦合”的交互。

  • 什么是消息队列?
    消息队列是指在消息传输过程中保存消息的容器,可以帮助不同进程/应用之间异步通信。
  • 你可以把消息队列想象成一个“菜鸟驿站”。就像你网购时,快递先送到驿站,你再去取货一样,消息队列里先存储消息,等消费者来“取走”处理。这样,生产消息的部分(就像快递员)和处理消息的部分(就像取快递的人)就不会因为时间不匹配而互相拖累。简单来说,菜鸟驿站帮助解决快递和取件的时差问题,消息队列则解决了系统中生产和消费的速度差异问题。
  • image-20250314212026844
  • 为什么要使用消息队列?
    • 系统解耦:生产者和消费者不必直接调用。
    • 异步处理:生产者把任务丢到队列即可,后续由消费者异步执行。
    • 流量削峰:将突发的请求平滑化处理,防止服务被压垮。
    • 数据分发:在分布式环境中,能快速地把数据分发给不同的消费者。

2. MQ 的基本概念

2.1 Producer / Consumer / Broker

  • 生产者(Producer):负责生产消息并发送到消息队列。
  • 消费者(Consumer):负责从队列中获取消息并进行处理。
  • 服务器(Broker):消息队列的核心组件,管理和存储消息(也可称为“消息中间件”)。

2.2 Queue / Topic

  • 队列(Queue):点对点(Point to Point, P2P)模式常用的数据结构,消息只能被一个消费者消费。
  • 主题(Topic):发布订阅(Pub/Sub)模式中,多个消费者可同时订阅并消费同一条消息。

3. 常见的消息队列实现

特性KafkaRocketMQRabbitMQActiveMQ
单机吞吐量10 万级10 万级万级10 万级
开发语言ScalaJavaErlangJava
高可用分布式分布式主从分布式
消息延迟ms 级ms 级us 级ms 级
消息丢失理论上不会丢失理论上不会丢失
消费模式拉取推拉推拉推拉
持久化文件文件内存、文件内存、文件、数据库
支持协议自定义协议自定义协议AMQP、XMPP、SMTP、STOMPAMQP、MQTT、OpenWire、STOMP
社区活跃度
管理界面(无/一般)web console一般
部署难度(未标注)(未标注)
部署方式独立独立独立独立、嵌入
成熟度成熟比较成熟成熟成熟
综合评价优点:拥有强大的性能及吞吐量,兼容性很好。
缺点:由于支持消息堆积,导致延迟较高。
优点:性能好,稳定可靠,活跃的中文社区,特高的响应能力。
缺点:随着规模扩大,扩展性问题需改善。
优点:产品成熟,容易部署和使用,路由配置灵活。
缺点:性能和吞吐量较差,不易进行二次开发。
优点:产品成熟,支持多种协议,支持多种语言客户端。
缺点:社区不够活跃,可能存在消息丢失风险。

4. 消息队列的使用场景

4.1异步处理

image-20250314223318585

如果没有引入MQ进行架构改造,每次支付成功后的大量同步接口调用,耗时大,通过引入MQ,当更新完订单状态和扣减成功库存后,就发一条“支付成功”消息到MQ中,然后立即返回。而信息、积分、优惠券系统会订阅MQ中的该类消息,它们收到通知后就会去异步处理。整个系统的响应时间可以大大缩短,

4.2 削峰/限流

先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。

image-20250314222512088

像之前黑马点评项目中的异步秒杀下单,判断库存,检验一人一单,只要满足这两个条件就一定可以下单成功,不用等数据真的写进数据库中,可以直接告诉用户下单成功,只需要将订单id等信息引入异步队列记录,后台再开一个线程慢慢去执行队列中的消息就行,有效提高效率。

4.3 解耦

解耦和异步是同生同源的,我们经过上述的异步化改造后,自然而然的就已经将各个系统解耦出去了.如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。

5. 示例:如何使用 RabbitMQ

安装与配置

  1. RabbitMQ 官方下载
  2. 安装后可通过管理控制台查看队列状态。

第一步,配置pom包。

创建Spring Boot项目并在pom.xml文件中添加spring-bootstarter-amqp等相关组件依赖:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在上面的示例中,引入Spring Boot自带的amqp组件spring-bootstarter-amqp。

第二步,修改配置文件。

修改application.properties配置文件,配置rabbitmq的host地址、端口以及账户信息。

spring.rabbitmq.host=10.2.1.231
spring.rabbitmq.port=5672
spring.rabbitmq.username=zhangweizhong
spring.rabbitmq.password=weizhong1988
spring.rabbitmq.virtualHost=order

在上面的示例中,主要配置RabbitMQ服务的地址。RabbitMQ配置由spring.rabbitmq.*配置属性控制。virtual-host配置项指定RabbitMQ服务创建的虚拟主机,不过这个配置项不是必需的。

第三步,创建消费者

消费者可以消费生产者发送的消息。接下来创建消费者类Consumer,并使用@RabbitListener注解来指定消息的处理方法。示例代码如下:

@Component
public class Consumer {
    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("rabbitmq_queue"))
    public void process(String message) {
        System.out.println("消费者消费消息111=====" + message);
    }
}

在上面的示例中,Consumer消费者通过@RabbitListener注解创建侦听器端点,绑定rabbitmq_queue队列。

(1)@RabbitListener注解提供了@QueueBinding、@Queue、@Exchange等对象,通过这个组合注解配置交换机、绑定路由并且配置监听功能等。

(2)@RabbitHandler注解为具体接收的方法。

第四步,创建生产者

生产者用来产生消息并进行发送,需要用到RabbitTemplate类。与之前的RedisTemplate类似,RabbitTemplate是实现发送消息的关键类。示例代码如下:

@Component
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void produce() {
        String message = new Date() + "Beijing";
        System.out.println("生产者产生消息=====" + message);
        rabbitTemplate.convertAndSend("rabbitmq_queue", message);
    }
}

如上面的示例所示,RabbitTemplate提供了 convertAndSend方法发送消息。convertAndSend方法有routingKey和message两个参数:

(1)routingKey为要发送的路由地址。

(2)message为具体的消息内容。发送者和接收者的queuename必须一致,不然无法接收。


http://www.kler.cn/a/585260.html

相关文章:

  • 提升开发效率的FPGA/IC小工具
  • CSS3学习教程,从入门到精通,CSS3 选择器语法知识点及案例代码(3)
  • 区块链技术与 DICT(数字化信息与通信技术)的结合
  • 江苏无锡一家汽车零部件企业终止,拓展氢燃料电池存不确定性
  • 网易爆米花 1.8.2| 免费无广告,智能刮削,聚合6大网盘,全端无缝看片
  • ArcGIS助力水文分析:数据处理、地图制作与流域特征提取
  • uni-app打包h5并部署到nginx,路由模式history
  • QKV矩阵:优维大模型自注意力机制的数学之美
  • TCP 采用三次握手建立连接的原因
  • 30天学习Java第六天——Object类
  • C++ const 使用
  • Android源码学习之Overlay
  • 实现手机手势签字功能
  • 在线教育网站项目第四步 :学习Vue3 + Nuxt3+springcloud,服务器为ubuntu24.04
  • 日志Python安全之SSTI——Flask/Jinja2
  • go中实现子模块调用main包中函数的方法
  • 外星人入侵-Python-二
  • 12 DHCP的内容和HTTP的改良
  • 车载以太网测试-11【网络层-ICMP协议】
  • 02-Canvas-fabric.BaseBrush绘图工具