当前位置: 首页 > article >正文

数据集成工具 ---- datax 3.0

1、datax:

        是一个异构数据源离线同步工具,致力于实现关系型数据库(mysql、oracle等)hdfs、hive、hbase等各种异构数据源之间的数据同步

2、参考网址文献:

https://github.com/alibaba/DataX/blob/master/introduction.mdhttps://github.com/alibaba/DataX/blob/master/userGuid.mdhttps://github.com/alibaba/DataX/blob/master/introduction.md

3、Datax的框架设计:

Datax作为离线数据同步工具,主要的是采用了Framework+plugin架构构成,将数据源的读数据和写数据封装成对应的Reader和Writer插件,纳入到整体的同步框架中。

        1、Reader:作为数据的采集模块,负责采集数据源的数据,将数据发送给Framework

        2、Writer:作为数据写入模块,负责不断的向Framework取出数据,将数据写入到对应的目的端

        3、Framework:用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲、数据转换等核心技术问题。

4、Datax的核心架构:

Datax3.0 开源版本是支持单机多线程来完成同步作业运行,因为底层是使用java做开发。整体的架构:

模块的核心介绍:

        1、Datax完成单个数据同步做作业,被称之为job,Datax接收到一个job时就会启动一个进程来完成数据同步工作,所以Datax job 模块是单个作业的中枢管理中心,主要是承担数据清理,子任务切分、TaskGroup 管理。

        2、当Datax启动后,Datax job会根据不同的源数据将job切分成不同的Task,所以Task是Datax的最小作业单位,每一个Task都会负责一部分的数据同步。

        3、切分成多个Task后,Datax job 就会调用scheduler模块,根据配置的并发数量,将拆分的Task重新组合,组装成TaskGroup,每一个TaskGroup都负责一定的Task任务的执行,默认TaskGroup并发数量数5个。

        4、每一个Task都是由TaskGroup所监控执行启动,每一个Task启动后都会按照Reader---Channel---Writer的执行顺序执行。

        5、当任务启动后,Datax job就会监控所有的TaskGroup的执行情况,当所有的TaskGroup任务完成后,job就会退出,当出现异常,就会异常退出并且进程退出值非0.

5、Datax的核心优势:

        1、可靠的数据质量监控

        2、丰富的数据转换功能

        3、精准的控制速度

        4、容错机制:

                1、线程内部重试

                        DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。

                2、线程级别重试

                        目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。

        5、极简的体验

6、Datax与Sqoop的区别:
功能Dataxsqoop
运行模式单进程多线程MR
分布式是不支持分布式支持
流控需要定制
统计信息支持不支持,分布式的数据收集不方便
数据校验只有core部分有校验功能不支持,分布式的数据收集不方便
监控需要定制需要定制
7、Datax部署:

1、下载jar包:

        下载路径:https://github.com/alibaba/DataX

2、解压文件,配置环境变量:  

#解压jar包
tar -zxvf datax.tar.gz

#配置环境变量:
vim /etc/profile


export   DATAX_HOME=/user/loacl/soft/datax
export   PATH=.:$PATH:$DATAX_HOME/bin

#配置好环境变量,让配置文件生效
source /etc/profile

3、使执行文件拥有执行权:

添加执行权:

chmod +x  data.py
8、Datax的使用:
在datax中会自动的生成模板的命令:

datax.py -r streamreader -w streamwriter
        1、streamreader  to  streamwriter,数据打印在控制台上面

参数说明:

"sliceRecordCount": 100  #指定打印的个数

"channel": 1  #指定并发度
#创建json文件:

vim streamreadertostreamwriter.json


{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [
                            {
                                "type":"string",
                                "value":"wyz"
                            },
                            {
                                "type":"int",
                                "value":"18"
                            }
                        ], 
                        "sliceRecordCount": 100  #指定打印的个数
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1  #指定并发度
            }
        }
    }
}



#脚本执行命令:
datax.py streamreadertostreamwriter.json
        2、mysql to mysql
1、可以通过命令获取模板:
datax.py  -r mysqlreader -w mysqlwriter 

2、可以通过github上的模板进行编写:分别是mysqlreader和mysqlwriter,参数会比较详细


3、在插入数据的需要注意是在将数据写入的时候如果出现在数据,那么此时可能是创建的表出了问题
    例如:表中的某个字段是主键,主键唯一
vim mysqlreaderTomysqlwriter.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                                "id",
							 "name",
							 "age",
							 "clazz",
							 "gender"
                        
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://192.168.226.1:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""      #不是必须要写的,作用是可以在读数据时进行一次过滤
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                                "id",
							 "name",
							 "age",
							 "clazz",
							 "gender"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.226.1:3306/bigdata25", 
                                "table": ["data_test"]
                            }
                        ], 
                        "password": "123456", 
                        "preSql": [],   #不是必须写的,作用是再写入数据前可以执行该sql
                        "session": [], 
                        "username": "root", 
                        "writeMode": "insert"   #必选,指定数据写入的模式,分成三种:insert(一般默认)、replace、update
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}




#执行脚本:
datax.py   mysqlreaderTomysqlwriter.json

 将数据写入到mysql时,写入的表是需要提前创建的。

        3、mysql to hdfs

参数解释:

使用datax的时候hdfswriter只支持两种文件形式,分别是text和orc

"fileType": "text",  #支持两种方式:text和orc,text表示的是textfile,orc表示的orcfile

"compress": "", #指定文件的压缩形式,不指定代表不用压缩,
                text支持的压缩方式:gzip,bzip2,
                orc支持的压缩方式有NONE和SNAPPY

 "writeMode": "append"   #表示的是数据在写入的操作,分成三种:
                            append,写入前不做任何处理  
                            nonconflit 如果文件存在,直接报错  
                            truncate:如果文件存在,那就先删除在写入

"path": "/bigdata25/datax/datax_mysqltohdfs/"  #文件不存在是需要提前创建的

vim mysqlTohdfs.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"id",
                                "type":"string"
                            },
                            {
                                "name":"stu_name",
                                "type":"string" 
                            },
                            {
                                "name":"age",
                                "type":"string"
                            },
                            {
                                
                                "name":"clazz",
                                "type":"string"
                            },
                            {
                                "name":"gender",
                                "type":"string" 
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", 
                        "fileName": "stu_mysqltohdfs", 
                        "fileType": "text",  
                        "path": "/bigdata25/datax/datax_mysqltohdfs/", 
                        "writeMode": "append"                         												
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

#执行脚本:
datax.py mysqlTohdfs.json
        4、mysql to hive

 原理思想:

实际上还是将数据存入到hdfs上面,hive通过记录元数据信息来获取数据
原理:
	创建好hive表在保存在hdfs上,是有文件路径,然后通过写入到指定的hdfs文件路径就能将数据写入到hive表中
当开启hive的时候,在hive中创建的表会默认的存储hdfs的/user/hive/warehouse/目录下

前期准备:

前期准备:
启动hive(后台启动):nohup hive --service metastore &
连接hive:hive

创建hive表(在没有说明的情况下一般在都是创建一个外部表)
创建一个datax数据库:
	create database datax;
切换数据库:use datax
创建外部表:
	create external table if not exists datax_mysqltohive(
    	 id string,
         name string,
         age int,
         clazz string,
		gender string
    		)
row format delimited  fields terminated  by ',' stored as textfile

编写脚本:

vim mysqlTohive.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"id",
                                "type":"string"
                            },
                            {
                                "name":"stu_name",
                                "type":"string" 
                            },
                            {
                                "name":"age",
                                "type":"string"
                            },
                            {
                                
                                "name":"clazz",
                                "type":"string"
                            },
                            {
                                "name":"gender",
                                "type":"string" 
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", 
                        "fileName": "datax_mysqltohive1", 
                        "fileType": "text", 
                        "path": "/user/hive/warehouse/datax.db/datax_mysqltohive", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}


#执行脚本命令:
datax.py  mysqlTohive.json
        5、mysql to hbase(Hbase11XWriter)

参数解释:

"mode": "normal" 写hbase的模式,目前只支持normal模式

"hbaseConfig"  {"hbase.zookeeper.quorum": "***"} 描述:连接HBase集群需要的配置信息,JSON格式

"table": "writer" 表的名称,大小写比较敏感

"encoding" 编码方式

"rowkeyColumn" 描述:要写入的hbase的rowkey列。index:指定该列对应reader端column的索引,从0开始,若为常量index为-1;type:指定写入数据类型,用于转换HBase byte[];value:配置常量,常作为多个字段的拼接符。hbasewriter会将rowkeyColumn中所有列按照配置顺序进行拼接作为写入hbase的rowkey,不能全为常量


"versionColumn" 表示指定写入hbase的时间戳,支持当前时间、指定时间列、指定时间

前期准备:

前期准备:

启动zookeeper:
	zkServer.sh start(每一个节点上都是需要启动的)
查看zk的状态:
	zkServer.sh status 

启动hbase:
	start-hbase.sh

连接hbase:
	sqlline.py master,node1,node2

进入hbase的客户端:hbase shell 

hbase中查看表的命令:!table
退出命令 !quit
查看表:list
在hbase中创建创建表,指定表名和列簇:create 'student','cf1'
				

编写脚本:

vim mysqlTohbase.json



{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                 "writer": {
                      "name": "hbase11xwriter",
                      "parameter": {
                        "hbaseConfig": {
                          "hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
                        },
                        "table": "NEW_STU",
                        "mode": "normal",
                        "rowkeyColumn": [
                            {
                              "index":0,
                              "type":"string"
                            },
                            {
                              "index":-1,
                              "type":"string",
                              "value":"_"
                            }
                        ],
                        "column": [
                          {
                            "index":1,
                            "name": "cf1:name",
                            "type": "string"
                          },
                          {
                            "index":2,
                            "name": "cf1:age",
                            "type": "int"
                          },
                          {
                            "index":3,
                            "name": "cf1:clazz",
                            "type": "string"
                          },
                          {
                            "index":4,
                            "name": "cf1:gender",
                            "type": "string"
                          }
                        ],
                        "versionColumn":{
                          "index": -1,
                          "value":"123456789"
                        },
                        "encoding": "utf-8"
                      
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }  
    }
}

#执行脚本
datax.py mysqlTohbase.json
        6、mysql增量同步数据到hive中。

最主要的工作就是在原先的mysql数据导入的hive中的基础上进行where过滤

vim mysqlTohive.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": "id >20"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"id",
                                "type":"string"
                            },
                            {
                                "name":"stu_name",
                                "type":"string" 
                            },
                            {
                                "name":"age",
                                "type":"string"
                            },
                            {
                                
                                "name":"clazz",
                                "type":"string"
                            },
                            {
                                "name":"gender",
                                "type":"string" 
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", 
                        "fileName": "datax_mysqltohive1", 
                        "fileType": "text", 
                        "path": "/user/hive/warehouse/datax.db/datax_mysqltohive", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}


#执行脚本命令:
datax.py  mysqlTohive.json
9、在使用datax过程中出现的错误:

1、配置文件出现错误,脚本不完整:


http://www.kler.cn/a/272076.html

相关文章:

  • 构建高效稳定的网络环境
  • 简识JVM私有内存区域栈、数据结构
  • WordPress果果对象存储插件
  • 网络(一)
  • 【玩转全栈】----Django制作部门管理页面
  • 电子应用设计方案101:智能家庭AI喝水杯系统设计
  • 6.【Linux】进程间通信(管道命名管道||简易进程池||简易客户端服务端通信)
  • Rust基础知识讲解
  • 图像分割的定义
  • ChatGPT登陆提示:“Please unblock challenges.cloudflare.com to proceed…”
  • 替代imx6ull mfgtool的方法
  • Leetcode 1. 两数之和
  • 新!PCA+DBO+K-means聚类,蜣螂优化算法DBO优化K-means,适合学习,也适合发paper。
  • 编译原理 第1章:概述
  • SpringBoot Servlet容器启动解析
  • 【论文精读】DDPM:Denoising Diffusion Probabilistic Models 去噪扩散概率模型
  • 每日OJ题_简单多问题dp④_力扣LCR 091. 粉刷房子
  • ROS2+NAV2如何快捷的在docker中使用主机的CAN
  • WPF中使用LiveCharts绘制散点图
  • 如何降低云计算成本?
  • 数字后端 EDA 软件分享
  • Find My游戏机|苹果Find My技术与游戏机结合,智能防丢,全球定位
  • Linux 块设备驱动
  • 算法第二十九天-森林中的兔子
  • LGB2028 反向输出一个三位数
  • 动态规划题目集一(代码 注解)