【RabbitMQ】Spring Boot 结合 RabbitMQ 完成应用间的通信
🔥个人主页: 中草药
🔥专栏:【中间件】企业级中间件剖析
Spring 框架与 RabbitMQ 的整合主要通过 Spring AMQP(Advanced Message Queuing Protocol)模块实现,提供了便捷的消息队列开发能力。
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置
#配置RabbitMQ的基本信息
spring:
rabbitmq:
host: 110.41.51.65
port: 15673 #默认为5672
username: study
password: study
virtual-host: bite #默认值为
#或者以下这种方式
rabbitmq:
addresses: #amqp://username:password@Ip:port/virtual-host
一、工作队列模式(Work Queue)
场景:多个消费者共享一个队列,消息被轮询分发(Round-Robin),用于任务分发和负载均衡(如耗时任务处理)。
producer
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/work")
public String work(){
//使用内置交换机 RoutingKey 和队列名称一致
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE,"Hello World"+i);
}
return "OK";
}
}
@RabbitListener
是 Spring 框架中用于监听 RabbitMQ 队列的注解,通过使用这个注解,可以定义一个方法,以便从 RabbitMQ 队列中接收消息(相当于消费者)。该注解支持多种参数类型,这些参数类型代表了从 RabbitMQ 接收到的消息和相关信息。
以下是一些常用的参数类型:
String
:返回消息的内容。Message
(org.springframework.amqp.core.Message
):Spring AMQP 的 Message 类,返回原始的消息体以及消息的属性,如消息 ID、内容、队列信息等。Channel
(com.rabbitmq.client.Channel
):RabbitMQ 的通道对象,可以用于进行更高级的操作,如手动确认消息 。
@Component
public class WorkListener {
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener1(Message message, Channel channel) {
System.out.println("Listener1:["+Constants.WORK_QUEUE+"]"+message+",channel:"+channel);
}
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener2(String message) {
System.out.println("Listener2:["+Constants.WORK_QUEUE+"]"+message);
}
}
观察控制台
轮询分发:默认按顺序将消息分发给不同消费者。
二、发布订阅模式(Publish/Subscribe)
场景:消息广播到所有绑定的队列(如日志广播、事件通知),使用 Fanout 交换机。
声明使用交换机和队列 并绑定
@Configuration
public class RabbitMQConfig {
@Bean("fanoutQueue1")
public Queue funoutQueue1() {
return QueueBuilder.durable(Constants.FUNOUT_QUEUE1).build();
}
@Bean("fanoutQueue2")
public Queue funoutQueue2() {
return QueueBuilder.durable(Constants.FUNOUT_QUEUE2).build();
}
@Bean("funoutExchange")
public FanoutExchange funoutExchange(){
return ExchangeBuilder.fanoutExchange(Constants.FUNOUT_EXCHANGE).durable(true).build();
}
@Bean("funoutQueueBinding1")
public Binding funoutQueueBinding1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("funoutExchange") FanoutExchange funoutExchange){
return BindingBuilder.bind(queue).to(funoutExchange);
}
@Bean("funoutQueueBinding2")
public Binding funoutQueueBinding2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("funoutExchange") FanoutExchange funoutExchange){
return BindingBuilder.bind(queue).to(funoutExchange);
}
}
producer
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/funout")
public String funout(){
rabbitTemplate.convertAndSend(Constants.FUNOUT_EXCHANGE,"","Hello Spring funout");
return "OK";
}
}
listener
@Component
public class FunoutListener {
@RabbitListener(queues = Constants.FUNOUT_QUEUE1)
public void queueListener1(String message) {
System.out.println("Listener1:["+Constants.FUNOUT_QUEUE1+"]"+message);
}
@RabbitListener(queues = Constants.FUNOUT_QUEUE2)
public void queueListener2(String message) {
System.out.println("Listener2:["+Constants.FUNOUT_QUEUE2+"]"+message);
}
}
在实际开发之中,一个 listener 监听一个 queue
三、路由模式(Routing)
场景:根据 路由键(Routing Key) 精准匹配,将消息发送到指定队列,使用 Direct 交换机。
声明使用交换机和队列 并绑定
@Configuration
public class RabbitMQConfig {
//direct
@Bean("directQueue1")
public Queue directQueue1() {
return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
}
@Bean("directQueue2")
public Queue directQueue2() {
return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
}
@Bean("directExchange")
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
}
@Bean("directQueueBinding1")
public Binding directQueueBinding1(@Qualifier("directQueue1") Queue queue,@Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("orange");
}
@Bean("directQueueBinding2")
public Binding directQueueBinding2(@Qualifier("directQueue2") Queue queue,@Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("black");
}
@Bean("directQueueBinding3")
public Binding directQueueBinding3(@Qualifier("directQueue2") Queue queue,@Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("orange");
}
}
producer
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/direct/{routingKey}")
public String direct(@PathVariable String routingKey){
rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routingKey,"Hello Spring direct "+routingKey);
return "direct OK";
}
}
listener
@Component
public class DirectListener {
@RabbitListener(queues = Constants.DIRECT_QUEUE1)
public void queueListener1(String message) {
System.out.println("Listener1:["+Constants.DIRECT_QUEUE1+"]"+message);
}
@RabbitListener(queues = Constants.DIRECT_QUEUE2)
public void queueListener2(String message) {
System.out.println("Listener2:["+Constants.DIRECT_QUEUE2+"]"+message);
}
}
如果某一个队列即绑定了black和orange,将会分别发送到队列
四、通配符模式(Topics)
场景:根据路由键的 通配符规则 匹配队列,使用 Topic 交换机,支持 *
(匹配一个单词)和 #
(匹配多个单词)。
Topics 和 Routing 模式的区别是:
- topics 模式使用的交换机类型为 topic (Routing 模式使用的交换机类型为 direct)
- topic 类型的交换机在匹配规则上进行了扩展,Binding Key 支持通配符匹配
声明使用交换机和队列 并绑定
@Configuration
public class RabbitMQConfig {
//TOPIC
@Bean("topicExchange")
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
}
@Bean("topicQueue1")
public Queue topicQueue1(){
return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
}
@Bean("topicQueue2")
public Queue topicQueue2(){
return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
}
@Bean("topicQueueBinding1")
public Binding topicQueueBinding1(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("topicQueue1") Queue topicQueue){
return BindingBuilder.bind(topicQueue).to(topicExchange()).with("*.orange.*");
}
@Bean("topicQueueBinding2")
public Binding topicQueueBinding2(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("topicQueue2") Queue topicQueue){
return BindingBuilder.bind(topicQueue).to(topicExchange).with("*.*.rabbit");
}
@Bean("topicQueueBinding3")
public Binding topicQueueBinding3(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("topicQueue2") Queue topicQueue){
return BindingBuilder.bind(topicQueue).to(topicExchange).with("lazy.#");
}
}
producer
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/topic/{routingKey}")
public String topic(@PathVariable String routingKey){
rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey,"Hello Spring topic "+routingKey);
return "topic OK";
}
}
listener
@Component
public class TopicListener {
@RabbitListener(queues = Constants.TOPIC_QUEUE1)
public void queueListener1(String message) {
System.out.println("Listener1:["+Constants.TOPIC_QUEUE1+"]"+message);
}
@RabbitListener(queues = Constants.TOPIC_QUEUE2)
public void queueListener2(String message) {
System.out.println("Listener2:["+Constants.TOPIC_QUEUE2+"]"+message);
}
}
五、完成应用通信
SpringBoot 整合 RabbitMQ 实现应用通信是微服务/分布式系统中常见的异步解耦方案。
以此图为实例
订单系统
@Configuration
public class RabbitMQConfig {
@Bean("orderQueue")
public Queue orderQueue() {
return QueueBuilder.durable("order.create").build();
}
}
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/create")
public String create(){
rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+ UUID.randomUUID());
//这里仅仅是模拟演示,实际的下单操作比较复杂,包括参数的校验,数据库存储等等 业务代码省略
return "下单成功";
}
}
物流系统
@Component
@RabbitListener(queues = "order.create")
public class OrderListener {
@RabbitHandler
//该注解根据所识别的数据类型不同自动分配不同的方法
public void handleOrder(String orderInfo) {
System.out.println("接收到新的订单消息:"+orderInfo);
//接收到订单消息后,进行相应的业务出路 代码省略
}
@RabbitHandler
//在此处为方便演示我们将order-service 让此项目应用
//正确的做法是将OrderInfo抽取出来单独作为一个包,两个service都引用这个包
public void handleOrder2(OrderInfo orderInfo) {
System.out.println("接收到新的订单消息:"+orderInfo);
}
}
@RabbitHandler
注解用于标记方法,这些方法会根据消息的类型来处理接收到的消息。当一个消息监听器容器接收到消息时,它会根据消息的类型选择合适的 @RabbitHandler
注解的方法来处理该消息。
测试结果
当在发送的是一个对象时,为保证对象的可读性,我们要保证对象可被序列化,且通过 Jackson2JsonMessageConverter 将其从原生序列化改为Json格式
@Configuration
public class RabbitMQConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
}
对于Listener,为保证同样具有解读json的能力,也应该去加上相同的配置
@Data
public class OrderInfo implements Serializable {//实现序列化接口
private String orderId;
private String name;
}
可观察结果
对时间的慷慨,就等于慢性自杀。——奥斯特洛夫斯基
🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀
以上,就是本期的全部内容啦,若有错误疏忽希望各位大佬及时指出💐
制作不易,希望能对各位提供微小的帮助,可否留下你免费的赞呢🌸