rabbitmq高级特性(2)TTL、死信/延迟队列、事务与消息分发
目录
1.TTL
1.1.设置消息过期时间
1.2.设置队列过期时间
2.死信队列
2.1.介绍
2.2.演示
3.延迟队列
3.1.模拟实现延迟队列
3.2.延迟队列插件
4.事务与消息分发
4.1.事务
4.2.消息分发
1.TTL
所谓的ttl,就是过期时间。对于rabbitmq,可以设置队列和消息的过期时间
1.1.设置消息过期时间
(1)相关的交换机和队列
//ttl
@Bean("ttlQueue")
public Queue ttlQueue() {
return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}
@Bean("ttlDirectExchange")
public DirectExchange ttlDirectExchange() {
return ExchangeBuilder.directExchange(Constant.TTL_EXCHANGE).build();
}
@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue,@Qualifier("ttlDirectExchange") Exchange exchange ) {
return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
}
(2)生产者 -- 生产消息的时候设置ttl
给消息设置过期时间,也就是设置消息对象的属性
第一种写法:
//ttl
@RequestMapping("/ttl")
public String ttl() {
MessagePostProcessor postProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");//设置10秒后过期
return message;
}
};
rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl","a ttl test",postProcessor);
return "ttl";
}
第二种写法:
//ttl
@RequestMapping("/ttl")
public String ttl() {
rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl","a ttl test",message -> {
message.getMessageProperties().setExpiration("10000");//设置10秒后过期
return message;
});
return "ttl";
}
(3)效果展示
消息在10s后就会自动删除 (本人承诺没有做任何的处理)
(4)设置消息ttl注意事项
设置消息TTL,即时消息过期,也不会马上从队列中删除,而是在即将投递到消费者之前才会进行判定。如果每次都要扫描整个队列就会很低效。
给A消息设置30s过期,B消息设置10s过期,先将消息A存入队列再存B消息,此时B消息30s后才会被删除。
以上的两条消息是同时消失。
1.2.设置队列过期时间
给队列设置TTL,指的是队列中的消息在TTL后就会被删除,而非队列被删除。
(1)设置交换机和队列
代码给队列设置ttl,在创建队列中调用 ttl() 方法即可
//队列ttl
@Bean("ttlQueue2")
public Queue ttlQueue2() {
return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20 * 1000).build();//队列中的消息20s后被删除
}
@Bean("ttlDirectExchange2")
public DirectExchange ttlDirectExchange2() {
return ExchangeBuilder.directExchange(Constant.TTL_EXCHANGE).build();
}
@Bean("ttlBinding2")
public Binding ttlBinding2(@Qualifier("ttlQueue") Queue queue,@Qualifier("ttlDirectExchange") Exchange exchange ) {
return BindingBuilder.bind(queue).to(exchange).with("ttl2").noargs();
}
(2)生产者与效果
@RequestMapping("/ttl2")
public String ttl2() {
rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl2","a ttl2");
return "ttl2";
}
给队列设置ttl,在web界面中就会有改效果出现。
如果把设置有ttl的消息发送到设置有ttl的队列中,那么过期时间取值小的一个。
2.死信队列
- 所谓死信,就是因为种种原因无法被消费的消息。
- 所以死信队列,就是用来存放死信的队列。
- 死信到达死信队列的交换机称为DLX(Dead Letter Exchange)
2.1.介绍
(1)死信队列的图解
正常队列中的消息因为一些原因就会变成死信,然后经过一个特定的路由交换,最后到达一个指定的死信队列中,然后再投递给消费者。
(2)消息称为死信的原因
- 消息被拒绝,且设置了无法入队
- 消息过期
- 队列达到最大长度
2.2.演示
要演示死信队列的情况,就需要有两种队列和两种交换机。
(1)声明交换机和队列
@Configuration
public class DlConfig {
//正常的交换机和队列
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder.durable(Constant.NORMAL_QUEUE)
.deadLetterExchange(Constant.DL_EXCHANGE)//绑定死信交换机
.deadLetterRoutingKey("dlx")//死信交换机的路由规则
.ttl(10000)//10秒后消息过期
.maxLength(10L)//队列最大长度为10
.build();
}
@Bean("normalExchange")
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).build();
}
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalExchange") DirectExchange normalExchange, @Qualifier("normalQueue")Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("normal");
}
//死信交换机和队列
@Bean("dlxQueue")
public Queue dlxQueue() {
return QueueBuilder.durable(Constant.DL_QUEUE).build();
}
@Bean("dlxExchange")
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(Constant.DL_EXCHANGE).build();
}
@Bean("dlxBinding")
public Binding dlxBinding(@Qualifier("dlxExchange") DirectExchange dlxExchange, @Qualifier("dlxQueue")Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx");
}
}
上面我们知道了消息称为死信的条件,其中消息过期和超过队列最大长度可以在声明队列时实现。所以,有如下的代码改进
//正常的交换机和队列
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder.durable(Constant.NORMAL_QUEUE)
.deadLetterExchange(Constant.DL_EXCHANGE)//绑定死信交换机
.deadLetterRoutingKey("dlx")//死信交换机的路由规则
.ttl(10000)//10秒后消息过期
.maxLength(10L)//队列最大长度为10
.build();
}
对于消息被拒,我们在消费者部分进行修改就好。
(2)生产者和消费者
生产者:可以选择模拟超出队列最大长度的情况
//dlx死信队列
@RequestMapping("/dlx")
public String dlx() {
int maxLen = 12;
for(int i=0;i<maxLen;i++) {
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","normal and dlx");
}
return "normal";
}
消费者:
@Component
public class DlListener {
//监听正常队列,模拟拒绝消息
//@RabbitListener(queues = Constant.NORMAL_QUEUE)
public void normalListener() {
System.out.println();
}
//监听死信队列
@RabbitListener(queues = Constant.DL_QUEUE)
public void dlxListener(Message message) {
System.out.println("死信队列:"+ new String(message.getBody()));
}
}
队列就变成了这样子:
(3)过期消息变成死信
(4)其他情况
比如拒绝消息和长度超过最大队列长度
3.延迟队列
延迟队列,指消息发送到队列后,消费者并不能马上拿到消息,而是等待指定的时间后才能消息该消息。
应用场景:
(1)智能家居:比如通过手机下达命令控制家里的家居,达到一定时间段就自动开启。
(2)日常管理:预定一个会议,在会议开始前15分钟就会通知参加人员
比如,我们经常使用的手机闹钟,就是类似于延迟队列的效果。
3.1.模拟实现延迟队列
对于原生的rabbitmq,并没有实现延迟队列的功能,但是我们可以通过TTL+死信队列来模拟实现。
(1)如何模拟实现
消费者需要订阅死信队列,生产者把延迟的消息放入正常队列中,当消息过期就会自动进入死信队列,消费者进而可以拿到消息。
对于TTL,我们是设置消息的TTL,也可以设置队列的过期时间。
(2)模拟实现
死信队列:
@Configuration
public class DlConfig {
//正常的交换机和队列
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder.durable(Constant.NORMAL_QUEUE)
.deadLetterExchange(Constant.DL_EXCHANGE)//绑定死信交换机
.deadLetterRoutingKey("dlx")//死信交换机的路由规则
.build();
}
@Bean("normalExchange")
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).build();
}
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalExchange") DirectExchange normalExchange, @Qualifier("normalQueue")Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("normal");
}
//死信交换机和队列
@Bean("dlxQueue")
public Queue dlxQueue() {
return QueueBuilder.durable(Constant.DL_QUEUE).build();
}
@Bean("dlxExchange")
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(Constant.DL_EXCHANGE).build();
}
@Bean("dlxBinding")
public Binding dlxBinding(@Qualifier("dlxExchange") DirectExchange dlxExchange, @Qualifier("dlxQueue")Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx");
}
}
生产者:生产者在生产消息的时候加上过期时间,也就是TTL
@RequestMapping("/delay")
public String delay() {
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","这是一条延迟消息",message -> {
message.getMessageProperties().setExpiration("10000");//设置10秒后过期
return message;
});
System.out.println("消息发送时间:"+new Date());
return "delay";
}
消费者:消费者订阅死信队列
//监听死信队列
@RabbitListener(queues = Constant.DL_QUEUE)
public void dlxListener(Message message) {
System.out.println("消费时间: "+new Date() +",死信队列:"+ new String(message.getBody()));
}
演示结果:
结果恰好是10秒后。
(3)存在的缺陷
前面我们知道,当多个携带TTL的消息进入队列中,并且前面消息的TTL大于后面的;那么就会出现,只有前面的消息过期,后面的消息才会跟着过期,这就是TTL+私信队列存在的问题。
所以我们使用一个插件,使用插件带来的延迟队列进行操作。
3.2.延迟队列插件
(1)下载插件并启用
下载地址:这个页面如果点不开,可以使用加速软件加速
Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
找到.ez文件:点击就会下载插件文件
确定安装目录:
像ubunto的就使用这两个目录的其中一个即可:
将文件复制到目录下:没有目录就创建
找到目录
安装插件前:
安装插件:直接将文件拖拽进来即可
安装插件后:多出来的插件
启动插件:
最后在rabbitmq客户端查看交换机类型
这就说明延迟插件启动成功,后续使用该交换机即可。
(2)定义交换机和队列
@Configuration
public class DelayConfig {
@Bean("delayQueue")
public Queue delayQueue() {
return QueueBuilder.durable(Constant.DELAY_QUEUE).build();
}
@Bean("delayExchange")
public Exchange delayExchange() {
return ExchangeBuilder.directExchange(Constant.DELAY_EXCHANGE).delayed().build();
}
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue,@Qualifier("delayExchange") Exchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs();
}
}
生产者:
@RequestMapping("/delay2")
public String delay2() {
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","这是一条延迟消息",message -> {
message.getMessageProperties().setExpiration("10000");//设置10秒后过期
return message;
});
System.out.println("消息发送时间:"+new Date());
return "delay";
}
消费者:
@Component
public class DelayListener {
@RabbitListener(queues = Constant.DELAY_QUEUE)
public void delayListener() {
System.out.println("消息时间:"+new Date());
}
}
(3)演示
使用延迟插件就不会出现像上述TTL+死信队列的问题
如果需要关闭插件,执行下面的命令即可:
rabbitmq delayed message exchange
4.事务与消息分发
4.1.事务
事务,就是保证发送消息和接收消息是原子性的,要么全部成功,要么全部失败
(1)配置事务管理器
这里就是需要对AMQP客户端进行设置属性
//3.返回事务
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);//true为开启事务
return rabbitTemplate;
}
@Bean
public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory){
return new RabbitTransactionManager(connectionFactory);//事务管理器
}
后续使用该对象就可以完成事务的操作
(2)准备队列和交换机
这里使用系统默认的交换机即可
//事务
@Bean("transQueue")
public Queue transQueue() {
return QueueBuilder.durable(Constant.TRANS_QUEUE).build();
}
(3)消费者
@RequestMapping("/trans")
public String trans() {
rabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第一条消息");
System.out.println("异常前");
int a=9/0;//模拟发送异常
rabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第二条消息");
return "trans";
}
上面的消费者是没有使用事物的
(4)没有采取事务
这里指的是既没有开启事务,也没有在方法上加上@Transactional注解
运行结果:
异常前成功发送消息,异常后的消息没有进行发送成功。
(5)使用事务
@Transactional
@RequestMapping("/trans")
public String trans() {
transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第一条消息");
System.out.println("异常前");
int a=9/0;//模拟发送异常
transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第二条消息");
return "trans";
}
这个时候发送了异常,队列中也是一条消息都没有的。
(6)事务小结
要完成一个事务的操作,这三个操作都不能少
配置对象和事务管理器:
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
加上@Transactional注解:
@Transactional
@RequestMapping("/trans")
public String trans() {
transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第一条消息");
//int a=9/0;//模拟发送异常
transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第二条消息");
return "trans";
}
还有一个注意事项,使用事务,最好把消息发送确认模式关闭
4.2.消息分发
(1)定义
多个消费者订阅同一个队列时,队列会轮询给消费者分发消息,每个消费者平均每分钟拿到的消息数目是一样的,这种情况看似挺好的,但是容易出现问题。
当每个消费者的消费能力不一样时,消费速度慢的,消息就会积压;而消费速度快的消费者,就会空闲,进而影响整体的吞吐量。
所以就有了消息分发,按照一定的规则,平均每分钟给不同的消费者分发不同数量的消息。
对于消息分发,有两个应用场景 -- 限流和非公平分发
(2)限流
消费者每次只能拿到一定数量的消息,只有消费并且确认后,才能继续拿到消息。所以需要配置成手动确认模式和限流参数
1)配置
2)相应代码
交换机和队列:
@Configuration
public class QosConfig {
@Bean("qosQueue")
public Queue qosQueue() {
return QueueBuilder.durable(Constant.QOS_QUEUE).build();
}
@Bean("qosExchange")
public Exchange qosExchange() {
return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE).delayed().build();
}
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosQueue") Queue delayQueue, @Qualifier("qosExchange") Exchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with("qos").noargs();
}
}
生产者:
@RequestMapping("/qos")
public String Qos() {
for(int i=0;i<20;i++) {
rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE,"qos","a qos test"+i);
}
return "qos";
}
消费者:
@Component
public class QosListener {
@RabbitListener(queues = Constant.QOS_QUEUE)
public void qosListener(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("消费:"+new String(message.getBody()));
//channel.basicAck(deliveryTag, false); //不进行消息确认
}catch (Exception e){
channel.basicNack(deliveryTag, false, false);
}
}
}
3)演示
一下子往队列中发送20条消息,但是消费者一下子只能拿到5条消息
但是没有确认,就只有五条消息,也拿不到后续的消息。
(3)负载均衡
模拟实现负载均衡,可以把限流参数修改成1,消费确认完成一条消息才能继续拿。
后续代码跟上述是差不多了