当前位置: 首页 > article >正文

2.rabbitMQ之交换机

1.交换机的作用

1.默认交换机会自动指定队列
2.之前一个信息必须被消费1次,现在的是一个消息可以被消费多次(发送到不同队列的前提下,正常情况下一个队列只能消费一次)
3.消息先发给交换机,然后交换机发给多个队列,可以达到多次消费的效果

如图mq3
在这里插入图片描述

2.交换机的类型

  1. 默认交换机 无名 ""指定
    1.直接类型 direct(一个routingKey绑定一个队列,一个交换机绑定一个队列)
    2.主题 topic(一个交换机绑定多个队列,主要是通过表达式来实现)
    3.标题 headers(不常用)
    4.扇出 fanout(一个信息被交换机全部队列接收,相当于QQ聊天)

3.临时队列 没有D的队列 一旦断开连接,队列会被自动删除

   //获得临时队列,features有 AD和 Excl
    String QName=channel.quequeDeclare().getQueue();

4.交换机和队列绑定binding

   //在界面, 添加queue然后添加exchange,然后在交换机 添加队列,
       rountingkey代表想要发给哪个队列,后面可以指定哪个可以接收特定的信息

5.fanout(相当于qq群的广播,一条消息被多台计算机 接收) ,队列名可以写为空
1. 2个消费者 声明交换机的名字和类型 主要代码如下

  channel.exchangeDeclare(name,"fanout");
     String QName=channel.queueDeclare().getQueue();
      //队列名,交换机名,routingKey
      channel.queueBind(QName,exName,"");
             -------完整代码-------
public class exchangeConsumer1 {
    public static final String EXCHANGE_NAME="log";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        //得到临时队列
        String QName=channel.queueDeclare().getQueue();
        //交换机的名字和类型
//        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        channel.queueBind(QName,EXCHANGE_NAME,"");
        DeliverCallback deliverCallback=(tag, delivery)->{

            System.out.println("consumer1"+new String(delivery.getBody(),"UTF-8"));
        };
        CancelCallback nCallback=(tag)->{

            System.out.println("失败应答");
        };

        boolean IsAck=false;
        channel.basicConsume(QName,IsAck,deliverCallback,(tag)->{});



    }
}
   2.生产者 也要不用再次声明交换机,不用队列名了,用交换机名就可以接收消息
 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
           -------完整代码-------
public class exchangeProducer {
    public static final String EXCHANGE_NAME="log";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        //交换机的名字和类型
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        Scanner scanner = new Scanner(System.in);
		//模拟生产者不停发消息
        while (scanner.hasNext()){
            String next = scanner.next();
            //交换机
            //队列名
            //设置消息持久化
            //二进制
            channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
//            boolean flag=channel.waitForConfirms();
//            if(flag){
//                System.out.println("消息已经写入磁盘的确认");
//
//            }

        }

    }
}

6.直接交换机 direct(rountingKey相等就是fanout交换机,不相等就是direct)(路由)
//可以绑定队列
1.提供者修改交换机类型和routingKey

      channel.exchangeDeclare(EXCHANGE_NAME,"direct");
      channel.basicPublish(EXCHANGE_NAME,"wrong", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
2.消费者修改交换机类型和routingKey
 channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.queueBind(QName,EXCHANGE_NAME,"error");

7.topic主题交换机,direct只能一个交换机绑定一个队列,这个可以路由多个队列(生产者不声明队列,只声明交换机,消费者声明队列和交换机)
1.routingKey的写法单词不能全部是字母(就成direct交换机了) 如aa.bb.mq *代替1个单词 #代替0个或多个单词

如 lazy.#可以匹配 lazy.ngs.me
*.*.rabbit 匹配 ngs.me.rabbit

2.注意 绑定是 #相当于fanout交换机   #和*都没有出现就是direct交换机
3.代码 交换机为topic

#消费者.queueBind()可以写多次

     -------代码------
       public class TopicProducer {
    public static final String EXCHANGE_NAME="log2";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        channel.confirmSelect();
        //交换机的名字和类型
//        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//        channel.queueDeclare("Q1",false,false,false,null);
        Scanner scanner = new Scanner(System.in);



        while (scanner.hasNext()){
            String next = scanner.next();
            //交换机
            //队列名
            //设置消息持久化
            //二进制
            //fanout交换机模式
//            channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
            //direct交换机,这里发送的routingKey可以自行修改测试
			            channel.basicPublish(EXCHANGE_NAME,"quick.orange.rabbit", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
//            boolean flag=channel.waitForConfirms();
//            if(flag){
//                System.out.println("消息已经写入磁盘的确认");
//
//            }

        }

    }
} 
       ----消费者-----
  public class TopicConsumer1 {
    public static final String EXCHANGE_NAME="log2";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        //得到临时队列

        //交换机的名字和类型
//        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        String queueName="Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        //!!!核心代码,就是发送到的rountingKey和队列绑定
        channel.queueBind("Q1",EXCHANGE_NAME,"*.orange.*");
        channel.queueBind("Q1",EXCHANGE_NAME,"*.orange.aa");
        DeliverCallback deliverCallback=(tag, delivery)->{
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            System.out.println("consumer1"+new String(delivery.getBody(),"UTF-8"));
        };
        CancelCallback nCallback=(tag)->{

            System.out.println("失败应答");
        };

        boolean IsAck=false;
        //消费信息
        channel.basicConsume("Q1",IsAck,deliverCallback,(tag)->{});



    }
}

8.死信队列(不能被消费的信息放到死信队列,防止消息丢失) 管理界面队列有DLK代表开启了死信

1.来源
—1.消息TTL过期(信息指定时间会过期)
—2.队列达到最大长度(队列满了)
—3.信息被拒绝(basic.reject或basic.nack不应答) requeue=false(信息不放回队列,丢失?)

------2.实现(绑定普通队列和死信队列) 普通和死信交换机都为 dirrect
如图mq4
请添加图片描述

   //普通队列要声明的时候加入arg才会转发到死信队列 !!!注意设置的普通队列的args,不是死信的
    -----消费者-----
  Map<String,Object> args=new HashMap();
  args.put("x-dead-letter-exchange","DEAD_EXCHANGE_NAME");
  //设置死信 routingKey
args.put("x-dead-letter-routing-key","lisi");
     channe.queueDeclare(...,args);
//过期时间这里可以设置,也可producer声明
//声明队列
--------生产者--------  
//设置ttl time to live过期时间
   channel.basicPublish(...,prop,...);             

9.死信队列之队列达到最大长度

   //消费者设置最大正常队列的长度

10.死信队列之信息被拒绝

 //消费者
     channel.basicReject(msg.getEnvelope().getDeliveryTag(),false);//不放回队列
//一定要开启手动应答

11.延迟队列.是死信队列的过期时间(企业上班案例)
//整合springboot 选2.3.1,复制依赖,复制配置
//整合后不用自己声明队列和交换机,由专门的配置类写
1.配置文件类


  @Configuration //配置类上面写
     //声明交换机
     @Bean("xExchange")
     public DirectExchange xExchange(){
return new DirectExchange("name");
    }
    //声明队列
    @Bean("queueA")
     public Queue xExchange(){
             //指定死信 xxx参数
return QueueBuilder.durable(Queue_A).withArguments(xxx).build();
    }
   //死信队列
    @Bean("queueA")
     public Queue xExchange(){
             //没有参数
return QueueBuilder.durable(DEAD_QUEUE).build();
    }
   //绑定,名字必须要有语义化 semantic
      @Bean
     public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
             //,队列和交换机绑定,并指定routingKey
return BindingBuilder.bind(queueA).to(xExchange).with("XB");
    }    
    2.写controller接收消息
@Autowired
 private RabbitTemplate rabbitTemplate;
log.info("xxxx{},xxx{}",new Date().toString(),msg);
//发送消息来自spring公司的工具,转发到交换机和发送
  rabbitTemplate.convertAndSend("X","routingKey","xxmsg")
   3.死信消费者接收消息 需要监听器
    @Slf4j
     @Component
           xx class
                    @RabbitListener(queues="QD") 
                //注意!!!msg是spring的类,Channel是mq的,刚开始导错包
           public void xx(Message msg,Channel channel){
sout(new String(msg.getBody()));
       }

12.延迟优化 不在队列声明写延迟时间,而在生产者的声明,就不用一直更新队列代码
图mq5

请添加图片描述
----1.增加一个不设置时间的队列

//controller当生产者
rabbitTemplate.convertAndSend("exchange_x", "XB", "消息来自 ttl 为 xS 的队列: "+message, correlationData ->{
        correlationData.getMessageProperties().setExpiration(ttl);
        return correlationData;
    });

2.死信做延迟的缺陷,因为是排队的,所以发送多条消息不同延迟时间,按第一条的时间延迟(mq只会检查第一条消息是否过期)(导致先发送时间长的数据一直等待,其他后发送的数据在等待完成用同一时间送达)

3.解决方法(使用插件) 将我们的插件 复制到 mq的plugin的文件夹下(到exchange页面会多一个x-delayed-messeage选项)由交换机延迟了

//文件夹的路径
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
//插件放到文件夹后,enable开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
 //只能配置类写自定义交换机
 @Bean
 public CustomExchange delayedExchange(){

 
  }   
 //绑定交换机和队列
//生产者设置时间,注意之前是 setExpiration现在是setDelay交换机延迟  
    //代码如下
    -----配置类-------
@Configuration
public class TtlQueueConfig{
    private  static String Exchange_X= "exchange_x";
    private  static String Exchange_Y_DEAD= "exchange_y_dead";
    private  static String QUEUE_A= "queue_a";
    private  static String QUEUE_B= "queue_b";
    private  static String QUEUE_D= "queue_dead";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";


    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(Exchange_X);
    }
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Exchange_Y_DEAD);
    }
    //自定义交换机 我们在这里定义的是一个延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        //自定义交换机的类型 !!!
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
                args);
    }


    @Bean("queueA")
    public Queue queueA(){
//        Map<String, Object> args = new HashMap<>();
//        //声明当前队列绑定的死信交换机
//        args.put("x-dead-letter-exchange",Exchange_Y_DEAD);
//        //声明当前队列的死信路由 key
//        args.put("x-dead-letter-routing-key", "YD");
//        //声明队列的 TTL
//        args.put("x-message-ttl", 10000);
//        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
        return QueueBuilder.durable(QUEUE_A).build();
    }
    @Bean("queueB")
    public Queue queueB(){
//        Map<String, Object> args = new HashMap<>();
//        //声明当前队列绑定的死信交换机
//        args.put("x-dead-letter-exchange",Exchange_Y_DEAD);
//        //声明当前队列的死信路由 key
//        args.put("x-dead-letter-routing-key", "YD");
//        //声明队列的 TTL
//        args.put("x-message-ttl", 40000);
//        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
        return QueueBuilder.durable(QUEUE_B).build();
    }
    @Bean("queueDead")
    public Queue queueDead(){
        return QueueBuilder.durable(QUEUE_D).build();
    }

    @Bean("xBindingQueueA")
    public Binding xBindingQueueA(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    @Bean("xBindingQueueB")
    public Binding xBindingQueueB(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    @Bean("yBindingQueueDead")
    public Binding yBindingQueueDead(@Qualifier("queueDead") Queue queueDead,@Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueDead).to(yExchange).with("YD");
    }



    @Bean
    public Binding bindingDelayedQueue(@Qualifier("queueDead") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange
                                               delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }





} 
    ---------死信消费者-------
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues="queue_dead")
    public void deadMeg(Message msg, Channel channel){
        String s = new String(msg.getBody());

        log.info("时间,{}消息{}",new Date(),s);
    }


}

http://www.kler.cn/a/16821.html

相关文章:

  • 【C语言】值传递和地址传递
  • 准确率调整研究中心
  • ubuntu-desktop-24.04上手指南(更新阿里源、安装ssh、安装chrome、设置固定IP、安装搜狗输入法)
  • 【MySQL】MySQL函数之JSON_EXTRACT
  • 【练习案例】30个 CSS Javascript 加载器动画效果
  • 【STM32F1】——无线收发模块RF200与串口通信
  • 【SCI征稿】中科院2区SCI,CCF推荐,评职代表作首选快刊
  • LC正弦波振荡器【高频电子线路】【Multisim】
  • thread-最佳时间
  • BetaFlight统一硬件配置文件研读
  • PAT A1012 The Best Rank
  • iOS---iOS10适配iOS当前所有系统的远程推送
  • Ansible的基础了解
  • docker容器:本地私有仓库、harbor私有仓库部署与管理
  • 【Unity入门】24.碰撞检测
  • 滴水逆向三期笔记与作业——02C语言——02数据类型
  • JDBC详解(四):操作BLOB类型字段(超详解)
  • Python做个猫狗识别系统,给人美心善的邻居
  • matlab for循环详解
  • 其他自定义代码片段
  • 设计模式--装饰者模式
  • JSP+Struct+MySql基于BBS管理系统设计与实现(源代码+论文+中英资料+开题报告+答辩PPT)
  • Java 怎样实现代理模式,有什么优缺点
  • ospf扩展配置—认证、沉默接口、加快收敛、缺省路由
  • 2023-04-23 学习记录--C/C++-邂逅C/C++(中)
  • hive 清空分区表 多姿势对比