当前位置: 首页 > article >正文

036 RabbitMQ消息确认 死信队列 延时队列

文章目录

  • 生产者确认模式
    • application.properties
    • MessageController.java
    • MessageConfirmRallback.java
  • 生产者回退模式
    • application.properties
    • MessageConfirmRallback.java
    • MessageController.java
  • 消费者手动确认
    • application.properties
    • ConsumerAckQueueListener.java
  • 死信队列
  • 延时队列

测试链接 http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingKey=order.A&msg=aaa

思考问题: 生产者能百分之百将消息发送给消息队列吗?
不确定的
1.生产者如果发消息给MQ,消息在传输的过程中可能丢失。找不到交换机
2.交换机路由到队列,也存在丢失消息的可能性

问题解决方案:
1.生产者确认模式
2.生产者回退模式

目标: 演示生产者确认的效果,消息百分百进入交换机
实现步骤:
1.配置开启生产者确认模式
2.编写生产者确认回调方法,处理业务逻辑
3.在RabbitMQ模板对象中,设置回调逻辑
4.测试请求一下
目标2: 消息能够从交换机百分百进入到队列
实现步骤:
1.配置开启生产者回退模式
2.编写生产者回退的回调方法
3.设置回退回调方法
4.测试

生产者确认模式

application.properties

# 配置开启生产者确认模式
spring.rabbitmq.publisher-confirms=true

MessageController.java

package com.cubemall.controller;

/*
目标: 搭建RabbitMQ高级特性演示环境
1.搭建消费者工程[复用之前工程]
2.搭建提供者工程[复用之前工程]
3.编写MessageController: 用来发送消息
交换机
路由键
消息内容
4.RabbitMQ配置交换机和队列,及路由键
5.编写消费者监听

思考问题: 生产者能百分之百将消息发送给消息队列吗?
不确定的
1.生产者如果发消息给MQ,消息在传输的过程中可能丢失。找不到交换机
2.交换机路由到队列,也存在丢失消息的可能性

问题解决方案:
1.生产者确认模式
2.生产者回退模式

目标: 演示生产者确认的效果,消息百分百进入交换机
实现步骤:
1.配置开启生产者确认模式
2.编写生产者确认回调方法,处理业务逻辑
3.在RabbitMQ模板对象中,设置回调逻辑
4.测试请求一下
 */

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {
    //发送消息接口
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //定义发送消息的接口
    @RequestMapping("/direct/sendMsg")
    public String sendMsgtoMQ(String exchange,String routingKey,String msg){
        rabbitTemplate.convertAndSend(exchange,routingKey,msg);
        return "已投递";

    }
}

MessageConfirmRallback.java

package com.cubemall.controller;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/*
发送消息回调确认类:消息如果没有进入交换机,会回调当前类中的confirm
 */
@Component
public class MessageConfirmRallback implements RabbitTemplate.ConfirmCallback {
    //配置回调的方法
    @Autowired
    private RabbitTemplate rabbitTemplate;


    //配置在当前对象注入之后,再设置当前对象到RabbitTemplate对象中
    @PostConstruct//注解作用: 在当前对象初始化完毕之后执行的方法
    public void initRabbittemplate(){
        rabbitTemplate.setConfirmCallback(this::confirm);
    }


    /**
     * 不论是否进入交换机,都会回调当前方法
     * @param correlationData 消息投递封装对象
     * @param ack 是否投递成功
     * @param exception 如果错误,错误原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String exception) {
        if (ack) {
            System.out.println("消息进入了交换机成功{}");
        }else {
            System.out.println("消息进入了交换机失败{} 原因:"+exception);
        }
    }
}


生产者回退模式

application.properties

# 配置开启生产者的回退模式
spring.rabbitmq.publisher-returns=true

MessageConfirmRallback.java

package com.cubemall.controller;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/*
发送消息回调确认类:消息如果没有进入交换机,会回调当前类中的confirm
 */
@Component
public class MessageConfirmRallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    //配置回调的方法
    @Autowired
    private RabbitTemplate rabbitTemplate;


    //配置在当前对象注入之后,再设置当前对象到RabbitTemplate对象中
    @PostConstruct//注解作用: 在当前对象初始化完毕之后执行的方法
    public void initRabbittemplate(){
        rabbitTemplate.setConfirmCallback(this::confirm);
        rabbitTemplate.setReturnCallback(this::returnedMessage);
    }


    /**
     * 不论是否进入交换机,都会回调当前方法
     * @param correlationData 消息投递封装对象
     * @param ack 是否投递成功
     * @param exception 如果错误,错误原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String exception) {
        if (ack) {
            System.out.println("消息进入了交换机成功{}");
        }else {
            System.out.println("消息进入了交换机失败{} 原因:"+exception);
        }
    }


    /**
     * 消息从交换机进入队列失败回调方法:只会在失败的情况下
     * @param message the returned message.
     * @param replyCode the reply code.
     * @param replyText the reply text.
     * @param exchange the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息从交换机进入队列失败: >>>>>>>");
        System.out.println("exchange = " + exchange);
        System.out.println("replyCode = " + replyCode);
        System.out.println("replyText = " + replyText);
        System.out.println("routingKey = " + routingKey);

    }
}

MessageController.java

package com.cubemall.controller;

/*
目标: 搭建RabbitMQ高级特性演示环境
1.搭建消费者工程[复用之前工程]
2.搭建提供者工程[复用之前工程]
3.编写MessageController: 用来发送消息
交换机
路由键
消息内容
4.RabbitMQ配置交换机和队列,及路由键
5.编写消费者监听

思考问题: 生产者能百分之百将消息发送给消息队列吗?
不确定的
1.生产者如果发消息给MQ,消息在传输的过程中可能丢失。找不到交换机
2.交换机路由到队列,也存在丢失消息的可能性

问题解决方案:
1.生产者确认模式
2.生产者回退模式

目标: 演示生产者确认的效果,消息百分百进入交换机
实现步骤:
1.配置开启生产者确认模式
2.编写生产者确认回调方法,处理业务逻辑
3.在RabbitMQ模板对象中,设置回调逻辑
4.测试请求一下

目标2: 消息能够从交换机百分百进入到队列
实现步骤:
1.配置开启生产者回退模式
2.编写生产者回退的回调方法
3.设置回退回调方法
4.测试
 */

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {
    //发送消息接口
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //定义发送消息的接口
    @RequestMapping("/direct/sendMsg")
    public String sendMsgtoMQ(String exchange,String routingKey,String msg){
        rabbitTemplate.convertAndSend(exchange,routingKey,msg);
        return "已投递";

    }
}

消费者手动确认

application.properties

# 配置开启消费端手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual

# 配置开启重试
spring.rabbitmq.listener.direct.retry.enabled=true

ConsumerAckQueueListener.java

package com.cubemall.listeners;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/*
消费者消息队列监听器
问题: 消费者能不能百分百接收到请求,而且业务逻辑处理出现异常,消息还能不能算接收到呢?

目标: 演示消费者手动确认的过程
实现步骤:
1.编写监听器类 和对于监听的方法,编写手动签收的业务逻辑
2.配置开启手动签收
3.测试
 */
@Component
@RabbitListener(queues = "order.A")
public class ConsumerAckQueueListener {
    //处理消息方法
    @RabbitHandler
    public void simpleHandler(String msg, Message message, Channel channel) throws IOException {

        System.out.println("下单消息{},内容为: " + msg);
        //获取消息的投递标签
        long deliveryTag = message.getMessageProperties().getDeliveryTag();


        try {
            if (msg.contains("苹果")) {
                throw new RuntimeException("不允许售卖苹果手机");
            }

            //签收消息
            /**
             * 参数1: 投递标签
             * 参数2: 是否是批量签收,true一次性签收所有消息,如果是false则只签收当前消息
             */
            channel.basicAck(deliveryTag,false);
            System.out.println("签收成功{}");
        } catch (IOException e) {
            //e.printStackTrace();
            //参数1: 投递标签
            //参数2: 是否批量
            //参数3: 是否重回队列
            channel.basicNack(deliveryTag,false,true);
            System.out.println("签收失败{}");
        }

        //拒绝签收消息: 出现异常了,拒绝签收
    }
}

死信队列

死信队列

延时队列

延时队列

消费者消息队列监听器
问题1: 消费者能不能百分百接收到请求,而且业务逻辑处理出现异常,消息还能不能算接收到呢?

目标: 演示消费者手动确认的过程
实现步骤:
1.编写监听器类 和对于监听的方法,编写手动签收的业务逻辑
2.配置开启手动签收
3.测试

问题2: 消息在队列中,如果没有被消费者消费?
TTL–> Time to Live (存活时间/有效期)
目标: 演示消息队列中消息失效超时过程
步骤:
1.配置新的队列order.B,设置队列内消息的超时时间5s x-message-ttl
2.将队列绑定order_exchange交换机上
3.发送消息,测试

问题3: 消息发送失败了,消息丢失了?消息有效期到了
死信队列: 当消息失效了,统一进入的一个队列,这个队列称之为死信队列
主要有三种情况:
1.到达了消息队列容量上限!
2.消费者如果拒绝签收,不重回队列!
3.消息超时了!

目标: 演示成为死信的过程
步骤:
1.建立死信队列deadQueue
2.建立死信交换机deadExchange
3.死信队列绑定死信交换机:order.dead
4.队列order.B绑定死信交换机 x-dead-letter-exchange x-dead-letter-routing-key
5.向队列发送消息,测试死信交换机

需求: 1.新用户注册成功7天后,发送消息问候?
2.下单后,30分钟未支付,取消订单,回滚票
延迟队列: 消息进入队列后不会被消费,只有到达指定的时间后才会被消费!


http://www.kler.cn/a/392025.html

相关文章:

  • OpenPCDet从环境配置到模型训练
  • MATLAB算法实战应用案例精讲-【数模应用】图像边缘检测(附MATLAB和python代码实现)(二)
  • STM32F4分别驱动SN65HVD230和TJA1050进行CAN通信
  • 大模型搜索引擎增强问答demo-纯python实现
  • 点击底部的 tabBar 属于 wx.switchTab 跳转方式,目标页面的 onLoad 不会触发(除非是第一次加载)
  • DAY15 神经网络的参数和变量
  • 脑机接口、嵌入式 AI 、工业级 MR、空间视频和下一代 XR 浏览器丨RTE2024 空间计算和新硬件专场回顾
  • 接口测试框架+数据驱动
  • Fish Agent V0.13B:Fish Audio的语音处理新突破,AI语音助手的未来已来!
  • 集合类源码浅析のJDK1.8ConcurrentHashMap(上篇)
  • Go语言的并发安全与互斥锁
  • 使用cloudflare搭建私人docker镜像站
  • 【深圳大学/大学物理实验2】弗兰克-赫兹实验预习题参考
  • PL/SQL执行.sql文件
  • Go语言实现用户登录Web应用
  • 解析“ChatGPT网络错误”:从网络专线到IP地址的根源与解决方案
  • 券商隔夜单自动下单交易接口
  • 基于表格滚动截屏(表格全部展开,没有滚动条)
  • 【LeetCode】【算法】240. 搜索二维矩阵II
  • 鸿蒙next版开发:ArkTS组件通用属性(布局约束)
  • QT常用控件
  • 127.WEB渗透测试-信息收集-ARL(18)
  • 自动化爬虫Selenium
  • 【启程Golang之旅】从零开始构建可扩展的微服务架构
  • 算法魅力-二分查找实战
  • 服务号消息折叠折射出的腾讯傲慢:上云会不会也一样?