当前位置: 首页 > article >正文

datax源码分析

文章目录

  • 前言
  • 一、加载配置文件
  • 二、根据加载的配置文件进行调度
  • 三、根据配置文件执行读取写入任务
  • 总结


前言

在上一篇文章当中我们已经了解了datax的启动原理,以及datax的最基础的配置,datax底层java启动类的入口及关键参数。

接下来我将进行启动类执行的源码分析,设计到datax当中的一些关键的配置文件的初始化,各种组件之间的加载原理及运行原理,带领大家深度理解datax的底层运行逻辑。


一、加载配置文件

在上一篇我们已经知道了,datax的调用的是Engine类的mian方法,代码当中只有一行关键代码Engine.entry(args)。

public static void main(String[] args) throws Exception {
		//我们人为设置的启动参数
        args = new String[]{
                "-mode", "standalone",
                "-jobid", "-1",
                "-job", "D:\\chrom下载\\datax\\datax\\bin\\mysql2mysql.json"
        };
        //设置datax的解压目录
        System.setProperty("datax.home", "D:\\chrom下载\\datax\\datax");
        int exitCode = 0;
        try {
        	//所有的逻辑都在这类方法里面
            Engine.entry(args);
        } catch (Throwable e) {
            //异常处理省略。。。
        }
        System.exit(exitCode);
    }
  1. Engine.entry(args)核心入口方法,该方法最核心内容主要是**解析datax运行所需的所有的参数,将其封装成Configuration,后续调用engine.start(configuration)**进行所有的业务逻辑的处理。我们需要着重的关注的方法只有一个就是Configuration configuration = ConfigParser.parse(jobPath)
    public static void entry(final String[] args) throws Throwable {
       	//省略不重要代码。。。
		//核心的解析逻辑,我们需要特别关注的代码逻辑
        Configuration configuration = ConfigParser.parse(jobPath);
     	//省略不重要代码。。。
        ConfigurationValidate.doValidate(configuration);
        Engine engine = new Engine();
        engine.start(configuration);
    }
  1. 解析我们需要的配置文件,job任务的json配置文件mysql2mysql.json,{you datax path}/conf/core.json文件,遍历{you datax patch}/plugin/reader/下的所有reade和{you datax patch}/plugin/writer/下的所有writer,加载符合的reader/writer的配置的plugin.json文件,reader/writer配置就是在mysql2mysql.json当中配置的,需要注意我们配置的reader/writer名称必须和reader/writer目录下的plugin_job_template.json下的name属性完全一致。处理还会加载前置处理plugin和后置的plugin,博主在源码debug时并未配置这两项的plugin。
    public static Configuration parse(final String jobPath) {
        //加载我们配置job的json配置文件,我这里是mysql2mysql.json配置文件,配置文件
        //可以参考我上一篇文章当中的配置
        Configuration configuration = ConfigParser.parseJobConfig(jobPath);
        //将我们配置mysql2mysql.json文件的属性和core.json文件的属性合并。
        configuration.merge(
        		//加载{you datax path}/conf/core.json文件
                ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH),
                false);
        //根据配置文件读取reader配置
        String readerPluginName = configuration.getString(
                CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
        //根据配置文件读取writer配置
        String writerPluginName = configuration.getString(
                CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
		//读取前置处理plugin
        String preHandlerName = configuration.getString(
                CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
		//读取后置处理plugin
        String postHandlerName = configuration.getString(
                CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);

        Set<String> pluginList = new HashSet<String>();
        pluginList.add(readerPluginName);
        pluginList.add(writerPluginName);

        if(StringUtils.isNotEmpty(preHandlerName)) {
            pluginList.add(preHandlerName);
        }
        if(StringUtils.isNotEmpty(postHandlerName)) {
            pluginList.add(postHandlerName);
        }
        try {
            //加载所有的插件配置文件,我们暂时只有reader和writer插件,即mysqlReader和mysqlWriter,将配置文件合并
            configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
        }catch (Exception e){
            //吞掉异常,保持log干净。这里message足够。
            LOG.warn(String.format("插件[%s,%s]加载失败,1s后重试... Exception:%s ", readerPluginName, writerPluginName, e.getMessage()));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e1) {
                //
            }
            configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
        }

        return configuration;
    }

到此为止我们的前期最关键的一个步骤就完成了,解析所有需要的配置项json,最终我们加载出来的配置文件格式如下:

{
    "internal": {
        "common": {
            "column": {
                "dateFormat": "yyyy-MM-dd",
                "datetimeFormat": "yyyy-MM-dd HH:mm:ss",
                "encoding": "utf-8",
                "extraFormats": [
                    "yyyyMMdd"
                ],
                "timeFormat": "HH:mm:ss",
                "timeZone": "GMT+8"
            }
        },
        "core": {
            "container": {
                "job": {
                    "id": -1,
                    "mode": "standalone",
                    "reportInterval": 10000
                },
                "taskGroup": {
                    "channel": 5
                },
                "trace": {
                    "enable": "false"
                }
            },
            "dataXServer": {
                "address": "http://localhost:7001/api",
                "reportDataxLog": false,
                "reportPerfLog": false,
                "timeout": 10000
            },
            "statistics": {
                "collector": {
                    "plugin": {
                        "maxDirtyNumber": 10,
                        "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector"
                    }
                }
            },
            "transport": {
                "channel": {
                    "byteCapacity": 67108864,
                    "capacity": 512,
                    "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
                    "flowControlInterval": 20,
                    "speed": {
                        "byte": -1,
                        "record": -1
                    }
                },
                "exchanger": {
                    "bufferSize": 32,
                    "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger"
                }
            }
        },
        "entry": {
            "jvm": "-Xms1G -Xmx1G"
        },
        //mysql2mysql.json当中的配置都在此项当中
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "column": [
                                "id",
                                "name",
                                "age"
                            ],
                            "connection": [
                                {
                                    "jdbcUrl": [
                                        "jdbc:mysql://localhost:3306/database?useSSL=false"
                                    ],
                                    "table": [
                                        "stu_copy1"
                                    ]
                                }
                            ],
                            "password": "123456",
                            "username": "root"
                        }
                    },
                    "writer": {
                        "name": "mysqlwriter",
                        "parameter": {
                            "column": [
                                "id",
                                "name",
                                "age"
                            ],
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:mysql://localhost:3306/database?useSSL=false&useUnicode=true&characterEncoding=utf8",
                                    "table": [
                                        "stu_copy2"
                                    ]
                                }
                            ],
                            "password": "123456",
                            "username": "root",
                            "writeMode": "replace"
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": 5
                }
            }
        },
        //reader/writer配置都在此项当中。
        "plugin": {
            "reader": {
                "mysqlreader": {
                    "class": "com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader",
                    "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
                    "developer": "alibaba",
                    "name": "mysqlreader",
                    "path": "D:\\chrom下载\\datax\\datax\\plugin\\reader\\mysqlreader"
                }
            },
            "writer": {
                "mysqlwriter": {
                    "class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter",
                    "description": "useScene: prod. mechanism: Jdbc connection using the database, execute insert sql. warn: The more you know about the database, the less problems you encounter.",
                    "developer": "alibaba",
                    "name": "mysqlwriter",
                    "path": "D:\\chrom下载\\datax\\datax\\plugin\\writer\\mysqlwriter"
                }
            }
        }
    }
}

二、根据加载的配置文件进行调度

  1. 默认使用JobContainer进行任务调度,AbstractContainer 实现类有两个,一个是JobContainer。另外一个是TaskGroupContainer。对于它们两个的关系,JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker
    TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。简单理解就是JobContainer负责任务调度,TaskGroupContainer负责执行。在源代码当中也有体现,在使用JobContainer调度后最终还是使用TaskGroupContainer进行执行。
public void start(Configuration allConf) {

        // 绑定column转换信息
        ColumnCast.bind(allConf);

        /**
         * 初始化PluginLoader,可以获取各种插件配置
         */
        LoadUtil.bind(allConf);

        //我们位配置任务组的配置信息,isJob为ture
        boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
                .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
        //JobContainer会在schedule后再行进行设置和调整值
        int channelNumber =0;
        AbstractContainer container;
        long instanceId;
        int taskGroupId = -1;
        if (isJob) {
            //创建一个JobContainer,将配置信息都设置到JobContainer中,
            //后续使用container执行
            allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
            container = new JobContainer(allConf);
            instanceId = allConf.getLong(
                    CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);

        } else {
            container = new TaskGroupContainer(allConf);
            instanceId = allConf.getLong(
                    CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
            taskGroupId = allConf.getInt(
                    CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
            channelNumber = allConf.getInt(
                    CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
        }

        //缺省打开perfTrace
        boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
        boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);

        //standalone模式的 datax shell任务不进行汇报
        if(instanceId == -1){
            perfReportEnable = false;
        }

        int priority = 0;
        try {
            priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
        }catch (NumberFormatException e){
            LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
        }

        Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
        //初始化PerfTrace
        PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
        perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
        //最终的实际执行的逻辑
        container.start();

    }
  1. 执行任务的调度,JobContainer.start(),最终会通过JobContainer.schedule()进行任务的调度处理。
public void start() {
        LOG.info("DataX jobContainer starts job.");

        boolean hasException = false;
        boolean isDryRun = false;
        try {
            this.startTimeStamp = System.currentTimeMillis();
            isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
            //模拟运行,不进行实际的数据的读取和写出。只是进行数据的预校验。该配置在属性job当中,
            //即我们配置mysql2mysql.json当中,默认我们未配置isDryRun为false。
            if(isDryRun) {
                LOG.info("jobContainer starts to do preCheck ...");
                this.preCheck();
            } else {
                userConf = configuration.clone();
                LOG.debug("jobContainer starts to do preHandle ...");
                //前置处理preHandler,实现AbstractPlugin接口,
                //最终调用AbstractPlugin.preHandler()方法
                this.preHandle();
                LOG.debug("jobContainer starts to do init ...");
                //初始化JobContainer的reader/writer属性,调用实际引用的
                //Reader.Job.init()和Writer.Job.init()方法
                this.init();
                LOG.info("jobContainer starts to do prepare ...");
                //调用对应的reader/writer的prepare前置处理方法
                this.prepare();
                LOG.info("jobContainer starts to do split ...");
                //重新设置reader/writer的配置,当我们配置了多个任务时,即
                //job.content设置多项时会出现,博主测下来只有第一个任务可以执行
                //后续的不会执行,就算每个配置项的reader/writer配置相同,不清楚是不是bug。
                this.totalStage = this.split();
                LOG.info("jobContainer starts to do schedule ...");
                //实际调用调度处理的逻辑,该逻辑是代码当中最核心的逻辑
                this.schedule();
                LOG.debug("jobContainer starts to do post ...");
                //调用对应的reader/writer的post后置处理方法
                this.post();
                LOG.debug("jobContainer starts to do postHandle ...");
                //后置处理postHandle,实现AbstractPlugin接口,
                //最终调用AbstractPlugin.postHandler()方法
                this.postHandle();
                LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
                //调用外部hook
                this.invokeHooks();
            }
        } catch (Throwable e) {
           //异常信息处理,此处省略。。。
        } finally {
        	//如果实际执行调度,则打印统计信息
            if(!isDryRun) {
                this.destroy();
                this.endTimeStamp = System.currentTimeMillis();
                if (!hasException) {
                    //最后打印cpu的平均消耗,GC的统计
                    VMInfo vmInfo = VMInfo.getVmInfo();
                    if (vmInfo != null) {
                        vmInfo.getDelta(false);
                        LOG.info(vmInfo.totalString());
                    }
                    LOG.info(PerfTrace.getInstance().summarizeNoException());
                    this.logStatistics();
                }
            }
        }
    }
  1. 任务调度处理JobContainer.schedule(),起内部调用了StandAloneScheduler.schedule(taskGroupConfigs)进行实际的调度处理。
    public void schedule(List<Configuration> configurations) {
        //省略部分代码。。。
        //关键点,根据配置信息执行任务调度分配
        startAllTaskGroup(configurations);
		//省略部分代码。。。
    }
  1. 通过线程池进行任务的调度处理,ProcessInnerScheduler.startAllTaskGroup。在实际调用当中我们可以发现最终使用的是TaskGroupContainer进行的处理,在job执行的配置文件,我们可以选择通过TaskGroupContainer直接执行还是通过JobContainer进行调度执行,如果core.container.model为taskGroup就直接通过TaskGroupContainer执行,不会进行前置插件后置插件等逻辑的处理。
    public void startAllTaskGroup(List<Configuration> configurations) {
        this.taskGroupContainerExecutorService = Executors
                .newFixedThreadPool(configurations.size());

        for (Configuration taskGroupConfiguration : configurations) {
        	//TaskGroupContainer taskGroupContainer = new 
        	// TaskGroupContainer(configuration);
			//TaskGroupContainer就是实际执行的调用的执行组件,最开始我们执行的JobContainer
			//我们可以根据mysql2mysql.json的配置直接执行,也可以通过JobContainer进行调度。
        return new TaskGroupContainerRunner(taskGroupContainer);
            TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
            //使用线程池进行并行处理调用
            this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
        }

        this.taskGroupContainerExecutorService.shutdown();
    }

三、根据配置文件执行读取写入任务

  1. TaskGroupContainerRunner封装了TaskGroupContainer,TaskGroupContainerRunner实现了Runnable接口,当被线程池调用时内部实际调用了TaskGroupContainer.start()方法进行处理。
   public void start() {
        try {
			//省略一部分代码。。。
            while (true) {
            	//1.判断task状态,省略。。。
                // 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误,省略代码。。。 
                //3.有任务未执行,且正在运行的任务数小于最大通道限制
                Iterator<Configuration> iterator = taskQueue.iterator();
                while(iterator.hasNext() && runTasks.size() < channelNumber){
					//省略部分代码。。。
                    Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
                	TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
                    taskStartTimeMap.put(taskId, System.currentTimeMillis());
                    //代码最关键的地方。此处就是实际调用reader和writer的入口,
                    //我们着重分析这个代码。
                	taskExecutor.doStart();

                    iterator.remove();
                    runTasks.add(taskExecutor);

                    //上面,增加task到runTasks列表,因此在monitor里注册。
                    taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));

                    taskFailedExecutorMap.remove(taskId);
                    LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started",
                            this.taskGroupId, taskId, attemptCount);
                }

                //4.任务列表为空,executor已结束, 搜集状态为success--->成功
                //5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报
                Thread.sleep(sleepIntervalInMillSec);
            }
            //6.最后还要汇报一次
            reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
        } catch (Throwable e) {
           //省略。。。
        }
    }
  1. 通过taskExecutor.doStart()执行数据的同步,该类当中有几个和重要的属性,taskConfig:job.content的内容、readerThread:读操作线程、writerThread:写操作线程、readerRunner:读操作的实际执行对象,它也为Runnable接口实现、writerRunner:写操作的实际执行对象,它也为Runnable接口实现。
public void doStart() {
			//1.先启动写线程,执行WriterRunner.run()方法
            this.writerThread.start();
            if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
                throw DataXException.asDataXException(
                        FrameworkErrorCode.RUNTIME_ERROR,
                        this.taskCommunication.getThrowable());
            }
            //2.后启动读线程,执行ReaderRunner.run()方法
            this.readerThread.start();
            if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
                throw DataXException.asDataXException(
                        FrameworkErrorCode.RUNTIME_ERROR,
                        this.taskCommunication.getThrowable());
            }

        }
  1. 读操作是通过执行读线程readerThread执行ReaderRunner.run方法,最终会调用到我们自定义的reader实现类,即MysqlReader执行startRead方法。
   public void startRead(RecordSender recordSender) {
            int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
			//这里会执行真正的查询操作
            this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
                    super.getTaskPluginCollector(), fetchSize);
        }
  1. 读取操作最终对数据库进行读取操作时会调用CommonRdbmsReader.startRead方法。
public void startRead(Configuration readerSliceConfig,
                              RecordSender recordSender,
                              TaskPluginCollector taskPluginCollector, int fetchSize) {
            //需要读取的数据的sql                 
            String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
           	//需要读取的数据表
            String table = readerSliceConfig.getString(Key.TABLE);

            PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);

            LOG.info("Begin to read record by Sql: [{}\n] {}.",
                    querySql, basicMsg);
            PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY);
            queryPerfRecord.start();
			//获取数据库连接
            Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,
                    username, password);

            DBUtil.dealWithSessionConfig(conn, readerSliceConfig,
                    this.dataBaseType, basicMsg);
            int columnNumber = 0;
            ResultSet rs = null;
            try {
            	//获取所有的查询结果列表数据。
                rs = DBUtil.query(conn, querySql, fetchSize);
                queryPerfRecord.end();

                ResultSetMetaData metaData = rs.getMetaData();
                columnNumber = metaData.getColumnCount();
                PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
                allResultPerfRecord.start();
                long rsNextUsedTime = 0;
                long lastTime = System.nanoTime();
                while (rs.next()) {
                    rsNextUsedTime += (System.nanoTime() - lastTime);
                    //将读取的到的所有的数据都通过recordSender传递给writer,默认使用的是
                    //BufferedRecordExchanger,将读取到的数据写入到缓冲区当中。
                    this.transportOneRecord(recordSender, rs,
                            metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
                    lastTime = System.nanoTime();
                }

                allResultPerfRecord.end(rsNextUsedTime);
                LOG.info("Finished read record by Sql: [{}\n] {}.",
                        querySql, basicMsg);

            }catch (Exception e) {
                throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
            } finally {
                DBUtil.closeDBResources(null, conn);
            }
        }
  1. WriterRunner通过读取到缓冲区BufferedRecordExchanger的数据后,交由具体的writer进行处理,我们示例使用的是Mysqlwriter,最终调用Mysqlwriter.startWrite()方法处理缓冲区的数据。
        public void startWrite(RecordReceiver recordReceiver) {
        //调用CommonRdbmsWriter进行处理收到的缓冲区数据
            this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,
                    super.getTaskPluginCollector());
        }
  1. 通过CommonRdbmsWriter将读取到的缓冲区的数据进行处理写入到配置的数据库当中。
public void startWrite(RecordReceiver recordReceiver,
                               Configuration writerSliceConfig,
                               TaskPluginCollector taskPluginCollector) {
            //获取连接
            Connection connection = DBUtil.getConnection(this.dataBaseType,
                    this.jdbcUrl, username, password);
            DBUtil.dealWithSessionConfig(connection, writerSliceConfig,
                    this.dataBaseType, BASIC_MESSAGE);
            //处理数据
            startWriteWithConnection(recordReceiver, taskPluginCollector, connection);
        }
       public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
            this.taskPluginCollector = taskPluginCollector;
            // 用于写入数据的时候的类型根据目的表字段类型转换
            this.resultSetMetaData = DBUtil.getColumnMetaData(connection,
                    this.table, StringUtils.join(this.columns, ","));
            // 写数据库的SQL语句
            calcWriteRecordSql();
            List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
            int bufferBytes = 0;
            try {
                Record record;
                while ((record = recordReceiver.getFromReader()) != null) {
                    if (record.getColumnNumber() != this.columnNumber) {
                        // 源头读取字段列数与目的表字段写入列数不相等,直接报错
                        throw DataXException
                                .asDataXException(
                                        DBUtilErrorCode.CONF_ERROR,
                                        String.format(
                                                "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
                                                record.getColumnNumber(),
                                                this.columnNumber));
                    }

                    writeBuffer.add(record);
                    bufferBytes += record.getMemorySize();

                    if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
                        doBatchInsert(connection, writeBuffer);
                        writeBuffer.clear();
                        bufferBytes = 0;
                    }
                }
                if (!writeBuffer.isEmpty()) {
                    doBatchInsert(connection, writeBuffer);
                    writeBuffer.clear();
                    bufferBytes = 0;
                }
            } catch (Exception e) {
                throw DataXException.asDataXException(
                        DBUtilErrorCode.WRITE_DATA_ERROR, e);
            } finally {
                writeBuffer.clear();
                bufferBytes = 0;
                DBUtil.closeDBResources(null, null, connection);
            }
        }

总结

datax的整体原理就是,根据我们配置的文件,加载不同的reader和writer,reader通过配置的单独的read线程将源数据库数据读取出来写入到缓冲区当中,writer通过线程将缓冲区的数据读取出来写入到目标数据库当中。

我们在实际的工作当中可能更需要的关注对于这个reader和writer的扩展,对于datax都已经提供了plugin生命周期的函数处理,前置处理后置处理等函数,如果我们需要自定义数据库的reader和writer逻辑时,就需要自定义Reader.Task的startRead接口,以及Writer.Task的startWrite接口去自定对于缓冲区数据的处理逻辑。


http://www.kler.cn/a/586261.html

相关文章:

  • unity导出比例问题
  • Ansible相关工具:ansible-doc、ansible
  • Linux系统中切换CUDA版本的完整指南(含vim使用方法)
  • NAFNet:Simple Baselines for Image Restoration
  • 深度解读DeepSeek部署使用安全(48页PPT)(文末有下载方式)
  • SSA-朴素贝叶斯分类预测matlab代码
  • 食品配送管理系统(源码+文档+讲解+演示)
  • 西门子S7-1200 PLC远程上下载程序方案
  • Springboot中的异常处理
  • 大模型——Qwen2-VL OCR能力微调与量化
  • 蓝桥杯2024年第十五届省赛真题-回文数组
  • OpenCV中文路径图片读写终极指南(Python实现)
  • 光伏储能:未来能源的黄金搭档
  • 【品铂科技】在高精度定位行业内的口碑怎么样?
  • 【说下线程本地变量ThreadLocal及其用法】
  • 游戏引擎学习第151天
  • ShadowCracker智能口令破解工具架构
  • 【工具】C#游戏防沉迷小工具
  • 17 | 实现简洁架构的 Biz 层
  • 【无标题】ffmpeg 合并文件夹下所有视频