当前位置: 首页 > article >正文

RabbitMQ 发布订阅模式,routing路由模式,topic模式

发布订阅模式

一个消息可以由多个消费者消费同一个消息

 消费者1和2同时消费了该消息

举例

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    /**
     * 发布订阅模式需要指定交换机和类型,不能用上面的模式
     * 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定
     * 或者没有符合路由规则的队列,那么消息会丢失
     * 第一个参数:交换机名字
     * 第二个参数:交换机类型
     *  fanout:广播,将消息交给所有绑定到交换机的队列
     *  direct:定向,把消息交给符合指定routing key的队列
     *  topic:通配符,把消息交给符合routing pattern(路由模式)的队列
     */
    channel.exchangeDeclare("03-pubsub1", "fanout");
    //6 发送消息
    /**
     * 第一个参数:交换机名称 没有交换机就设置""
     * 第二个参数:路由key
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */


    channel.basicPublish("03-pubsub1","", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ".getBytes());

    //7 关闭消息
    //channel.close();
    connection.close();
}

 消费者1和2同时消费了该消息,比如说消息是发短信,发邮件,  那么1和发短息  2可以发邮件

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("03-pubsub1", "fanout");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    channel.queueBind(queue, "03-pubsub1", "");
    channel.basicConsume(queue, false,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 1  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

routing路由模式

就是说哪些让谁干

哪些让谁干区分出来

也可以让所有消费者都消费

选择性的让某个消费者消费,或者都消费

 生产者

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    /**
     * 发布订阅模式需要指定交换机和类型,不能用上面的模式
     * 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定
     * 或者没有符合路由规则的队列,那么消息会丢失
     * 第一个参数:交换机名字
     * 第二个参数:交换机类型
     *  fanout:广播,将消息交给所有绑定到交换机的队列
     *  direct:定向,把消息交给符合指定routing key的队列
     *  topic:通配符,把消息交给符合routing pattern(路由模式)的队列
     */
    channel.exchangeDeclare("04-routing1", "direct");
    //6 发送消息
    /**
     * 第一个参数:交换机名称 没有交换机就设置""
     * 第二个参数:路由key  routing模式需要路由key
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */


    channel.basicPublish("04-routing1","info", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ".getBytes());

    //7 关闭消息
    //channel.close();
    connection.close();
}

 消费者1

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("04-routing1", "direct");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    //可与绑定多个
    channel.queueBind(queue, "04-routing1", "info");
    channel.queueBind(queue, "04-routing1", "error");
    channel.queueBind(queue, "04-routing1", "waring");
    channel.basicConsume(queue, true,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 1  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

消费者2

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("04-routing1", "direct");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    //可与绑定多个

    channel.queueBind(queue, "04-routing1", "trace");


    channel.basicConsume(queue, true,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 2  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

上面的只有消费者1消费了消息 

可以根据channel.queueBind(queue, "04-routing1", "trace"); 绑定消息  也可以让1和2都消费,

 

topic模式和Routing模式高度相识,用通配符的形式指定让谁消费,或者都消费

 生产者

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    /**
     * 发布订阅模式需要指定交换机和类型,不能用上面的模式
     * 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定
     * 或者没有符合路由规则的队列,那么消息会丢失
     * 第一个参数:交换机名字
     * 第二个参数:交换机类型
     *  fanout:广播,将消息交给所有绑定到交换机的队列
     *  direct:定向,把消息交给符合指定routing key的队列
     *  topic:通配符,把消息交给符合routing pattern(路由模式)的队列
     */
    channel.exchangeDeclare("05-topic1", "topic");
    //6 发送消息
    /**
     * 第一个参数:交换机名称 没有交换机就设置""
     * 第二个参数:路由key  routing模式需要路由key
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */


    channel.basicPublish("05-topic1","employee.save", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ".getBytes());

    //7 关闭消息
    //channel.close();
    connection.close();
}

消费者1

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("05-topic1", "topic");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    //可与绑定多个
    channel.queueBind(queue, "05-topic1", "employee.*");

    channel.basicConsume(queue, true,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 1  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

消费者2

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("05-topic1", "topic");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    //可与绑定多个
    channel.queueBind(queue, "05-topic1", "user.*");

    channel.basicConsume(queue, true,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 2  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

结果就是消费者1消费了消息

所有工作模式总结

 


http://www.kler.cn/news/16498.html

相关文章:

  • 2023-5-2面试题学习
  • 746. 使用最小花费爬楼梯
  • Cell:癌症研究的下一个问题是什么?
  • 大学生学java编程的就业前景怎么样?我来聊聊自己的见解
  • 通过Python的PIL库给图片添加文本水印
  • 【网络协议详解】——GNS3的使用(学习笔记)
  • 计算机网络笔记:TCP协议 和UDP协议(传输层)
  • ChatGPT调教指南(中文)
  • 回到大学时光,我想对当时的自己说些什么
  • DDD系列:四、领域层设计规范
  • 存储资源调优技术——SmartThin智能精简配置技术
  • C++动态规划模板汇总大全
  • STM32物联网实战开发(4)——基本定时器
  • 32k*16 薪,3年自动化测试历经3轮面试成功拿下华为Offer....
  • 【Java笔试强训 7】
  • GEE:MODIS计算遥感指数(NDVI、BSI、NDSI、EVI、LSWI、SIPI、EBI等)
  • 吉布斯采样方法
  • 设计模式-单例模式
  • 一文搞懂PMP挣值管理那些让你头疼的公式
  • mockjs学习笔记
  • maven中的 type ,scope的作用
  • 2335. 装满杯子需要的最短总时长
  • 83. map函数()-通过函数实现对可迭代对象的操作(适合零基础)
  • 「SQL面试题库」 No_55 销售分析 I
  • ramfs, rootfsinitramfs
  • HTML(四) -- 多媒体设计
  • CCD视觉检测设备如何选择光源
  • 【面试长文】HashMap的数据结构和底层原理以及在JDK1.6、1.7和JDK8中的演变差异
  • Blender启动场景的修改
  • 资讯汇总230503