当前位置: 首页 > article >正文

Linux中DataX使用第三期

简介

  紧接着上期关于DataX源码的初步了解,本期来自己定义一个简单的数据读取和数据写入插件。目的为了方便了解DataX工作的流程。

环境

  •  Windows10 (linux中命令相似,为了方面调试就用windows的)
  • JDK(1.8以上,推荐1.8)
  • Python(2或3都可以)
  • Apache Maven (推荐3.x版本)
  • IntelliJ IDEA 2023.2.2 (IDE没要求,能打开maven项目就行)
  • 源码下载地址
  • 本期使用到的代码

开始制作

  1.创建新插件模块

  首先找下一个简单reader和writer的模块(这里找到是streamreaderstreamwriter),然后复制一份代码把插件名字改下(这里我改成demoreaderdemowriter),并把这个模块添加到主项目中。

2.调整新插件代码

  替换代码中的 streamreader  StreamWeader  demoreader 和 DemoReader。DemoWriter模块中也一样的操作。

这个plugin.json比较重要,就是通过这文件调用插件的

3.使用新插件

  在测试类中添加新测试类。

DemoReader2DemoWriterTest

import com.alibaba.datax.example.ExampleContainer;
import com.alibaba.datax.example.util.PathUtil;
import org.junit.Test;

/**
 * {@code Author} 
 * {@code Date} 
 */

public class DemoReader2DemoWriterTest {
    @Test
    public void testDemoReader2DemoWriter() {
        String path = "/demo2demo.json";
        String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
        ExampleContainer.start(jobPath);
    }
}

demo2demo.json

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "demoreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好,世界-DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "demowriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5
      }
    }
  }
}

添加依赖

        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>demoreader</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>demowriter</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

执行测试类

日志打印显示调用新的插件(打印日志代码在DemoReader 和DemoWriter类的任意方法中都行 ,不一定要在初始化方法里面)

到这里这新的插件就配置好了,接下来就可以自定义开发了。

4.代码改造

DemoReader 和DemoWriter类中Job接口功能如下:

  • init: Job对象初始化工作,此时可以通过super.getPluginJobConf()获取与本插件相关的配置。读插件获得配置中reader部分,写插件获得writer部分。
  • prepare: 全局准备工作,比如odpswriter清空目标表。
  • split: 拆分Task。参数adviceNumber框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task的配置列表。
  • post: 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。
  • destroy: Job对象自身的销毁工作。

DemoReader 和DemoWriter类中Task接口功能如下:

  • init:Task对象的初始化。此时可以通过super.getPluginJobConf()获取与本Task相关的配置。这里的配置是Jobsplit方法返回的配置列表中的其中一个。
  • prepare:局部的准备工作
  • startRead: 从数据源读数据,写入到RecordSender中。RecordSender会把数据写入连接Reader和Writer的缓存队列
  • startWrite:从RecordReceiver中读取数据,写入目标数据源。RecordReceiver中的数据来自Reader和Writer之间的缓存队列
  • post: 局部的后置工作
  • destroy: Task象自身的销毁工作。

需要注意的是:

  • JobTask之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。
  • preparepostJobTask中都存在,插件需要根据实际情况确定在什么地方执行操作。

核心流程图

  上图可以通过在每个生命周期的方法类打印日志来进行验证(我测试的时候都有打印)。

  以上都是来自于官网的解释,还是比较简单易懂的。个人的理解类似HTTP请求,reader是请求方,writer是响应方。首选把数据存放到一个共享缓存位置,通过对数据的写入和读取来实现数据的传输,这里的核心在于数据的读取和数据的写入,于是做了代码的调整。

		public void startRead(RecordSender recordSender) {
			System.err.println("Task - 开始读取方法,用于开始读取数据___start。");
			while (this.sliceRecordCount > 0) {
				// 自定义数据,每循环一次生成一条数据, 方便演示就不通过*.json文件获取
				Record oneRecord = recordSender.createRecord();
				oneRecord.addColumn(new StringColumn("name"
						+ ThreadLocalRandom.current().nextInt(10000, 100000)
				));
				oneRecord.addColumn(new StringColumn("work"
						+ ThreadLocalRandom.current().nextInt(10000, 100000)
				));
				recordSender.sendToWriter(oneRecord);
				this.sliceRecordCount--;
			}
			System.err.println("Task - 开始读取方法,用于开始读取数据___end。");
		}

 public void startWrite(RecordReceiver recordReceiver) {
            System.err.println("Task - 开始写入方法,用于开始写入数据___start。");
                try {
                    Record record;
                    while ((record = recordReceiver.getFromReader()) != null) {
                        if (this.print) {
                            System.err.println(recordToString(record));
                        } else {
                            /* do nothing */
                        }
                    }

                } catch (Exception e) {
                    throw DataXException.asDataXException(DemoWriterErrorCode.RUNTIME_EXCEPTION, e);
                }
            System.err.println("Task - 开始写入方法,用于开始写入数据___end。");

        }

日志输出结果

这里如果是读取数据库里面的数据,以及是向数据库里面写入数据,这样就可以实现各种数据库数据的传输了,当然Datax核心还是对数据的处理和转换,这些需要开发者自己去思考。

结语

  本期主要说了怎么去开发一个属于自己的插件,核心就是对Datax开发插件配置以及Job和Task这个两个类的生命周期的了解,这样可以在已有的插件基础上进行快速的改造。下期来说说怎么通过DataX生命周期来进行各种数据的传输。


http://www.kler.cn/a/536206.html

相关文章:

  • 第6章《VTK与Qt集成》
  • [含文档+PPT+源码等]精品大数据项目-Django基于大数据实现的心血管疾病分析系统
  • 2025年Android NDK超全版本下载地址
  • 神经网络常见激活函数 1-sigmoid函数
  • Spring Bean生命周期
  • 10. 神经网络(二.多层神经网络模型)
  • Java 8的Stream API
  • 栈和队列的实现(C语言)
  • 解决aspose将Excel转成PDF中文变成方框的乱码问题
  • esp32 udp 客户端 广播
  • 【Elasticsearch】nested聚合
  • Day67:类的继承
  • 树莓派5添加摄像头 在C++下调用opencv
  • Junit5使用教程(6)--高级特性2
  • HTML学习之CSS三种引入方式
  • 基于JavaWeb开发的java Springboot实现教务管理系统
  • 介绍10个比较优秀好用的Qt相关的开源库
  • Linux后台运行进程
  • 网络安全 | 什么是XSS跨站脚本攻击?
  • 如何利用 Python 爬虫按关键字搜索淘宝商品
  • C++基础系列【5】namespace using
  • JAVA异步的TCP 通讯-客户端
  • 【STM32系列】利用MATLAB配合ARM-DSP库设计FIR数字滤波器(保姆级教程)
  • Shell-变量及注释
  • Qt修仙之路2-1 炼丹初成
  • JS的几种具体异常类型(报错)