当前位置: 首页 > article >正文

开源DataX集成可视化项目Datax-Web的使用

上一篇文章我们已经搭建好了 Datax-Web 后台,这篇文章我们具体讲一下如何通过Datax-Web来配置,同步MySQL数据库。

目标

image-20230321171446281

MySql数据库全量同步

datax-web-shuju

1.执行器配置

datax-webx12

1、"调度中心OnLine:"右侧显示在线的"调度中心"列表, 任务执行结束后, 将会以failover的模式进行回调调度中心通知执行结果, 避免回调的单点风险;

2、“执行器列表” 中显示在线的执行器列表, 可通过"OnLine 机器"查看对应执行器的集群机器;

image-20230327164734793

1、AppName: (与datax-executor中application.yml的datax.job.executor.appname保持一致)
每个执行器集群的唯一标示AppName, 执行器会周期性以AppName为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用;

2、名称: 执行器的名称, 因为AppName限制字母数字等组成,可读性不强, 名称为了提高执行器的可读性;

3、排序: 执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表;

4、注册方式:调度中心获取执行器地址的方式;

自动注册:执行器自动进行执行器注册,调度中心通过底层注册表可以动态发现执行器机器地址;

手动录入:人工手动录入执行器的地址信息,多地址逗号分隔,供调度中心使用;

5、机器地址:"注册方式"为"手动录入"时有效,支持人工维护执行器的地址信息;

2.创建数据源

数据源管理—>添加

image-20230327165806036

如图填写MySQL的账号信息,点击测试连接,无误之后确认。

第四步使用

3.创建任务模版

image-20230328150138528

第四步使用

4. 构建JSON脚本

1.任务批量构建

步骤一,步骤二,选择第二步中创建的数据源,JSON构建目前支持的数据源有hive,mysql,oracle,postgresql,sqlserver,hbase,mongodb,clickhouse 其它数据源的JSON构建正在开发中,暂时需要手动编写。

任务管理—>任务批量构建—>选择数据库源

image-20230328163521242

2.字段映射

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SMl6KYWJ-1680341307925)(H:/typora_images/image-20230328163800009.png)]

3.批量创建任务

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1ywQmUes-1680341307925)(H:/typora_images/image-20230328163835852.png)]

手动执行一次

4.启动任务

image-20230328164122275

查看日志

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ar0DUS01-1680341307926)(H:/typora_images/image-20230328164221949.png)]

报错

2023-03-28 16:41:14 [JobThread.run-130] <br>----------- datax-web job execute start -----------<br>----------- Param:
2023-03-28 16:41:14 [BuildCommand.buildDataXParam-100] ------------------Command parameters:
2023-03-28 16:41:14 [ExecutorJobHandler.execute-57] ------------------DataX process id: 29802
2023-03-28 16:41:14 [AnalysisStatistics.analysisStatisticsLog-53]   File "/data/datax/bin/datax.py", line 114
2023-03-28 16:41:14 [AnalysisStatistics.analysisStatisticsLog-53]     print readerRef
2023-03-28 16:41:14 [AnalysisStatistics.analysisStatisticsLog-53]           ^
2023-03-28 16:41:14 [AnalysisStatistics.analysisStatisticsLog-53] SyntaxError: Missing parentheses in call to 'print'. Did you mean print(readerRef)?
2023-03-28 16:41:14 [JobThread.run-165] <br>----------- datax-web job execute end(finish) -----------<br>----------- ReturnT:ReturnT [code=500, msg=command exit value(1) is failed, content=null]
2023-03-28 16:41:14 [ProcessCallbackThread.callbackLog-186] <br>----------- datax-web job callback finish.
2023-03-28 16:41:14 [TriggerCallbackThread.callbackLog-186] <br>----------- datax-web job callback finish.

经过查询是本机装了多版本的python

[root@node3 bin]#  whereis python
python: /usr/bin/python /usr/bin/python2.7 /usr/bin/python3.6 /usr/bin/python3.6m /usr/lib/python2.7 /usr/lib/python3.6 /usr/lib64/python2.7 /usr/lib64/python3.6 /etc/python /usr/include/python2.7 /usr/include/python3.6m /root/anaconda3/bin/python /root/anaconda3/bin/python3.9 /root/anaconda3/bin/python3.9-config /usr/share/man/man1/python.1.gz

[root@node3 bin]# python -V
Python 3.9.13
[root@node3 bin]# /usr/bin/python -V
Python 2.7.5

经过修复使Python改为2.7再执行任务

[root@node3 ~]# python -V
Python 2.7.5

还有一种修复方式是

Python (2.x) (支持Python3需要修改替换datax/bin下面的三个python文件,替换文件在doc/datax-web/datax-python3下) 必选,主要用于调度执行底层DataX的启动脚本,默认的方式是以Java子进程方式执行DataX,用户可以选择以Python方式来做自定义的改造

5.查看任务

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FC5KjxMD-1680341307927)(H:/typora_images/image-20230329112816134.png)]

查看日志:

image-20230329112858101

再用Navicat 查看目标库中数据是否一致。

image-20230329113024046

DataX-Web增量配置说明

一、根据日期进行增量数据抽取

1.页面任务配置

打开菜单任务管理页面,选择添加任务

按下图中5个步骤进行配置

datax-webx13

  • 1.任务类型选DataX任务
  • 2.辅助参数选择时间自增
  • 3.增量开始时间选择,即sql中查询时间的开始时间,用户使用此选项方便第一次的全量同步。第一次同步完成后,该时间被更新为上一次的任务触发时间,任务失败不更新。
  • 4.增量时间字段,-DlastTime=‘%s’ -DcurrentTime=‘%s’ 先来解析下这段字符串
1.-D是DataX参数的标识符,必配
2.-D后面的lastTime和currentTime是DataX json中where条件的时间字段标识符,必须和json中的变量名称保持一致
3.='%s'是项目用来去替换时间的占位符,比配并且格式要完全一致
4.注意-DlastTime='%s'和-DcurrentTime='%s'中间有一个空格,空格必须保留并且是一个空格
  • 5.时间格式,可以选择自己数据库中时间的格式,也可以通过json中配置sql时间转换函数来处理

注意,注意,注意: 配置一定要仔细看文档(后面我们也会对这块配置进行优化,避免大家犯错)

2.JSON配置

datax.json

{
  "job": {
    "setting": {
      "speed": {
        "channel": 16
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "splitPk": "id",
            "username": "root",
            "password": "root",
            "column": [
              "*"

            ],
            "connection": [
              {
                
                "jdbcUrl": [
                  "jdbc:mysql://localhost:3306/test?characterEncoding=utf8"
                ],
				"querySql": [
        "select * from test_list where operationDate >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})"
                                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
           
            "username": "root",
            "password": "123456",
            "column": [
              "*"
            ],
            "batchSize": "4096",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://localhost:3307/test?characterEncoding=utf8",
                "table": [
                  "test_list"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

querySql解析

select * from test_list where operationDate >= ${lastTime} and operationDate < ${currentTime}
  • 1.此处的关键点在 l a s t T i m e , {lastTime}, lastTime{currentTime},${}是DataX动态参数的固定格式,lastTime,currentTime就是我们页面配置中 -DlastTime=‘%s’ -DcurrentTime='%s’中的lastTime,currentTime,注意字段一定要一致。
  • 2.如果任务配置页面,时间类型选择为时间戳但是数据库时间格式不是时间戳,例如是:2019-11-26 11:40:57 此时可以用FROM_UNIXTIME(${lastTime})进行转换。
select * from test_list where operationDate >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})

二、根据自增Id进行增量数据抽取

1.页面任务配置

打开菜单任务管理页面,选择添加任务

按下图中4个步骤进行配置

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QQI3EmxB-1680341307929)(H:/typora_images/68747470733a2f2f64617461782d7765622e6f73732d636e2d68616e677a686f752e616c6979756e63732e636f6d2f646f632f69642d696e6372656d656e742e6a7067)]

  • 1.任务类型选DataX任务
  • 2.辅助参数选择主键自增
  • 3.增量主键开始ID选择,即sql中查询ID的开始ID,用户使用此选项方便第一次的全量同步。第一次同步完成后,该ID被更新为上一次的任务触发时最大的ID,任务失败不更新。
  • 4.增量时间字段,-DstartId=‘%s’ -DendId=‘%s’ 先来解析下这段字符串
1.-D是DataX参数的标识符,必配
2.-D后面的startId和endId是DataX json中where条件的id字段标识符,必须和json中的变量名称保持一致,endId是任务在每次执行时获取当前表maxId,也是下一次任务的startId
3.='%s'是项目用来去替换时间的占位符,比配并且格式要完全一致
4.注意-DstartId='%s'和-DendId='%s' 中间有一个空格,空格必须保留并且是一个空格
5.reader数据源,选择任务同步的读数据源
6.配置reader数据源中需要同步数据的表名及该表的主键

注意,注意,注意: 一定要仔细看文档(后续会对这块配置进行优化,避免大家犯错)

2.JSON配置

datax.json

{
   "job": {
     "setting": {
       "speed": {
         "channel": 3,
         "byte": 1048576
       },
       "errorLimit": {
         "record": 0,
         "percentage": 0.02
       }
     },
     "content": [
       {
         "reader": {
           "name": "mysqlreader",
           "parameter": {
             "username": "yRjwDFuoPKlqya9h9H2Amg==",
             "password": "yRjwDFuoPKlqya9h9H2Amg==",
             "splitPk": "",
             "connection": [
               {
                 "querySql": [
                   "select * from job_log where id>= ${startId} and id< ${endId}"
                 ],
                 "jdbcUrl": [
                   "jdbc:mysql://localhost:3306/datax_web"
                 ]
               }
             ]
           }
         },
         "writer": {
           "name": "mysqlwriter",
           "parameter": {
             "username": "mCFD+p1IMsa0rHicbQohcA==",
             "password": "PhYxJmA/nuBJD1OxKTRzZH8sxuRddOv83hdqDOVR+i0=",
             "column": [
               "`id`",
               "`job_group`",
               "`job_id`",
               "`job_desc`",
               "`executor_address`",
               "`executor_handler`",
               "`executor_param`",
               "`executor_sharding_param`",
               "`executor_fail_retry_count`",
               "`trigger_time`",
               "`trigger_code`",
               "`trigger_msg`",
               "`handle_time`",
               "`handle_code`",
               "`handle_msg`",
               "`alarm_status`",
               "`process_id`",
               "`max_id`"
             ],
             "connection": [
               {
                 "table": [
                   "job_log"
                 ],
                 "jdbcUrl": "jdbc:mysql://47.98.125.243:3306/datax_web"
               }
             ]
           }
         }
       }
     ]
   }
 }

querySql解析

select * from job_log where id>= ${startId} and id< ${endId}
  • 1.此处的关键点在 s t a r t I d , {startId}, startId{endId},${}是DataX动态参数的固定格式,startId,endId就是我们页面配置中 -DstartId=‘%s’ -DendId='%s’中的startId,endId,注意字段一定要一致。

三、JVM启动参数配置

此选择为非必选,可以配置DataX启动时JVM的参数,具体配置不做详解。

JVM启动参数拼接结果为: -j "-Xms2G -Xmx2G"

参考

https://github.com/WeiYe-Jing/datax-web

https://github.com/WeiYe-Jing/datax-web/blob/master/doc/datax-web/increment-desc.md


http://www.kler.cn/a/5263.html

相关文章:

  • LeetCode -Hot100 - 53. 最大子数组和
  • 【C++习题】20. 两个数组的交集
  • QT自定义工具条渐变背景颜色一例
  • 细说STM32F407单片机以轮询方式读写外部SRAM的方法
  • Flutter Web 中文字体显示异常问题
  • MATLAB对文件处理
  • 膳食真菌在癌症免疫治疗中的作用: 从肠道微生物群的角度
  • 【HTTP详解】常用的14个HTTP状态码
  • ChatGPT开始威胁程序员的核心能力了!
  • Java设计模式(九)—— 中介者模式
  • 从NLP视角看电视剧《狂飙》,会有什么发现?
  • 15. 三数之和(Java)
  • 软件架构class-5-ORMapping思想
  • 发现一个白嫖GPT4.0的方法!真的是完胜3.5!
  • 二叉树练习题(数据结构系列10)
  • 【linu】ARM安装vscode服务器,本地vscode远程服务器开发
  • Redis面试题 (2023最新版)
  • 嵌入式系统概括
  • unity3d:asset store上C#代码热重载插件,不需要重运行,重新加载更新后函数
  • 字符串(二)-- LC[17] 电话号码的字母组合
  • Vision Pro 自己写的一些自定义工具(c#)
  • 深入探究PyTorch:理解PyTorch的基础知识(一)
  • 大数据学完好就业么
  • Typora 修改表格宽度
  • js时间前五分钟 + utc转本地时间
  • PUSCH接收端处理流程学习