记一次kafka消息丢失问题排查
背景
我写了一个 自定义分区器 ,自测发送了一些简单的如Hello world
之类的消息成功了,并且日志现实确实调用了我自己的分区器,然后我自认为已经完美了。
后来我发现很多消息消费者没有消费
, 且发送完成回调
(CallBack)没有被调用。 消息就这么石沉大海,也没有错误日志。
排查过程
一开始我从消息丢失
方向排查,我前面文章也提到 消息发送几种方式 ,其中异步发送确实可能丢失,但我发送代码如下,我处理了异常,如果发送失败应该有异常打印出来的。
我一时也陷入迷茫,于是我网上查阅资料,关于生产者几个重要配置参数
acks 分区中有多少副本收到消息,生产者才认为消息发送成功 ,默认值1 ,并且我本地使用单节点,因此排除该配置
有些网上说
batch.size
和request.timeout.ms
不对,我半信半疑,改了参数试试,依旧有问题
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println(content + " send sucesss " + recordMetadata.toString());
}
}
});
定位问题
肯定是我修改了什么导致的问题, 于是我先把所有配置修改为默认的,发现能正常发送消息,消息也被消费。最终发现一旦使用我自定义分区器
,消息发送就有问题。代码如下,聪明的你看出问题在哪了吗?
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//获取分区总个数
int numPartitions = partitions.size();
int partition = key != null ? key.hashCode() % numPartitions : value.hashCode() % numPartitions;
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
答案揭晓
问题在于 HashCode 返回的可能是负数 ,负数再对 分区数 取模,得到的分区序号
就为负数。分区序号一旦为负,分区序号有问题了当然消息就发不出去,只是没想到也没报错。定位问题修复也很简单,给HashCode 取绝对值就解决了。
int partition = key != null ? Math.abs(key.hashCode()) % numPartitions : Math.abs(value.hashCode()) % numPartitions;
举一反三
如果返回的分区序号大于分区数
消息发送也同负数情况一样,石沉大海。
分区数配置为 10 ,分区器返回序号固定11 测试结果同预期一样,和负数情况一样,消息丢失。
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 11;
}
反思
-
没意识到 默认HashCode 可能为负数(当时误任务默认的不会为负数,除非恶意重写了HashCod方法)
-
没有参考默认的分区器
DefaultPartitioner
使用了 Utils.toPositive 将HashCode 转成了正数 - 排查过程也熟悉了不少 kafka重要参数
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
结语
限于篇幅一些代码没有给出,欢迎私下交流学习。