当前位置: 首页 > article >正文

5.RabbitMQ交换机详解

五、RabbitMQ交换机详解

1、交换机类型

Exchange(X) 可翻译成交换机/交换器/路由器

RabbitMQ交换器 (Exchange)类型

  • Fanout Exchange(扇形交换机)
  • Direct Exchange(直连交换机)
  • Topic Exchange(主题交换机)
  • Headers Exchange(头部交换机)
  • x-local-random Exchange
  • 自定义交换机

5.1.1、Fanout Exchange

(1)、交换机简介

扇形交换机会将消息投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;

在这里插入图片描述

(2)、使用示例

rabbitmq-05-fanout

添加依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

server:
  port: 8080

spring:
  application:
    name: fanout-learn

  rabbitmq:
    host: 192.168.0.101 # rabbitmq 的主机
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi # 虚拟主机

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    public static final String EXCHANGE_FANOUT = "exchange.fanout";
    public static final String QUEUE_FANOUT_A = "queue.fanout.a";
    public static final String QUEUE_FANOUT_B = "queue.fanout.B";
}

定义MQ

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    //rabbitmq 三部曲
    // 1、定义交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(RabbitMQConstant.EXCHANGE_FANOUT);
    }

    // 2 定义队列a
    @Bean
    public Queue queueA() {
        return new Queue(RabbitMQConstant.QUEUE_FANOUT_A);
    }

    // 2 定义队列b
    @Bean
    public Queue queueB() {
        return new Queue(RabbitMQConstant.QUEUE_FANOUT_B);
    }

    // 3 绑定交换机和队列a
    @Bean
    public Binding bingingA(FanoutExchange fanoutExchange, Queue queueA) {
        //将队列A绑定到扇形交换机
        return BindingBuilder.bind(queueA).to(fanoutExchange);
    }

    // 3 绑定交换机和队列b
    @Bean
    public Binding bingingB(FanoutExchange fanoutExchange, Queue queueB) {
        //将队列B绑定到扇形交换机
        return BindingBuilder.bind(queueB).to(fanoutExchange);
    }
}

定义生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 生产者发送消息
     */
    public void sendMsg() {
        //定义要发送的消息
        String msg = "hello world";
        // 消息封装成Message对象
        Message message = new Message(msg.getBytes());
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_FANOUT, "", message);
        log.info("消息发送完毕,发送时间为:{}", new Date());
    }
}

定义消费者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReceiveMessageService {
    //接收两个队列的消息
    @RabbitListener(queues = {RabbitMQConstant.QUEUE_FANOUT_A,RabbitMQConstant.QUEUE_FANOUT_B})
    public void receiveMsg(Message message){
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("接收到的消息为:{}",msg);
    }
}

发送消息

这里改造启动类,实现ApplicationRunner接口,重写里面的run()方法,在run()方法中发送消息(实际上应该在业务处理完毕后发送消息)

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq05FanoutApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq05FanoutApplication.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }

}

测试结果

在这里插入图片描述

5.1.2、Direct Exchange

(1)、交换机简介

路由键与队列名完全匹配的交换机

在这里插入图片描述

(2)、使用示例

rabbitmq-05-direct

添加依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

server:
  port: 8080

spring:
  application:
    name: direct-exchange

  rabbitmq:
    host: 192.168.0.101 # rabbitmq 的主机
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi # 虚拟主机

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    public static final String EXCHANGE_DIRECT = "exchange.direct";
    public static final String QUEUE_DIRECT_A = "queue.direct.a";
    public static final String QUEUE_DIRECT_B = "queue.direct.b";
    public static final String ROUTING_KEY = "info";
    public static final String ROUTING_WARNING_KEY = "warn";
}

定义MQ

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    //三部曲

    /**
     * 1、定义交换机
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        //使用建造者模式创建
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DIRECT).build();
    }

    /**
     * 2、定义队列
     *
     * @return
     */
    @Bean
    public Queue queueA() {
        //使用建造者模式创建
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_DIRECT_A).build();
    }

    @Bean
    public Queue queueB() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_DIRECT_B).build();
    }

    /**
     * 3、交换机和队列进行绑定
     *
     * @return
     */
    // 队列A绑定info
    @Bean
    public Binding bindingA(DirectExchange directExchange, Queue queueA) {
        //使用建造者模式创建
        return BindingBuilder.bind(queueA).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);
    }

    // 队列B绑定info
    @Bean
    public Binding bindingB1(DirectExchange directExchange, Queue queueB) {
        return BindingBuilder.bind(queueB).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);
    }

    // 队列B绑定warn
    @Bean
    public Binding bindingB2(DirectExchange directExchange, Queue queueB) {
        return BindingBuilder.bind(queueB).to(directExchange).with(RabbitMQConstant.ROUTING_WARNING_KEY);
    }
}

定义生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendMsg() {
        // 发送一个两个队列都能接收的消息
        //使用建造者模式创建消息
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        //参数1 交换机,参数2 路由key,参数3 消息
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT, RabbitMQConstant.ROUTING_KEY, message);

        // 发送一个只有队列B能接收的消息
        //使用建造者模式创建消息
        Message error_message = MessageBuilder.withBody("error world".getBytes()).build();
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT, RabbitMQConstant.ROUTING_WARNING_KEY, error_message);
        log.info("消息发送完毕,发送时间为:{}", new Date());
    }
}

定义消费者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReceiveMessageService {
    //接收A队列的消息
    @RabbitListener(queues = {RabbitMQConstant.QUEUE_DIRECT_A})
    public void receiveMsgA(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("接收到A的消息为:{}", msg);
    }

    //接收B队列的消息
    @RabbitListener(queues = {RabbitMQConstant.QUEUE_DIRECT_B})
    public void receiveMsgB(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("接收到B的消息为:{}", msg);
    }
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq05DirectApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq05DirectApplication.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }

}

这里改造启动类,实现ApplicationRunner接口,重写里面的run()方法,在run()方法中发送消息(实际上应该在业务处理完毕后发送消息)

测试结果

在这里插入图片描述

5.1.3、Topic Exchange

与direct交换机类似,也是需要路由键匹配分发

区别在于direct是完全匹配,topic模式是模糊匹配

(1)、交换机简介

通配符匹配(相当于模糊匹配)

在Topic模式中,将路由键通过"."来分为多个部分

  • #匹配多个单词

    用来表示任意数量(零个或多个)单词

    beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx
    
  • *匹配一个单词

    必须有一个而且只有一个

    用.隔开的为一个单词

    beijing.* == beijing.queue, beijing.xyz
    

在这里插入图片描述

(2)、使用示例

rabbitmq-05-topic

添加依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

server:
  port: 8080

spring:
  application:
    name: fanout-learn

  rabbitmq:
    host: 192.168.0.101 # rabbitmq 的主机
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi # 虚拟主机

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    public static final String EXCHANGE_TOPIC = "exchange.topic";
    public static final String QUEUE_TOPIC_A = "queue.topic.a";
    public static final String QUEUE_TOPIC_B = "queue.topic.b";
}

定义MQ

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    //三部曲

    /**
     * 1、定义交换机
     *
     * @return
     */
    @Bean
    public TopicExchange topicExchange() {
        return ExchangeBuilder.topicExchange(RabbitMQConstant.EXCHANGE_TOPIC).build();
    }

    @Bean
    public Queue queueA() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_TOPIC_A).build();
    }

    @Bean
    public Queue queueB() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_TOPIC_B).build();
    }

    @Bean
    public Binding bindingA(TopicExchange topicExchange, Queue queueA) {
        return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");
    }

    @Bean
    public Binding bindingB1(TopicExchange topicExchange, Queue queueB) {
        return BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit");
    }

    @Bean
    public Binding bindingB2(TopicExchange topicExchange, Queue queueB) {
        return BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");
    }
}

定义生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendMsg() {
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        //参数1 交换机,参数2 发送路由key,参数3 消息
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_TOPIC, "lazy.orange.rabbit", message);
    }
}

定义消费者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReceiveMessageService {
    //接收A队列的消息
    @RabbitListener(queues = {RabbitMQConstant.QUEUE_TOPIC_A})
    public void receiveMsgA(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("接收到A的消息为:{}", msg);
    }

    //接收B队列的消息
    @RabbitListener(queues = {RabbitMQConstant.QUEUE_TOPIC_B})
    public void receiveMsgB(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("接收到B的消息为:{}", msg);
    }
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq05TopicApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq05TopicApplication.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }

}

这里改造启动类,实现ApplicationRunner接口,重写里面的run()方法,在run()方法中发送消息(实际上应该在业务处理完毕后发送消息)

测试结果

在这里插入图片描述

5.1.4、Headers Exchange

(1)、交换机简介

基于消息内容中的headers属性进行匹配,不是根据路由键匹配

headers交换器和direct交换器完全一致,但是性能差很多,几乎不用了

消费方指定的headers中必须包含一个"x-match"键

"x-match"键的值有两个

  • x-match=all:表示所有的键值对都匹配才能接受消息
  • x-match=any:表示只要有键值对匹配就能接受消息

发消息时可以指定消息属性(MessageProperties)

如果heanders中包含多个消息属性,则所有属性都匹配上才算匹配上

在这里插入图片描述

(2)、使用示例

rabbitmq-05-headers

添加依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

server:
  port: 8080

spring:
  application:
    name: fanout-learn

  rabbitmq:
    host: 192.168.0.101 # rabbitmq 的主机
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi # 虚拟主机

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    public static final String EXCHANGE_HEADERS = "exchange.headers";
    public static final String QUEUE_HEADERS_A = "queue.headers.a";
    public static final String QUEUE_HEADERS_B = "queue.headers.b";
}

定义MQ

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 1、定义交换机
     *
     * @return
     */
    @Bean
    public HeadersExchange headersExchange() {
        return ExchangeBuilder.headersExchange(RabbitMQConstant.EXCHANGE_HEADERS).build();
    }

    /**
     * 2、定义两个队列
     *
     * @return
     */
    @Bean
    public Queue queueA() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_HEADERS_A).build();
    }

    @Bean
    public Queue queueB() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_HEADERS_B).build();
    }

    /**
     * 3、绑定队列与交换机
     *
     * @param headersExchange
     * @param queueA
     * @return
     */
    @Bean
    public Binding bindingA(HeadersExchange headersExchange, Queue queueA) {
        Map<String, Object> headerValues = new HashMap<>();
        headerValues.put("type", "m");
        headerValues.put("status", 1);
        return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();
    }

    @Bean
    public Binding bindingB(HeadersExchange headersExchange, Queue queueB) {
        Map<String, Object> headerValues = new HashMap<>();
        headerValues.put("type", "s");
        headerValues.put("status", 0);
        return BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();
    }
}

定义生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.MessageProperties;

import java.util.HashMap;
import java.util.Map;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 生产者发送消息
     */
    public void sendMsg() {
        //消息属性
        MessageProperties messageProperties = new MessageProperties();
        Map<String, Object> headers = new HashMap<>();
        headers.put("type", "s");
        headers.put("status", 0);
        //设置消息头
        messageProperties.setHeaders(headers);
        //添加了消息属性
        Message message = MessageBuilder.withBody("hello world".getBytes())
                .andProperties(messageProperties).build();
        //头部交换机,路由key无所谓
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_HEADERS, "", message);
        log.info("消息发送完毕!!!");
    }
}

定义消费者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReceiveMessageService {
    //接收A队列的消息
    @RabbitListener(queues = {RabbitMQConstant.QUEUE_HEADERS_A})
    public void receiveMsgA(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("接收到A的消息为:{}", msg);
    }

    //接收B队列的消息
    @RabbitListener(queues = {RabbitMQConstant.QUEUE_HEADERS_B})
    public void receiveMsgB(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("接收到B的消息为:{}", msg);
    }
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq05HeadersApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq05HeadersApplication.class, args);
    }
    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }

}

这里改造启动类,实现ApplicationRunner接口,重写里面的run()方法,在run()方法中发送消息(实际上应该在业务处理完毕后发送消息)

测试结果

在这里插入图片描述

5.1.5、自定义交换机

参考:延时交换机使用方式

2、交换机属性

(1)、交换机属性

1、name:交换机名称,就是一个字符串

2、Type:交换机类型(direct、topic、fanout、headers、x-local-random)五种

3、durable:持久化,声明交换机是否持久化,代表交换机在服务器重启后是否还存在

4、autoDelete:是否自动删除,曾经有队列绑定到该交换机,后来解绑了,那就会自动删除该交换机

5、internal:内部使用的,如果是yes表示客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定

6、arguments:只有一个取值alternate-exchange,表示备用交换机

7、alternate:设置备用交换机

(3)、使用示例

rabbitmq-05-properties-01

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

server:
  port: 8080

spring:
  application:
    name: fanout-learn

  rabbitmq:
    host: 192.168.0.101 # rabbitmq 的主机
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi # 虚拟主机

定义MQ

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 1、定义交换机
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return ExchangeBuilder // 默认为持久化的
                .directExchange(RabbitMQConstant.EXCHANGE_DIRECT) // 交换机的名字
                .autoDelete()//是否自动删除,默认不自动删除,调用该方法则表示自动删除
                .durable(false) //false表示不持久化,服务器重启会丢失;true表示持久化
                .internal()//是否是内部使用(如果是yes表示客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定),调用此方法表示设置为true
                .build();
    }

    /**
     * 2、定义队列
     *
     * @return
     */
    @Bean
    public Queue queue() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_DIRECT).build();
    }

    /**
     * 3、绑定交换机与队列
     *
     * @param directExchange
     * @param queue
     * @return
     */
    @Bean
    public Binding binding(DirectExchange directExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(directExchange).with("info");
    }
}

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    public static final String EXCHANGE_DIRECT = "exchange.direct";
    public static final String QUEUE_DIRECT = "queue.direct";
}

生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 生产者发送消息
     */
    public void sendMsg() {
        Message message = MessageBuilder.withBody("hello world".getBytes())
                .build();
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT, "info", message);
        log.info("消息发送完毕,发送时间为:{}", new Date());

        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT, "", message);
        log.info("消息发送完毕!!!");
    }
}

消费者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReceiveMessageService {
    //接收队列的消息
    @RabbitListener(queues = {RabbitMQConstant.QUEUE_DIRECT})
    public void receiveMsgA(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("接收到的消息为:{}", msg);
    }
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq05Properties01Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq05Properties01Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

测试过程

  1. 不加条件时

在这里插入图片描述

  1. 当配置属性internal时报错

在这里插入图片描述

  1. 测试持久化按上

    设置持久化为false ,重启rabbitmq-server,则交换机丢失

在这里插入图片描述

重启后交换机丢失

  1. 测试配置autoDelete

    自动删除为 true ,从控制台上手动解绑,会发现自动删除

在这里插入图片描述

解绑后队列自动删除

在这里插入图片描述

3、备用交换机

使用场景

当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息。如果想要监测哪些消息被投递到没有对应的队列,可以用备用交换机来实现,然后接收备用交换机的消息从而记录日志或发送报警信息。

使用示例

注意:备用交换机一般使用fanout交换机

测试时:指定一个错误路由

重点:普通交换机设置参数绑定到备用交换机

测试模块:rabbitmq-05-backup-01

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

server:
  port: 8080

spring:
  application:
    name: exchange-backup

  rabbitmq:
    host: 192.168.0.101
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi

定义MQ

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 定义正常交换机
     *
     * @return
     */
    @Bean
    public DirectExchange normalExchange() {
        Map<String, Object> arguments = new HashMap<>();
        /*//指定当前正常的交换机的备用交换机是谁
        arguments.put("alternate-exchange", RabbitMQConstant.EXCHANGE_BACKUP_NAME);
        return new DirectExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME, true, false, arguments);*/

        return ExchangeBuilder // 默认为持久化的,默认不自动删除
                .directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME) // 交换机的名字
                .alternate(RabbitMQConstant.EXCHANGE_BACKUP_NAME) //设置备用交换机 alternate-exchange
                .build();
    }

    /**
     * 定义正常队列
     *
     * @return
     */
    @Bean
    public Queue queueNormal() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).build();
    }

    /**
     * 绑定正常交换机和队列
     *
     * @param normalExchange
     * @param queueNormal
     * @return
     */
    @Bean
    public Binding binding(DirectExchange normalExchange, Queue queueNormal) {
        return BindingBuilder.bind(queueNormal).to(normalExchange).with(RabbitMQConstant.ROUTING_WARNING_KEY);
    }

    /**
     * 定义备用交换机
     *
     * @return
     */

    @Bean
    public FanoutExchange alternateExchange() {
        return ExchangeBuilder.fanoutExchange(RabbitMQConstant.EXCHANGE_BACKUP_NAME).build();
    }

    /**
     * 定义备用队列
     *
     * @return
     */
    @Bean
    public Queue alternateQueue() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_BACKUP_NAME).build();
    }

    /**
     * 绑定备用交换机和队列
     *
     * @param alternateExchange
     * @param alternateQueue
     * @return
     */
    @Bean
    public Binding bindingAlternate(FanoutExchange alternateExchange, Queue alternateQueue) {
        return BindingBuilder.bind(alternateQueue).to(alternateExchange);
    }
}

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    // 正常交换机
    public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.backup";
    //备用交换机
    public static final String EXCHANGE_BACKUP_NAME = "exchange.backup";
    //正常队列
    public static final String QUEUE_NORMAL_NAME = "queue.normal.backup";
    //备用队列
    public static final String QUEUE_BACKUP_NAME = "queue.backup";

    public static final String ROUTING_WARNING_KEY = "info";
}

生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendMsg() {
        Message message= MessageBuilder.withBody("hello world".getBytes())
                .build();
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,"test",message);
        log.info("消息发送完毕,发送时间为:{}",new Date());
    }
}

消费者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReceiveMessageService {

    //接收备用队列的消息
    @RabbitListener(queues = {RabbitMQConstant.QUEUE_BACKUP_NAME})
    public void receiveMsg(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("接收到备用的消息为:{}", msg);
    }
}

发送消息

故意写错路由key,由于正常交换机设置了备用交换机,所以该消息就会进入备用交换机从而进入备用对列

可以写一个程序接收备用对列的消息,接收到后通知相关人员进行处理
如果正常交换机没有设置备用交换机,则该消息会被抛弃

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq05Backup01Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq05Backup01Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }

}

测试结果

在这里插入图片描述


http://www.kler.cn/a/573881.html

相关文章:

  • 迷你世界脚本方块接口:Block
  • 地下井室可燃气体监测装置:守护地下安全,防患于未“燃”!
  • 《基于WebGL的matplotlib三维可视化性能调优》——让大规模3D数据流畅运行在浏览器端!
  • 【动手学深度学习】基于Python动手实现线性神经网络
  • c++为什么支持simd,而java不支持
  • PHP Error处理指南
  • 分布式存储学习——1.HBase的安装和配置
  • Deepseek×ComfyUI革命性工作流:AI图像3倍速精修实战指南
  • JVM 的组成部分有什么
  • svn 通过127.0.01能访问 但通过公网IP不能访问,这是什么原因?
  • 什么是 JVM? JVM (Java Virtual Machine)
  • 【Elasticsearch】Rollover 操作与Skip Rollover
  • go语言数据类型
  • ESP32-P4 支持哪些 RISC-V 汇编指令?
  • 将 PHP 函数转换为 Python
  • 低空经济-飞行数据平台 搭建可行方案
  • H5DS编辑器是如何让企业快速构建动态页面
  • 【JAVA架构师成长之路】【JDK源码】第5集:Map集合框架与泛型
  • leetcode日记(74)扰乱字符串
  • 科技解密丨赫思嘉·惟愿:深耕医美领域,以创新科技引领行业新篇章