当前位置: 首页 > article >正文

RabbitMQ深度探索:五种消息模式

  1. RabbitMQ 工作队列:
    1. 默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳
    2. 采用工作队列模式:
      1. 在通道中只需要设置 baseicQos 的值即可
        1. 表示 MQ 服务器每次只会给消费者推送 n 条消息
        2. 必须手动应答之后才会继续发送
    3. 生产者:
      public class ProducerFanout {
      
          private static final String QUEUE_NAME = "BoyatopMamber";
      
          public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
              // 1.创建新的连接
              Connection connection = RabbitMQConnection.getConnection();
      
              // 2.设置 channel
              Channel channel = connection.createChannel();
      
              for (int i = 0; i < 10; i++) {
                  // 3.发送消息
                  String msg = "Hello my Bro" + i;
                  channel.confirmSelect();
      
                  channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
                  boolean result = channel.waitForConfirms();
              }
              // 4.关闭资源
              channel.close();
              connection.close();
          }
      }
    4. 消费者1:
      public class Consumer1 {
          //定义队列
          private static final String QUEUE_NAME = "BoyatopMamber";
      
      
          public static void main(String[] args) throws IOException, TimeoutException {
              //创建连接
              Connection connection = RabbitMQConnection.getConnection();
      
              //创建通道
              final Channel channel = connection.createChannel();
              channel.basicQos(1);
      
              DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      String msg = new String(body,"UTF-8");
                      System.out.println("用户1:" + msg);
                      channel.basicAck(envelope.getDeliveryTag(),false);
                  }
              };
      
              //监听消息
              channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
      
          }
      }
    5. 消费者2:
      public class Consumer2 {
          //定义队列
          private static final String QUEUE_NAME = "BoyatopMamber";
      
      
          public static void main(String[] args) throws IOException, TimeoutException {
              //创建连接
              Connection connection = RabbitMQConnection.getConnection();
      
              //创建通道
              final Channel channel = connection.createChannel();
              channel.basicQos(3);
      
              DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      String msg = new String(body,"UTF-8");
                      System.out.println("消费者2:" + msg);
                      channel.basicAck(envelope.getDeliveryTag(),false);
                  }
              };
      
      
              //监听消息
              channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
      
      
      
          }
      }
  2. RabbitMQ 交换机类型:
    1. Direct exchange:直连交换机
    2. Fanout exchange:扇形交换机
    3. Topic exchange:主体交换机
    4. Headers exchange:头交换机
    5. Virtual Hostos:区分不同的团队
      1. 队列:存放消息
      2. 交换机:路由消息存放在那个队列中,类似于 Nginx
      3. 路由:key 分发规则
  3. RabbitMQ Fanout 发布订阅:
    1. 生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者
    2. 步骤:
      1. 需要创建两个队列,每个队列都对应一个消费者
      2. 队列需要绑定我们交换机
      3. 生产者投递消息到交换机中,交换机再将消息分配给两个队列中都存放起来
      4. 消费者从队列中获取消息
    3. 生产者:
      public class OrderConsumer {
      
          //定义队列
          private static final String QUEUE_NAME = "BoyatopOrder";
      
          //定义交换机
          private static final String EXCHANGE_NAME = "fanout_exchange";
      
          public static void main(String[] args) throws IOException, TimeoutException {
              ///创建连接
              Connection connection = RabbitMQConnection.getConnection();
      
              //创建通道
              Channel channel = connection.createChannel();
      
              //关联队列
              channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
      
              DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      String msg = new String(body,"UTF-8");
                      System.out.println("订单接收:" + msg);
                  }
              };
      
              //监听消息
              channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
          }
      }
    4. 数据消费者:
      public class MamConsumer {
          //定义队列
          private static final String QUEUE_NAME = "BoyatopMamber";
      
          //定义交换机的名称
          private static final String EXCHANGE_NAME = "fanout_exchange";
      
          public static void main(String[] args) throws IOException, TimeoutException {
              //创建连接
              Connection connection = RabbitMQConnection.getConnection();
      
              //创建通道
              final Channel channel = connection.createChannel();
      
              //关联队列
              channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
      
              DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      String msg = new String(body,"UTF-8");
                      System.out.println("消费者接收:" + msg);
                  }
              };
      
              //开启监听
              channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
          }
      }
    5. 订单消费者:
      public class OrderConsumer {
      
          //定义队列
          private static final String QUEUE_NAME = "BoyatopOrder";
      
          //定义交换机
          private static final String EXCHANGE_NAME = "fanout_exchange";
      
          public static void main(String[] args) throws IOException, TimeoutException {
              ///创建连接
              Connection connection = RabbitMQConnection.getConnection();
      
              //创建通道
              Channel channel = connection.createChannel();
      
              //关联队列
              channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
      
              DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      String msg = new String(body,"UTF-8");
                      System.out.println("订单接收:" + msg);
                  }
              };
      
              //监听消息
              channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
          }
      }
  4. Direct  路由模式:
    1. 当交换机类型为 direct 类型时,根据队列绑定的路由转发到具体的队列中存放消息
    2. 生产者:
      public class Producer {
          //定义交换机
          private static final String EXCHANGE_NAME = "newDirect_exchange";
      
          public static void main(String[] args) throws IOException, TimeoutException {
              //创建连接
              Connection connection = RabbitMQConnection.getConnection();
      
              //创建 channel
              Channel channel = connection.createChannel();
      
              //通道关联交换机
              channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);
      
              //发送消息
              for (int i = 0; i < 10; i++) {
                  String msg = "生产消息 --- 路由模式";
                  System.out.println(msg + i);
                  channel.basicPublish(EXCHANGE_NAME,"mail",null,msg.getBytes());
                  channel.basicPublish(EXCHANGE_NAME,"sms",null,msg.getBytes());
              }
      
              channel.close();
              connection.close();
          }
      }
    3. 邮件消费者:
      public class MailConsumer {
          //定义交换机
          private static final String EXCHANGE_NAME = "newDirect_exchange";
      
          //定义队列
          private static final String QUEUE_NAME = "newDirectqueueOne";
      
          public static void main(String[] args) throws IOException, TimeoutException {
              //创建连接
              Connection connection = RabbitMQConnection.getConnection();
      
              //创建 channel
              Channel channel = connection.createChannel();
      
              //关联队列
              channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"mail");
      
              DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      String msg = new String(body,"UTF-8");
                      System.out.println("邮件消费者接收信息:" + msg);
                  }
              };
      
              //监听队列
              channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
          }
      }
    4. 短信消费者:
      public class SmsConsumer {
          //定义交换机
          private static final String EXCHANGE_NAME = "newDirect_exchange";
      
          //定义队列
          private static final String QUEUE_NAME = "newDirectqueueTwo";
      
          public static void main(String[] args) throws IOException, TimeoutException {
              //创建连接
              Connection connection = RabbitMQConnection.getConnection();
      
              //创建 channel
              Channel channel = connection.createChannel();
      
              //关联队列
              channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"sms");
      
              DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      String msg = new String(body,"UTF-8");
                      System.out.println("短信消费者接收消息:" + msg);
                  }
              };
      
              //监听队列
              channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
          }
      }
  5. Topic 主体模式:
    1. 当交换机类型为 topic 类型时,根据队列绑定的路由键模糊转发到具体队列中存放
    2. #:表示支持匹配多个词
    3. *:表示只能匹配一个词

http://www.kler.cn/a/534764.html

相关文章:

  • 【3】高并发导出场景下,服务器性能瓶颈优化方案-文件压缩
  • 生产环境的 MySQL事务隔离级别
  • 中小企业的采购流程,采购管理是如何进行的?
  • Spring Boot 2 快速教程:WebFlux优缺点及性能分析(四)
  • 【华为OD-E卷 - 108 最大矩阵和 100分(python、java、c++、js、c)】
  • 蓝桥杯试题:排序
  • CentOS 7.3编译Rsyslog 8.1903.0
  • 机器学习9-卷积和卷积核2
  • Android_P_Audio_系统(1) — Auido 系统简介
  • 【FPGA】 MIPS 12条整数指令 【3】
  • UE_C++ —— Properties
  • 高手之间的较量,是“想过”和“想透”之间的较量
  • 深入理解小波变换:信号处理的强大工具
  • python代码
  • HELLOCTF反序列化靶场全解
  • langchain教程-2.prompt
  • DeepSeek写的lammps反应势断键动态显示程序
  • 使用requestAnimationFrame减少浏览器重绘
  • 事件驱动架构(EDA)
  • 电路研究9.2.10——合宙Air780EP中文件系统读写命令使用方法研究
  • 达梦利用老备份集和新归档日志进行异机恢复
  • 单硬盘槽笔记本更换硬盘
  • 2025年南软考研复试,进!
  • 【机器学习与数据挖掘实战】案例12:基于决策树算法的水色图像的水质评价
  • 如何使用Webpack构建前端应用?
  • 剑指 Offer II 014. 字符串中的变位词