【RabbitMQ】消息堆积、推拉模式
消息堆积
原因
消息堆积是指在消息队列中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象。通常有以下几种原因:
消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力。
消费者处理能力不足:消费者消费消息的速度跟不上消息生产的速度,也会导致消息在队列中积压。可能的原因有:
- 消费端业务逻辑复杂、耗时长。
- 消费端代码性能低。
- 系统资源限制,如CPU、内存、磁盘等也会限制消费者处理消息的速率。
- 异常处理不当,消费者在处理消息时出现异常,导致消息无法被正确处理和确认。
网络问题:因为网络延迟或不稳定,消费者无法及时接收或确认消息,最终导致消息积压。
RabbitMQ服务器配置偏低
解决方案
消息积压可能会导致系统性能下降,影响用户体验,甚至导致系统崩溃。因此,及时发现消息积压并解决对于维护系统稳定性至关重要。
遇到消息积压时,首先要分析消息积压造成的原因,根据原因来调整策略。通常从以下几个方面考虑:
提高消费者效率
- 增加消费者实例数量,比如新增机器。
- 优化业务逻辑,比如使用多线程来处理业务。
- 设置prefetchCount,当一个消费者阻塞时,消息转发到其他未阻塞的消费者。
- 消息发生异常时,设置合适的重试策略,或者转入到死信队列。
限制生产者效率:比如流量控制、限流算法等。
- 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送效率。
- 限流:使用限流工具,为消息发送效率设置一个上限。
- 设置过期时间:如果消息过期未被消费,可以配置死信队列,以避免消息丢失,同时减少对主队列的压力。
资源与配置优化:比如省级RabbitMQ服务器的硬件,调整RabbitMQ的配置参数等。
推拉模式
概述
RabbitMQ支持两种消息传递模式:推模式(push)和拉模式(pull)。
推模式:消息中间件主动将消息推送给消费者(对消息的获取更加实时,适合对数据实时性要求较高的业务,例如实时数据处理:监控系统、报表系统等)。
拉模式:消费者主动从消息中间件拉取消息(消费端可以按照自己的处理速度来消费,避免消息积压,适合需要流量控制,或者需要大量计算资源的任务,拉取模式允许消费者准备好之后再请求消息,避免资源浪费)。
RabbitMQ主要就是基于推模式工作的,例如最开始谈到的几种工作模式,都是基于推模式来进行实现。它的设计核心是让消息队列中的消费者接收到由生产者发送的消息,使用channel.basicConsume方式订阅队列,MQ就会把消息推送到订阅该队列的消费者。如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方式来进行消费消息。
SpringBoot方式
@Configuration
public class PushAndPull {
@Bean("pushQueue")
public Queue pushQueue() {
return QueueBuilder.durable(Constants.PUSH_QUEUE).build();
}
@Bean("pushExchange")
public Exchange pushExchange() {
return ExchangeBuilder.directExchange(Constants.PUSH_EXCHANGE).durable(true).build();
}
@Bean("pushQueueBind")
public Binding pushQueueBind(@Qualifier("pushExchange") Exchange exchange,
@Qualifier("pushQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("push").noargs();
}
@Bean("pullQueue")
public Queue pullQueue() {
return QueueBuilder.durable(Constants.PULL_QUEUE).build();
}
@Bean("pullExchange")
public Exchange pullExchange() {
return ExchangeBuilder.directExchange(Constants.PULL_EXCHANGE).durable(true).build();
}
@Bean("pullQueueBind")
public Binding pullQueueBind(@Qualifier("pullExchange") Exchange exchange,
@Qualifier("pullQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("pull").noargs();
}
}
@Slf4j
@RestController
@RequestMapping("/pushAndPull")
public class PushAndPullController {
@Resource
private RabbitTemplate rabbitTemplate;
// 推模式
@RequestMapping("/push")
public void push() {
this.rabbitTemplate.convertAndSend(Constants.PUSH_EXCHANGE, "push", "hello push");
System.out.println("推模式消息发送成功!");
}
// 拉模式
@RequestMapping("/pull")
public void pull() {
this.rabbitTemplate.convertAndSend(Constants.PULL_EXCHANGE, "pull", "hello pull");
System.out.println("拉模式消息发送成功!");
}
}
@Configuration
@RestController
public class PushAndPullListener {
// 推模式
@RabbitListener(queues = Constants.PUSH_QUEUE)
public void push(String message) {
System.out.println("推模式: " + message);
}
@Resource
private RabbitTemplate rabbitTemplate;
// 拉模式
@RequestMapping("/pullConsumer")
public void pull(String message) {
Message receive = this.rabbitTemplate.receive(Constants.PULL_QUEUE);
System.out.println("拉模式: " + receive);
}
}
SDK方式
// 推模式生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("113.45.220.15"); // IP
connectionFactory.setPort(5672); // PORT
connectionFactory.setUsername("admin"); // 用户名
connectionFactory.setPassword("admin"); // 密码
connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机
// TODO 创建连接
Connection connection = connectionFactory.newConnection();
// TODO 获取信道
Channel channel = connection.createChannel();
// TODO 声明交换机
channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);
// TODO 声明队列
channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);
// TODO 绑定交换机和队列
channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");
// TODO 发送消息
channel.basicPublish(Constants.PUSH_EXCHANGE, "push", null, "推模式".getBytes());
System.out.println("推模式发送消息成功!");
// TODO 释放资源
channel.close();
connection.close();
}
}
// 推模式消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("113.45.220.15"); // IP
connectionFactory.setPort(5672); // PORT
connectionFactory.setUsername("admin"); // 用户名
connectionFactory.setPassword("admin"); // 密码
connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机
// TODO 创建连接
Connection connection = connectionFactory.newConnection();
// TODO 获取信道
Channel channel = connection.createChannel();
// TODO 声明交换机
channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);
// TODO 声明队列
channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);
// TODO 绑定交换机和队列
channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");
// TODO 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("推模式消费者接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.PUSH_QUEUE, true, consumer);
// TODO 释放资源
}
}
// 拉模式生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("113.45.220.15"); // IP
connectionFactory.setPort(5672); // PORT
connectionFactory.setUsername("admin"); // 用户名
connectionFactory.setPassword("admin"); // 密码
connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机
// TODO 创建连接
Connection connection = connectionFactory.newConnection();
// TODO 获取信道
Channel channel = connection.createChannel();
// TODO 声明交换机
channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);
// TODO 声明队列
channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);
// TODO 绑定交换机和队列
channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");
// TODO 发送消息
channel.basicPublish(Constants.PULL_EXCHANGE, "pull", null, "拉模式".getBytes());
System.out.println("拉模式发送消息成功!");
// TODO 释放资源
channel.close();
connection.close();
}
}
// 拉模式消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("113.45.220.15"); // IP
connectionFactory.setPort(5672); // PORT
connectionFactory.setUsername("admin"); // 用户名
connectionFactory.setPassword("admin"); // 密码
connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机
// TODO 创建连接
Connection connection = connectionFactory.newConnection();
// TODO 获取信道
Channel channel = connection.createChannel();
// TODO 声明交换机
channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);
// TODO 声明队列
channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);
// TODO 绑定交换机和队列
channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");
// TODO 接收消息
GetResponse getResponse = channel.basicGet(Constants.PULL_QUEUE, true);
if (getResponse != null) {
System.out.println("拉模式收到消息:" + new String(getResponse.getBody()));
}
// TODO 释放资源
}
}