当前位置: 首页 > article >正文

记一次kafka消息丢失问题排查

背景

我写了一个 自定义分区器 ,自测发送了一些简单的如Hello world 之类的消息成功了,并且日志现实确实调用了我自己的分区器,然后我自认为已经完美了。

后来我发现很多消息消费者没有消费,  且发送完成回调(CallBack)没有被调用。 消息就这么石沉大海,也没有错误日志。

排查过程

一开始我从消息丢失方向排查,我前面文章也提到 消息发送几种方式 ,其中异步发送确实可能丢失,但我发送代码如下,我处理了异常,如果发送失败应该有异常打印出来的。

我一时也陷入迷茫,于是我网上查阅资料,关于生产者几个重要配置参数

acks 分区中有多少副本收到消息,生产者才认为消息发送成功 ,默认值1 ,并且我本地使用单节点,因此排除该配置

有些网上说 batch.size 和 request.timeout.ms 不对,我半信半疑,改了参数试试,依旧有问题

producer.send(producerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            e.printStackTrace();
        } else {
            System.out.println(content + " send sucesss  " + recordMetadata.toString());
        }
    }
});

定位问题

肯定是我修改了什么导致的问题, 于是我先把所有配置修改为默认的,发现能正常发送消息,消息也被消费。最终发现一旦使用我自定义分区器,消息发送就有问题。代码如下,聪明的你看出问题在哪了吗?

public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        //获取分区总个数
        int numPartitions = partitions.size();

        int partition = key != null ? key.hashCode() % numPartitions : value.hashCode() % numPartitions;
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

答案揭晓

问题在于 HashCode 返回的可能是负数 ,负数再对 分区数 取模,得到的分区序号 就为负数。分区序号一旦为负,分区序号有问题了当然消息就发不出去,只是没想到也没报错。定位问题修复也很简单,给HashCode 取绝对值就解决了。

 int partition = key != null ? Math.abs(key.hashCode()) % numPartitions : Math.abs(value.hashCode()) % numPartitions;

举一反三

如果返回的分区序号大于分区数 消息发送也同负数情况一样,石沉大海。

分区数配置为 10 ,分区器返回序号固定11 测试结果同预期一样,和负数情况一样,消息丢失。

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 11;
    }

反思

  • 没意识到 默认HashCode 可能为负数(当时误任务默认的不会为负数,除非恶意重写了HashCod方法)

  • 没有参考默认的分区器 DefaultPartitioner使用了 Utils.toPositive 将HashCode 转成了正数

  • 排查过程也熟悉了不少 kafka重要参数
   public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
        return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

结语

限于篇幅一些代码没有给出,欢迎私下交流学习。


http://www.kler.cn/news/312305.html

相关文章:

  • [SDX35+WCN6856]SDX35 + WCN6856 WiFi可以up起来之后无法扫描到SSID
  • 7.sklearn-逻辑回归、精确率和召回率、ROC曲线和AUC指标
  • Java项目: 基于SpringBoot+mybatis+maven旅游管理系统(含源码+数据库+毕业论文)
  • nvm node管理工具常用指令
  • 编程基础:函数栈帧的创建和销毁
  • (十六)Ubuntu 20.04 下搭建PX4+MATLAB 仿真环境(HITL)
  • 无人机之AI跟踪篇
  • Facebook直播限流是什么原因?是ip地址导致的吗
  • Java商城的技术优势有哪些
  • Spring 出现 No qualifying bean of type ‘com.xxx‘ available 解决方法
  • 35.贪心算法2
  • [ffmpeg] 视频格式转换
  • 开发板与ubuntu建立网络通信(NFS和TFTP协议搭建)
  • elasticsearch实战应用
  • NAT网络地址转换
  • 【spring】spring框架中使用的设计模式有哪些,经典的设计模式应用,spring框架中哪些地方使用了哪些优秀的设计模式
  • 制作炫酷个人网页:用 HTML 和 CSS3 展现你的风格
  • macos清理垃圾桶时提示 “操作无法完成,因为该项目正在使用中” 解决方法 , 强制清理mac废纸篓 方法
  • 外国药品位置检测系统源码分享
  • 好用的XML解析库——fast-xml-parser
  • 应用案例|开源 PolarDB-X 在互联网安全场景的应用实践
  • YOLOv9改进系列,YOLOv9主干网络替换为RepViT (CVPR 2024,清华提出,独家首发),助力涨点
  • 基于Springboot的无接触外卖配送系统
  • 电风扇制造5G智能工厂物联数字孪生平台,推进制造业数字化转型
  • 35. MyBatis中的缓存失效机制是如何工作的?
  • pytorch入门(1)——pytorch加载数据初认识
  • Nginx:高性能Web服务器与反向代理的深度剖析
  • 契约锁与您相约2024新疆数字经济创新大会暨新疆数字丝路博览会
  • QT支持C/C++工业边缘计算网关带RS485、HDMI视频输出
  • pinia在vue3中的使用