深入浅出消息队列 (MQ)
深入浅出消息队列 (MQ)
1. 前言
在分布式系统中,消息队列 (Message Queue, MQ) 经常被用来实现异步解耦、流量削峰等功能。它能够让系统在高并发环境下更具弹性,并且在各个模块之间实现“松耦合”的交互。
- 什么是消息队列?
消息队列是指在消息传输过程中保存消息的容器,可以帮助不同进程/应用之间异步通信。 - 你可以把消息队列想象成一个“菜鸟驿站”。就像你网购时,快递先送到驿站,你再去取货一样,消息队列里先存储消息,等消费者来“取走”处理。这样,生产消息的部分(就像快递员)和处理消息的部分(就像取快递的人)就不会因为时间不匹配而互相拖累。简单来说,菜鸟驿站帮助解决快递和取件的时差问题,消息队列则解决了系统中生产和消费的速度差异问题。
- 为什么要使用消息队列?
- 系统解耦:生产者和消费者不必直接调用。
- 异步处理:生产者把任务丢到队列即可,后续由消费者异步执行。
- 流量削峰:将突发的请求平滑化处理,防止服务被压垮。
- 数据分发:在分布式环境中,能快速地把数据分发给不同的消费者。
2. MQ 的基本概念
2.1 Producer / Consumer / Broker
- 生产者(Producer):负责生产消息并发送到消息队列。
- 消费者(Consumer):负责从队列中获取消息并进行处理。
- 服务器(Broker):消息队列的核心组件,管理和存储消息(也可称为“消息中间件”)。
2.2 Queue / Topic
- 队列(Queue):点对点(Point to Point, P2P)模式常用的数据结构,消息只能被一个消费者消费。
- 主题(Topic):发布订阅(Pub/Sub)模式中,多个消费者可同时订阅并消费同一条消息。
3. 常见的消息队列实现
特性 | Kafka | RocketMQ | RabbitMQ | ActiveMQ |
---|---|---|---|---|
单机吞吐量 | 10 万级 | 10 万级 | 万级 | 10 万级 |
开发语言 | Scala | Java | Erlang | Java |
高可用 | 分布式 | 分布式 | 主从 | 分布式 |
消息延迟 | ms 级 | ms 级 | us 级 | ms 级 |
消息丢失 | 理论上不会丢失 | 理论上不会丢失 | 低 | 低 |
消费模式 | 拉取 | 推拉 | 推拉 | 推拉 |
持久化 | 文件 | 文件 | 内存、文件 | 内存、文件、数据库 |
支持协议 | 自定义协议 | 自定义协议 | AMQP、XMPP、SMTP、STOMP | AMQP、MQTT、OpenWire、STOMP |
社区活跃度 | 高 | 中 | 高 | 高 |
管理界面 | (无/一般) | web console | 好 | 一般 |
部署难度 | 中 | (未标注) | 低 | (未标注) |
部署方式 | 独立 | 独立 | 独立 | 独立、嵌入 |
成熟度 | 成熟 | 比较成熟 | 成熟 | 成熟 |
综合评价 | 优点:拥有强大的性能及吞吐量,兼容性很好。 缺点:由于支持消息堆积,导致延迟较高。 | 优点:性能好,稳定可靠,活跃的中文社区,特高的响应能力。 缺点:随着规模扩大,扩展性问题需改善。 | 优点:产品成熟,容易部署和使用,路由配置灵活。 缺点:性能和吞吐量较差,不易进行二次开发。 | 优点:产品成熟,支持多种协议,支持多种语言客户端。 缺点:社区不够活跃,可能存在消息丢失风险。 |
4. 消息队列的使用场景
4.1异步处理
如果没有引入MQ进行架构改造,每次支付成功后的大量同步接口调用,耗时大,通过引入MQ,当更新完订单状态和扣减成功库存后,就发一条“支付成功”消息到MQ中,然后立即返回。而信息、积分、优惠券系统会订阅MQ中的该类消息,它们收到通知后就会去异步处理。整个系统的响应时间可以大大缩短,
4.2 削峰/限流。
先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。
像之前黑马点评项目中的异步秒杀下单,判断库存,检验一人一单,只要满足这两个条件就一定可以下单成功,不用等数据真的写进数据库中,可以直接告诉用户下单成功,只需要将订单id等信息引入异步队列记录,后台再开一个线程慢慢去执行队列中的消息就行,有效提高效率。
4.3 解耦
解耦和异步是同生同源的,我们经过上述的异步化改造后,自然而然的就已经将各个系统解耦出去了.如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。
5. 示例:如何使用 RabbitMQ
安装与配置
- RabbitMQ 官方下载
- 安装后可通过管理控制台查看队列状态。
第一步,配置pom包。
创建Spring Boot项目并在pom.xml文件中添加spring-bootstarter-amqp等相关组件依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在上面的示例中,引入Spring Boot自带的amqp组件spring-bootstarter-amqp。
第二步,修改配置文件。
修改application.properties配置文件,配置rabbitmq的host地址、端口以及账户信息。
spring.rabbitmq.host=10.2.1.231
spring.rabbitmq.port=5672
spring.rabbitmq.username=zhangweizhong
spring.rabbitmq.password=weizhong1988
spring.rabbitmq.virtualHost=order
在上面的示例中,主要配置RabbitMQ服务的地址。RabbitMQ配置由spring.rabbitmq.*配置属性控制。virtual-host配置项指定RabbitMQ服务创建的虚拟主机,不过这个配置项不是必需的。
第三步,创建消费者
消费者可以消费生产者发送的消息。接下来创建消费者类Consumer,并使用@RabbitListener注解来指定消息的处理方法。示例代码如下:
@Component
public class Consumer {
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue("rabbitmq_queue"))
public void process(String message) {
System.out.println("消费者消费消息111=====" + message);
}
}
在上面的示例中,Consumer消费者通过@RabbitListener注解创建侦听器端点,绑定rabbitmq_queue队列。
(1)@RabbitListener注解提供了@QueueBinding、@Queue、@Exchange等对象,通过这个组合注解配置交换机、绑定路由并且配置监听功能等。
(2)@RabbitHandler注解为具体接收的方法。
第四步,创建生产者
生产者用来产生消息并进行发送,需要用到RabbitTemplate类。与之前的RedisTemplate类似,RabbitTemplate是实现发送消息的关键类。示例代码如下:
@Component
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void produce() {
String message = new Date() + "Beijing";
System.out.println("生产者产生消息=====" + message);
rabbitTemplate.convertAndSend("rabbitmq_queue", message);
}
}
如上面的示例所示,RabbitTemplate提供了 convertAndSend方法发送消息。convertAndSend方法有routingKey和message两个参数:
(1)routingKey为要发送的路由地址。
(2)message为具体的消息内容。发送者和接收者的queuename必须一致,不然无法接收。