MQ消息队列
一、简介
1.1 同步调用
微服务中基于Feign的调用就属于同步方式。
1.2 异步调用
异步调用常见实现是事件驱动模式。
优势:
(1)服务解耦
(2)性能提升,吞吐量提高
(3)服务没有强依赖,不担心级联失败问题
(4)流量削峰
缺点:
(1)依赖于Broker的可靠性、安全性、吞吐能力
(2)架构复杂了,业务没有明显的流程线,不方便追踪管理
1.3 MQ
MQ(MessageQueue,消息队列),也就是Broker。
MQ的实现包括RabbitMQ,ActiveMQ,RocketMQ,Kafka。
二、安装
2.1 单机部署
采取方式二,上传tar包:
导入镜像:
安装MQ,执行下面的命令来运行MQ容器:
docker run \
-e RABBITMQ_REFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mql \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
说明:
上述指令开放了两个端口,一个是15672是管理平台的端口,一个5672是发送消息的端口。
-d 是后台运行
“ rabbitmq:3-management”是镜像名称。
访问rabbitmq管理界面:
上图中的Connections表示连接,Channels是通道,消息基于通道发送与接受。
Exchanges是交换机。
Queues是消息队列,来存储交换的信息。
Admin是用户管理,可以新增用户并修改用户权限;创建一个用户有一个VirtualHost虚拟主机,不同用户处于不同的虚拟主机,互相不可见。
三、消息模型
MO官方文档中给出了5个MQ的DEMO实例。
3.1基本消息模型的案例
生产者代码:
消费者代码:
说明:消费者和生产者都要创建通道和队列,因为不知道哪个先执行,实际上两者的创建只会执行一次。
四、SpringAMQP
AMQP(Advanced Message Queuing Protocol)是用于应用程序或之间传递业务消息的开放标准。该协议与平台无关,更符合微服务中独立性的要求。
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包括两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
网址:https://spring.io/projects/spring-amqp
4.1 案例-实现HelloWorld中的基础消息队列
步骤如下。
(一)引入依赖
在父工程中引入依赖
(二)消息发送服务编写
在publisher服务中编写application.yml,添加mq连接消息
编写测试类
运行测试类,在rabbitmq管理界面查看结果:
(三)消息接收服务编写
在consumer中编写消费逻辑,监听simple.queue。
在publisher服务中编写application.yml,添加mq连接消息
编写测试类:
运行:
可以查看rabbitmq管理平台,queues中的消息已经没有了:
4.2 Work Queue工作队列
注意:
若是queue中消息满了,再有消息会被丢弃,所以可以用多个consumer来提高消息处理速度。
Work Queue工作队列中,一个pulisher往队列上发消息,两个consumer接收消息。
在4.1案例的基础上修改,修改publisher代码:
修改consumer代码:
效果如下:
补充:
为防止接收多个消息却处理不过来,consumer服务可以设置如下:
4.3 发布(Publish)订阅(Subscribe)模式
前面的基础消息队列和工作队列中,一个publisher的一个消息只能被一个consumer处理,处理完后消息便从queue上消失。
发布订阅模式允许将同一消息发给多个消费者(适用于一个事件触发多个服务的场景),实现方式是加入了exchange(交换机)。
常见exchange类型包括:
fanout(广播)、Direct(路由)、Topic(话题)
注意:exchange负责消息路由,而不是存储,路由失败则消息丢失。
4.3.1 Fanout Exchange
fanout exchange会将接收到的消息路由到每一个跟其绑定的queue。
(一)在consumer服务中声明exchange、queue即绑定关系binding
在rabbitmq管理平台进行查看:
(二)在消费者中添加监听的方法
(三)修改publisher发送消息的代码
4.3.2 DirectExchange路由模式
Direct Exchange会将接收到的消息根据规则路由到指定的queue,因此称为路由模式。
每一个queue都与exchange设置bindingkey(可以设置多个)。
发布者发布消息时,指定消息的routingkey.
exchange将消息路由到bindingkey与消息routingkey一致的队列。
(一)在consumer里利用@RabbitListener声明Exchange、Queue、RoutingKey
前面声明exchange,queue使用的是配置类加@bean的方式,其实可以省略,之间使用注解。
4.3.3 TopicExchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以**.分割。主要特点是routingkey可以使用通配符**。
4.4 消息转换器
在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
例如:发送map对象
可以看到message里的数据使用默认方法完成序列化结果如下:
改用JSON方式序列化步骤如下:
(一)引入依赖
(二)publisher声明序列化方法
(三)consumer监听类代码修改