当前位置: 首页 > article >正文

RabbitMQ深度探索:创建消息队列

快速入门:实现 RabbitMQ 简单队列:

  1. 在 RabbitMQ 平台创建 Virtual Hosts 和一个队列
  2. /boyaVirtualHosts
    1. 订单队列
    2. 支付队列
  3. 导入依赖:
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.6.5 </version>
    </dependency>
  4. 编写连接类:
    public class RabbitMQConnection {
        /**
         * 获取连接
         */
        public static Connection getConnection() throws IOException, TimeoutException {
            // 1.创建连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            // 2.设置连接地址
            connectionFactory.setHost("127.0.0.1");
    
            // 3.设置端口号
            connectionFactory.setPort(5672);
    
            // 4.设置账号和密码
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
    
            // 5.设置 VirtualHost
            connectionFactory.setVirtualHost("/boyaVirtualHostsR");
    
            return connectionFactory.newConnection();
        }
    }
  5. 编写生产者代码:
    public class Producer {
        private static final String QUEUE_NAME = "BoyatopMamber";
        /**
         * 获取连接
         */
        public static void main(String[] args) throws IOException, TimeoutException {
    
    
            while (true){
                // 1.创建连接
                Connection connection = RabbitMQConnection.getConnection();
    
                // 2.设置通道
                Channel channel = connection.createChannel();
    
                // 3.设置消息
                String msg = "Hello World";
                System.out.println("msg:" + msg);
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    
    
                channel.close();
                connection.close();
            }
        }
    }
  6. 编写消费者代码:
    public class Comsumer {
        private static final String QUEUE_NAME = "BoyatopMamber";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.创建链接
            Connection connection = RabbitMQConnection.getConnection();
    
            // 2.设置通道
            Channel channel = connection.createChannel();
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body,"UTF-8");
                    System.out.println("消费者获取消息:" + msg);
                }
            };
    
            // 3.监听队列
            channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
        }
    }
  7. RabbitMQ 如何保证消息不丢失:
    1. 生产者角色:
      1. 确保生产者角色投递到 MQ 服务器端成功
      2. Ack 消息确认机制
      3. 同步或异步的形式:
        1. Confirms
        2. 事务消息
    2. 消费者角色:
      1. 在 RabbitMQ 情况下:
        1. 必须要将消息消费成功之后,才会将消息从 MQ 服务器端移除
      2. 在 kafka 中的情况下:
        1. 不管是消费成功还是消费失败,该消息都不会立即从 MQ 服务器移除
    3. MQ 服务器端:
      1. 在默认的情况下,都会对队列中的消息持久化,持久化硬盘
  8. 使用消息确认机制 + 持久化技术实现:
    1. A 消费者确认收到消息机制
      channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
      1. 第二个参数值为 false,代表关闭 RabbitMQ 的自动应答机制,改为手动应答
      2. 在处理完消息时,返回应答状态,true 表示为自动应答模式
        channel.basicAck(envelope.getDeliveryTag(),false);
    2. B 生产者确认投递消息成功,使用 Confirm 机制或者事务消息
    3. Confirm 机制,同步或异步的形式
  9. RabbitMQ 默认创建是持久化的形式:
    1. 将代码中的 durable 设为 true
    2. 参数详解:
      1. Durability:是否持久化
        1. durable:持久化
        2. Transient:不持久化
      2. Auto delete:是否自动删除
        1. 当最后一个消费者断开连接之后队列是否自动被删除
        2. 可以通过 RabbitMQ Management 查看某个队列的消费者数量
        3. 当 consumers = 0 时,队列就会自动删除
  10. 使用 RabbitMQ 事务:
    //设置事务
    channel.txSelect();
    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    channel.txCommit();
    1. 生产者:
      
      public class producer {
          private static final String QUEUE_NAME = "BoyatopMamber";
      
          public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
              // 1.创建新的连接
              Connection connection = RabbitMQConnection.getConnection();
      
              // 2.设置 channel
              Channel channel = connection.createChannel();
      
              // 3.发送消息
              String msg = "Hello my Bro";
              channel.confirmSelect();
      
              channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
              boolean result = channel.waitForConfirms();
      
              if(result){
                  System.out.println("消息投递成功");
              }else {
                  System.out.println("消息投递失败");
              }
      
              // 4.关闭资源
              channel.close();
              connection.close();
          }
      }
    2. 消费者:
      public class Consumer {
          private static final String QUEUE_ANME = "BoyatopMamber";
      
          public static void main(String[] args) throws IOException, TimeoutException {
              // 1.创建连接
              Connection connection = RabbitMQConnection.getConnection();
      
              //2.设置通道
              Channel channel = connection.createChannel();
              DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      String msg = new String(body,"UTF-8");
                      System.out.println("消费者获取消息:" + msg);
      
                      //消费者完成 消费该消息
                      channel.basicAck(envelope.getDeliveryTag(),false);
                  }
              };
      
              // 3.监听队列
              channel.basicConsume(QUEUE_ANME,false,defaultConsumer);
          }
      }

http://www.kler.cn/a/535921.html

相关文章:

  • 【LeetCode】152、乘积最大子数组
  • Windows 中学习Docker环境准备2、Docker Desktop中安装ubuntu
  • 设计模式Python版 享元模式
  • IDEA 中集成 Maven,配置环境、创建以及导入项目
  • 排序算法与查找算法
  • 【算法篇】贪心算法
  • 【梦想终会实现】Linux驱动学习4
  • Nginx与frp结合实现局域网和公网的双重https服务
  • 网站打开提示不安全
  • 深度剖析FFmpeg视频解码后的帧处理到Qt显示 从AVFrame到QImage的转换(二)
  • DeepSeek 和 ChatGPT-4o
  • K8s 常见面试题(K8s Common Interview Questions)
  • 如何正确配置您的WordPress邮件设置
  • 『python爬虫』获取免费IP代理 搭建自己的ip代理池(保姆级图文)
  • Redis数据变化监听:使用Spring Boot实现实时数据监控
  • 【2】高并发导出场景下,服务器性能瓶颈优化方案-异步导出
  • AI大模型评测对比2—ChatGPT对比DeepSeek
  • DeepSeek-VL2论文解读:用于高级多模态理解的专家混合视觉语言模型
  • 图论 - 临接矩阵与临接表介绍与分析对比
  • Linux进阶——远程连接服务器
  • salesforce SF CLI 数据运维经验分享
  • 2025模仿游戏 别人怎么做就什么做 做的过程中再加入自己的元素 模仿与创新
  • 深度学习中模型训练的过拟合与欠拟合问题
  • 计算机毕业设计Python+大模型疲劳驾驶检测系统 自动驾驶 面部多信息特征融合的疲劳驾驶检测系统 驾驶员疲劳驾驶风险检测 深度学习 机器学习 大数据
  • innoDB 如何解决幻读
  • 动手学图神经网络(10):利用 PyTorch Geometric 进行图分类