当前位置: 首页 > article >正文

30、Flink中操作已经配置好的远程文件系统

背景:flink作业中既配置了obs作为chk的远程文件系统,又在作业中读取obs文件内容时,使用obsclient会导致任务无法创建chk目录而启动失败。
解决办法:使用flink-core里的fileSystem来操作 。这样就不用去使用对应文件系统的客户端了,而是直接使用的是当前flink中配置的远程文件系统。

 public static Long getKafkaOffsetFromHoodie2(String hdfsPath) throws IOException {

    final Path path = new Path(hdfsPath);
    //获取文件系统
    final FileSystem fileSystem = path.getFileSystem();
    final FileStatus[] fileStatuses = fileSystem.listStatus(path);

    if (fileSystem.exists(path)) {

      if (fileStatuses.length > 0) {
        // 获取最新commit文件
        final FileStatus latestFile =
            Arrays.stream(fileStatuses)
                .filter(x -> x.getPath().getName().endsWith(".commit"))
                .max(Comparator.comparingLong(FileStatus::getModificationTime))
                .orElse(null);
        if (latestFile != null) {
          LOG.info("最新commit文件为:{}", latestFile.getPath().getPath());
          // 读取文件内容
          try (FSDataInputStream inputStream = fileSystem.open(latestFile.getPath());
              BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
            String line;
            StringBuilder stringBuilder = new StringBuilder();
            while ((line = reader.readLine()) != null) {
              stringBuilder.append(line);
            }
            JSONObject jsonObject = JSON.parseObject(stringBuilder.toString());

            if (jsonObject.containsKey("extraMetadata")) {
              JSONObject extraMetadata = jsonObject.getJSONObject("extraMetadata");
              if (extraMetadata.containsKey("deltastreamer.checkpoint.key")) {
                String string = extraMetadata.getString("deltastreamer.checkpoint.key");
                String offset = string.split(",")[1].split(":")[1];
                LOG.info("当前偏移量==>  " + offset);
                return Long.valueOf(offset);
              }
            } else {
              LOG.error("this is not an delta-stream mission");
              throw new IOException("this is not an delta-stream mission");
            }

          } catch (IOException e) {
            System.err.println("读取文件时发生错误: " + e.getMessage());
          }
        }

      } else {
        LOG.warn("{} 目录为空或无法访问", hdfsPath);
      }
    } else {
      LOG.error("{} 指定路径不是一个有效的目录", hdfsPath);
    }
    return null;
  }

http://www.kler.cn/a/533584.html

相关文章:

  • AJAX笔记进阶篇
  • CSS 值和单位详解:从基础到实战
  • unordered_map/set的哈希封装
  • JVM 四虚拟机栈
  • 安全策略实验报告
  • 当WebGIS遇到智慧文旅-以长沙市不绕路旅游攻略为例
  • PyTorch Geometric(PyG)机器学习实战
  • deepseek设计硬件电路之设计一个pA级电流测量电路
  • Android 常用命令和工具解析之Battery Historian
  • 基于HTML生成网页有什么优势
  • Java—不可变集合
  • 最新黑马商城运行问题解决
  • 优化数据库结构
  • 服务器安装了esxi,通过esxi创建了N个虚拟机,如何实现类似于阿里云或者腾讯云的类似的云端管理虚拟机监控虚拟机的系统,要求开源,中文界面
  • 【Block总结】MDCR,多尺度深度可分离卷积,捕捉不同感受野范围的空间特征
  • LabVIEW图像采集与应变场测量系统
  • 【算法】动态规划专题③ ——二维DP python
  • 【PromptCoder + Bolt.new】Cascade模式自动生成页面和对应的路由
  • 10.单例模式 (Singleton Pattern)
  • 防火墙策略
  • react的antd表格数据回显在form表单中
  • 2024 TCSVT: LS2T2NN
  • 深入解析 Chrome 浏览器的多进程架构:标签页是进程还是线程?(中英双语)
  • 20250205——Windows系统基于ollama的DeepSeek-R1本地安装
  • 备战蓝桥杯-并查集
  • 【力扣】54.螺旋矩阵