当前位置: 首页 > article >正文

springboot整合rabbitmq(消息确认)

1.pom依赖

        <!-- RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- JSON处理 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

2.配置信息

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    # 发布确认类型
    publisher-confirm-type: correlated  # 发布确认
    publisher-returns: true             # 发布返回
    listener:
      simple:
        acknowledge-mode: manual        # 手动确认
        # 每次只处理一条消息
        prefetch: 1

3. 生产方确认监听器

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

        // 消息发送确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息发送成功");
            } else {
                System.out.println("消息发送失败");
            }
        });

        // 消息返回回调
        rabbitTemplate.setReturnsCallback(returned -> {
            System.out.println("消息被退回");
        });

4.消费方确认

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void handleMessage(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 处理消息
            int a=5/0;
            System.out.println(new String(message.getBody()));
            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            System.out.println("消息接收成功");

        } catch (Exception e) {
            // 消息重新入队
            channel.basicNack(deliveryTag, false, true);
            System.out.println("消息接收异常");
        }
    }


http://www.kler.cn/a/505077.html

相关文章:

  • Android15源码编译问题处理
  • VB.NET 正则表达式完全指南
  • 9.7 visual studio 搭建yolov10的onnx的预测(c++)
  • 【WEB】网络传输中的信息安全 - 加密、签名、数字证书与HTTPS
  • 【源码】Sharding-JDBC源码分析之SQL重写实现原理
  • ElasticSearch|ES|架构介绍|原理浅析
  • AWS上搭建Storage Gateway并创建SMB和NFS服务
  • 一招解决word嵌入图片显示不全问题
  • 【vue3项目使用 animate动画效果】
  • Linux固定ip
  • 借助Claude实现Playwright的自动化(MCP Server)
  • UE5游戏性能优化指南
  • Java 输入输出流(下)
  • 简洁明快git入门及github实践教程
  • 音视频入门基础:RTP专题(3)——SDP简介
  • arcgis三调符号库的使用
  • 《leetcode-runner》如何手搓一个debug调试器——指令系统
  • 【源码】Sharding-JDBC源码分析之SQL重写实现原理
  • IEC103 转 ModbusTCP 网关
  • 【网络协议】ACL(访问控制列表)第二部分
  • Re78 读论文:GPT-4 Technical Report
  • 中台成熟度模型有什么用
  • 2025特种设备安全管理人员免费题库限时练习(判断题)
  • RocketMQ消息发送---源码解析
  • 【FAQ】HarmonyOS SDK 闭源开放能力 —Map Kit(4)
  • 系统架构设计师考点—UML建模和设计模式