当前位置: 首页 > article >正文

SpringBoot集成多个rabbitmq

1、pom文件

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.4.9</version>
</dependency>

2、rabbitmq的连接配置文件

spring:
  rabbitmq:
    mq1:
      host: xxx.xxx.xxx.xxx
      port: 5672
      username: xxxx
      password: xxxxx
      enable: true
    mq2:
      host: xxx.xxx.xxx.xxx
      port: 5672
      username: xxxxx
      password: xxxxx
      enable: true

3、mq1的相关代码  MQ1RabbitConfiguration.java

package com.pojo.config;

import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

@Data
@Component("mq1RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq1") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true") //是否启用
public class MQ1RabbitConfiguration {
    private String host;
    private Integer port;
    private String username;
    private String password;

    @Autowired
    private ReturnCallBack1 returnCallBack1;
    @Autowired
    private ConfirmCallBack1 confirmCallBack1;

    @Bean(name = "mq1ConnectionFactory")
    //命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此
    @Primary
    public ConnectionFactory createConnectionFactory() {
        //消息队列1的连接
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        //开启发送到交换机和队列的回调
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        return connectionFactory;
    }

    @Bean(name = "mq1RabbitTemplate")
    //命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此
    @Primary
    public RabbitTemplate brainRabbitTemplate(@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {
        //消息生产
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //发送消息时设置强制标志,仅当提供了returnCallback时才适用
        rabbitTemplate.setMandatory(true);
        //确保消息是否发送到交换机,成功与失败都会触发
        rabbitTemplate.setConfirmCallback(confirmCallBack1);
        //确保消息是否发送到队列,成功发送不触发,失败触发
        rabbitTemplate.setReturnsCallback(returnCallBack1);
        return rabbitTemplate;
    }

    @Bean(name = "simpleRabbitListenerContainerFactory1")
    @Primary
    public SimpleRabbitListenerContainerFactory firstFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }


    @Bean(name = "subQueue01")
    public Queue firstQueue() {
        return new Queue("subQueue01");
    }

    @Bean(name = "subQueue02")
    public Queue secondQueue() {
        return new Queue("subQueue02");
    }

    @Bean(name = "subQueue03")
    public Queue thirdQueue() {
        return new Queue("subQueue03", true);
    }

    @Bean(name = "subQueue04")
    public Queue fourQueue() {
        return new Queue("subQueue04", true);
    }

    @Bean(name = "topicExchangeOne")
    public TopicExchange topicExchange() {
//        Direct exchange(直连交换机)
//        Fanout exchange(扇型交换机)
//        Topic exchange(主题交换机)
//        Headers exchange(头交换机)
//        Dead Letter Exchange(死信交换机)
        return new TopicExchange("topicExchangeOne");
    }
    @Bean(name = "binding1")
    public Binding binding1(@Qualifier("subQueue01") Queue queue, TopicExchange exchange) {
        //绑定队列1到TopicExchange  routingKey是队列1的队列名
        return BindingBuilder.bind(queue).to(exchange).with("subQueue01");
    }

    @Bean(name = "fanoutExchangeOne")
    public FanoutExchange fanoutExchange() {
//        Direct exchange(直连交换机)
//        Fanout exchange(扇型交换机)
//        Topic exchange(主题交换机)
//        Headers exchange(头交换机)
//        Dead Letter Exchange(死信交换机)
        return new FanoutExchange("fanoutExchangeOne");
    }

    @Bean(name = "binding3")
    public Binding binding3(@Qualifier("subQueue03") Queue queue, FanoutExchange exchange) {
        //绑定队列3到fanoutExchange  队列3和队列4都能消费fanoutExchange的消息
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean(name = "binding4")
    public Binding binding4(@Qualifier("subQueue04") Queue queue, FanoutExchange exchange) {
        //绑定队列4到fanoutExchange  队列3和队列4都能消费fanoutExchange的消息
        return BindingBuilder.bind(queue).to(exchange);
    }

}

ConfirmCallBack1 .java

package com.pojo.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ConfirmCallBack1 implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        if (!ack) {
            log.info("ConfirmCallBack1消息发送交换机失败:{}", s);
        } else {
            log.info("ConfirmCallBack1消息发送交换机成功");
        }
    }
}


ReturnCallBack1.java
package com.pojo.config;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReturnCallBack1 implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("ReturnCallBack1消息发送队列失败:{}", JSON.toJSON(returnedMessage));
    }
}

4、mq2的相关代码

  MQ2RabbitConfiguration.java

package com.pojo.config;

import lombok.Data;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Data
@Component("mq2RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq2") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq2.enable", havingValue = "true") //是否启用
public class MQ2RabbitConfiguration {
    private String host;
    private Integer port;
    private String username;
    private String password;

    @Autowired
    private ReturnCallBack2 returnCallBack2;
    @Autowired
    private ConfirmCallBack2 confirmCallBack2;

    @Bean(name = "mq2ConnectionFactory")   //命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此
    public ConnectionFactory createConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        //开启发送到交换机和队列的回调
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        return connectionFactory;
    }

    @Bean(name = "mq2RabbitTemplate") //命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此
    public RabbitTemplate brainRabbitTemplate(@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //发送消息时设置强制标志,仅当提供了returnCallback时才适用
        rabbitTemplate.setMandatory(true);
        //确保消息是否发送到交换机,成功与失败都会触发
        rabbitTemplate.setConfirmCallback(confirmCallBack2);
        //确保消息是否发送到队列,成功发送不触发,失败触发
        rabbitTemplate.setReturnsCallback(returnCallBack2);
        return rabbitTemplate;
    }

    @Bean(name = "simpleRabbitListenerContainerFactory2")
    public SimpleRabbitListenerContainerFactory secondFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }



}

ConfirmCallBack2.java

package com.pojo.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ConfirmCallBack2 implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        if (!ack) {
            log.info("ConfirmCallBack2消息发送交换机失败:{}", s);
        } else {
            log.info("ConfirmCallBack2消息发送交换机成功");
        }
    }
}


ReturnCallBack2.java

package com.pojo.config;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReturnCallBack2 implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("ReturnCallBack2消息发送队列失败:{}", JSON.toJSON(returnedMessage));
    }
}

5、消息生产者

package com.pojo.prj.controller;


import com.pojo.common.anno.NoNeedLogin;
import com.pojo.common.base.ApplicationContextUtils;
import com.pojo.common.base.BaseController;
import com.pojo.util.ResponseResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;



/**
 * <p>
 * 项目表 前端控制器
 * </p>
 *
 * @author zhushangjin
 * @menu 项目管理
 * @since 2022-11-14
 */
@RestController
@Slf4j
public class ProjectController extends BaseController {

    @Resource(name = "mq1RabbitTemplate")
    //初始化mq1的RabbitTemplate对象
    private RabbitTemplate mq1RabbitTemplate;

    @Resource(name = "mq2RabbitTemplate")
    //初始化mq1的RabbitTemplate对象
    private RabbitTemplate mq2RabbitTemplate;


    /**
     * 获取项目下拉列表
     *
     * @return
     * @status done
     */
    @GetMapping("/prj/project/list")
    @NoNeedLogin
    public ResponseResult<String> list() {
        String active = ApplicationContextUtils.getActiveProfile();
        logger.error(ApplicationContextUtils.getActiveProfile());
        return ResponseResult.ok("ReturnCallBack2");
    }


    @GetMapping("/prj/project/test1")
    public ResponseResult test1() {

        //发送到topicExchangeOne类型的交换机,根据routekey去找发送到哪个队列里,
        // 只有这一个队列才能收到这条消息
        String str = "test1test1test1test1test1";
        mq1RabbitTemplate.convertAndSend("topicExchangeOne","subQueue01", str);
        return buildResponseResult(true);

    }

    @GetMapping("/prj/project/test2")
    public ResponseResult test2() {
        //发送到direct类型的交换机,根据routekey去找发送到哪个队列里,
        //只有这一个队列才能收到这条消息
        mq2RabbitTemplate.convertAndSend("subQueue02", "test2test2test2test2test2");
        return buildResponseResult(true);

    }

    @GetMapping("/prj/project/test3")
    public ResponseResult test3() {
        //发送到fanout类型的交换机,跟这个交换机绑定的队列都会收到这一条消息,
        // 故第二个参数routekey无需填写
        mq1RabbitTemplate.convertAndSend("fanoutExchangeOne", null, "test3test3test3test3test3");
        return buildResponseResult(true);

    }


}

6、消息消费者

Receiver1.java

package com.pojo.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "subQueue01", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver1 {

    @RabbitHandler(isDefault = true)
    public void process(String hello) {
        System.out.println("Receiver1: " + hello);
    }

}

Receiver2.java

package com.pojo.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "subQueue02", containerFactory = "simpleRabbitListenerContainerFactory2")
public class Receiver2 {

     @RabbitHandler(isDefault = true)
    public void process(String hello) {
        System.out.println("Receiver2: " + hello);
    }

}

Receiver3.java

package com.pojo.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "subQueue03", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver3 {

     @RabbitHandler(isDefault = true)
    public void process(String hello) {
        System.out.println("Receiver3 : " + hello);
    }

}

Receiver4.java

package com.pojo.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "subQueue04", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver4 {

    @RabbitHandler(isDefault = true)
    public void process(String hello) {
        System.out.println("Receiver4 : " + hello);
    }

}

创建队列

@Bean(name = "uavTopicQueue")
    public Queue topicQueue() {
        Map<String, Object> argsMap = new HashMap<String, Object>();
        argsMap.put("x-max-priority", 5);
        Queue queue = new Queue(UAV_QUEUE, true, false, false, argsMap);
        return queue;
    }


http://www.kler.cn/a/409161.html

相关文章:

  • 8、深入剖析PyTorch的state_dict、parameters、modules源码
  • Linux网络——网络层
  • 计算机网络socket编程(6)_TCP实网络编程现 Command_server
  • Vue (一)
  • Excel的图表使用和导出准备
  • 【H2O2|全栈】JS进阶知识(八)ES6(4)
  • 安宝特方案 | AR助力紧急救援,科技守卫生命每一刻!
  • C++结构型设计模式之桥接模式
  • C# 数据结构之【树】C#树
  • 显示类控件
  • 深度学习中的长短期记忆网络(LSTM)与自然语言处理
  • [AutoSar]BSW_Diagnostic_007 BootLoader 跳转及APP OR boot response 实现
  • 数据结构 ——— 直接选择排序算法的实现
  • springboot 使用笔记
  • selinux及防火墙
  • 力扣11.22
  • 【SSMS】【数据库】还原数据库
  • Scala的Array和ArrayBuffer集合及多维数组
  • 数据库、数据仓库、数据湖、数据中台、湖仓一体的概念和区别
  • Mac下的vscode远程ssh免密码登录
  • 【CVE-2024-9413】SCP-Firmware漏洞:安全通告
  • 【LLM训练】从零训练一个大模型有哪几个核心步骤?
  • 重装系统后ip地址错误,网络无法接通怎么办
  • C++设计模式-享元模式
  • C#13新特性介绍:LINQ 的优化设计
  • OpenMM的安装与使用