当前位置: 首页 > article >正文

消息队列:如何确保消息不会丢失?

引言

对业务系统来说,丢消息意味着数据丢失,这是无法接受的。

主流的消息队列产品都提供了非常完善的消息可靠性保证机制,完全可以做到在消息传递过程中,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。

绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正确使用和配置消息队列导致的。

虽然不同的消息队列提供的 API 不一样,相关的配置项也不同,但是在保证消息可靠传递这块儿,它们的实现原理是一样的。

检测消息丢失

消息队列的有序性

在这里插入图片描述
大概流程如下:

  1. 发送端在拦截器中,给每条消息附加一个连续递增的序号;
  2. 消费端在拦截器中检测消息的连续性,如果消息没有丢失,消息序号必然是库中保存的上一次消费到的消息序号+1;

对于 kafka 和 rocketmq,不保证在 Topic 上的严格顺序,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。

如果 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

除了在拦截器生成消息序号外,我们也可以利用消息位移来实现。

位移(Offset)是消息队列中用于标识消息位置的指标,它指向了消息在队列中的确切位置。在如Kafka这样的消息队列中,位移是消费者读取消息的起始点。消费者在读取消息后,会更新位移,表明已经读取了这些消息。如果位移连续,那么可以认为没有消息丢失。如果位移不连续,比如位移从100直接跳到了102,那么101的消息就可能丢失了。

例如:

 public void onMessage(List<ConsumerRecord<String, EventRecord>> data, Acknowledgment acknowledgment) {

        try {
             
            /**
             * 
             * 业务处理
             * 
             * */


            long initStartOffset = redisCluster.exists(String.format(Constants.INIT_OFFSET, bizName))
                    ? Long.valueOf(redisCluster.get(String.format(Constants.INIT_OFFSET, bizName)))
                    : 0L;
            if (initStartOffset == 0) {
                redisCluster.set(String.format(Constants.INIT_OFFSET, bizName), String.valueOf(
                        data.stream().mapToLong(ConsumerRecord<String, EventRecord>::offset).max().getAsLong()));
                acknowledgment.acknowledge();
                return;
            }
            long minOffsetRecord = data.stream().mapToLong(ConsumerRecord<String, EventRecord>::offset).min()
                    .getAsLong();
            long maxOffsetRecord = data.stream().mapToLong(ConsumerRecord<String, EventRecord>::offset).max()
                    .getAsLong();



            log.info("initStartOffset={}, minOffsetRecord={}, maxOffsetRecord={}", initStartOffset, minOffsetRecord, maxOffsetRecord);

            if (minOffsetRecord - initStartOffset > 1) {
                log.info(String.format(Constants.INIT_OFFSET, bizName) + "存在数据丢失。。。。。。。。。。。。。。。。。。minOffsetRecord = " + minOffsetRecord
                        + ",initStartOffset = " + initStartOffset);
                consumeUtils.sendDingWarn("同步消费异常", bizName + "数据丢失!!!!!!!");
            }
            redisCluster.set(String.format(Constants.INIT_OFFSET, bizName), String.valueOf(maxOffsetRecord));
            acknowledgment.acknowledge();

            
        } catch (Exception e) {
            throw new RuntimeException("kafka message process error!", e);
        }

}

在检测到消息丢失时,我们可以钉钉报警,也可以直接抛出异常,停止消费。

确保消息可靠传递

在这里插入图片描述

  • 生产阶段: 消息在 Producer 创建出来,经过网络传输发送到 Broker 端;
  • 存储阶段: 消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上;
  • 消费阶段: Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

在上面三个阶段中,哪些会发生消息丢失呢?

生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。

有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。

以 Kafka 为例,我们看一下如何可靠地发送消息:同步发送时,只要注意捕获异常即可。

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("消息发送成功。");
} catch (Throwable e) {
    System.out.println("消息发送失败!");
    System.out.println(e);
}

异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。

producer.send(record, (metadata, exception) -> {
    if (metadata != null) {
        System.out.println("消息发送成功。");
    } else {
        System.out.println("消息发送失败!");
        System.out.println(exception);
    }
});

存储阶段

在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

比如,上面的那段代码,只有业务逻辑处理完成后,才会发送消费确认。

acknowledgment.acknowledge();

参考资料
《消息队列高手课》


http://www.kler.cn/news/311760.html

相关文章:

  • 自然语言处理实战项目全解析
  • 阻止冒泡事件
  • Python中的异步编程:从基础知识到高级应用
  • vi | vim基本使用
  • 视频相关处理
  • 基于Delphi的题库生成系统
  • spark读mongodb
  • HTB-Jerry(tomcat war文件、msfvenom)
  • Unity制作角色溶解变成光点消失
  • GPT提示词分享 —— 深度思考助手
  • 【Vue】VueRouter路由
  • Spring系统学习(一)——初识Spring框架
  • 第五届“马栏山杯”国际音视频算法大赛创新应用赛投票环节正式启动啦!
  • Json和Http专栏
  • linux如何查看当前的目录所在位置
  • GDPU 信息安全 天码行空1 用Wireshark分析典型TCP/IP体系中的协议
  • 【vue】vue3+ts对接科大讯飞大模型3.5智能AI
  • MongoDB的安装和使用
  • React Zustand状态管理库的使用
  • 性能优化一:oracle 锁的原则
  • 手机实时提取SIM卡打电话的信令和声音-新的篇章(一、可行的方案探讨)
  • 【简单记录】Linux系统安装ZooKeeper
  • 【电路笔记】-运算放大器比较器
  • 在线查看 Android 系统源代码 Git repositories on android
  • YOLOv9改进策略【注意力机制篇】| MCAttention 多尺度交叉轴注意力
  • vue和thinkphp路由伪静态配置
  • 前端vue-子组件对于父组件的传值的约束
  • cuda与机器学习
  • C++ ——string的模拟实现
  • 字节跳动的微服务独家面经