datax源码分析
文章目录
- 前言
- 一、加载配置文件
- 二、根据加载的配置文件进行调度
- 三、根据配置文件执行读取写入任务
- 总结
前言
在上一篇文章当中我们已经了解了datax的启动原理,以及datax的最基础的配置,datax底层java启动类的入口及关键参数。
接下来我将进行启动类执行的源码分析,设计到datax当中的一些关键的配置文件的初始化,各种组件之间的加载原理及运行原理,带领大家深度理解datax的底层运行逻辑。
一、加载配置文件
在上一篇我们已经知道了,datax的调用的是Engine类的mian方法,代码当中只有一行关键代码Engine.entry(args)。
public static void main(String[] args) throws Exception {
//我们人为设置的启动参数
args = new String[]{
"-mode", "standalone",
"-jobid", "-1",
"-job", "D:\\chrom下载\\datax\\datax\\bin\\mysql2mysql.json"
};
//设置datax的解压目录
System.setProperty("datax.home", "D:\\chrom下载\\datax\\datax");
int exitCode = 0;
try {
//所有的逻辑都在这类方法里面
Engine.entry(args);
} catch (Throwable e) {
//异常处理省略。。。
}
System.exit(exitCode);
}
- Engine.entry(args)核心入口方法,该方法最核心内容主要是**解析datax运行所需的所有的参数,将其封装成Configuration,后续调用engine.start(configuration)**进行所有的业务逻辑的处理。我们需要着重的关注的方法只有一个就是
Configuration configuration = ConfigParser.parse(jobPath)
public static void entry(final String[] args) throws Throwable {
//省略不重要代码。。。
//核心的解析逻辑,我们需要特别关注的代码逻辑
Configuration configuration = ConfigParser.parse(jobPath);
//省略不重要代码。。。
ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
engine.start(configuration);
}
- 解析我们需要的配置文件,job任务的json配置文件mysql2mysql.json,{you datax path}/conf/core.json文件,遍历{you datax patch}/plugin/reader/下的所有reade和{you datax patch}/plugin/writer/下的所有writer,加载符合的reader/writer的配置的plugin.json文件,reader/writer配置就是在mysql2mysql.json当中配置的,需要注意我们配置的reader/writer名称必须和reader/writer目录下的plugin_job_template.json下的name属性完全一致。处理还会加载前置处理plugin和后置的plugin,博主在源码debug时并未配置这两项的plugin。
public static Configuration parse(final String jobPath) {
//加载我们配置job的json配置文件,我这里是mysql2mysql.json配置文件,配置文件
//可以参考我上一篇文章当中的配置
Configuration configuration = ConfigParser.parseJobConfig(jobPath);
//将我们配置mysql2mysql.json文件的属性和core.json文件的属性合并。
configuration.merge(
//加载{you datax path}/conf/core.json文件
ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH),
false);
//根据配置文件读取reader配置
String readerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
//根据配置文件读取writer配置
String writerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
//读取前置处理plugin
String preHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
//读取后置处理plugin
String postHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
Set<String> pluginList = new HashSet<String>();
pluginList.add(readerPluginName);
pluginList.add(writerPluginName);
if(StringUtils.isNotEmpty(preHandlerName)) {
pluginList.add(preHandlerName);
}
if(StringUtils.isNotEmpty(postHandlerName)) {
pluginList.add(postHandlerName);
}
try {
//加载所有的插件配置文件,我们暂时只有reader和writer插件,即mysqlReader和mysqlWriter,将配置文件合并
configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
}catch (Exception e){
//吞掉异常,保持log干净。这里message足够。
LOG.warn(String.format("插件[%s,%s]加载失败,1s后重试... Exception:%s ", readerPluginName, writerPluginName, e.getMessage()));
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
//
}
configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
}
return configuration;
}
到此为止我们的前期最关键的一个步骤就完成了,解析所有需要的配置项json
,最终我们加载出来的配置文件格式如下:
{
"internal": {
"common": {
"column": {
"dateFormat": "yyyy-MM-dd",
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"encoding": "utf-8",
"extraFormats": [
"yyyyMMdd"
],
"timeFormat": "HH:mm:ss",
"timeZone": "GMT+8"
}
},
"core": {
"container": {
"job": {
"id": -1,
"mode": "standalone",
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}
},
"dataXServer": {
"address": "http://localhost:7001/api",
"reportDataxLog": false,
"reportPerfLog": false,
"timeout": 10000
},
"statistics": {
"collector": {
"plugin": {
"maxDirtyNumber": 10,
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector"
}
}
},
"transport": {
"channel": {
"byteCapacity": 67108864,
"capacity": 512,
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"flowControlInterval": 20,
"speed": {
"byte": -1,
"record": -1
}
},
"exchanger": {
"bufferSize": 32,
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger"
}
}
},
"entry": {
"jvm": "-Xms1G -Xmx1G"
},
//mysql2mysql.json当中的配置都在此项当中
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"age"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://localhost:3306/database?useSSL=false"
],
"table": [
"stu_copy1"
]
}
],
"password": "123456",
"username": "root"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"name",
"age"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://localhost:3306/database?useSSL=false&useUnicode=true&characterEncoding=utf8",
"table": [
"stu_copy2"
]
}
],
"password": "123456",
"username": "root",
"writeMode": "replace"
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
},
//reader/writer配置都在此项当中。
"plugin": {
"reader": {
"mysqlreader": {
"class": "com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
"developer": "alibaba",
"name": "mysqlreader",
"path": "D:\\chrom下载\\datax\\datax\\plugin\\reader\\mysqlreader"
}
},
"writer": {
"mysqlwriter": {
"class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute insert sql. warn: The more you know about the database, the less problems you encounter.",
"developer": "alibaba",
"name": "mysqlwriter",
"path": "D:\\chrom下载\\datax\\datax\\plugin\\writer\\mysqlwriter"
}
}
}
}
}
二、根据加载的配置文件进行调度
- 默认使用JobContainer进行任务调度,AbstractContainer 实现类有两个,一个是JobContainer。另外一个是TaskGroupContainer。对于它们两个的关系,JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker
TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。简单理解就是JobContainer负责任务调度,TaskGroupContainer负责执行。在源代码当中也有体现,在使用JobContainer调度后最终还是使用TaskGroupContainer进行执行。
public void start(Configuration allConf) {
// 绑定column转换信息
ColumnCast.bind(allConf);
/**
* 初始化PluginLoader,可以获取各种插件配置
*/
LoadUtil.bind(allConf);
//我们位配置任务组的配置信息,isJob为ture
boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
//JobContainer会在schedule后再行进行设置和调整值
int channelNumber =0;
AbstractContainer container;
long instanceId;
int taskGroupId = -1;
if (isJob) {
//创建一个JobContainer,将配置信息都设置到JobContainer中,
//后续使用container执行
allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
container = new JobContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);
} else {
container = new TaskGroupContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
taskGroupId = allConf.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
channelNumber = allConf.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
}
//缺省打开perfTrace
boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);
//standalone模式的 datax shell任务不进行汇报
if(instanceId == -1){
perfReportEnable = false;
}
int priority = 0;
try {
priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
}catch (NumberFormatException e){
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
}
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
//最终的实际执行的逻辑
container.start();
}
- 执行任务的调度,JobContainer.start(),最终会通过JobContainer.schedule()进行任务的调度处理。
public void start() {
LOG.info("DataX jobContainer starts job.");
boolean hasException = false;
boolean isDryRun = false;
try {
this.startTimeStamp = System.currentTimeMillis();
isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
//模拟运行,不进行实际的数据的读取和写出。只是进行数据的预校验。该配置在属性job当中,
//即我们配置mysql2mysql.json当中,默认我们未配置isDryRun为false。
if(isDryRun) {
LOG.info("jobContainer starts to do preCheck ...");
this.preCheck();
} else {
userConf = configuration.clone();
LOG.debug("jobContainer starts to do preHandle ...");
//前置处理preHandler,实现AbstractPlugin接口,
//最终调用AbstractPlugin.preHandler()方法
this.preHandle();
LOG.debug("jobContainer starts to do init ...");
//初始化JobContainer的reader/writer属性,调用实际引用的
//Reader.Job.init()和Writer.Job.init()方法
this.init();
LOG.info("jobContainer starts to do prepare ...");
//调用对应的reader/writer的prepare前置处理方法
this.prepare();
LOG.info("jobContainer starts to do split ...");
//重新设置reader/writer的配置,当我们配置了多个任务时,即
//job.content设置多项时会出现,博主测下来只有第一个任务可以执行
//后续的不会执行,就算每个配置项的reader/writer配置相同,不清楚是不是bug。
this.totalStage = this.split();
LOG.info("jobContainer starts to do schedule ...");
//实际调用调度处理的逻辑,该逻辑是代码当中最核心的逻辑
this.schedule();
LOG.debug("jobContainer starts to do post ...");
//调用对应的reader/writer的post后置处理方法
this.post();
LOG.debug("jobContainer starts to do postHandle ...");
//后置处理postHandle,实现AbstractPlugin接口,
//最终调用AbstractPlugin.postHandler()方法
this.postHandle();
LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
//调用外部hook
this.invokeHooks();
}
} catch (Throwable e) {
//异常信息处理,此处省略。。。
} finally {
//如果实际执行调度,则打印统计信息
if(!isDryRun) {
this.destroy();
this.endTimeStamp = System.currentTimeMillis();
if (!hasException) {
//最后打印cpu的平均消耗,GC的统计
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
vmInfo.getDelta(false);
LOG.info(vmInfo.totalString());
}
LOG.info(PerfTrace.getInstance().summarizeNoException());
this.logStatistics();
}
}
}
}
- 任务调度处理JobContainer.schedule(),起内部调用了StandAloneScheduler.schedule(taskGroupConfigs)进行实际的调度处理。
public void schedule(List<Configuration> configurations) {
//省略部分代码。。。
//关键点,根据配置信息执行任务调度分配
startAllTaskGroup(configurations);
//省略部分代码。。。
}
- 通过线程池进行任务的调度处理,ProcessInnerScheduler.startAllTaskGroup。在实际调用当中我们可以发现最终使用的是TaskGroupContainer进行的处理,在job执行的配置文件,我们可以选择通过TaskGroupContainer直接执行还是通过JobContainer进行调度执行,
如果core.container.model为taskGroup就直接通过TaskGroupContainer执行,不会进行前置插件后置插件等逻辑的处理。
public void startAllTaskGroup(List<Configuration> configurations) {
this.taskGroupContainerExecutorService = Executors
.newFixedThreadPool(configurations.size());
for (Configuration taskGroupConfiguration : configurations) {
//TaskGroupContainer taskGroupContainer = new
// TaskGroupContainer(configuration);
//TaskGroupContainer就是实际执行的调用的执行组件,最开始我们执行的JobContainer
//我们可以根据mysql2mysql.json的配置直接执行,也可以通过JobContainer进行调度。
return new TaskGroupContainerRunner(taskGroupContainer);
TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
//使用线程池进行并行处理调用
this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
}
this.taskGroupContainerExecutorService.shutdown();
}
三、根据配置文件执行读取写入任务
- TaskGroupContainerRunner封装了TaskGroupContainer,TaskGroupContainerRunner实现了Runnable接口,当被线程池调用时内部实际调用了TaskGroupContainer.start()方法进行处理。
public void start() {
try {
//省略一部分代码。。。
while (true) {
//1.判断task状态,省略。。。
// 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误,省略代码。。。
//3.有任务未执行,且正在运行的任务数小于最大通道限制
Iterator<Configuration> iterator = taskQueue.iterator();
while(iterator.hasNext() && runTasks.size() < channelNumber){
//省略部分代码。。。
Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
//代码最关键的地方。此处就是实际调用reader和writer的入口,
//我们着重分析这个代码。
taskExecutor.doStart();
iterator.remove();
runTasks.add(taskExecutor);
//上面,增加task到runTasks列表,因此在monitor里注册。
taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));
taskFailedExecutorMap.remove(taskId);
LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started",
this.taskGroupId, taskId, attemptCount);
}
//4.任务列表为空,executor已结束, 搜集状态为success--->成功
//5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报
Thread.sleep(sleepIntervalInMillSec);
}
//6.最后还要汇报一次
reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
} catch (Throwable e) {
//省略。。。
}
}
- 通过taskExecutor.doStart()执行数据的同步,该类当中有几个和重要的属性,
taskConfig:job.content的内容、readerThread:读操作线程、writerThread:写操作线程、readerRunner:读操作的实际执行对象,它也为Runnable接口实现、writerRunner:写操作的实际执行对象,它也为Runnable接口实现。
public void doStart() {
//1.先启动写线程,执行WriterRunner.run()方法
this.writerThread.start();
if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
this.taskCommunication.getThrowable());
}
//2.后启动读线程,执行ReaderRunner.run()方法
this.readerThread.start();
if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
this.taskCommunication.getThrowable());
}
}
- 读操作是通过执行读线程readerThread执行ReaderRunner.run方法,最终会调用到我们自定义的reader实现类,即MysqlReader执行startRead方法。
public void startRead(RecordSender recordSender) {
int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
//这里会执行真正的查询操作
this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
super.getTaskPluginCollector(), fetchSize);
}
- 读取操作最终对数据库进行读取操作时会调用CommonRdbmsReader.startRead方法。
public void startRead(Configuration readerSliceConfig,
RecordSender recordSender,
TaskPluginCollector taskPluginCollector, int fetchSize) {
//需要读取的数据的sql
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
//需要读取的数据表
String table = readerSliceConfig.getString(Key.TABLE);
PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);
LOG.info("Begin to read record by Sql: [{}\n] {}.",
querySql, basicMsg);
PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY);
queryPerfRecord.start();
//获取数据库连接
Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,
username, password);
DBUtil.dealWithSessionConfig(conn, readerSliceConfig,
this.dataBaseType, basicMsg);
int columnNumber = 0;
ResultSet rs = null;
try {
//获取所有的查询结果列表数据。
rs = DBUtil.query(conn, querySql, fetchSize);
queryPerfRecord.end();
ResultSetMetaData metaData = rs.getMetaData();
columnNumber = metaData.getColumnCount();
PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
allResultPerfRecord.start();
long rsNextUsedTime = 0;
long lastTime = System.nanoTime();
while (rs.next()) {
rsNextUsedTime += (System.nanoTime() - lastTime);
//将读取的到的所有的数据都通过recordSender传递给writer,默认使用的是
//BufferedRecordExchanger,将读取到的数据写入到缓冲区当中。
this.transportOneRecord(recordSender, rs,
metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
lastTime = System.nanoTime();
}
allResultPerfRecord.end(rsNextUsedTime);
LOG.info("Finished read record by Sql: [{}\n] {}.",
querySql, basicMsg);
}catch (Exception e) {
throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally {
DBUtil.closeDBResources(null, conn);
}
}
- WriterRunner通过读取到缓冲区BufferedRecordExchanger的数据后,交由具体的writer进行处理,我们示例使用的是Mysqlwriter,最终调用Mysqlwriter.startWrite()方法处理缓冲区的数据。
public void startWrite(RecordReceiver recordReceiver) {
//调用CommonRdbmsWriter进行处理收到的缓冲区数据
this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,
super.getTaskPluginCollector());
}
- 通过CommonRdbmsWriter将读取到的缓冲区的数据进行处理写入到配置的数据库当中。
public void startWrite(RecordReceiver recordReceiver,
Configuration writerSliceConfig,
TaskPluginCollector taskPluginCollector) {
//获取连接
Connection connection = DBUtil.getConnection(this.dataBaseType,
this.jdbcUrl, username, password);
DBUtil.dealWithSessionConfig(connection, writerSliceConfig,
this.dataBaseType, BASIC_MESSAGE);
//处理数据
startWriteWithConnection(recordReceiver, taskPluginCollector, connection);
}
public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
this.taskPluginCollector = taskPluginCollector;
// 用于写入数据的时候的类型根据目的表字段类型转换
this.resultSetMetaData = DBUtil.getColumnMetaData(connection,
this.table, StringUtils.join(this.columns, ","));
// 写数据库的SQL语句
calcWriteRecordSql();
List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
int bufferBytes = 0;
try {
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != this.columnNumber) {
// 源头读取字段列数与目的表字段写入列数不相等,直接报错
throw DataXException
.asDataXException(
DBUtilErrorCode.CONF_ERROR,
String.format(
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
record.getColumnNumber(),
this.columnNumber));
}
writeBuffer.add(record);
bufferBytes += record.getMemorySize();
if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
}
}
if (!writeBuffer.isEmpty()) {
doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
}
} catch (Exception e) {
throw DataXException.asDataXException(
DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
writeBuffer.clear();
bufferBytes = 0;
DBUtil.closeDBResources(null, null, connection);
}
}
总结
datax的整体原理就是,根据我们配置的文件,加载不同的reader和writer,reader通过配置的单独的read线程将源数据库数据读取出来写入到缓冲区当中,writer通过线程将缓冲区的数据读取出来写入到目标数据库当中。
我们在实际的工作当中可能更需要的关注对于这个reader和writer的扩展,对于datax都已经提供了plugin生命周期的函数处理,前置处理后置处理等函数,如果我们需要自定义数据库的reader和writer逻辑时,就需要自定义Reader.Task的startRead接口,以及Writer.Task的startWrite接口去自定对于缓冲区数据的处理逻辑。