当前位置: 首页 > article >正文

RabbitMQ 持久化与不公平分发

RabbitMQ 持久化与不公平分发

1. RabbitMQ 持久化 (Durability)

概念

持久化是指将消息或队列保存在磁盘上,以确保即使 RabbitMQ 服务器宕机或重启,数据也不会丢失。

持久化的三个层面

  1. 队列持久化
    队列持久化意味着即使 RabbitMQ 重启后,队列依然存在,但它不会保证队列中的消息仍然存在。

    • 队列持久化声明

      channel.queueDeclare("queue_name", true, false, false, null);
      

      其中,true 表示队列持久化。

      image-20241019002748164

  2. 消息持久化
    消息持久化是在生产者发送消息时指定的,确保消息在服务器重启后依然能够保留在队列中。

    • 消息持久化声明
      channel.basicPublish("", "queue_name", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
      

    注意:队列持久化和消息持久化是独立的。即使队列是持久化的,消息也需要单独设置为持久化。

  3. 交换机(Exchange)持久化
    交换机也可以设置为持久化,确保 RabbitMQ 重启后,交换机不会丢失。

    • 交换机持久化声明
      channel.exchangeDeclare("exchange_name", "direct", true);
      
      其中,true 表示交换机持久化。

持久化的注意事项

  • 持久化操作会稍微影响性能,因为需要将数据写入磁盘。
  • 队列持久化并不会让未持久化的消息在 RabbitMQ 重启后恢复,消息也需要设置为持久化。

2. 不公平分发 (Fair Dispatch)

概念

RabbitMQ 默认采用轮询的方式来分发消息给消费者,即每个消费者会按照平等的顺序接收消息。这种机制在某些情况下会导致某些消费者的任务积压过多,而其他消费者的任务处理过快,导致系统资源浪费。这时,我们可以使用不公平分发(也称为预取机制)来优化分发过程。

不公平分发实现

不公平分发的实现方式是通过设置 Qos(Quality of Service) 预取值,使得每个消费者一次只会接收特定数量的消息,直到这些消息处理完并发送应答后,才会接收新的消息。

步骤
  1. 设置 QoS
    在消费者端使用 basicQos 方法来设置每次接收的消息数量。通常设置为 1,表示消费者一次只会接收一条消息,在处理完并应答之后再接收下一条消息。

    // 设置为每次处理一条消息
    channel.basicQos(1);
    
  2. 手动消息应答
    RabbitMQ 的默认行为是自动应答,即消费者接收到消息后立即应答,不论是否处理完成。要实现不公平分发,需要手动确认消息,在消息处理完之后再发送应答。

    • 手动应答代码示例
      boolean autoAck = false;  // 关闭自动应答
      channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
      
      // 消息处理完成后,手动应答
      channel.basicAck(deliveryTag, false);
      

不公平分发的优点

  • 避免消费者压力过大:某些消费者可能任务处理较慢,通过 QoS 设置,可以避免一次性收到大量任务导致压力过大。
  • 优化资源使用:确保任务更合理地分配到消费者,提升整体的任务处理效率。

不公平分发的注意事项

  • basicQos(1) 的设置会稍微降低吞吐量,但可以确保消息公平分配给消费者。
  • 必须确保消息的手动应答,否则未应答的消息可能会再次发送给其他消费者,导致重复消费。

3. RabbitMQ 持久化与不公平分发结合

在实际应用中,RabbitMQ 持久化与不公平分发可以结合使用,以确保消息的可靠性和消费者的负载平衡。

  • 持久化 确保消息和队列在服务重启时不会丢失,保证了系统的可靠性。
  • 不公平分发 则通过手动应答和 QoS 设置,确保消费者可以根据自己的处理能力,公平地获取消息并进行处理,防止某个消费者处理任务过于缓慢。

代码示例

// 创建通道并声明持久化队列
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare("task_queue", true, false, false, null);

// 生产者发送持久化消息
String message = "Hello World";
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

// 消费者设置 QoS 和手动应答
channel.basicQos(1);
boolean autoAck = false;  // 关闭自动应答
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String receivedMessage = new String(delivery.getBody(), "UTF-8");
    System.out.println("Received: " + receivedMessage);
    
    // 模拟任务处理
    doWork(receivedMessage);
    
    // 手动应答
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};

// 消费消息
channel.basicConsume("task_queue", autoAck, deliverCallback, consumerTag -> {});

http://www.kler.cn/news/357473.html

相关文章:

  • sqli-labs less-25a
  • 单片机的寻址方式有哪些?
  • 创建虚拟机并安装操作系统
  • 贪心day4
  • 【人工智能-初级】第9章 神经网络的基础:理解感知器与激活函数
  • qt项目使用其他项目的ui之单继承之成员变量
  • Cookie与Session的区别(特别详细)
  • C++学习路线(十六)
  • [论文阅读]: Detecting Copyrighted Content in Language Models Training Data
  • 【python】OpenCV—Fourier Transform
  • 十一、SQL 优化:提升数据库性能的关键技巧与实例讲解
  • MongoDB 的优点和缺点
  • 探索YOLO v11:3D人工智能的RGB-D视觉革命
  • 深度解析 Redis 存储结构及其高效性背后的机制
  • 放眼全球:在竞争激烈的当下,海外媒体发稿何以备受关注?
  • UDP协议和TCP协议
  • 常见网络钓鱼类型
  • 【深度学习|地学应用】Aerosol——宏观层面分析地震气溶胶异常——采用 HYSPLIT-4模型模拟地震AOD异常的水平和垂直后向轨迹
  • IDEA中的快捷键大全--超详细
  • UltraISO(软碟通)制作U盘制作Ubuntu20.04启动盘