当前位置: 首页 > article >正文

【RabbitMQ】消息队列消息确认机制

文章目录

    • 消息确认机制
      • 事务机制
      • Confirm模式

消息确认机制

生产者将消息发送出去之后,AMQP 协议中实现了 事务机制Confirm 模式 两种方式确认消息有没有到达 RabbitMQ 服务器

事务机制

channel.txSelect() 声明启动事务模式;

channel.txCommit() 提交事务;

channel.txRollback() 回滚事务;

Connection conn = ConnectionUtils.getConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String str = "hello rabbitmq";
channel.txSelect();//开启事务
try {
    channel.basicPublish("",QUEUE_NAME,null,str.getBytes());
    //加入错误代码后事务回滚
    int i = 1/0;
    channel.txCommit();//提交事务
} catch (IOException e) {
    e.printStackTrace();
    channel.txRollback();//回滚事务
} finally {
    channel.close();
    conn.close();
}

模式缺点:事务机制通常涉及到对消息队列服务器的锁定和解锁操作,这些操作会增加额外的延迟,从而降低系统的吞吐量。在事务开启期间,通道(channel)会被阻塞,直到事务被提交或回滚,这期间无法进行其他操作。

事务会锁定涉及的资源,直到事务完成。如果事务处理时间较长,或者有大量的事务同时进行,可能会导致资源竞争和锁定问题,影响系统的并发能力。

Confirm模式

方式一:channel.waitForConfirms() 普通发送方确认模式;

Connection conn = ConnectionUtils.getConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String str = "hello rabbitmq";
channel.confirmSelect();//开启消息确认模式
channel.basicPublish("",QUEUE_NAME,null,str.getBytes());
//加入错误代码后事务回滚
int i = 1/0;
if (channel.waitForConfirms()) {
    System.out.println("消息确认发送");
}
channel.close();
conn.close();

channel.confirmSelect() 声明开启发送方确认模式,再使用 channel.waitForConfirms() 等待消息被服务器确认即可。

方式二:channel.waitForConfirmsOrDie() 批量确认模式;

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
    String message = "hello rabbitmq";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会
IOException
System.out.println("全部执行完成");

channel.confirmSelect() 声明开启发送方确认模式,再使用 channel.waitForConfirmsOrDie() 等待消息被服务器确认即可。

方式三:channel.addConfirmListener() 异步监听发送方确认模式;

Connection conn = ConnectionUtils.getConnection();
Channel channel =  conn.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();//开启消息确认
//监听消息确认
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println(String.format("已确认消息,标识:%d,多个消息:%b",deliveryTag, multiple));
    }
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("未确认消息,标识:" + multiple);
    }
});
for (int i = 0; i < 30; i++) {
    //发送消息
    String msg = "hello rabbitmq";
    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
Thread.sleep(10000);
System.out.println("发送成功!!!");
//释放资源
channel.close();
conn.close();
  • Multiple:是否多条
  • deliveryTag:如果是多条,这个就是最后一条消息的 tag

异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可。


http://www.kler.cn/a/500548.html

相关文章:

  • 绘制三角形、正六边形、五角星、六角星
  • USRP X310 Windows 烧录镜像
  • 【C++】拷贝构造函数与运算符重载
  • 【Bluedroid】HFP连接流程源码分析(一)
  • 基于ResNet的CIFAR-10分类实现与分析
  • django基于Python的校园个人闲置物品换购平台
  • Android Framework WMS全面概述和知识要点
  • 浅谈云计算03 | 云计算的技术支撑(云使能技术)
  • 基于华为ENSP的OSPF-开放式最短路径优先协议保姆级别详解(1)
  • JAVA 嵌套列表初始化和转字符串
  • 十大排序简介
  • 新冠肺炎服务预约微信小程序的设计与实现ssm+论文源码调试讲解
  • 【Unity精品插件】Love/Hate:专注于 AI 行为与情感互动
  • 1F1B 非交错式调度模式与 GPipe 策略的内存节省优势
  • 【CSS】HTML页面定位CSS - position 属性 relative 、absolute、fixed 、sticky
  • 用JSONEncoder解决Object of type Color is not JSON serializable报错
  • 【数据结构-堆】2233. K 次增加后的最大乘积
  • 【python】str.upper()、str.join()、stri.strip()用法
  • Java 项目中引入阿里云 OSS SDK
  • Pytorch使用手册-优化 Vision Transformer 模型以用于部署(专题十六)