Linux中DataX使用第三期
简介
紧接着上期关于DataX源码的初步了解,本期来自己定义一个简单的数据读取和数据写入插件。目的为了方便了解DataX工作的流程。
环境
- Windows10 (linux中命令相似,为了方面调试就用windows的)
- JDK(1.8以上,推荐1.8)
- Python(2或3都可以)
- Apache Maven (推荐3.x版本)
- IntelliJ IDEA 2023.2.2 (IDE没要求,能打开maven项目就行)
- 源码下载地址
- 本期使用到的代码
开始制作
1.创建新插件模块
首先找下一个简单reader和writer的模块(这里找到是streamreader和streamwriter),然后复制一份代码把插件名字改下(这里我改成demoreader和demowriter),并把这个模块添加到主项目中。
2.调整新插件代码
替换代码中的 streamreader 和 StreamWeader 为 demoreader 和 DemoReader。DemoWriter模块中也一样的操作。
这个plugin.json比较重要,就是通过这文件调用插件的
3.使用新插件
在测试类中添加新测试类。
DemoReader2DemoWriterTest
import com.alibaba.datax.example.ExampleContainer;
import com.alibaba.datax.example.util.PathUtil;
import org.junit.Test;
/**
* {@code Author}
* {@code Date}
*/
public class DemoReader2DemoWriterTest {
@Test
public void testDemoReader2DemoWriter() {
String path = "/demo2demo.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
}
}
demo2demo.json
{
"job": {
"content": [
{
"reader": {
"name": "demoreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "demowriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
添加依赖
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>demoreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>demowriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
执行测试类
日志打印显示调用新的插件(打印日志代码在DemoReader 和DemoWriter类的任意方法中都行 ,不一定要在初始化方法里面)
到这里这新的插件就配置好了,接下来就可以自定义开发了。
4.代码改造
DemoReader 和DemoWriter类中Job
接口功能如下:
init
: Job对象初始化工作,此时可以通过super.getPluginJobConf()
获取与本插件相关的配置。读插件获得配置中reader
部分,写插件获得writer
部分。prepare
: 全局准备工作,比如odpswriter清空目标表。split
: 拆分Task
。参数adviceNumber
框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task
的配置列表。post
: 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。destroy
: Job对象自身的销毁工作。
DemoReader 和DemoWriter类中Task
接口功能如下:
init
:Task对象的初始化。此时可以通过super.getPluginJobConf()
获取与本Task
相关的配置。这里的配置是Job
的split
方法返回的配置列表中的其中一个。prepare
:局部的准备工作。startRead
: 从数据源读数据,写入到RecordSender
中。RecordSender
会把数据写入连接Reader和Writer的缓存队列。startWrite
:从RecordReceiver
中读取数据,写入目标数据源。RecordReceiver
中的数据来自Reader和Writer之间的缓存队列。post
: 局部的后置工作。destroy
: Task象自身的销毁工作。
需要注意的是:
Job
和Task
之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。prepare
和post
在Job
和Task
中都存在,插件需要根据实际情况确定在什么地方执行操作。
核心流程图
上图可以通过在每个生命周期的方法类打印日志来进行验证(我测试的时候都有打印)。
以上都是来自于官网的解释,还是比较简单易懂的。个人的理解类似HTTP请求,reader是请求方,writer是响应方。首选把数据存放到一个共享缓存位置,通过对数据的写入和读取来实现数据的传输,这里的核心在于数据的读取和数据的写入,于是做了代码的调整。
public void startRead(RecordSender recordSender) {
System.err.println("Task - 开始读取方法,用于开始读取数据___start。");
while (this.sliceRecordCount > 0) {
// 自定义数据,每循环一次生成一条数据, 方便演示就不通过*.json文件获取
Record oneRecord = recordSender.createRecord();
oneRecord.addColumn(new StringColumn("name"
+ ThreadLocalRandom.current().nextInt(10000, 100000)
));
oneRecord.addColumn(new StringColumn("work"
+ ThreadLocalRandom.current().nextInt(10000, 100000)
));
recordSender.sendToWriter(oneRecord);
this.sliceRecordCount--;
}
System.err.println("Task - 开始读取方法,用于开始读取数据___end。");
}
public void startWrite(RecordReceiver recordReceiver) {
System.err.println("Task - 开始写入方法,用于开始写入数据___start。");
try {
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
if (this.print) {
System.err.println(recordToString(record));
} else {
/* do nothing */
}
}
} catch (Exception e) {
throw DataXException.asDataXException(DemoWriterErrorCode.RUNTIME_EXCEPTION, e);
}
System.err.println("Task - 开始写入方法,用于开始写入数据___end。");
}
日志输出结果
这里如果是读取数据库里面的数据,以及是向数据库里面写入数据,这样就可以实现各种数据库数据的传输了,当然Datax核心还是对数据的处理和转换,这些需要开发者自己去思考。
结语
本期主要说了怎么去开发一个属于自己的插件,核心就是对Datax开发插件配置以及Job和Task这个两个类的生命周期的了解,这样可以在已有的插件基础上进行快速的改造。下期来说说怎么通过DataX生命周期来进行各种数据的传输。