【RabbitMQ】消息分发、事务
消息分发
概念
RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。
默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经消费并且已经确认了该消息。这种方式是不大合理的。试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。
在工作模式一文中,书写RPC模式的代码时,已经写了一行代码channel.basicQos(1),来限制当前信道上的消费者所能保持的最大未确认消息的数量是1。所以,我们只需要使用此方法来限制每一个消费者的消息数量就可以避免上述情况发生。
比如,消费端调用了channel.basicQos(5),RabbitMQ就会为该消费者计数,发送一条消息计数加一,消费一条消息计数减一。当到达了设定的上限之后,RabbitMQ就不会再向该消费者发送消息了,知道消费者确认了某条消息之后,才会继续发送。
当channel.basicQos(int prefetchCount)中的形参个数为0时,表示的是没有上限。
应用场景
- 限流
- 非公平分发(负载均衡)
限流
在学习消息分发之前,当消息到达队列之后,如果有对应的消费者存在,那么队列就会一股脑把所有消息全部发送过去,从而造成瞬间压力,进而可能造成服务宕机,产生严重的影响。因此我们就要进行限流,限制消费者接收消息的数量。
限流通过设置prefetchCount参数,同时也必须要设置消息应答方式为手动应答。
spring:
rabbitmq:
host: 43.138.108.125
port: 5672
username: admin
password: admin
virtual-host: mq-springboot-test
listener:
simple:
acknowledge-mode: manual # 消息确认机制为手动确认
prefetch: 5 # 最多拉取5条消息
@Configuration
public class QosConfig {
@Bean("qosQueue")
public Queue qosQueue() {
return QueueBuilder.durable(Constants.QOS_QUEUE).build();
}
@Bean("qosExchange")
public Exchange qosExchange() {
return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();
}
@Bean("qosQueueBind")
public Binding qosQueueBind(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();
}
}
@RestController
@RequestMapping("/qos")
public class QosController {
@Resource
private RabbitTemplate rabbitTemplate;
@RequestMapping
public void qosQueue() {
for (int i = 0; i < 10; i++) {
this.rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "hello qos " + i);
System.out.println("第" + i + "次发送消息成功!");
}
}
}
@Configuration
public class QosListener {
@RabbitListener(queues = Constants.QOS_QUEUE)
public void qosListener(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收的消息为:" + msg);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
启动程序之后,可以看到出现如上结果,明显看到,我们发送了10条信息,但是由于限流的原因,当消费者接收了5条消息之后,并且没有去应答,因此程序就不再继续接收消息,而是等待这5条消息应答之后,才会去继续接收消息。
负载均衡
在有两个消费者的情况下,一个消费者处理任务非常快,一个消费者处理任务非常慢,就会造成一个消费者会一直很忙,而另一个消费者会很闲。这是因为RabbitMQ只是在消息进入队列时进行分派消息,他不考虑消费者未确认消息的数量。我们可以使用prefetch=1的方式来进行设置,告诉RabbitMQ一次只给一个消费者一条消息。在消费者处理并确认该消息之前,都不向其发送新的消息。这样做就可以使得有消息时,所有消费者都处理忙碌的状态。
实现负载均衡功能的代码和实现限流的代码类似,只需要将配置文件中的prefetch修改为1即可。
事务
RabbitMQ也实现了事务机制,允许开发者确保消息的接收和发送是原子性的,要么全部成功,要把全部失败。
@Component
public class RabbitTemplateConfig {
@Bean("transactionRabbitTemplate")
public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true); // 开启事务
return rabbitTemplate;
}
}
@Configuration
public class TransactionConfig {
@Bean("transactionQueue")
public Queue transactionQueue() {
return QueueBuilder.durable(Constants.TRANSACTION_QUEUE).build();
}
@Bean("transactionExchange")
public Exchange transactionExchange() {
return ExchangeBuilder.directExchange(Constants.TRANSACTION_EXCHANGE).durable(true).build();
}
@Bean("transactionQueueBind")
public Binding transactionQueueBind(@Qualifier("transactionQueue") Queue queue,
@Qualifier("transactionExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("transaction").noargs();
}
}
@RestController
@RequestMapping("/transaction")
public class TransactionController {
@Resource(name = "transactionRabbitTemplate")
private RabbitTemplate rabbitTemplate;
@Transactional
@RequestMapping
public void transactionQueue() {
System.out.println("发送成功");
this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");
int i = 1 / 0;
this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");
}
}
RabbitMQ和Redis中的事务相对来说,都是比较简单的,并不和MySQL,包含那么多的性质。因此,在对事务的介绍中,并没有大幅度进行介绍。