RabbitMQ-死信队列
以下单打车为例,用户在下单后,寻找附近的车辆,有一个司机接单了,但是没有在规定的时间到来,导致订单超时了。这时候平台就会再次寻找附近的车辆,下单通知附件的车辆来接你。
这类的场景如果放到MQ上,能实现吗?结合之前的实现已经可以做到订单超时了,但是超时后,目前还没有办法处理,这就需要使用死信队列了。
DLX,全称Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter)之后,被重新发送到一个特殊的交换器(DLX)中,同时DLX的队列就称为“死信队列”。
以下几种情况导致消息变为死信:
- 消息被拒绝(Basic.Reject/Basic.Nack),并设置requeue参数为false.
- 消息过期。
- 队列达到最大长度。
对于RabbitMQ来说,DLX是一个非常有用的特性。它可以处理异常您那个况下,消息不能够被消息者正确消费(消费者调用了Basic.Nack或者Basic.Reject)而被置入死信队列中,后续分析程序可以通过消费死信队列中的内容来分析当时所遇到的异常情况。进而可以改善和优化系统。
9.1 被拒绝的消息
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class DlxRejectProduct {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定义测试消息过期的队列和交换器
channel.exchangeDeclare("dlx.biz.reject.ex", BuiltinExchangeType.FANOUT, false);
Map<String, Object> argument = new HashMap<>();
// 当消息过期后,放置于死信队列
argument.put("x-dead-letter-exchange", "dlx.dead.ex");
// 设置队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
argument.put("x-dead-letter-routing-key", "rk.dlx.reject");
channel.queueDeclare("dlx.biz.reject.qu", false, false, false, argument);
channel.queueBind("dlx.biz.reject.qu", "dlx.biz.reject.ex", "dlx.biz.reject.rk");
// 定义死信交换器和数据
channel.exchangeDeclare("dlx.dead.ex", BuiltinExchangeType.DIRECT, true);
// 用于接收过期后消息的队列
channel.queueDeclare("dlx.reject.qu", false, false, false, null);
// 将用于接收过期消息队列与交换器相绑定
channel.queueBind("dlx.reject.qu", "dlx.dead.ex", "rk.dlx.reject");
channel.basicConsume(
"dlx.biz.reject.qu",
false,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
// 进行消息的拒绝,并且不进入队列
channel.basicReject(envelope.getDeliveryTag(), false);
}
});
}
}
定义了一个业务交换器和队列,并将其绑定,在消费端将消息拒绝,并且不重新加入队列。在定义业务队列时,设置了死信交换器即消息拒绝后放置的放置的交换器,并设置了死信的路由key。
然后再定义了死信交换器以及对应拒绝后的接收消息的队列。
当消息到达消费端后,由于开启了手动ACK确认,会进入处理,而客户端的处理是拒绝消息,并且不重新放回队列,就会被放入到死信交换器dlx.dead.ex
中,而这个消息的死信路由key为rk.dlx.reject
,而此路由key绑定了队列dlx.reject.qu
,这样就看到了消息进入了最终我们看到的队列 dlx.reject.qu
中。
观察下队列的情况:
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers,policy --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────────┬────────────────┬─────────────────────────┬──────────┬───────────┬────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │ policy │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.reject.qu │ 0 │ 0 │ 0 │ 1 │ │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.reject.qu │ 1 │ 0 │ 1 │ 0 │ │
└───────────────────┴────────────────┴─────────────────────────┴──────────┴───────────┴────────┘
[root@nullnull-os rabbitmq]#
通过观察可以发现,消息已经进入了死信队列后的交换器。
9.2 过期的消息
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class DlxExpireProduct {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); ) {
// 定义测试消息过期的队列和交换器
channel.exchangeDeclare("dlx.biz.expire.ex", BuiltinExchangeType.FANOUT, false);
Map<String, Object> argument = new HashMap<>();
// 消息10秒过期
argument.put("x-message-ttl", 10000);
// 当消息过期后,放置于死信队列
argument.put("x-dead-letter-exchange", "dlx.dead.ex");
// 设置队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
argument.put("x-dead-letter-routing-key", "rk.dlx.expire");
channel.queueDeclare("dlx.biz.expire.qu", false, false, false, argument);
channel.queueBind("dlx.biz.expire.qu", "dlx.biz.expire.ex", "dlx.biz.expire.rk");
// 定义死信交换器和数据
channel.exchangeDeclare("dlx.dead.ex", BuiltinExchangeType.DIRECT, true);
// 用于接收过期后消息的队列
channel.queueDeclare("dlx.expire.qu", false, false, false, null);
// 将用于接收过期消息队列与交换器相绑定
channel.queueBind("dlx.expire.qu", "dlx.dead.ex", "rk.dlx.expire");
// 测试过期消息的发送
String msgExpire = "测试过期消息";
channel.basicPublish(
"dlx.biz.expire.ex", "", null, msgExpire.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}
在队列上设置消息10秒过期。并设置了过期绑定的死信交换器和key。
当消息过期时,就会被放入到dlx.dead.ex
交换器,并且此消息设置了死信的路由key为rk.dlx.expire
,而此路由key绑定了dlx.expire.qu
这个队列,所以消息最终就发送到了dlx.expire.qu
中
观察队列中的消息
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers,policy --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────────┬────────────────┬─────────────────────────┬──────────┬───────────┬────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │ policy │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.expire.qu │ 0 │ 0 │ 0 │ 0 │ │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.expire.qu │ 1 │ 0 │ 1 │ 0 │ │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.reject.qu │ 0 │ 0 │ 0 │ 0 │ │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.reject.qu │ 1 │ 0 │ 1 │ 0 │ │
└───────────────────┴────────────────┴─────────────────────────┴──────────┴───────────┴────────┘
[root@nullnull-os rabbitmq]#
等待10秒后,再查看队列:
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers,policy --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────────┬────────────────┬─────────────────────────┬──────────┬───────────┬────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │ policy │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.expire.qu │ 1 │ 0 │ 1 │ 0 │ │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.expire.qu │ 0 │ 0 │ 0 │ 0 │ │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.reject.qu │ 0 │ 0 │ 0 │ 0 │ │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.reject.qu │ 1 │ 0 │ 1 │ 0 │ │
└───────────────────┴────────────────┴─────────────────────────┴──────────┴───────────┴────────┘
[root@nullnull-os rabbitmq]#
可以发现,消息已经进入了死信队列了。
9.3 超过队列的长度的消息
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class DlxMaxLengthProduct {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); ) {
// 定义测试消息过期的队列和交换器
channel.exchangeDeclare("dlx.biz.max.length.ex", BuiltinExchangeType.FANOUT, false);
Map<String, Object> argument = new HashMap<>();
// 当消息过期后,放置于死信队列
argument.put("x-dead-letter-exchange", "dlx.dead.ex");
// 设置队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
argument.put("x-dead-letter-routing-key", "rk.dlx.max.length");
// 定义消息的最大长度为2,超过2个,第三个即为死信消息
argument.put("x-max-length", 2);
channel.queueDeclare("dlx.biz.max.length.qu", false, false, false, argument);
channel.queueBind("dlx.biz.max.length.qu", "dlx.biz.max.length.ex", "dlx.biz.max.length.rk");
// 定义死信交换器和数据
channel.exchangeDeclare("dlx.dead.ex", BuiltinExchangeType.DIRECT, true);
// 用于接收过期后消息的队列
channel.queueDeclare("dlx.max.length.qu", false, false, false, null);
// 将用于接收过期消息队列与交换器相绑定
channel.queueBind("dlx.max.length.qu", "dlx.dead.ex", "rk.dlx.max.length");
String push1 = "测试发送消息1";
String push2 = "测试发送消息2";
String push3 = "测试发送消息3";
channel.basicPublish("dlx.biz.max.length.ex","",null,push1.getBytes(StandardCharsets.UTF_8));
channel.basicPublish("dlx.biz.max.length.ex","",null,push2.getBytes(StandardCharsets.UTF_8));
channel.basicPublish("dlx.biz.max.length.ex","",null,push3.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}
通过指定消息队列的长度为2,即第三个消息就会进入死信队列。规则还是同之前的过期和拒绝一样。
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers,policy --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────────────┬────────────────┬─────────────────────────┬──────────┬───────────┬────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │ policy │
├───────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.expire.qu │ 1 │ 0 │ 1 │ 0 │ │
├───────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.expire.qu │ 0 │ 0 │ 0 │ 0 │ │
├───────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.reject.qu │ 0 │ 0 │ 0 │ 0 │ │
├───────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.max.length.qu │ 1 │ 0 │ 1 │ 0 │ │
├───────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.max.length.qu │ 2 │ 0 │ 2 │ 0 │ │
├───────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.reject.qu │ 1 │ 0 │ 1 │ 0 │ │
└───────────────────────┴────────────────┴─────────────────────────┴──────────┴───────────┴────────┘
[root@nullnull-os rabbitmq]#
最终的消息是两个在业务队列,而超过最大长度的消息在死信队列中。
9.4 SpringBoot使用死信队列
maven导入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.2.8.RELEASE</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<version>2.2.7.RELEASE</version>
<scope>test</scope>
</dependency>
连接配制
spring:
application:
name: dlx
rabbitmq:
host: node1
port: 5672
virtual-host: /
username: root
password: 123456
主启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DlxApplication {
public static void main(String[] args) {
SpringApplication.run(DlxApplication.class, args);
}
}
队列配制
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DlxConfig {
/**
* 业务队列,并指定了死信交换器
*
* @return
*/
@Bean
public Queue bizQueue() {
Map<String, Object> argument = new HashMap<>();
// 消息在10秒后过期
argument.put("x-message-ttl", 5000);
// 设置该队列所关联的死信交换器,当消息超过10秒没有消费,则加入死信队列
argument.put("x-dead-letter-exchange", "dlx.springboot.ex");
// 设置该队列所关联的死信交换器的routingKey,如果没有特殊的指定,使用原队列的routingKey.
argument.put("x-dead-letter-routing-key", "dlx.springboot.rk");
Queue queue = new Queue("dlx.spring.biz.qu", false, false, false, argument);
return queue;
}
/**
* 业务交换器
*
* @return
*/
@Bean
public Exchange bizExchange() {
return new DirectExchange("dlx.spring.biz.ex", false, false, null);
}
@Bean
public Binding bizBind() {
return BindingBuilder.bind(bizQueue()).to(bizExchange()).with("dlx.spring.biz.rk").noargs();
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue queueDlx() {
return new Queue("dlx.springboot.expire.qu", false, false, false);
}
@Bean
public Exchange exchangeDlx() {
return new DirectExchange("dlx.springboot.ex", true, false, null);
}
@Bean
public Binding bindDlx() {
return BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with("dlx.springboot.rk").noargs();
}
}
控制层处理
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
public class BizController {
@Autowired private AmqpTemplate template;
@RequestMapping("/expire-dlx")
public String expireDlx() {
String msg = "测试发送消息,10秒超时";
template.convertAndSend(
"dlx.spring.biz.ex", "dlx.spring.biz.rk", msg.getBytes(StandardCharsets.UTF_8));
return "expire-dlx";
}
@RequestMapping("/dlx/get")
public String sendDlxMsg() {
byte[] getMsg = (byte[]) (template.receiveAndConvert("dlx.springboot.expire.qu"));
return new String(getMsg, StandardCharsets.UTF_8);
}
}
启动项目
然后在浏览器中输入:http://127.0.0.1:8080/expire-dlx
观察队列信息:
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers,policy --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌──────────────────────────┬────────────────┬─────────────────────────┬──────────┬───────────┬────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │ policy │
├──────────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.springboot.expire.qu │ 0 │ 0 │ 0 │ 0 │ │
├──────────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.spring.biz.qu │ 1 │ 0 │ 1 │ 0 │ │
└──────────────────────────┴────────────────┴─────────────────────────┴──────────┴───────────┴────────┘
[root@nullnull-os rabbitmq]#
此时数据在业务队列中。等待5秒,再观察队列:
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers,policy --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌──────────────────────────┬────────────────┬─────────────────────────┬──────────┬───────────┬────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │ policy │
├──────────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.springboot.expire.qu │ 1 │ 0 │ 1 │ 0 │ │
├──────────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.spring.biz.qu │ 0 │ 0 │ 0 │ 0 │ │
└──────────────────────────┴────────────────┴─────────────────────────┴──────────┴───────────┴────────┘
[root@nullnull-os rabbitmq]#
发现数据已经进入了死信队列中。
在浏览器中访问另外的一个接口, http://127.0.0.1:8080/dlx/get
便能得到发送的数据信息:
测试发送消息,10秒超时