当前位置: 首页 > article >正文

【RabbitMQ】消息堆积、推拉模式

消息堆积

原因

消息堆积是指在消息队列中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象。通常有以下几种原因:

消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力。

消费者处理能力不足:消费者消费消息的速度跟不上消息生产的速度,也会导致消息在队列中积压。可能的原因有:

  • 消费端业务逻辑复杂、耗时长。
  • 消费端代码性能低。
  • 系统资源限制,如CPU、内存、磁盘等也会限制消费者处理消息的速率。
  • 异常处理不当,消费者在处理消息时出现异常,导致消息无法被正确处理和确认。

网络问题:因为网络延迟或不稳定,消费者无法及时接收或确认消息,最终导致消息积压。

RabbitMQ服务器配置偏低

解决方案

消息积压可能会导致系统性能下降,影响用户体验,甚至导致系统崩溃。因此,及时发现消息积压并解决对于维护系统稳定性至关重要。

遇到消息积压时,首先要分析消息积压造成的原因,根据原因来调整策略。通常从以下几个方面考虑:

提高消费者效率

  • 增加消费者实例数量,比如新增机器。
  • 优化业务逻辑,比如使用多线程来处理业务。
  • 设置prefetchCount,当一个消费者阻塞时,消息转发到其他未阻塞的消费者。
  • 消息发生异常时,设置合适的重试策略,或者转入到死信队列。

限制生产者效率:比如流量控制、限流算法等。

  • 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送效率。
  • 限流:使用限流工具,为消息发送效率设置一个上限。
  • 设置过期时间:如果消息过期未被消费,可以配置死信队列,以避免消息丢失,同时减少对主队列的压力。

资源与配置优化:比如省级RabbitMQ服务器的硬件,调整RabbitMQ的配置参数等。

推拉模式

概述

RabbitMQ支持两种消息传递模式:推模式(push)和拉模式(pull)。

推模式:消息中间件主动将消息推送给消费者(对消息的获取更加实时,适合对数据实时性要求较高的业务,例如实时数据处理:监控系统、报表系统等)。

拉模式:消费者主动从消息中间件拉取消息(消费端可以按照自己的处理速度来消费,避免消息积压,适合需要流量控制,或者需要大量计算资源的任务,拉取模式允许消费者准备好之后再请求消息,避免资源浪费)。

RabbitMQ主要就是基于推模式工作的,例如最开始谈到的几种工作模式,都是基于推模式来进行实现。它的设计核心是让消息队列中的消费者接收到由生产者发送的消息,使用channel.basicConsume方式订阅队列,MQ就会把消息推送到订阅该队列的消费者。如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方式来进行消费消息。

SpringBoot方式

@Configuration
public class PushAndPull {

    @Bean("pushQueue")
    public Queue pushQueue() {
        return QueueBuilder.durable(Constants.PUSH_QUEUE).build();
    }

    @Bean("pushExchange")
    public Exchange pushExchange() {
        return ExchangeBuilder.directExchange(Constants.PUSH_EXCHANGE).durable(true).build();
    }

    @Bean("pushQueueBind")
    public Binding pushQueueBind(@Qualifier("pushExchange") Exchange exchange,
                                 @Qualifier("pushQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("push").noargs();
    }

    @Bean("pullQueue")
    public Queue pullQueue() {
        return QueueBuilder.durable(Constants.PULL_QUEUE).build();
    }

    @Bean("pullExchange")
    public Exchange pullExchange() {
        return ExchangeBuilder.directExchange(Constants.PULL_EXCHANGE).durable(true).build();
    }

    @Bean("pullQueueBind")
    public Binding pullQueueBind(@Qualifier("pullExchange") Exchange exchange,
                                 @Qualifier("pullQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("pull").noargs();
    }
}
@Slf4j
@RestController
@RequestMapping("/pushAndPull")
public class PushAndPullController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    // 推模式
    @RequestMapping("/push")
    public void push() {
        this.rabbitTemplate.convertAndSend(Constants.PUSH_EXCHANGE, "push", "hello push");
        System.out.println("推模式消息发送成功!");
    }

    // 拉模式
    @RequestMapping("/pull")
    public void pull() {
        this.rabbitTemplate.convertAndSend(Constants.PULL_EXCHANGE, "pull", "hello pull");
        System.out.println("拉模式消息发送成功!");
    }

}
@Configuration
@RestController
public class PushAndPullListener {

    // 推模式
    @RabbitListener(queues = Constants.PUSH_QUEUE)
    public void push(String message) {
        System.out.println("推模式: " + message);
    }

    @Resource
    private RabbitTemplate rabbitTemplate;

    // 拉模式
    @RequestMapping("/pullConsumer")
    public void pull(String message) {
        Message receive = this.rabbitTemplate.receive(Constants.PULL_QUEUE);
        System.out.println("拉模式: " + receive);
    }

}

SDK方式

// 推模式生产者
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("113.45.220.15"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        Connection connection = connectionFactory.newConnection();

        // TODO 获取信道
        Channel channel = connection.createChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");

        // TODO 发送消息
        channel.basicPublish(Constants.PUSH_EXCHANGE, "push", null, "推模式".getBytes());
        System.out.println("推模式发送消息成功!");

        // TODO 释放资源
        channel.close();
        connection.close();
    }

}
// 推模式消费者
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("113.45.220.15"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        Connection connection = connectionFactory.newConnection();

        // TODO 获取信道
        Channel channel = connection.createChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");

        // TODO 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("推模式消费者接收到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.PUSH_QUEUE, true, consumer);


        // TODO 释放资源

    }

}
// 拉模式生产者
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("113.45.220.15"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        Connection connection = connectionFactory.newConnection();

        // TODO 获取信道
        Channel channel = connection.createChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");

        // TODO 发送消息
        channel.basicPublish(Constants.PULL_EXCHANGE, "pull", null, "拉模式".getBytes());
        System.out.println("拉模式发送消息成功!");

        // TODO 释放资源
        channel.close();
        connection.close();
    }

}
// 拉模式消费者
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("113.45.220.15"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        Connection connection = connectionFactory.newConnection();

        // TODO 获取信道
        Channel channel = connection.createChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");

        // TODO 接收消息
        GetResponse getResponse = channel.basicGet(Constants.PULL_QUEUE, true);
        if (getResponse != null) {
            System.out.println("拉模式收到消息:" + new String(getResponse.getBody()));
        }


        // TODO 释放资源

    }

}

http://www.kler.cn/a/321206.html

相关文章:

  • 麒麟系统下docker搭建jenkins
  • web与网络编程
  • androidstudio入门到放弃配置
  • Python期末复习 | 列表、元组、字典、集合与字符串 | 代码演示
  • 林曦词典|养生
  • nVisual自定义工单内容
  • java调用opencv部署到centos7
  • 个人行政复议在线预约系统开发+ssm论文源码调试讲解
  • TypeScript 中的接口、泛型与自定义类型
  • 基于Es和智普AI实现的语义检索
  • <script>中的为什么需要转义?
  • 【python qdrant 向量数据库 完整示例代码】
  • Centos7 docker 自动补全命令
  • js 接力导出
  • 双token无感刷新
  • AI大语言模型的全面解读
  • 828华为云征文|使用Flexus X实例安装宝塔面板教学
  • 1 elasticsearch安装
  • 什么是开放式耳机?具有什么特色?非常值得入手的蓝牙耳机推荐
  • 【C++位图】构建灵活的空间效率工具
  • 计算机毕业设计选题推荐-基于python的养老院数据可视化分析
  • R18 NES 之SSB-less SCell operation for inter-band CA
  • 基于vue框架的宠物寻回小程序8g7el(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。
  • MATLAB系列09:图形句柄
  • 论文解读《Object-Centric Learning with Slot Attention》
  • 网络模型的保存与读取