当前位置: 首页 > article >正文

RabbitMQ消息接收类Receiver

Receiver 类会使用 @RabbitListener 注解,来表示该类是一个消息接收器,用于处理来自 RabbitMQ 队列的消息

@Component
public class Receiver {
    @RabbitListener(queues = RabbitMQConstant.QUEUE_HELLO)
    public void hello(String hello) {
        System.out.println ("Receiver(hello) : "  + hello);
    }
 
    @RabbitListener(queues = RabbitMQConstant.QUEUE_HI)
    public void hi(String payload) {
        System.out.println ("Receiver(hi) : "  + payload);
        throw new RuntimeException("手动造的异常");
    }
类的结构

注解 @Component:
表示这个类是一个 Spring 组件,Spring 将自动识别并注册这个类,允许其作为 Spring 上下文中的一个 Bean。

注解 @RabbitListener:
用于标记一个方法为 RabbitMQ 消息监听器。当指定的队列收到消息时,RabbitMQ 会调用这个方法来处理消息。

方法分析

1. hello 方法

@RabbitListener(queues = RabbitMQConstant.QUEUE_HELLO)
public void hello(String hello) {
    System.out.println("Receiver(hello) : " + hello);
}

功能:这个方法用于处理来自 QUEUE_HELLO 队列的消息。
参数:接收一个 String 类型的消息(在这里命名为 hello)。
处理逻辑:打印接收到的消息内容。
监听机制:当 QUEUE_HELLO 队列收到消息时,RabbitMQ 会自动调用这个方法。

2. hi 方法

@RabbitListener(queues = RabbitMQConstant.QUEUE_HI)
public void hi(String payload) {
    System.out.println("Receiver(hi) : " + payload);
    throw new RuntimeException("手动造的异常");
}

功能:这个方法用于处理来自 QUEUE_HI 队列的消息。
参数:接收一个 String 类型的消息(在这里命名为 payload)。
处理逻辑:打印接收到的消息内容,然后手动抛出一个 RuntimeException 异常。
监听机制:当 QUEUE_HI 队列收到消息时,RabbitMQ 会自动调用这个方法。

异常处理

手动抛出异常:在 hi 方法中,咱们手动抛出了一个 RuntimeException。这会导致消息处理失败。
RabbitMQ 默认行为:当 @RabbitListener 方法抛出异常时,RabbitMQ 的默认行为是:
消息将被标记为“未确认”(unacknowledged),即 RabbitMQ 不会认为该消息已成功处理。
根据 RabbitMQ 的配置,可能会对消息进行重试或者将其发送到死信队列(Dead Letter Queue, DLQ)。

处理异常的最佳实践
重试机制:您可以使用 Spring 的重试机制来配置失败的重试次数和间隔。
自定义异常处理:可以通过实现 ErrorHandler 接口或使用 @RabbitListener 的 errorHandler 属性来定义自定义的错误处理策略。
死信队列:将无法成功处理的消息发送到死信队列,以便后续分析和处理。

示例改进

@RabbitListener(queues = RabbitMQConstant.QUEUE_HI, 
               ackMode = "MANUAL", 
               errorHandler = "myErrorHandler")
public void hi(String payload, Channel channel, Message message) {
    try {
        System.out.println("Receiver(hi) : " + payload);
        // 业务逻辑处理
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 处理异常逻辑
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

1. @RabbitListener 注解
功能:略
属性解释:
queues = RabbitMQConstant.QUEUE_HI:指定该监听器要监听的队列,RabbitMQConstant.QUEUE_HI 是一个常量,表示消息将从这个队列中获取。

ackMode = “MANUAL”:设置消息确认模式为手动确认。这意味着消息的确认(acknowledgment)将由代码中显式调用,而不是自动确认。

errorHandler = “myErrorHandler”:指定一个自定义的错误处理器,用于处理在消息处理过程中发生的异常。

2. 方法参数
String payload:接收的消息内容,类型为 String。
Channel channel:RabbitMQ 的通道对象,允许您手动确认或拒绝消息。
Message message:RabbitMQ 的消息对象,包含了消息的元数据和其他信息。

3. 方法逻辑
try 块:

System.out.println("Receiver(hi) : " + payload);
// 业务逻辑处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

sout:打印接收到的消息内容。
在这里您也可以替换/添加任何业务逻辑来处理接收到的消息。

channel.basicAck(…):手动确认消息的处理。这个调用告诉 RabbitMQ 消息已经成功处理,可以从队列中移除。getDeliveryTag() 方法获取消息的唯一标识符。

catch 块:

catch (Exception e) {
    // 处理异常逻辑
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}

如果在 try 块中发生异常,将执行此部分。
channel.basicNack(…):拒绝消息的处理。该调用告诉 RabbitMQ 该消息处理失败,参数说明如下:

第一个参数是消息的唯一标识符(delivery tag)。
第二个参数设置为 false,表示不批量处理。
第三个参数设置为 true,表示将消息重新放回队列中,以便其他消费者可以尝试处理。

这种模式在需要精细控制消息确认和错误处理时非常有用。


http://www.kler.cn/news/342372.html

相关文章:

  • 细说QT各种线程锁的特点和用法
  • JAVA学习-练习试用Java实现“反转链表 II”
  • C#中,重载(overload) 重写(override)的应用说明
  • Ubuntu安装nvidia显卡驱动
  • [云] DockerCoins 练习笔记
  • HUAWEI_HCIA_实验指南_Lib1.6_配置通过FTP进行文件操作
  • TadTR(TIP 2022)视频动作检测方法详解
  • Spring Boot与事务钩子函数:深度解析与应用实践
  • Python 如何使用 Redis 作为缓存
  • [Algorithm][贪心][可被三整除的最大和][距离相等的条形码][重构字符串]详细讲解
  • 如何利用phpstudy创建mysql数据库
  • FreeSWITCH 分机网关路由
  • 在 Ubuntu 下通过 Docker 部署 FTP 服务器
  • Flutter平台嵌入器
  • 【Linux】线程与线程安全知识总结
  • Python如何创建异步上下文管理器
  • 远程控制 远程桌面 teamview 源代码 定制 贴牌
  • 【万字长文】Word2Vec计算详解(一)CBOW模型
  • opencv的相机标定与姿态解算
  • Visual Studio Code 中通过鼠标滚轮调整字体大小并使用 Ctrl+W 关闭文档窗口【最详细】