当前位置: 首页 > article >正文

RabbitMQ基础篇之Java客户端 Work Queues

文章目录

    • 模型概述
    • 需求
    • 实现步骤
      • 创建队列
      • 定义消费者
      • 定义消息发送
      • 测试执行
      • 观察结论
      • 多消费者的作用
      • 性能差异
      • 生产环境中的应用
      • 处理速度差异的情况
      • 优化示例
      • 总结

模型概述

  • Work Queues 模型也称为任务模型,多个消费者绑定到同一个队列,共同消费队列中的消息。
  • 特点
    • 每条消息只会被一个消费者处理,消息不会被多个消费者同时消费。
    • 消息的处理是分配给绑定到队列的消费者,多个消费者可以加速消息的处理。



需求

模拟 WorkQueue,实现一个队列绑定多个消费者

  • 在 RabbitMQ 的控制台创建一个队列,名为 work.queue
  • 在 publisher 服务中定义测试方法,发送 50 条消息到 work.queue
  • 在 consumer 服务中定义两个消息监听者,都监听 work.queue 队列


实现步骤

创建队列

  • 在控制台创建队列,命名为 work_queue




定义消费者

  • Consumer 服务中定义两个消息监听者(即消费者)。
  • 监听的队列改为 work_queue,分别打印消息处理情况,并加上时间戳。
  • 为区分不同消费者,使用不同的打印样式(例如,Consumer 1 用黑色,Consumer 2 用红色打印)。
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) {
    System.out.println("消费者11111111111111111接收到的消息: " +
            message +
            ", " +
            LocalTime.now());
}

 @RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) {
    System.err.println("消费者22222222222222222接收到的消息: " +
            message +
            ", " +
            LocalTime.now());
}



定义消息发送

  • 在发送者服务中,编写发送 50 条消息的代码。
  • 使用循环发送消息,并在消息中附加编号,以便消费者知道自己接收的是第几条消息。
@Test
public void testWorkQueue() {
    // 队列名称
    String queueName = "work.queue";
    for (int i = 1; i <= 50; i++) {
        // 消息
        String message = "Hello, Spring AMQP_" + i;
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}



测试执行

  • 启动消费者后,开始发送 50 条消息。
  • 观察消费者的消息接收情况:
    • Consumer 1 接收到偶数编号的消息,Consumer 2 接收到奇数编号的消息。
    • 每个消费者分别接收 25 条消息,分配是均匀的(轮询机制)。




观察结论

  • 结论 1: 同一个消息只会被一个消费者处理,不会被多个消费者处理。
  • 结论 2: 消息的分配是均匀的,即使有多个消费者,每个消费者会轮流处理消息。例如,消息1分给消费者2,消息2分给消费者1,依此类推。


多消费者的作用

  • 增加消费者数目可以提高消息的处理速度。
  • 如果只有一个消费者,则所有消息由一个消费者处理。如果有多个消费者,消息会被分配给不同的消费者,提高处理效率。



性能差异

  • 情况 1: 一个消费者处理所有消息时,消息处理的速度较慢。
  • 情况 2: 多个消费者处理消息时,可以大大提高消息的处理速度。
    • 例如,2 个消费者分别处理 25 条消息,处理速度快。
    • 3 个消费者则每人处理约 16 条,4 个消费者则每人处理约 12 条消息。


生产环境中的应用

  • 在实际生产中,通常不会手动创建多个消费者方法。一般是写一个消费者方法,并通过部署多个实例(多台机器或多个服务实例)来实现多个消费者。
  • 这样可以使用不同机器的资源来处理消息,进一步提升消息处理速度。


处理速度差异的情况

  • 如果不同消费者的处理速度不同,默认的轮询分配可能不够合理。为了解决这个问题,可以使用 prefetch 配置来优化消息的分配。
    • 默认情况下,RabbitMQ 使用轮询方式来分配消息。
    • 如果某个消费者处理消息较慢,默认的分配方式会导致它处理大量消息,处理速度慢。
    • 可以通过配置 spring.rabbitmq.listener.simple.prefetch=1 来优化。这样每个消费者只能提前获取一条消息,直到当前消息处理完毕,才能获取下一条。
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息  



优化示例

  • 在消费者 1 中设置每条消息处理后休眠 25 毫秒,每秒处理 40 条消息。
  • 在消费者 2 中设置每条消息处理后休眠 200 毫秒,每秒处理 5 条消息。
  • 通过这种方式模拟消费者处理速度不同。
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) throws InterruptedException {
    System.out.println("消费者11111111111111111接收到的消息: " + message + ", " + LocalTime.now());
    Thread.sleep(25);
}

@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) throws InterruptedException {
    System.err.println("消费者22222222222222222接收到的消息: " + message + ", " + LocalTime.now());
    Thread.sleep(200);
}




总结

  • Work Queues 模型通过多个消费者共同消费同一队列的消息,提高了处理速度,解决了单个消费者处理过慢的问题。
  • 生产环境中通过多实例部署,可以有效地扩展消费者数量,提升系统处理能力。

http://www.kler.cn/a/460533.html

相关文章:

  • 回归预测 | MATLAB实现CNN-SVM多输入单输出回归预测
  • springboot集成qq邮箱服务
  • 【AI】最近有款毛茸茸AI生成图片圈粉了,博主也尝试使用风格转换生成可爱的小兔子,一起来探索下是如何实现的
  • ELK入门教程(超详细)
  • 深入理解Java中的Set集合:特性、用法与常见操作指南
  • Datawhale AI冬令营(第二期)动手学AI Agent--Task3:学Agent工作流搭建,创作进阶Agent
  • 电子电气架构 --- 什么是自动驾驶技术中的域控制单元(DCU)?
  • 【Paper Tips】随记1-word版打印公式
  • Mesh网格数据结构2-半边结构
  • 32. 找最小数
  • 集成开发环境——keil c51 和 keil mdk的安装及融合
  • 【详解】AndroidWebView的加载超时处理
  • 【YashanDB知识库】sys登录提示账户被锁,怎么处理?
  • 【AUTOSAR 基础软件】Can模块详解(Can栈之驱动模块)
  • 深入了解 Zookeeper:原理与应用
  • 小白投资理财 - 看懂 EPS 每股收益
  • Windows11 的开发
  • 【SpringBoot】深度解析 Spring Boot 拦截器:实现统一功能处理的关键路径
  • debian安装Nginx
  • [羊城杯 2024]不一样的数据库_2
  • QGIS移动图元功能
  • 纯血鸿蒙ArkUI相对布局详解
  • 简易内存池(中)
  • Kubernetes: NetworkPolicy 的实践应用
  • 航顺芯片推出HK32A040方案,赋能汽车矩阵大灯安全与智能化升级
  • Linux postgresql-15部署文档