当前位置: 首页 > article >正文

消息可靠投递

消息可靠投递

  • 生产者消息投递到 Broker 时,万一网络断了,生产者以为自己投递成功了,消息队列以为没有生产者投递
  • RabbitMQ 提供了两种方式控制可靠投递,confirm 确认模式,return 退回模式
  • RabbitMQ 提供事务机制,但是性能较差,不做讲解,可自行研究

image-20210816144827884

ConfrmCallBack

消息从生产者投递交换机,交换机给生产者一个响应,生产者收到肯定应答,才能保证消息成功投递到交换机,但是如果没有设置持久化,这时候交换机断电重启,仍然丢失,需要做到以下3个方法才能保证可靠投递到交换机

  1. 队列设置持久化
  2. 消息设置持久化
  3. ConfirmCallBack回调

单个同步确认

生产者投递一个消息,交换机回应,生产者确认之后再发布下一个,吞吐量很低

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//单条同步确认
public class SingleSyncConfirm {
    /**
     * 生产者 → 交换机
     * 生产者投递 1条 消息给交换机
     * 交换机持久化,保存本地之后,给生产者一个应答
     * 生产者接收到成功应答之后,再投递下一条消息
     * 10000条,每条都回执一次 总耗时: 1958 毫秒
     * @param args
     */
    private static final String SINGLE_SYNC_CONFIRM_EXCHANGE = "singleSyncConfirmExchange";
    private static final String SINGLE_SYNC_CONFIRM_QUEUE = "singleSyncConfirmQueue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        if (true) {
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //交换机声明,队列声明,交换机绑定队列
        channel.exchangeDeclare(SINGLE_SYNC_CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(SINGLE_SYNC_CONFIRM_QUEUE,true,false,false,null);
        channel.queueBind(SINGLE_SYNC_CONFIRM_QUEUE,SINGLE_SYNC_CONFIRM_EXCHANGE,SINGLE_SYNC_CONFIRM_QUEUE);
        /**
         * 开启 confirm 确认
         */
        channel.confirmSelect();

        long begin = System.currentTimeMillis();//开始时间

        for (int i = 0; i < 1000; i++) {
            String str = i + "";
            channel.basicPublish(SINGLE_SYNC_CONFIRM_EXCHANGE,SINGLE_SYNC_CONFIRM_QUEUE,null,str.getBytes());
            //生产者等待确认
            boolean b = channel.waitForConfirms();
            if (b) {
                System.out.println("第 " + i + " 条发送成功");
            } else {
                System.out.println("第 " + i + " 条发送失败================");
            }
        }

        long end = System.currentTimeMillis();//结束时间
        System.out.println("总耗时: " + (end - begin) + " 毫秒");
    }
}

批量同步确认

发布批量消息之后,等待,当有某一个故障的时候,不知道是哪个消息出问题

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//多条同步确认
public class MultipleSyncConfirm {

    private static final String MULTIPLE_SYNC_CONFIRM_EXCHANGE = "multipleSyncConfirmExchange";
    private static final String MULTIPLE_SYNC_CONFIRM_QUEUE = "multipleSyncConfirmQueue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        if (true) {
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //交换机声明
        channel.exchangeDeclare(MULTIPLE_SYNC_CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(MULTIPLE_SYNC_CONFIRM_QUEUE,true,false,false,null);
        channel.queueBind(MULTIPLE_SYNC_CONFIRM_QUEUE,MULTIPLE_SYNC_CONFIRM_EXCHANGE,MULTIPLE_SYNC_CONFIRM_QUEUE);

        /**
         * 开启 confirm 确认
         */
        channel.confirmSelect();

        long begin = System.currentTimeMillis();//开始时间

        for (int i = 1; i <= 1000; i++) {
            String str = i + "";
            channel.basicPublish(MULTIPLE_SYNC_CONFIRM_EXCHANGE, MULTIPLE_SYNC_CONFIRM_QUEUE, null, str.getBytes());

            if (i % 100 == 0) {
                //生产者等待确认
                boolean b = channel.waitForConfirms();
                if (b) {
                    System.out.println("第 " + i + " 条发送成功");
                } else {
                    System.out.println("第 " + i + " 条发送失败================");
                }
            }
        }

        long end = System.currentTimeMillis();//结束时间
        System.out.println("总耗时: " + (end - begin) + " 毫秒");
    }
}

异步批量确认

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;

//多条异步确认
public class MultipleAsyncConfirm {

    private static final String MULTIPLE_ASYNC_CONFIRM_EXCHANGE = "multipleAsyncConfirmExchange";
    private static final String MULTIPLE_ASYNC_CONFIRM_QUEUE = "multipleAsyncConfirmQueue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        if (true) {
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //交换机声明
        channel.exchangeDeclare(MULTIPLE_ASYNC_CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(MULTIPLE_ASYNC_CONFIRM_QUEUE, true, false, false, null);
        channel.queueBind(MULTIPLE_ASYNC_CONFIRM_QUEUE, MULTIPLE_ASYNC_CONFIRM_EXCHANGE, MULTIPLE_ASYNC_CONFIRM_QUEUE);

        /**
         * 开启 confirm 确认
         */
        channel.confirmSelect();

        long begin = System.currentTimeMillis();//开始时间


        /**
         * 开启 confirm 回调函数
         */
        ConfirmCallback ackCallBack = new ConfirmCallback() {
            /**
             * 成功回调
             * @param deliveryTag
             * @param multiple      true表示已确认多条,false表示已确认单条
             * @throws IOException
             */
            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.out.println("已确认多条: " + deliveryTag);
                } else {
                    System.out.println("已确认单条: " + deliveryTag);
                }
            }
        };
        ConfirmCallback nackCallBack = new ConfirmCallback() {
            /**
             * 失败回调
             * @param deliveryTag
             * @param multiple      true表示未确认多条,false表示未确认单条
             * @throws IOException
             */
            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.out.println("未确认多条: " + deliveryTag);
                } else {
                    System.out.println("未确认多条: " + deliveryTag);
                }
            }
        };
        channel.addConfirmListener(ackCallBack, nackCallBack);

        for (int i = 1; i <= 1000; i++) {
            String str = i + "\t" + new Date().getTime();
            channel.basicPublish(MULTIPLE_ASYNC_CONFIRM_EXCHANGE, MULTIPLE_ASYNC_CONFIRM_QUEUE, null, str.getBytes());
        }

        long end = System.currentTimeMillis();//结束时间
        System.out.println("总耗时: " + (end - begin) + " 毫秒==========================");
    }
}

对比

单条确认:简单,吞吐量低

批量确认:简单,吞吐量比单条确认高,当一批消息中有一个出问题,不知道是哪一个

异步批量确认:代码复杂,吞吐量高

ReturnCallBack

交换机投递到队列


SpringBoot 整合

ConfirmCallback

引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件中开启
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
  • 较低版本中配置是

    publisher-confirms: true
    
  • publisher-confirm-type:取值有3种

    • none 禁用发布确认模式,默认
    • correlated 消息从生产者投递到交换机成功后触发回调
    • simple 类似 correlated ,支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法
配置类

在 Publish/Subscribe 基础之上整合

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class ConfirmCallBackImpl implements RabbitTemplate.ConfirmCallback {

    /**
     * @param correlationData 相关数据,一般存储一个id,用来辨识唯一性
     * @param ack             确认,如果成功返回 true,如果失败返回 false
     * @param cause           原因,失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("correlationData = " + correlationData);
        System.out.println("ack = " + ack);
        System.out.println("cause = " + cause);

        if (ack) {
            System.out.println("投递到交换机成功");
        } else {
            System.out.println("投递失败,原因是:\t" + cause);
            System.out.println("将失败的消息 " + correlationData.getId() + " 保存到数据库");
        }
    }
}
//设置确认回调
rabbitTemplate.setConfirmCallback(confirmCallBackI);

image-20210816182706901

因为是和之前的一起整合,所以设置 ConfirmCallBack 在注入 RabbitTemplate 时一起设置
也可以使用匿内部类或者Lambda表达式
测试
  • http://localhost:8080/confirm/test/abc 正常投递
  • http://localhost:8080/confirm/test-fail/abc 失败投递
import com.example.constants.Constants;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("confirm")
public class ConfirmController {
    /**
     * 注入 RabbitTemplate 模板对象
     */
    @Resource
    private RabbitTemplate rabbitTemplate;


    @RequestMapping("test/{name}")
    public String send(@PathVariable("name") String name) {

        /**
         * 发送消息
         */
        String str = name + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS").format(new Date());

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(Constants.MY_FANOUT_EXCHANGE, "", str, correlationData);

        try {
            /**
             * 睡眠,程序还没运行结束
             * 确认回调已经执行,说明是异步的
             */
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消息发送成功!\t" + str);
        return "success";
    }

    /**
     * 测试失败
     *
     * @param name
     * @return
     */
    @RequestMapping("test-fail/{name}")
    public String sendFail(@PathVariable("name") String name) {
        String str = name + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS").format(new Date());

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        /**
        * 创建一个不存在的交换机名称,用来测试投递失败
        */
        String errorExchangeName = Constants.MY_FANOUT_EXCHANGE + "abc";
        rabbitTemplate.convertAndSend(errorExchangeName, "", str, correlationData);

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消息发送成功!\t" + str);
        return "success";
    }
}

ReturnCallBack

引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件中开启
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-returns: true
配置类
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class ReturnsCallBackImpl implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("returnedMessage = " + returnedMessage);
        System.out.println("returnedMessage.getMessage() = " + returnedMessage.getMessage());
        System.out.println("returnedMessage.getReplyCode() = " + returnedMessage.getReplyCode());
        System.out.println("returnedMessage.getReplyText() = " + returnedMessage.getReplyText());
        System.out.println("returnedMessage.getExchange() = " + returnedMessage.getExchange());
        System.out.println("returnedMessage.getRoutingKey() = " + returnedMessage.getRoutingKey());
    }
}
//设置确认回调
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnsCallBackImpl);

image-20210817032048020

测试
  • 在 Direct 模式下,修改路由为一个不存在的路由
import com.example.constants.Constants;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("return")
public class ReturnController {
    /**
     * 注入 RabbitTemplate 模板对象
     */
    @Resource
    private RabbitTemplate rabbitTemplate;


    @RequestMapping("test/{name}")
    public String send(@PathVariable("name") String name) {

        /**
         * 发送消息
         */
        String str = name + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS").format(new Date());

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend(Constants.MY_DIRECT_EXCHANGE, "sms", str, correlationData);
        rabbitTemplate.convertAndSend(Constants.MY_DIRECT_EXCHANGE, "email", str, correlationData);

        try {
            /**
             * 睡眠,程序还没运行结束
             * 确认回调已经执行,说明是异步的
             */
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消息发送成功!\t" + str);
        return "success";
    }

    /**
     * 测试失败
     *
     * @param name
     * @return
     */
    @RequestMapping("test-fail/{name}")
    public String sendFail(@PathVariable("name") String name) {
        String str = name + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS").format(new Date());

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        /**
         * 使用错误的路由键,用来测试交换机不能达到队列
         */
        String routingKey = "abc";
        rabbitTemplate.convertAndSend(Constants.MY_DIRECT_EXCHANGE, routingKey, str, correlationData);

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消息发送成功!\t" + str);
        return "success";
    }
}


http://www.kler.cn/a/293594.html

相关文章:

  • 金属箔电阻
  • npm i忽略依赖冲突
  • HTML 基础架构:理解网页的骨架
  • C#里演示使用数学的三角函数
  • 【IC每日一题:IC验证面试--UVM验证-2】
  • 如何在Android中自定义property
  • 数据赋能(199)——开发:数据开发管理——概述、关注焦点
  • 批量文件编码转换用python实现的utf8转gb2312,vscode设置特殊文件的默认打开编码
  • 数据赋能(198)——开发:数据应用——技术方法、主要工具
  • DAY69
  • vue , 微信小程序 , uni-app绑定变量属性
  • 【2024】MySQL库表基本操作
  • 算法:图片压缩算法【Z字行扫描】(Java实现)
  • 相亲交友系统商业开发
  • 【最新华为OD机试E卷-支持在线评测】分糖果(100分)-多语言题解-(Python/C/JavaScript/Java/Cpp)
  • 用ACF和PACF计算出一堆数据的周期个数以及周期时长,数据分析python
  • Linux系统练习笔记【完整版】
  • .NET/C#⾯试题汇总系列:⾯向对象
  • SpringBoot整合openApi
  • 数据分析的革命:Docker容器化在数据分析中的应用
  • ssm微信小程序校园失物招领论文源码调试讲解
  • 【HTML】置换元素(替换元素)
  • 什么是区块链?
  • vue3+ts项目import导入路径用@/报错找不到模块“@/components/也没有快捷提示
  • 小米红米系列机型 机型代码查询总目录 adb指令查询步骤
  • 【论文精读】SCINet-基于降采样和交互学习的时序卷积模型