30、Flink中操作已经配置好的远程文件系统
背景:flink作业中既配置了obs作为chk的远程文件系统,又在作业中读取obs文件内容时,使用obsclient会导致任务无法创建chk目录而启动失败。
解决办法:使用flink-core里的fileSystem来操作 。这样就不用去使用对应文件系统的客户端了,而是直接使用的是当前flink中配置的远程文件系统。
public static Long getKafkaOffsetFromHoodie2(String hdfsPath) throws IOException {
final Path path = new Path(hdfsPath);
//获取文件系统
final FileSystem fileSystem = path.getFileSystem();
final FileStatus[] fileStatuses = fileSystem.listStatus(path);
if (fileSystem.exists(path)) {
if (fileStatuses.length > 0) {
// 获取最新commit文件
final FileStatus latestFile =
Arrays.stream(fileStatuses)
.filter(x -> x.getPath().getName().endsWith(".commit"))
.max(Comparator.comparingLong(FileStatus::getModificationTime))
.orElse(null);
if (latestFile != null) {
LOG.info("最新commit文件为:{}", latestFile.getPath().getPath());
// 读取文件内容
try (FSDataInputStream inputStream = fileSystem.open(latestFile.getPath());
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
StringBuilder stringBuilder = new StringBuilder();
while ((line = reader.readLine()) != null) {
stringBuilder.append(line);
}
JSONObject jsonObject = JSON.parseObject(stringBuilder.toString());
if (jsonObject.containsKey("extraMetadata")) {
JSONObject extraMetadata = jsonObject.getJSONObject("extraMetadata");
if (extraMetadata.containsKey("deltastreamer.checkpoint.key")) {
String string = extraMetadata.getString("deltastreamer.checkpoint.key");
String offset = string.split(",")[1].split(":")[1];
LOG.info("当前偏移量==> " + offset);
return Long.valueOf(offset);
}
} else {
LOG.error("this is not an delta-stream mission");
throw new IOException("this is not an delta-stream mission");
}
} catch (IOException e) {
System.err.println("读取文件时发生错误: " + e.getMessage());
}
}
} else {
LOG.warn("{} 目录为空或无法访问", hdfsPath);
}
} else {
LOG.error("{} 指定路径不是一个有效的目录", hdfsPath);
}
return null;
}