当前位置: 首页 > article >正文

高效改进!防止DataX从HDFS导入关系型数据库丢数据

在这里插入图片描述

高效改进!防止DataX从HDFS导入关系型数据库丢数据

针对DataX在从HDFS导入数据到关系型数据库过程中的数据丢失问题,优化了分片处理代码。改动包括将之前单一分片处理逻辑重构为循环处理所有分片,确保了每个分片数据都得到全面读取和传输,有效提升了数据导入的可靠性和效率。这些改动不仅解决了丢数据的问题,还显著提高了处理多分片数据的性能。

背景

我们数据中台设计,数据同步功能是datax完成,在orc格式时datax从hdfs导数据到关系型数据库数据丢失,而在textfile格式时丢失数据,当文件超过250M多时会丢数据。因想使用orc格式节省数据空间,提高spark运行效率,需要解决这个问题。

问题

在这里插入图片描述
在这里插入图片描述

只读取了256M 左右的数据,数据条数对不上,导致hdfs,orc格式导入数据到pg,mysql等关系型数据库,数据丢失。

解决

修改hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java

问题代码

 InputSplit[] splits = in.getSplits(conf, 1);

                RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
                Object key = reader.createKey();
                Object value = reader.createValue();
                // 获取列信息
                List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                List<Object> recordFields;
                while (reader.next(key, value)) {
                    recordFields = new ArrayList<Object>();

                    for (int i = 0; i <= columnIndexMax; i++) {
                        Object field = inspector.getStructFieldData(value, fields.get(i));
                        recordFields.add(field);

修改后

 // OrcInputFormat getSplits params numSplits not used, splits size = block numbers
                InputSplit[] splits = in.getSplits(conf, -1);
                for (InputSplit split : splits) {
                    {
                        RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
                        Object key = reader.createKey();
                        Object value = reader.createValue();
                        // 获取列信息
                        List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                        List<Object> recordFields;
                        while (reader.next(key, value)) {
                            recordFields = new ArrayList<Object>();

                            for (int i = 0; i <= columnIndexMax; i++) {
                                Object field = inspector.getStructFieldData(value, fields.get(i));
                                recordFields.add(field);
                            }
                            transportOneRecord(column, recordFields, recordSender,
                                    taskPluginCollector, isReadAllColumns, nullFormat);
                        }
                        reader.close();

点击参考查看

重新打包替换hdfsreader.jar即可

解析

  1. 新增循环处理所有分片的逻辑: 之前的代码只处理了第一个分片(splits[0]),现在改为了处理所有的分片。新增的部分如下:

    java
    InputSplit[] splits = in.getSplits(conf, -1);
    for (InputSplit split : splits) {
        RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
        Object key = reader.createKey();
        Object value = reader.createValue();
    

    旧的逻辑是:

    java
    InputSplit[] splits = in.getSplits(conf, 1);
    RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
    Object key = reader.createKey();
    Object value = reader.createValue();
    

    这样改动的目的是,同时处理多个分片,从而提升数据读取的效率。

  2. 移除了重复的分片处理逻辑: 不使用重复的分片处理逻辑:

    java
    // OrcInputFormat getSplits params numSplits not used, splits size = block numbers
    InputSplit[] splits = in.getSplits(conf, -1);
    
  3. 代码块的重构: 将读取分片、解析记录以及处理记录的逻辑放入一个循环中,使代码更简洁、更易读:

    改之前:

    java
    InputSplit[] splits = in.getSplits(conf, 1);
    RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
    Object key = reader.createKey();
    Object value = reader.createValue();
    

    改后使用循环:

    java
    InputSplit[] splits = in.getSplits(conf, -1);
    for (InputSplit split : splits) {
        RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
        Object key = reader.createKey();
        Object value = reader.createValue();
    
  4. 处理每个记录字段并传输记录: 保持对每条记录的字段读取并将其传输转移到了新的循环处理逻辑中:

    改之前:

    while (reader.next(key, value)) {
        recordFields = new ArrayList<Object>();
        for (int i = 0; i <= columnIndexMax; i++) {
            Object field = inspector.getStructFieldData(value, fields.get(i));
            recordFields.add(field);
        }
        transportOneRecord(column, recordFields, recordSender,
                           taskPluginCollector, isReadAllColumns, nullFormat);
    }
    reader.close();
    

    改后:

    for (InputSplit split : splits) {
        RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
        Object key = reader.createKey();
        Object value = reader.createValue();
        List<? extends StructField> fields = inspector.getAllStructFieldRefs();
        List<Object> recordFields;
        while (reader.next(key, value)) {
            recordFields = new ArrayList<Object>();
            for (int i = 0; i <= columnIndexMax; i++) {
                Object field = inspector.getStructFieldData(value, fields.get(i));
                recordFields.add(field);
            }
            transportOneRecord(column, recordFields, recordSender,
                               taskPluginCollector, isReadAllColumns, nullFormat);
        }
        reader.close();
    }
    
  5. 为什么是256M没有更改前他是按每个文件进行分割,而在datax的配置中Java heap size 即默认xmx设置时256M,所以当单个文件超过256M时,超过的部分就被丢掉了,造成数据缺失,而更改后的是按hdfs block size 块的大小进行分割,循环遍历,所以直接修改xmx也能解决问题,但是你要想万一文件超过128G那,你不可能一直调大Java heap size,所以按hdfs block size分割是合理的解决方案

reader单个分片(InputSplit)的大小

在DataX的数据读取过程中,reader单个分片(InputSplit)的大小通常取决于底层存储系统和具体的配置参数。对于HDFS(Hadoop Distributed File System)를的读取,分片大小主要由以下几个因素决定:

  1. HDFS块大小(Block Size): HDFS将文件分为多个块,每个块通常是64MB、128MB或256MB大小,具体大小可以通过HDFS的配置参数dfs.blocksize进行设置。DataX会根据这些块来创建分片,也就是一个分片通常对应一个或多个HDFS块。
  2. 文件本身的大小: 如果文件比HDFS块小,或者没有跨越多个块,则一个文件可能只对应一个分片。
  3. DataX的任务配置: DataX允许在其配置文件中指定一些与分片相关的参数,类似于Hadoop的mapreduce.input.fileinputformat.split.maxsizemapreduce.input.fileinputformat.split.minsize,这些参数可以影响分片的逻辑。
  4. InputFormat: DataX使用的Hadoop的InputFormat也能控制分片的逻辑,比如FileInputFormatTextInputFormatOrcInputFormat等。这些格式定义了如何分割输入数据,结合文件大小和块大小来决定分片。

总结

  • 主要改动是将之前只处理单个分片的逻辑重构为一个循环,处理所有分片。这使代码更具扩展性和效率,也适应不同的输入数据量。
  • 移除了无用且重复的注释和代码行,以保持代码清晰。

http://www.kler.cn/a/368834.html

相关文章:

  • c语言中整数在内存中的存储
  • spark on kubernetes运行测试
  • 基于SSM+小程序的旅游社交登录管理系统(旅游4)
  • 鸿蒙-窗口什么时候有叉按钮
  • web pdf 图片拖动图片合成
  • 【vue+leaflet】自定义控件(五)
  • 学习threejs,使用粒子实现雨滴特效
  • 计算机网络协议
  • 14 Docker容器单机网络架构全攻略:docker网络细节揭秘
  • 【mysql 进阶】3 MySQL架构和存储引擎
  • esp32c6 开发实战:http 协议
  • Pytorch学习--如何下载及使用Pytorch中自带数据集,如何把数据集和transforms联合在一起使用
  • 【WIN】WIN10_WSL_Ubuntu18.04_ROS_rviz_docker
  • Mbox网关——氢能制造产业的智能桥梁
  • 4.rabbitmq安装【Docker】
  • 【Spring】控制反转 依赖注入(本文内容由大模型生成)
  • USART串口通信:配置与实践详解(下篇)
  • css模糊遮罩效果
  • vue20.17.0-全局注册
  • 102. 管道漫游案例
  • Ubuntu20.04版本的NVIDIA显卡驱动程序安装(宝宝级攻略)
  • vue 项目i18n国际化,快速抽离中文,快速翻译
  • vscode使用make编译c的问题
  • 机器人和智能的进化速度远超预期-ROS-AI-
  • git的学习之本地进行操作
  • 【数据结构】队列和栈相互实现