SpringBoot集成多个rabbitmq
1、pom文件
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.9</version>
</dependency>
2、rabbitmq的连接配置文件
spring:
rabbitmq:
mq1:
host: xxx.xxx.xxx.xxx
port: 5672
username: xxxx
password: xxxxx
enable: true
mq2:
host: xxx.xxx.xxx.xxx
port: 5672
username: xxxxx
password: xxxxx
enable: true
3、mq1的相关代码 MQ1RabbitConfiguration.java
package com.pojo.config;
import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
@Data
@Component("mq1RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq1") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true") //是否启用
public class MQ1RabbitConfiguration {
private String host;
private Integer port;
private String username;
private String password;
@Autowired
private ReturnCallBack1 returnCallBack1;
@Autowired
private ConfirmCallBack1 confirmCallBack1;
@Bean(name = "mq1ConnectionFactory")
//命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此
@Primary
public ConnectionFactory createConnectionFactory() {
//消息队列1的连接
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
//开启发送到交换机和队列的回调
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
return connectionFactory;
}
@Bean(name = "mq1RabbitTemplate")
//命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此
@Primary
public RabbitTemplate brainRabbitTemplate(@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {
//消息生产
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//发送消息时设置强制标志,仅当提供了returnCallback时才适用
rabbitTemplate.setMandatory(true);
//确保消息是否发送到交换机,成功与失败都会触发
rabbitTemplate.setConfirmCallback(confirmCallBack1);
//确保消息是否发送到队列,成功发送不触发,失败触发
rabbitTemplate.setReturnsCallback(returnCallBack1);
return rabbitTemplate;
}
@Bean(name = "simpleRabbitListenerContainerFactory1")
@Primary
public SimpleRabbitListenerContainerFactory firstFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean(name = "subQueue01")
public Queue firstQueue() {
return new Queue("subQueue01");
}
@Bean(name = "subQueue02")
public Queue secondQueue() {
return new Queue("subQueue02");
}
@Bean(name = "subQueue03")
public Queue thirdQueue() {
return new Queue("subQueue03", true);
}
@Bean(name = "subQueue04")
public Queue fourQueue() {
return new Queue("subQueue04", true);
}
@Bean(name = "topicExchangeOne")
public TopicExchange topicExchange() {
// Direct exchange(直连交换机)
// Fanout exchange(扇型交换机)
// Topic exchange(主题交换机)
// Headers exchange(头交换机)
// Dead Letter Exchange(死信交换机)
return new TopicExchange("topicExchangeOne");
}
@Bean(name = "binding1")
public Binding binding1(@Qualifier("subQueue01") Queue queue, TopicExchange exchange) {
//绑定队列1到TopicExchange routingKey是队列1的队列名
return BindingBuilder.bind(queue).to(exchange).with("subQueue01");
}
@Bean(name = "fanoutExchangeOne")
public FanoutExchange fanoutExchange() {
// Direct exchange(直连交换机)
// Fanout exchange(扇型交换机)
// Topic exchange(主题交换机)
// Headers exchange(头交换机)
// Dead Letter Exchange(死信交换机)
return new FanoutExchange("fanoutExchangeOne");
}
@Bean(name = "binding3")
public Binding binding3(@Qualifier("subQueue03") Queue queue, FanoutExchange exchange) {
//绑定队列3到fanoutExchange 队列3和队列4都能消费fanoutExchange的消息
return BindingBuilder.bind(queue).to(exchange);
}
@Bean(name = "binding4")
public Binding binding4(@Qualifier("subQueue04") Queue queue, FanoutExchange exchange) {
//绑定队列4到fanoutExchange 队列3和队列4都能消费fanoutExchange的消息
return BindingBuilder.bind(queue).to(exchange);
}
}
ConfirmCallBack1 .java
package com.pojo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ConfirmCallBack1 implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if (!ack) {
log.info("ConfirmCallBack1消息发送交换机失败:{}", s);
} else {
log.info("ConfirmCallBack1消息发送交换机成功");
}
}
}
ReturnCallBack1.java
package com.pojo.config;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ReturnCallBack1 implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("ReturnCallBack1消息发送队列失败:{}", JSON.toJSON(returnedMessage));
}
}
4、mq2的相关代码
MQ2RabbitConfiguration.java
package com.pojo.config;
import lombok.Data;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Data
@Component("mq2RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq2") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq2.enable", havingValue = "true") //是否启用
public class MQ2RabbitConfiguration {
private String host;
private Integer port;
private String username;
private String password;
@Autowired
private ReturnCallBack2 returnCallBack2;
@Autowired
private ConfirmCallBack2 confirmCallBack2;
@Bean(name = "mq2ConnectionFactory") //命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此
public ConnectionFactory createConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
//开启发送到交换机和队列的回调
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
return connectionFactory;
}
@Bean(name = "mq2RabbitTemplate") //命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此
public RabbitTemplate brainRabbitTemplate(@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//发送消息时设置强制标志,仅当提供了returnCallback时才适用
rabbitTemplate.setMandatory(true);
//确保消息是否发送到交换机,成功与失败都会触发
rabbitTemplate.setConfirmCallback(confirmCallBack2);
//确保消息是否发送到队列,成功发送不触发,失败触发
rabbitTemplate.setReturnsCallback(returnCallBack2);
return rabbitTemplate;
}
@Bean(name = "simpleRabbitListenerContainerFactory2")
public SimpleRabbitListenerContainerFactory secondFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
}
ConfirmCallBack2.java
package com.pojo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ConfirmCallBack2 implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if (!ack) {
log.info("ConfirmCallBack2消息发送交换机失败:{}", s);
} else {
log.info("ConfirmCallBack2消息发送交换机成功");
}
}
}
ReturnCallBack2.java
package com.pojo.config;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ReturnCallBack2 implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("ReturnCallBack2消息发送队列失败:{}", JSON.toJSON(returnedMessage));
}
}
5、消息生产者
package com.pojo.prj.controller;
import com.pojo.common.anno.NoNeedLogin;
import com.pojo.common.base.ApplicationContextUtils;
import com.pojo.common.base.BaseController;
import com.pojo.util.ResponseResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* <p>
* 项目表 前端控制器
* </p>
*
* @author zhushangjin
* @menu 项目管理
* @since 2022-11-14
*/
@RestController
@Slf4j
public class ProjectController extends BaseController {
@Resource(name = "mq1RabbitTemplate")
//初始化mq1的RabbitTemplate对象
private RabbitTemplate mq1RabbitTemplate;
@Resource(name = "mq2RabbitTemplate")
//初始化mq1的RabbitTemplate对象
private RabbitTemplate mq2RabbitTemplate;
/**
* 获取项目下拉列表
*
* @return
* @status done
*/
@GetMapping("/prj/project/list")
@NoNeedLogin
public ResponseResult<String> list() {
String active = ApplicationContextUtils.getActiveProfile();
logger.error(ApplicationContextUtils.getActiveProfile());
return ResponseResult.ok("ReturnCallBack2");
}
@GetMapping("/prj/project/test1")
public ResponseResult test1() {
//发送到topicExchangeOne类型的交换机,根据routekey去找发送到哪个队列里,
// 只有这一个队列才能收到这条消息
String str = "test1test1test1test1test1";
mq1RabbitTemplate.convertAndSend("topicExchangeOne","subQueue01", str);
return buildResponseResult(true);
}
@GetMapping("/prj/project/test2")
public ResponseResult test2() {
//发送到direct类型的交换机,根据routekey去找发送到哪个队列里,
//只有这一个队列才能收到这条消息
mq2RabbitTemplate.convertAndSend("subQueue02", "test2test2test2test2test2");
return buildResponseResult(true);
}
@GetMapping("/prj/project/test3")
public ResponseResult test3() {
//发送到fanout类型的交换机,跟这个交换机绑定的队列都会收到这一条消息,
// 故第二个参数routekey无需填写
mq1RabbitTemplate.convertAndSend("fanoutExchangeOne", null, "test3test3test3test3test3");
return buildResponseResult(true);
}
}
6、消息消费者
Receiver1.java
package com.pojo.config;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "subQueue01", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver1 {
@RabbitHandler(isDefault = true)
public void process(String hello) {
System.out.println("Receiver1: " + hello);
}
}
Receiver2.java
package com.pojo.config;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "subQueue02", containerFactory = "simpleRabbitListenerContainerFactory2")
public class Receiver2 {
@RabbitHandler(isDefault = true)
public void process(String hello) {
System.out.println("Receiver2: " + hello);
}
}
Receiver3.java
package com.pojo.config;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "subQueue03", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver3 {
@RabbitHandler(isDefault = true)
public void process(String hello) {
System.out.println("Receiver3 : " + hello);
}
}
Receiver4.java
package com.pojo.config;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "subQueue04", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver4 {
@RabbitHandler(isDefault = true)
public void process(String hello) {
System.out.println("Receiver4 : " + hello);
}
}
创建队列
@Bean(name = "uavTopicQueue")
public Queue topicQueue() {
Map<String, Object> argsMap = new HashMap<String, Object>();
argsMap.put("x-max-priority", 5);
Queue queue = new Queue(UAV_QUEUE, true, false, false, argsMap);
return queue;
}