当前位置: 首页 > article >正文

RabbitMQ-死信队列

以下单打车为例,用户在下单后,寻找附近的车辆,有一个司机接单了,但是没有在规定的时间到来,导致订单超时了。这时候平台就会再次寻找附近的车辆,下单通知附件的车辆来接你。

这类的场景如果放到MQ上,能实现吗?结合之前的实现已经可以做到订单超时了,但是超时后,目前还没有办法处理,这就需要使用死信队列了。

DLX,全称Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter)之后,被重新发送到一个特殊的交换器(DLX)中,同时DLX的队列就称为“死信队列”。

以下几种情况导致消息变为死信:

  1. 消息被拒绝(Basic.Reject/Basic.Nack),并设置requeue参数为false.
  2. 消息过期。
  3. 队列达到最大长度。

对于RabbitMQ来说,DLX是一个非常有用的特性。它可以处理异常您那个况下,消息不能够被消息者正确消费(消费者调用了Basic.Nack或者Basic.Reject)而被置入死信队列中,后续分析程序可以通过消费死信队列中的内容来分析当时所遇到的异常情况。进而可以改善和优化系统。

9.1 被拒绝的消息

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class DlxRejectProduct {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();

    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 定义测试消息过期的队列和交换器
    channel.exchangeDeclare("dlx.biz.reject.ex", BuiltinExchangeType.FANOUT, false);
    Map<String, Object> argument = new HashMap<>();
    // 当消息过期后,放置于死信队列
    argument.put("x-dead-letter-exchange", "dlx.dead.ex");
    // 设置队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
    argument.put("x-dead-letter-routing-key", "rk.dlx.reject");
    channel.queueDeclare("dlx.biz.reject.qu", false, false, false, argument);
    channel.queueBind("dlx.biz.reject.qu", "dlx.biz.reject.ex", "dlx.biz.reject.rk");

    // 定义死信交换器和数据
    channel.exchangeDeclare("dlx.dead.ex", BuiltinExchangeType.DIRECT, true);
    // 用于接收过期后消息的队列
    channel.queueDeclare("dlx.reject.qu", false, false, false, null);
    // 将用于接收过期消息队列与交换器相绑定
    channel.queueBind("dlx.reject.qu", "dlx.dead.ex", "rk.dlx.reject");

    channel.basicConsume(
        "dlx.biz.reject.qu",
        false,
        new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(
              String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
              throws IOException {
            // 进行消息的拒绝,并且不进入队列
            channel.basicReject(envelope.getDeliveryTag(), false);
          }
        });
  }
}

定义了一个业务交换器和队列,并将其绑定,在消费端将消息拒绝,并且不重新加入队列。在定义业务队列时,设置了死信交换器即消息拒绝后放置的放置的交换器,并设置了死信的路由key。

然后再定义了死信交换器以及对应拒绝后的接收消息的队列。

当消息到达消费端后,由于开启了手动ACK确认,会进入处理,而客户端的处理是拒绝消息,并且不重新放回队列,就会被放入到死信交换器dlx.dead.ex中,而这个消息的死信路由key为rk.dlx.reject,而此路由key绑定了队列dlx.reject.qu,这样就看到了消息进入了最终我们看到的队列 dlx.reject.qu中。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

观察下队列的情况:

[root@nullnull-os rabbitmq]#  rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers,policy  --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────────┬────────────────┬─────────────────────────┬──────────┬───────────┬────────┐
│ name              │ messages_ready │ messages_unacknowledged │ messages │ consumers │ policy │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.reject.qu │ 0001         │        │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.reject.qu     │ 1010         │        │
└───────────────────┴────────────────┴─────────────────────────┴──────────┴───────────┴────────┘
[root@nullnull-os rabbitmq]# 

通过观察可以发现,消息已经进入了死信队列后的交换器。

9.2 过期的消息

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class DlxExpireProduct {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();

    factory.setUri("amqp://root:123456@node1:5672/%2f");

    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel(); ) {

      // 定义测试消息过期的队列和交换器
      channel.exchangeDeclare("dlx.biz.expire.ex", BuiltinExchangeType.FANOUT, false);
      Map<String, Object> argument = new HashMap<>();
      // 消息10秒过期
      argument.put("x-message-ttl", 10000);
      // 当消息过期后,放置于死信队列
      argument.put("x-dead-letter-exchange", "dlx.dead.ex");
      // 设置队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
      argument.put("x-dead-letter-routing-key", "rk.dlx.expire");
      channel.queueDeclare("dlx.biz.expire.qu", false, false, false, argument);
      channel.queueBind("dlx.biz.expire.qu", "dlx.biz.expire.ex", "dlx.biz.expire.rk");

      // 定义死信交换器和数据
      channel.exchangeDeclare("dlx.dead.ex", BuiltinExchangeType.DIRECT, true);
      // 用于接收过期后消息的队列
      channel.queueDeclare("dlx.expire.qu", false, false, false, null);
      // 将用于接收过期消息队列与交换器相绑定
      channel.queueBind("dlx.expire.qu", "dlx.dead.ex", "rk.dlx.expire");

      // 测试过期消息的发送
      String msgExpire = "测试过期消息";
      channel.basicPublish(
          "dlx.biz.expire.ex", "", null, msgExpire.getBytes(StandardCharsets.UTF_8));
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

在队列上设置消息10秒过期。并设置了过期绑定的死信交换器和key。

当消息过期时,就会被放入到dlx.dead.ex交换器,并且此消息设置了死信的路由key为rk.dlx.expire,而此路由key绑定了dlx.expire.qu这个队列,所以消息最终就发送到了dlx.expire.qu

观察队列中的消息

[root@nullnull-os rabbitmq]#  rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers,policy  --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────────┬────────────────┬─────────────────────────┬──────────┬───────────┬────────┐
│ name              │ messages_ready │ messages_unacknowledged │ messages │ consumers │ policy │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.expire.qu     │ 0000         │        │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.expire.qu │ 1010         │        │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.reject.qu │ 0000         │        │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.reject.qu     │ 1010         │        │
└───────────────────┴────────────────┴─────────────────────────┴──────────┴───────────┴────────┘
[root@nullnull-os rabbitmq]# 

等待10秒后,再查看队列:

[root@nullnull-os rabbitmq]#  rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers,policy  --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────────┬────────────────┬─────────────────────────┬──────────┬───────────┬────────┐
│ name              │ messages_ready │ messages_unacknowledged │ messages │ consumers │ policy │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.expire.qu     │ 1010         │        │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.expire.qu │ 0000         │        │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.reject.qu │ 0000         │        │
├───────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.reject.qu     │ 1010         │        │
└───────────────────┴────────────────┴─────────────────────────┴──────────┴───────────┴────────┘
[root@nullnull-os rabbitmq]# 

可以发现,消息已经进入了死信队列了。

9.3 超过队列的长度的消息

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class DlxMaxLengthProduct {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();

    factory.setUri("amqp://root:123456@node1:5672/%2f");

    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel(); ) {

      // 定义测试消息过期的队列和交换器
      channel.exchangeDeclare("dlx.biz.max.length.ex", BuiltinExchangeType.FANOUT, false);
      Map<String, Object> argument = new HashMap<>();
      // 当消息过期后,放置于死信队列
      argument.put("x-dead-letter-exchange", "dlx.dead.ex");
      // 设置队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
      argument.put("x-dead-letter-routing-key", "rk.dlx.max.length");
      // 定义消息的最大长度为2,超过2个,第三个即为死信消息
      argument.put("x-max-length", 2);
      channel.queueDeclare("dlx.biz.max.length.qu", false, false, false, argument);
      channel.queueBind("dlx.biz.max.length.qu", "dlx.biz.max.length.ex", "dlx.biz.max.length.rk");

      // 定义死信交换器和数据
      channel.exchangeDeclare("dlx.dead.ex", BuiltinExchangeType.DIRECT, true);
      // 用于接收过期后消息的队列
      channel.queueDeclare("dlx.max.length.qu", false, false, false, null);
      // 将用于接收过期消息队列与交换器相绑定
      channel.queueBind("dlx.max.length.qu", "dlx.dead.ex", "rk.dlx.max.length");


      String push1 = "测试发送消息1";
      String push2 = "测试发送消息2";
      String push3 = "测试发送消息3";
      channel.basicPublish("dlx.biz.max.length.ex","",null,push1.getBytes(StandardCharsets.UTF_8));
      channel.basicPublish("dlx.biz.max.length.ex","",null,push2.getBytes(StandardCharsets.UTF_8));
      channel.basicPublish("dlx.biz.max.length.ex","",null,push3.getBytes(StandardCharsets.UTF_8));

    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

通过指定消息队列的长度为2,即第三个消息就会进入死信队列。规则还是同之前的过期和拒绝一样。

[root@nullnull-os rabbitmq]#  rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers,policy  --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────────────┬────────────────┬─────────────────────────┬──────────┬───────────┬────────┐
│ name                  │ messages_ready │ messages_unacknowledged │ messages │ consumers │ policy │
├───────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.expire.qu         │ 1010         │        │
├───────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.expire.qu     │ 0000         │        │
├───────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.reject.qu     │ 0000         │        │
├───────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.max.length.qu     │ 1010         │        │
├───────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.biz.max.length.qu │ 2020         │        │
├───────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.reject.qu         │ 1010         │        │
└───────────────────────┴────────────────┴─────────────────────────┴──────────┴───────────┴────────┘
[root@nullnull-os rabbitmq]# 

最终的消息是两个在业务队列,而超过最大长度的消息在死信队列中。

9.4 SpringBoot使用死信队列

maven导入

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.2.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.2.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.2.8.RELEASE</version>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <version>2.2.7.RELEASE</version>
            <scope>test</scope>
        </dependency>

连接配制

spring:
  application:
    name: dlx
  rabbitmq:
    host: node1
    port: 5672
    virtual-host: /
    username: root
    password: 123456

主启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DlxApplication {
  public static void main(String[] args) {
    SpringApplication.run(DlxApplication.class, args);
  }
}

队列配制

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DlxConfig {

  /**
   * 业务队列,并指定了死信交换器
   *
   * @return
   */
  @Bean
  public Queue bizQueue() {
    Map<String, Object> argument = new HashMap<>();

    // 消息在10秒后过期
    argument.put("x-message-ttl", 5000);
    // 设置该队列所关联的死信交换器,当消息超过10秒没有消费,则加入死信队列
    argument.put("x-dead-letter-exchange", "dlx.springboot.ex");
    // 设置该队列所关联的死信交换器的routingKey,如果没有特殊的指定,使用原队列的routingKey.
    argument.put("x-dead-letter-routing-key", "dlx.springboot.rk");
    Queue queue = new Queue("dlx.spring.biz.qu", false, false, false, argument);
    return queue;
  }

  /**
   * 业务交换器
   *
   * @return
   */
  @Bean
  public Exchange bizExchange() {
    return new DirectExchange("dlx.spring.biz.ex", false, false, null);
  }

  @Bean
  public Binding bizBind() {
    return BindingBuilder.bind(bizQueue()).to(bizExchange()).with("dlx.spring.biz.rk").noargs();
  }

  /**
   * 死信队列
   *
   * @return
   */
  @Bean
  public Queue queueDlx() {
    return new Queue("dlx.springboot.expire.qu", false, false, false);
  }

  @Bean
  public Exchange exchangeDlx() {
    return new DirectExchange("dlx.springboot.ex", true, false, null);
  }

  @Bean
  public Binding bindDlx() {
    return BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with("dlx.springboot.rk").noargs();
  }
}

控制层处理

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;


@RestController
public class BizController {

  @Autowired private AmqpTemplate template;

  @RequestMapping("/expire-dlx")
  public String expireDlx() {
    String msg = "测试发送消息,10秒超时";
    template.convertAndSend(
        "dlx.spring.biz.ex", "dlx.spring.biz.rk", msg.getBytes(StandardCharsets.UTF_8));

    return "expire-dlx";
  }

  @RequestMapping("/dlx/get")
  public String sendDlxMsg() {
    byte[] getMsg = (byte[]) (template.receiveAndConvert("dlx.springboot.expire.qu"));

    return new String(getMsg, StandardCharsets.UTF_8);
  }
}

启动项目

然后在浏览器中输入:http://127.0.0.1:8080/expire-dlx

观察队列信息:

[root@nullnull-os rabbitmq]#  rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers,policy  --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌──────────────────────────┬────────────────┬─────────────────────────┬──────────┬───────────┬────────┐
│ name                     │ messages_ready │ messages_unacknowledged │ messages │ consumers │ policy │
├──────────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.springboot.expire.qu │ 0000         │        │
├──────────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.spring.biz.qu        │ 1010         │        │
└──────────────────────────┴────────────────┴─────────────────────────┴──────────┴───────────┴────────┘
[root@nullnull-os rabbitmq]# 

此时数据在业务队列中。等待5秒,再观察队列:

[root@nullnull-os rabbitmq]#  rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers,policy  --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌──────────────────────────┬────────────────┬─────────────────────────┬──────────┬───────────┬────────┐
│ name                     │ messages_ready │ messages_unacknowledged │ messages │ consumers │ policy │
├──────────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.springboot.expire.qu │ 1010         │        │
├──────────────────────────┼────────────────┼─────────────────────────┼──────────┼───────────┼────────┤
│ dlx.spring.biz.qu        │ 0000         │        │
└──────────────────────────┴────────────────┴─────────────────────────┴──────────┴───────────┴────────┘
[root@nullnull-os rabbitmq]#

发现数据已经进入了死信队列中。

在浏览器中访问另外的一个接口, http://127.0.0.1:8080/dlx/get

便能得到发送的数据信息:

测试发送消息,10秒超时


http://www.kler.cn/a/471273.html

相关文章:

  • 使用JMeter玩转tidb压测
  • nginx-灰度发布策略(基于cookie)
  • 解密序列建模:理解 RNN、LSTM 和 Seq2Seq
  • 微信小程序获取图片使用session(上篇)
  • 统计字符【2】(PTA)C语言
  • unity学习8:unity的基础操作 和对应shortcut
  • android系统的一键编译与非一键编译 拆包 刷机方法
  • 【数据可视化】数据可视化看板需求梳理模板(含示例)
  • Linux 系统清理命令大全
  • 深度学习中的正则化方法
  • React 深入学习理解
  • Redis Zset有序集合
  • python学opencv|读取图像(二十七)使用cv2.warpAffine()函数平移图像
  • Unity中 Xlua使用整理(一)
  • nginx 配置 本地启动
  • 移动应用安全基础:深入理解Hooking框架
  • cuda实现flash_attn_mma_share_kv源码分析
  • vue el table 不出滚动条样式显示 is_scrolling-none,如何修改?
  • Airflow:SQL Sensor 监控数据库业务变化
  • 力扣--283.移动零
  • presto权限管理
  • 计算机网络之---无线网络的传输介质
  • 使用 Flink CDC 构建 Streaming ETL
  • C++ 提升编译速度的利器:前向声明
  • 【ABAP开发环境】(三)ABAP GIT
  • 根据python代码自动生成类图的实现方法[附带python源码]