当前位置: 首页 > article >正文

微服务学习(3):Work Queues的作用与测试

Work Queues在消息队列(MQ)中扮演着基础且重要的角色,它主要用于实现任务的异步处理和负载均衡。其应用范围广泛,涵盖了从简单的任务分发到复杂的微服务架构间的通信。无论是处理耗时的任务、平滑流量峰值,还是增强系统的可扩展性和可靠性,Work Queues都能提供有效的解决方案。它是构建高效率、松耦合分布式系统的关键模式之一,适用于需要确保消息传递顺序及消费者间负载均衡的场景。

目录

Work Queues是什么

为什么采用Work Queues

Work Queues测试

开始测试

问题解决

总结


Work Queues是什么

Work Queues(工作队列)模型是一种消息队列模式,用于在生产者和多个工作者(消费者)之间分配耗时任务。在此模型中,每个任务作为一个消息发送到队列,然后由多个工作者之一接收并处理。这样可以并行处理多个任务,提高处理速度和系统效率,特别适合于需要执行耗时操作的任务分发场景。通过负载均衡的方式,Work Queues能够确保任务被均匀地分配给空闲的工作者,从而优化资源利用。(多个消费者绑定到一个队列,共同消费队列中的消息

如下图所示(未加入交换机):

为什么采用Work Queues

任务分配与负载均衡:在没有工作队列的情况下,如果多个消费者直接从同一个任务源获取任务,可能会导致某些消费者过载,而其他消费者闲置的问题。通过使用Work Queues,任务可以被均匀地分发给所有空闲的工作者,实现自动化的负载均衡。

提高处理效率:对于耗时的任务,单个消费者可能无法及时处理所有的请求,导致延迟增加。使用Work Queues可以让多个消费者同时处理队列中的任务,从而显著提高系统的整体处理能力和响应速度。

解耦生产者和消费者:生产者只需将任务发送到队列中,无需关心任务实际由谁处理、如何处理等细节。这种机制使得生产者和消费者之间实现了松耦合,方便了系统的扩展和维护。

增强系统可靠性:当一个消费者失败或需要维护时,其他消费者可以继续处理队列中的任务,保证了系统的高可用性和可靠性。此外,消息队列通常会提供持久化功能,确保即使系统崩溃,未完成的任务也不会丢失。

简而言之,使用work 模型,多个消费者共同处理消息,消息处理的速度就提高了,提高了系统的灵活性、可靠性

Work Queues测试

开始测试

首先,新建一个队列work.queue

然后在publisher服务中的SpringAmqpTest类中添加一个测试方法:

    /**
     * 模拟消息堆积
     * @throws InterruptedException
     */
    @Test
    public void testWorkQueue() throws InterruptedException {
        // 队列名称
        String queueName = "work.queue";
        // 消息
        String message = "hello rabbitmq";
        // 发送消息
        for (int i = 0; i < 50; i++) {
            // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }

要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        log.info("消费者1收到消息:{}", msg);
        Thread.sleep(20);
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        log.info("消费者1收到消息:{}", msg);
        Thread.sleep(200);
    }

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

消费者1 sleep了20毫秒,相当于每秒钟处理50个消息.

消费者2 sleep了200毫秒,相当于每秒处理5个消息

运行后控制台结果如下:

结果描述:可以看到消费者1和消费者2竟然每人消费了25条消息,消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

问题解决

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试,发现结果如下:

消费者1接收到消息:【hello rabbitmq0】16:28:04.400892
消费者2----------接收到消息:【hello rabbitmq1】16:28:04.412429500
消费者1接收到消息:【hello rabbitmq2】16:28:04.443088500
消费者1接收到消息:【hello rabbitmq3】16:28:04.474896500
消费者1接收到消息:【hello rabbitmq4】16:28:04.505468900
消费者1接收到消息:【hello rabbitmq5】16:28:04.537058900
消费者1接收到消息:【hello rabbitmq6】16:28:04.568139200
消费者1接收到消息:【hello rabbitmq7】16:28:04.599120700
消费者1接收到消息:【hello rabbitmq8】16:28:04.629452700
消费者2----------接收到消息:【hello rabbitmq9】16:28:04.660120800
消费者1接收到消息:【hello rabbitmq10】16:28:04.691873500
消费者1接收到消息:【hello rabbitmq11】16:28:04.722122900
消费者1接收到消息:【hello rabbitmq12】16:28:04.754236300
消费者1接收到消息:【hello rabbitmq13】16:28:04.783756100
消费者1接收到消息:【hello rabbitmq14】16:28:04.814155600
消费者1接收到消息:【hello rabbitmq15】16:28:04.846354400
消费者1接收到消息:【hello rabbitmq16】16:28:04.877538200
消费者2----------接收到消息:【hello rabbitmq17】16:28:04.908657700
消费者1接收到消息:【hello rabbitmq18】16:28:04.939493800
消费者1接收到消息:【hello rabbitmq19】16:28:04.970395700
消费者1接收到消息:【hello rabbitmq20】16:28:05.000535100
消费者1接收到消息:【hello rabbitmq21】16:28:05.031434700
消费者1接收到消息:【hello rabbitmq22】16:28:05.061435400
消费者1接收到消息:【hello rabbitmq23】16:28:05.092037600
消费者1接收到消息:【hello rabbitmq24】16:28:05.124013300
消费者2----------接收到消息:【hello rabbitmq25】16:28:05.153791500
消费者1接收到消息:【hello rabbitmq26】16:28:05.184977800
消费者1接收到消息:【hello rabbitmq27】16:28:05.216572900
消费者1接收到消息:【hello rabbitmq28】16:28:05.247382300
消费者1接收到消息:【hello rabbitmq29】16:28:05.279566600
消费者1接收到消息:【hello rabbitmq30】16:28:05.309735600
消费者1接收到消息:【hello rabbitmq31】16:28:05.340732900
消费者1接收到消息:【hello rabbitmq32】16:28:05.370816400
消费者2----------接收到消息:【hello rabbitmq33】16:28:05.401774
消费者1接收到消息:【hello rabbitmq34】16:28:05.431728
消费者1接收到消息:【hello rabbitmq35】16:28:05.463604200
消费者1接收到消息:【hello rabbitmq36】16:28:05.494062400
消费者1接收到消息:【hello rabbitmq37】16:28:05.524519700
消费者1接收到消息:【hello rabbitmq38】16:28:05.555449200
消费者1接收到消息:【hello rabbitmq39】16:28:05.586951
消费者1接收到消息:【hello rabbitmq40】16:28:05.618628100
消费者2----------接收到消息:【hello rabbitmq41】16:28:05.648597700
消费者1接收到消息:【hello rabbitmq42】16:28:05.679561400
消费者1接收到消息:【hello rabbitmq43】16:28:05.710259900
消费者1接收到消息:【hello rabbitmq44】16:28:05.740510700
消费者1接收到消息:【hello rabbitmq45】16:28:05.771589
消费者1接收到消息:【hello rabbitmq46】16:28:05.801896700
消费者1接收到消息:【hello rabbitmq47】16:28:05.833378300
消费者1接收到消息:【hello rabbitmq48】16:28:05.863505700
消费者2----------接收到消息:【hello rabbitmq49】16:28:05.894284

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了7条消息。而最终总的执行耗时也在1秒左右,大大提升。

总结

Work模型的使用:

多个消费者绑定到一个队列,同一条消息只会被一个消费者处理,通过设置prefetch来控制消费者预取的消息数量。

Work Queues成为优化分布式系统中任务处理流程的理想选择,尤其适用于需要异步处理、负载均衡和系统扩展的应用场景。


http://www.kler.cn/a/569481.html

相关文章:

  • 《白帽子讲 Web 安全:点击劫持》
  • 计算机毕业设计SpringBoot+Vue.js林业产品推荐系统 农产品推荐系统 (源码+文档+PPT+讲解)
  • 【Git】Ubuntu 安装 Git Large File Storage(LFS)以及使用 Git LFS 下载
  • AI 时代下,操作系统如何进化与重构?
  • 打开 Windows Docker Desktop 出现 Docker Engine Stopped 问题
  • 2.1 第一个程序:从 Hello World 开始
  • 量子计算在材料科学中的应用:开辟新技术前沿
  • 极简RabbitMQ快速学习
  • Python 如何实现烟花效果的完整代码
  • 【区块链 + 智慧政务】 伽罗华域:区块链数据溯源系统 | FISCO BCOS 应用案例
  • Linux 下使用mtr命令来进行网络诊断
  • Docker数据卷操作实战
  • 【Java分布式】Nacos注册中心
  • 最大子数组和力扣--53
  • 深入解析数据倾斜:原因、影响与优化方案
  • XGMII(10 Gigabit Media Independent Interface)详解
  • LLMR: Real-time Prompting of Interactive Worldsusing Large Language Models
  • 2016年蓝桥杯第七届CC++大学B组真题及代码
  • spring boot 2.7 + seata +微服务 降级失败问题修复
  • 【愚公系列】《Python网络爬虫从入门到精通》037-文件的存取