当前位置: 首页 > article >正文

RabbitMQ通过代码创建交换机和队列

常见交换机

RabbitMQ提供的交换机一共的四种,分别是:

1. Fanout:采用广播形式来发送消息,会将消息路由到所有绑定了的队列之中。

2. Direct:通过Binding Key与队列绑定,生产者在发送信息的时候会通过Routing Key来告诉交换机消息要发送给谁

3. Topic:与Direct类似,但是Topic支持通配符,能够使用通配符实现多匹配,一般情况下Binding Key是由多个单词组成,而Topic能够通过#和*来模糊匹配,#代表多个单词,*代表一个单词

4. Headers :根据消息头进行匹配,基本上很少使用,限制较多,而且麻烦

Fanout 扇形交换机

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {

    // 定义队列名称
    public static final String FANOUT_QUEUE_1 = "fanout.queue.1";
    public static final String FANOUT_QUEUE_2 = "fanout.queue.2";
    // 定义交换机名称
    public static final String FANOUT_EXCHANGE = "fanout.exchange";

    /**
     * 声明第一个队列
     */
    @Bean
    public Queue fanoutQueue1() {
        return new Queue(FANOUT_QUEUE_1);
    }

    /**
     * 声明第二个队列
     */
    @Bean
    public Queue fanoutQueue2() {
        return new Queue(FANOUT_QUEUE_2);
    }

    /**
     * 声明一个扇出交换机
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    /**
     * 将第一个队列绑定到扇出交换机
     */
    @Bean
    public Binding fanoutBinding1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }

    /**
     * 将第二个队列绑定到扇出交换机
     */
    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
}

Direct 直连交换机

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    // 定义队列名称
    public static final String DIRECT_QUEUE = "direct.queue";
    // 定义交换机名称
    public static final String DIRECT_EXCHANGE = "direct.exchange";
    // 定义路由键
    public static final String DIRECT_ROUTING_KEY = "direct.routing.key";

    /**
     * 声明一个队列
     */
    @Bean
    public Queue directQueue() {
        return new Queue(DIRECT_QUEUE);
    }

    /**
     * 声明一个直连交换机
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    /**
     * 将队列绑定到交换机,并指定路由键
     */
    @Bean
    public Binding directBinding() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTING_KEY);
    }
}

Topic 主题交换机

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicConfig {

    // 定义队列名称
    public static final String TOPIC_QUEUE_1 = "topic.queue.1";
    public static final String TOPIC_QUEUE_2 = "topic.queue.2";
    // 定义交换机名称
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    // 定义路由键模式
    public static final String TOPIC_ROUTING_KEY_1 = "topic.routing.key.1.*";
    public static final String TOPIC_ROUTING_KEY_2 = "topic.routing.key.2.#";

    /**
     * 声明第一个队列
     */
    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE_1);
    }

    /**
     * 声明第二个队列
     */
    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE_2);
    }

    /**
     * 声明一个主题交换机
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    /**
     * 将第一个队列绑定到主题交换机,并指定路由键模式
     */
    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_ROUTING_KEY_1);
    }

    /**
     * 将第二个队列绑定到主题交换机,并指定路由键模式
     */
    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_ROUTING_KEY_2);
    }
}

Headers 头交换机

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class HeadersConfig {

    // 定义队列名称
    public static final String HEADERS_QUEUE_1 = "headers.queue.1";
    public static final String HEADERS_QUEUE_2 = "headers.queue.2";
    // 定义交换机名称
    public static final String HEADERS_EXCHANGE = "headers.exchange";

    /**
     * 声明第一个队列
     */
    @Bean
    public Queue headersQueue1() {
        return new Queue(HEADERS_QUEUE_1);
    }

    /**
     * 声明第二个队列
     */
    @Bean
    public Queue headersQueue2() {
        return new Queue(HEADERS_QUEUE_2);
    }

    /**
     * 声明一个头交换机
     */
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(HEADERS_EXCHANGE);
    }

    /**
     * 将第一个队列绑定到头交换机,并指定头部属性
     */
    @Bean
    public Binding headersBinding1() {
        Map<String, Object> headers = new HashMap<>();
        headers.put("header1", "value1");
        return BindingBuilder.bind(headersQueue1()).to(headersExchange()).whereAny(headers).match();
    }

    /**
     * 将第二个队列绑定到头交换机,并指定头部属性
     */
   

注意点:

在我测试的过程中,发现在创建的过程中并不会直接去创建,而是在你读取队列后才会被创建,哪怕这个队列不存在,报错了,也会创建交换机和队列并绑定


http://www.kler.cn/a/469620.html

相关文章:

  • 【银河麒麟高级服务器操作系统实例】tcp半链接数溢出分析及处理全过程
  • 如何快速上手一个鸿蒙工程
  • uniapp打包到宝塔并发布
  • 最好用的图文识别OCR -- PaddleOCR(2) 提高推理效率(PPOCR模型转ONNX模型进行推理)
  • 20250106面试
  • Element-UI:如何实现表格组件el-table多选场景下根据数据对某一行进行禁止被选中?
  • 分布式任务调度xxl-job入门案例
  • Chapter 1 Understanding Large Language Models
  • BUG解决:postman可以请求成功,但Python requests请求报403
  • kafka Broker专题
  • pytdx,取市场股票列表,get_security_list,start参数为8000时,数据获取失败,导致无法获取全量数据的BUG修正
  • vue3-diff算法-最长递增子序列
  • 数据结构C语言描述8(图文结合)--哈希、哈希冲突、开放地址法、链地址法等实现
  • AndroidStudio环境版本管理
  • XIAO Esp32 S3 网络摄像头——3音视频监控
  • 2.1.7-1 io_uring的使用
  • 2025年,AI时代下的前端职业思考
  • 【网络安全 | 漏洞挖掘】绕过电子邮件确认实现预账户接管
  • 01——python (mac)安装
  • Redis两种主要的持久化方式是什么?
  • pytorch梯度上下文管理器介绍
  • 新疆乡镇界面图层arcgis格式shp数据有乡镇名称和编码2020年wgs84坐标无偏移数据内容测评
  • MySQL 04 章——运算符
  • 100万并发用户的分布式频道聊天系统
  • CSP初赛知识学习计划(第三天)
  • 【无线传感网】无线传感器网络安全