Flink实时写Hudi报NumberFormatException异常
Flink实时写Hudi报NumberFormatException异常
问题描述
在Flink项目中,针对Hudi表 xxxx_table 的 bucket_write 操作由于 java.lang.NumberFormatException 异常而从运行状态切换到失败状态。异常信息显示在解析字符串"ddd7a1ec"为整数时出现了问题。报错如下:
bucket_write: xxxx_table switched from RUNNING to FAILED with failure cause: java.lang.NumberFormatException: For input string: "ddd7a1ec"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:79)
at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.lambda$bootstrapIndexIfNeed$1(BucketStreamWriteFunction.java:162)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.bootstrapIndexIfNeed(BucketStreamWriteFunction.java:160)
at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.processElement(BucketStreamWriteFunction.java:112)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
原因分析:
报错相关源码如下:
public static int bucketIdFromFileId(String fileId) {
return Integer.parseInt(bucketIdStrFromFileId(fileId));
}
public static String bucketIdStrFromFileId(String fileId) {
return fileId.substring(0, 8);
}
通过查看 BucketIdentifier 源代码,发现 bucketIdFromFileId 方法尝试将 fileId 参数的子字符串解析为整数。 fileId 应包含作为前缀的桶标识符,而 bucketIdStrFromFileId 方法则通过取 fileId 的前8个字符来提取桶标识符。
这说明异常发生在解析历史数据文件时。查看hdfs文件目录发现,历史数据文件未按照桶索引逻辑进行编写,正常桶索引写入的文件名具有桶标识符作为前缀,而历史文件则缺乏此桶标识符前缀。因此,在尝试从历史文件名中解析桶标识符时,由于缺少预期的桶标识符前缀,解析过程失败。
那么历史数据是谁写入的呢?经过调查发现,该非法的历史数据是由下游系统为方便调试程序时写入。
解决方案:
删除无用的非法数据文件即可解决。如果非法文件数据有留存必要,那可能要备份后再处理。
为确保该问题不再发生,需要确保历史数据文件遵循桶索引逻辑,这样可以保证解析过程顺利进行,避免 java.lang.NumberFormatException 异常的发生。