当前位置: 首页 > article >正文

RabbitMQ消息模型之发布订阅Publish-Subscribe

发布订阅模型 Publish/Subscribe

发布订阅模型也称为广播模型,交换机类型需要指定为Fanout,正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。每个消费者都监听自己的队列,所以同一个消息,会被所有的消费者共同消费。Fanout 这种交换类型并不能给我们带来很大的灵活性,它只能进行无意识的广播。
image-20220526173046882

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者。
  • 每个消费者有自己的Queue。
  • 每个队列都要绑定到Exchange。
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列。
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费。

创建生产者

public class MyProducer {

    @Test
    public void test() throws Exception {
        // Fanout模式不需要指定队列
        String queue = "";
        // 交换机
        String exchange = "logs";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(exchange, "fanout");
        for (int i = 0; i < 3; i++) {
            // 发布消息
            channel.basicPublish(exchange, queue, null, ("DEBUG LOG -> " + i).getBytes());
            channel.basicPublish(exchange, queue, null, ("INFO LOG -> " + i).getBytes());
            channel.basicPublish(exchange, queue, null, ("WARN LOG -> " + i).getBytes());
            channel.basicPublish(exchange, queue, null, ("ERROR LOG -> " + i).getBytes());
        }
    }
}

创建消费者1

public class MyConsumer1 {

    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("logs", "fanout");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //将临时队列绑定exchange
        channel.queueBind(queue, "logs", "");
        //处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}

创建消费者2

public class MyConsumer2 {

    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("logs", "fanout");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //将临时队列绑定exchange
        channel.queueBind(queue, "logs", "");
        //处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2: " + new String(body));
            }
        });
    }
}

image-20220526175729612

两个消费者同时都收到了消息。


http://www.kler.cn/news/150034.html

相关文章:

  • docker中安装mysql,远程连接
  • 基于docker的onlyoffice使用--运行JavaSpringExample
  • 你了解vue的diff算法吗?
  • go学习之文件操作与命令行参数
  • leetcode 283. 移动零
  • JavaScript基础知识总结
  • Java 关于批量插入遇到的问题 -sqlserver
  • 配置阿里云的yum仓库
  • qt-C++笔记之主线程中使用异步逻辑来处理ROS事件循环和Qt事件循环解决相互阻塞的问题
  • 数学加速器:Python numpy.add函数全解读
  • 基于单片机的肺活量检测系统(论文+源码)
  • 树与二叉树堆:堆的意义
  • SpringBoot整合EasyExcel实现复杂Excel表格的导入导出功能
  • DNS/ICMP协议、NAT技术
  • goweb入门教程
  • Python dns域名解析(dns.resolver)
  • 【LeetCode:1670. 设计前中后队列 | 数据结构设计】
  • spring-webflux的一些概念的理解
  • 【Rust】基本的语法概念
  • 唯创知音WT588F02B-8S语音芯片:灵活更换语音内容,降低开发成本与备货压力
  • python每日一题——12最小覆盖子串
  • GoLang切片
  • Leetcode算法系列| 1. 两数之和(四种解法)
  • OpenLDAP配置web管理界面PhpLDAPAdmin服务-centos9stream
  • Spring---对象的存储和读取
  • PyQt6库和工具库QTDesigner安装与配置
  • Lazada详情API接口:一键获取商品信息的深度实践
  • 喜报 | 再获影响力产品奖!擎创科技实力亮相GOPS全球运维大会
  • Java-多线程基本知识学习总结
  • 二叉树OJ题之二