RabbitMQ解决消息积压的方法
目录
减少发送mq的消息体内容
增加消费者数量
批量消费消息
临时队列转移
监控和预警机制
分阶段实施
最后还有一个方法就是开启队列的懒加载
这篇文章总结一下自己知道的解决消息积压得方法。
减少发送mq的消息体内容
像我们没有必要知道一个的中间状态,只需知道一个最终状态就可以了。
发送的消息体只用包含:id和状态等关键信息,不用发送一个完整的对象内容。
消费者收到消息之后,通过id调用原服务再将完整的消息对象内容查询出来即可,最后再进行消费处理。
增加消费者数量
采用动态增加消费者的数量
@Configuration
public class RabbitMQConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 设置并发消费者数量
factory.setConcurrentConsumers(5); // 初始消费者数量
factory.setMaxConcurrentConsumers(20); // 最大消费者数量
// 动态调整消费者数量
factory.setConsumerTagStrategy(queue -> "consumer-" + UUID.randomUUID());
return factory;
}
}
@Service
public class ConsumerManagerService {
@Autowired
private RabbitListenerEndpointRegistry registry;
public void adjustConsumerCount(String queueName, int count) {
MessageListenerContainer container =
registry.getListenerContainer(queueName);
if (container instanceof SimpleMessageListenerContainer) {
SimpleMessageListenerContainer simpleContainer =
(SimpleMessageListenerContainer) container;
simpleContainer.setConcurrentConsumers(count);
}
}
}
批量消费消息
@Service
public class BatchMessageConsumer {
@RabbitListener(queues = "myQueue", containerFactory = "batchFactory")
public void processBatch(List<Message> messages, Channel channel) {
try {
// 批量处理消息
List<MessageDTO> dtos = messages.stream()
.map(this::convertToDTO)
.collect(Collectors.toList());
// 批量保存到数据库
batchSaveToDatabase(dtos);
// 获取最后一条消息的deliveryTag
long lastDeliveryTag = messages.get(messages.size() - 1)
.getMessageProperties()
.getDeliveryTag();
// 批量确认
channel.basicAck(lastDeliveryTag, true);
} catch (Exception e) {
// 批量拒绝
handleBatchError(messages, channel);
}
}
}
// 配置批量消费
@Configuration
public class BatchConsumerConfig {
@Bean
public SimpleRabbitListenerContainerFactory batchFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 启用批量消费
factory.setBatchListener(true);
// 批量大小
factory.setBatchSize(100);
// 批量超时时间
factory.setReceiveTimeout(1000L);
return factory;
}
}
临时队列转移
@Service
public class MessageTransferService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void transferMessages(String sourceQueue, String tempQueue, int batchSize) {
while (true) {
// 从源队列批量获取消息
List<Message> messages = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
Message message = rabbitTemplate.receive(sourceQueue);
if (message == null) break;
messages.add(message);
}
if (messages.isEmpty()) break;
// 转移到临时队列
messages.forEach(msg ->
rabbitTemplate.send(tempQueue, msg));
}
}
}
// 临时队列的消费者
@Component
public class TempQueueConsumer {
@RabbitListener(queues = "#{tempQueue.name}")
public void processMessage(Message message) {
// 使用更高效的处理方式
fastProcessMessage(message);
}
@Bean
public Queue tempQueue() {
return new Queue("temp-queue-" + UUID.randomUUID(), false, false, true);
}
}
监控和预警机制
@Service
@Slf4j
public class QueueMonitorService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void monitorQueueSize() {
String queueName = "myQueue";
// 获取队列信息
Properties properties = rabbitTemplate.execute(channel ->
channel.queueDeclarePassive(queueName));
// 获取消息数量
int messageCount = properties.getMessageCount();
// 检查消息堆积
if (messageCount > threshold) {
// 发送告警
sendAlert(queueName, messageCount);
// 动态调整消费者
adjustConsumers(queueName, messageCount);
}
}
private void adjustConsumers(String queueName, int messageCount) {
// 根据消息数量动态调整消费者数量
int newConsumerCount = calculateConsumerCount(messageCount);
consumerManagerService.adjustConsumerCount(queueName, newConsumerCount);
}
}
分阶段实施
@Service
public class MessageHandlingStrategy {
public void handleMessageBacklog() {
// 1. 首先增加消费者数量
adjustConsumerCount();
// 2. 如果仍然堆积,启用批量处理
if (isStillBacklogged()) {
enableBatchProcessing();
}
// 3. 如果问题持续,使用临时队列
if (isEmergency()) {
transferToTemporaryQueue();
}
}
}