Linux中DataX使用第四期
简介
紧接着上期关于定义如何一个简单的插件,本期了解下关系型数据库的数据读取和数据写入。
环境
- Windows10 (linux中命令相似,为了方面调试就用windows的)
- JDK(1.8以上,推荐1.8)
- Python(2或3都可以)
- Apache Maven (推荐3.x版本)
- IntelliJ IDEA 2023.2.2 (IDE没要求,能打开maven项目就行)
- 源码下载地址
内容
DataX运行的时序图
- 对于关系型数据库DataX内有一个公共的调用工具。
CommonRdbmsReader
包含两个内部类Job
和Task
的公共类,分别用于处理作业级别和任务级别的操作。
Job
类负责作业的初始化、预检查、拆分、以及作业结束时的清理工作。
init(Configuration originalConfig)
: 初始化作业配置。preCheck(Configuration originalConfig, DataBaseType dataBaseType)
: 预检查数据库连接和查询语句的有效性。split(Configuration originalConfig, int adviceNumber)
: 将作业拆分为多个任务。post(Configuration originalConfig)
和destroy(Configuration originalConfig)
: 作业结束时的清理工作。
Task
类负责单个任务的初始化、数据读取和任务结束时的清理工作。
init(Configuration readerSliceConfig)
: 初始化任务配置,包括数据库连接信息。startRead(Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector, int fetchSize)
: 开始读取数据,执行查询并将结果发送到记录发送器。post(Configuration originalConfig)
和destroy(Configuration originalConfig)
: 任务结束时的清理工作。transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, TaskPluginCollector taskPluginCollector)
: 处理并传输一条记录。buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, TaskPluginCollector taskPluginCollector)
: 根据结果集构建记录。
核心代码
public void startRead(Configuration readerSliceConfig,
RecordSender recordSender,
TaskPluginCollector taskPluginCollector, int fetchSize) {
// 从配置中获取查询SQL语句
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
// 从配置中获取表名
String table = readerSliceConfig.getString(Key.TABLE);
// 将任务详情添加到性能跟踪实例中
PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);
// 打印日志,表示开始读取记录
LOG.info("Begin to read record by Sql: [{}\n] {}.",
querySql, basicMsg);
// 创建一个性能记录对象,记录SQL查询阶段
PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY);
queryPerfRecord.start();
// 获取数据库连接
Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,
username, password);
// session config .etc related
DBUtil.dealWithSessionConfig(conn, readerSliceConfig,
this.dataBaseType, basicMsg);
int columnNumber = 0;
ResultSet rs = null;
try {
rs = DBUtil.query(conn, querySql, fetchSize);
queryPerfRecord.end();
ResultSetMetaData metaData = rs.getMetaData();
columnNumber = metaData.getColumnCount();
//这个统计干净的result_Next时间
PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
allResultPerfRecord.start();
long rsNextUsedTime = 0;
long lastTime = System.nanoTime();
while (rs.next()) {
rsNextUsedTime += (System.nanoTime() - lastTime);
this.transportOneRecord(recordSender, rs,
metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
lastTime = System.nanoTime();
}
allResultPerfRecord.end(rsNextUsedTime);
//目前大盘是依赖这个打印,而之前这个Finish read record是包含了sql查询和result next的全部时间
LOG.info("Finished read record by Sql: [{}\n] {}.",
querySql, basicMsg);
}catch (Exception e) {
throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally {
DBUtil.closeDBResources(null, conn);
}
}
这段Java代码是一个数据库读取任务的核心实现,用于从数据库中读取数据。它实现了startRead
方法,该方法接收几个参数,包括配置信息、记录发送器、任务插件收集器以及每次查询的记录数。
方法参数
readerSliceConfig
:包含数据库连接和查询配置的Configuration
对象。recordSender
:用于发送读取到的记录的对象。taskPluginCollector
:用于收集任务插件的错误信息的对象。fetchSize
:每次查询数据库时获取的记录数。
1.变量初始化
- 从配置中获取SQL查询语句和表名。
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
String table = readerSliceConfig.getString(Key.TABLE);
2.性能跟踪
- 使用
PerfTrace
和PerfRecord
进行性能跟踪,记录任务开始时间和详细信息。
PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);
LOG.info("Begin to read record by Sql: [{}\n] {}.", querySql, basicMsg);
PerfRecord queryPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.SQL_QUERY);
queryPerfRecord.start();
3.数据库连接
- 使用
DBUtil
工具类获取数据库连接,并根据配置处理会话配置。
Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl, username, password);
DBUtil.dealWithSessionConfig(conn, readerSliceConfig, this.dataBaseType, basicMsg);
4.查询数据库
- 执行SQL查询并获取结果集,记录查询结束时间,获取结果集的元数据(列数)。
int columnNumber = 0;
ResultSet rs = null;
try {
rs = DBUtil.query(conn, querySql, fetchSize);
queryPerfRecord.end();
ResultSetMetaData metaData = rs.getMetaData();
columnNumber = metaData.getColumnCount();
5.读取结果集
- 使用
while
循环遍历结果集,记录每条记录的读取时间,并通过transportOneRecord
方法发送记录。 transportOneRecord表示
从数据库查询结果集中读取数据,并根据数据类型将其转换为相应的数据列,最终构建一个完整的记录对象
PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
allResultPerfRecord.start();
long rsNextUsedTime = 0;
long lastTime = System.nanoTime();
while (rs.next()) {
rsNextUsedTime += (System.nanoTime() - lastTime);
this.transportOneRecord(recordSender, rs, metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
lastTime = System.nanoTime();
}
allResultPerfRecord.end(rsNextUsedTime);
LOG.info("Finished read record by Sql: [{}\n] {}.", querySql, basicMsg);
6.异常处理
- 捕获异常并抛出自定义的数据库查询异常,最后确保关闭数据库连接。
} catch (Exception e) {
throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally {
DBUtil.closeDBResources(null, conn);
}
总结
这段代码的主要功能是从数据库中读取数据,并通过性能跟踪记录查询和读取过程的时间。它使用了JDBC来连接和查询数据库,并通过RecordSender
发送读取到的记录。代码中还包含了异常处理和资源释放的逻辑,确保在发生异常时能够正确处理并释放数据库连接。
CommonRdbmsWriter
也包含两个内部类Job
和Task
的公共类,分别用于处理作业级别和任务级别的操作。
Job
类负责作业的初始化、预检查、拆分、以及作业结束时的清理工作。
init(Configuration originalConfig)
:初始化作业配置。writerPreCheck(Configuration originalConfig, DataBaseType dataBaseType)
:进行写前检查,包括SQL语法检查和权限检查。prepare(Configuration originalConfig)
:执行作业前的准备工作,如执行预SQL语句。split(Configuration originalConfig, int mandatoryNumber)
:将作业配置分割成多个任务配置。post(Configuration originalConfig)
:执行作业后的操作,如执行后SQL语句。destroy(Configuration originalConfig)
:销毁作业资源。
Task
类负责单个任务的初始化、数据读取和任务结束时的清理工作。
init(Configuration writerSliceConfig)
:初始化任务配置。prepare(Configuration writerSliceConfig)
:执行任务前的准备工作。startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection)
:使用给定的数据库连接开始写入数据。startWrite(RecordReceiver recordReceiver, Configuration writerSliceConfig, TaskPluginCollector taskPluginCollector)
:开始写入数据。post(Configuration writerSliceConfig)
:执行任务后的操作。destroy(Configuration writerSliceConfig)
:销毁任务资源。doBatchInsert(Connection connection, List<Record> buffer)
:批量插入数据。doOneInsert(Connection connection, List<Record> buffer)
:逐条插入数据。fillPreparedStatement(PreparedStatement preparedStatement, Record record)
:填充PreparedStatement对象。fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column)
:根据字段类型填充PreparedStatement对象。calcWriteRecordSql()
:计算写入记录的SQL语句。calcValueHolder(String columnType)
:计算值占位符。
核心代码
public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
System.err.println("CommonRdbmsWriter Task startWriteWithConnection");
this.taskPluginCollector = taskPluginCollector;
// 用于写入数据的时候的类型根据目的表字段类型转换
this.resultSetMetaData = DBUtil.getColumnMetaData(connection,
this.table, StringUtils.join(this.columns, ","));
// 写数据库的SQL语句
calcWriteRecordSql();
List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
int bufferBytes = 0;
try {
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != this.columnNumber) {
// 源头读取字段列数与目的表字段写入列数不相等,直接报错
throw DataXException
.asDataXException(
DBUtilErrorCode.CONF_ERROR,
String.format(
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
record.getColumnNumber(),
this.columnNumber));
}
writeBuffer.add(record);
bufferBytes += record.getMemorySize();
if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
}
}
if (!writeBuffer.isEmpty()) {
doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
}
} catch (Exception e) {
throw DataXException.asDataXException(
DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
writeBuffer.clear();
bufferBytes = 0;
DBUtil.closeDBResources(null, null, connection);
}
}
方法参数
recordReceiver
:用于接收从数据源读取的数据记录。taskPluginCollector
:用于收集任务执行过程中的信息,比如错误信息。connection
:数据库连接对象,用于与数据库进行交互。
1.初始化任务收集器
将传入的任务收集器赋值给当前对象的成员变量。
this.taskPluginCollector = taskPluginCollector;
2.获取列元数据
调用DBUtil
工具类的方法,获取目标表的列元数据,包括列名、类型等。
this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(this.columns, ","));
3.计算写入SQL语句
调用方法计算写入数据库的SQL语句。
calcWriteRecordSql();
4.初始化缓冲区
初始化一个记录列表作为缓冲区,用于批量写入数据。
List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
int bufferBytes = 0;
5.读取并写入数据
- 从
recordReceiver
中读取记录,直到没有更多记录。 - 检查读取的记录列数是否与目标表列数一致,不一致则抛出异常。
- 将记录添加到缓冲区,并计算缓冲区的大小。
- 当缓冲区达到指定大小(行数或字节数)时,调用
doBatchInsert
方法批量写入数据,并清空缓冲区
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != this.columnNumber) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, String.format("列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.", record.getColumnNumber(), this.columnNumber));
}
writeBuffer.add(record);
bufferBytes += record.getMemorySize();
if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
}
}
6.处理剩余记录
如果在读取完所有记录后,缓冲区中还有剩余的记录,则进行最后一次批量写入。
if (!writeBuffer.isEmpty()) {
doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
}
7.异常处理
捕获并抛出自定义的DataX异常。
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
8.资源清理
清空缓冲区,释放数据库连接资源。
finally {
writeBuffer.clear();
bufferBytes = 0;
DBUtil.closeDBResources(null, null, connection);
}
总结
这段代码的主要目的是从recordReceiver
中读取记录读取数据,然后按照配置的批量大小和字节数,将数据写入目标数据库。
结语
对于关系型数据库DataX基本都实现了,如果有定制化需求可以通过对上面的代码模块进行复制后改造来实现。同时DataX还支持非结构化存储(图片、文件、视频等)数据同步,可参考plugin-unstructured-storage-util模块。