当前位置: 首页 > article >正文

Seatunnel解决ftp读取json文件无法读取数组以及格式化之后的json无法解析的问题

问题原因

在JsonRead这个方法里面 在源码中使用的逻辑是读取一行 然后把这个json进行解析
但是这样存在一个问题 比如如果json的格式是这样的
{
name:“zhangsan”,
age:25
}
如果是这样的话 第一行读到的内容就是 {
显然 一个 { 并不是一个json 这样会导致解析json失败

问题解决的思路

我的方法是将整个文件中的内容全部解析
然后使用Seatunnel中自带的JackJson这个工具类进行解析
然后再获取到单个的Json对象 之后再解析成一个Json的字符串
因为解析过之后的Json字符串肯定不存在换行 所以这种换行的问题算是规避了
但是这样又引发了另一个问题就是 一下子加载全部的文件内容可能会导致内存飙升 而且解析json 构造对象这个过程也是比较耗费资源的
但是我目前没有想出来更好的方法
我目前的业务需求是 这种ftp的文件都是小文件 不存在特别大的json 所以我的这个方法是可以完成现在的需求的

修改代码的内容

要修改的代码的位置是
org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java

    @Override
    public void readProcess(
            String path,
            String tableId,
            Collector<SeaTunnelRow> output,
            InputStream inputStream,
            Map<String, String> partitionsMap,
            String currentFileName)
            throws IOException {
        InputStream actualInputStream;
        switch (compressFormat) {
            case LZO:
                LzopCodec lzo = new LzopCodec();
                actualInputStream = lzo.createInputStream(inputStream);
                break;
            case NONE:
                actualInputStream = inputStream;
                break;
            default:
                log.warn(
                        "Json file does not support this compress type: {}",
                        compressFormat.getCompressCodec());
                actualInputStream = inputStream;
                break;
        }
        try (BufferedReader reader =
                new BufferedReader(new InputStreamReader(actualInputStream, encoding))) {
            //TODO wxt 优先使用之前的方法
            try{
                reader.lines()
                        .forEach(
                                line -> {
                                    try {
                                        SeaTunnelRow seaTunnelRow =
                                                deserializationSchema.deserialize(
                                                        line.getBytes(StandardCharsets.UTF_8));
                                        if (isMergePartition) {
                                            int index = seaTunnelRowType.getTotalFields();
                                            for (String value : partitionsMap.values()) {
                                                seaTunnelRow.setField(index++, value);
                                            }
                                        }
                                        seaTunnelRow.setTableId(tableId);
                                        output.collect(seaTunnelRow);
                                    } catch (IOException e) {
                                        String errorMsg =
                                                String.format(
                                                        "Deserialize this jsonFile data [%s] failed, please check the origin data",
                                                        line);
                                        throw new FileConnectorException(
                                                FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,
                                                errorMsg,
                                                e);
                                    }
                                });
            }catch (Exception e){
                //region 我修改的内容

                //首先读取全部的内容
                // 将 BufferedReader 内容读取到一个 String
                StringWriter stringWriter = new StringWriter();
                String line;
                while ((line = reader.readLine()) != null) {
                    stringWriter.write(line);
                }
                String jsonContent = stringWriter.toString();

                // 判断 JSON 类型并处理
                ObjectMapper objectMapper = new ObjectMapper();
                JsonNode jsonNode = objectMapper.readTree(jsonContent);

                if (jsonNode.isArray()) {
                 
                    // 遍历数组并转换为单行字符串
                    for (JsonNode node : jsonNode) {
                        String singleLineJson = objectMapper.writeValueAsString(node);
                       // region 这一部分是我直接从上面复制下来的
                        try {
                            SeaTunnelRow seaTunnelRow =
                                    deserializationSchema.deserialize(
                                            singleLineJson.getBytes(StandardCharsets.UTF_8));
                            if (isMergePartition) {
                                int index = seaTunnelRowType.getTotalFields();
                                for (String value : partitionsMap.values()) {
                                    seaTunnelRow.setField(index++, value);
                                }
                            }
                            seaTunnelRow.setTableId(tableId);
                            output.collect(seaTunnelRow);
                        } catch (IOException e1) {
                            String errorMsg =
                                    String.format(
                                            "Deserialize this jsonFile data [%s] failed, please check the origin data",
                                            singleLineJson);
                            throw new FileConnectorException(
                                    FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,
                                    errorMsg,
                                    e);
                        }
                        // endregion
                    }
                } else if (jsonNode.isObject()) {
                    String singleLineJson = objectMapper.writeValueAsString(jsonNode);
                    // region 这一部分是我直接从上面复制下来的
                    try {
                        SeaTunnelRow seaTunnelRow =
                                deserializationSchema.deserialize(
                                        singleLineJson.getBytes(StandardCharsets.UTF_8));
                        if (isMergePartition) {
                            int index = seaTunnelRowType.getTotalFields();
                            for (String value : partitionsMap.values()) {
                                seaTunnelRow.setField(index++, value);
                            }
                        }
                        seaTunnelRow.setTableId(tableId);
                        output.collect(seaTunnelRow);
                    } catch (IOException e1) {
                        String errorMsg =
                                String.format(
                                        "Deserialize this jsonFile data [%s] failed, please check the origin data",
                                        singleLineJson);
                        throw new FileConnectorException(
                                FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,
                                errorMsg,
                                e);
                    }
                    // endregion
                }


                //endregion


            }



        }
    }

http://www.kler.cn/a/421622.html

相关文章:

  • 12月2日星期一今日早报简报微语报早读
  • node.js @ffmpeg-installer/ffmpeg 桌面推流
  • SpringBoot源码-Spring Boot启动时控制台为何会打印logo以及自定义banner.txt文件控制台打印
  • Android Camera2采集并编码为H.264
  • 源码分析之Openlayers的核心EventTarget类的实现
  • el-table 最简单的方法配置图片预览功能
  • TypeScript和JavaScript的区别
  • 如何使用brew安装phpredis扩展?
  • 零基础学安全--Burp Suite(4)proxy模块以及漏洞测试理论
  • CTF之WEB(php弱类型绕过)
  • openGauss极致RTO流程讲解及运维方法
  • vue实现懒加载
  • 30分钟学会正则表达式
  • Wwise SoundBanks内存优化
  • renderExtraFooter 添加本周,本月,本年
  • 数据库——创建索引的原则
  • 学成在线day08
  • k8s 亲和性之Affinity
  • 《Python基础》之Pandas库
  • PostgreSQL认证培训需要什么条件
  • 上天入地,智能诊断,多语言支持,璞华IETM打造产品技术信息管理极致用户体验
  • Python虚拟环境管理工具:Pipenv
  • Linux-Ubuntu16.04摄像头 客户端抓取帧并保存为PNG
  • Golang教程第24篇(语言接口)
  • Meta-Llama-3-8B-Instruct 模型的混合精度训练显存需求:AdamW优化器(中英双语)
  • STM32G4系列MCU的Direct memory access controller (DMA)功能之一