当前位置: 首页 > article >正文

【RabbitMQ】消息分发、事务

消息分发

概念

RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经消费并且已经确认了该消息。这种方式是不大合理的。试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。

在工作模式一文中,书写RPC模式的代码时,已经写了一行代码channel.basicQos(1),来限制当前信道上的消费者所能保持的最大未确认消息的数量是1。所以,我们只需要使用此方法来限制每一个消费者的消息数量就可以避免上述情况发生。

比如,消费端调用了channel.basicQos(5),RabbitMQ就会为该消费者计数,发送一条消息计数加一,消费一条消息计数减一。当到达了设定的上限之后,RabbitMQ就不会再向该消费者发送消息了,知道消费者确认了某条消息之后,才会继续发送。

当channel.basicQos(int prefetchCount)中的形参个数为0时,表示的是没有上限。

应用场景

  1. 限流
  2. 非公平分发(负载均衡)

限流

在学习消息分发之前,当消息到达队列之后,如果有对应的消费者存在,那么队列就会一股脑把所有消息全部发送过去,从而造成瞬间压力,进而可能造成服务宕机,产生严重的影响。因此我们就要进行限流,限制消费者接收消息的数量。

限流通过设置prefetchCount参数,同时也必须要设置消息应答方式为手动应答。

spring:
  rabbitmq:
    host: 43.138.108.125
    port: 5672
    username: admin
    password: admin
    virtual-host: mq-springboot-test
    listener:
      simple:
        acknowledge-mode: manual # 消息确认机制为手动确认
        prefetch: 5 # 最多拉取5条消息
@Configuration
public class QosConfig {

    @Bean("qosQueue")
    public Queue qosQueue() {
        return QueueBuilder.durable(Constants.QOS_QUEUE).build();
    }

    @Bean("qosExchange")
    public Exchange qosExchange() {
        return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();
    }

    @Bean("qosQueueBind")
    public Binding qosQueueBind(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();
    }

}
@RestController
@RequestMapping("/qos")
public class QosController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void qosQueue() {
        for (int i = 0; i < 10; i++) {
            this.rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "hello qos " + i);
            System.out.println("第" + i + "次发送消息成功!");
        }
    }

}
@Configuration
public class QosListener {

    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void qosListener(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收的消息为:" + msg);
        // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

 

启动程序之后,可以看到出现如上结果,明显看到,我们发送了10条信息,但是由于限流的原因,当消费者接收了5条消息之后,并且没有去应答,因此程序就不再继续接收消息,而是等待这5条消息应答之后,才会去继续接收消息。

负载均衡

在有两个消费者的情况下,一个消费者处理任务非常快,一个消费者处理任务非常慢,就会造成一个消费者会一直很忙,而另一个消费者会很闲。这是因为RabbitMQ只是在消息进入队列时进行分派消息,他不考虑消费者未确认消息的数量。我们可以使用prefetch=1的方式来进行设置,告诉RabbitMQ一次只给一个消费者一条消息。在消费者处理并确认该消息之前,都不向其发送新的消息。这样做就可以使得有消息时,所有消费者都处理忙碌的状态。

实现负载均衡功能的代码和实现限流的代码类似,只需要将配置文件中的prefetch修改为1即可。

事务

RabbitMQ也实现了事务机制,允许开发者确保消息的接收和发送是原子性的,要么全部成功,要把全部失败。

@Component
public class RabbitTemplateConfig {

    @Bean("transactionRabbitTemplate")
    public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true); // 开启事务
        return rabbitTemplate;
    }

}
@Configuration
public class TransactionConfig {

    @Bean("transactionQueue")
    public Queue transactionQueue() {
        return QueueBuilder.durable(Constants.TRANSACTION_QUEUE).build();
    }

    @Bean("transactionExchange")
    public Exchange transactionExchange() {
        return ExchangeBuilder.directExchange(Constants.TRANSACTION_EXCHANGE).durable(true).build();
    }

    @Bean("transactionQueueBind")
    public Binding transactionQueueBind(@Qualifier("transactionQueue") Queue queue,
                                       @Qualifier("transactionExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("transaction").noargs();
    }

}
@RestController
@RequestMapping("/transaction")
public class TransactionController {

    @Resource(name = "transactionRabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    @Transactional
    @RequestMapping
    public void transactionQueue() {
        System.out.println("发送成功");
         this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");
         int i = 1 / 0;
        this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");
    }

}

RabbitMQ和Redis中的事务相对来说,都是比较简单的,并不和MySQL,包含那么多的性质。因此,在对事务的介绍中,并没有大幅度进行介绍。 


http://www.kler.cn/a/318913.html

相关文章:

  • 微服务day08
  • 【IC每日一题:IC常用模块--RR/handshake/gray2bin】
  • Dolby TrueHD和Dolby Digital Plus (E-AC-3)编码介绍
  • npm list @types/node 命令用于列出当前项目中 @types/node 包及其依赖关系
  • 如何在算家云搭建Peach-9B-8k-Roleplay(文本生成)
  • 读数据质量管理:数据可靠性与数据质量问题解决之道03数据目录
  • 二级C语言2023-3易错题
  • 【伺服】Servo入坑学习记录①
  • 02DSP学习-了解syscfg
  • C语言从头学63—学习头文件stdlib.h(二)
  • PyQt5 导入ui文件报错 AttributeError: type object ‘Qt‘ has no attribute
  • Spring Boot在心理辅导领域的创新应用
  • C++(9.24)
  • Golang | Leetcode Golang题解之第420题强密码检验器
  • Android SystemUI组件(07)锁屏KeyguardViewMediator分析
  • echarts图表刷新
  • 与 CESS Network 共探去中心化创新:重塑数据基础设施,驱动未来变革
  • 数电学习基础(逻辑门电路+)
  • 羽毛球场馆预约系统,便捷管理预约
  • 【UE5】将2D切片图渲染为体积纹理,最终实现使用RT实时绘制体积纹理【第二篇-着色器制作】
  • 破解 oklink 网站加密数据(升级版)
  • Python中字典常用方法
  • Go版数据结构 -【序言】
  • 一,初始 MyBatis-Plus
  • 微信小程序公共样式:设计与实现指南
  • 智能听诊器宠物社区的新宠