当前位置: 首页 > article >正文

Datax安装部署及读取MYSQL写入HDFS

一.DataX简介

1.DataX概述

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
源码地址:https://github.com/alibaba/DataX

2.DataX支持的数据源

https://github.com/alibaba/DataX
在这里插入图片描述
在这里插入图片描述

3.DataX架构原理

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

在这里插入图片描述

4.DataX框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
在这里插入图片描述

5.DataX运行流程

在这里插入图片描述

6.DataX调度决策思路

举例来说,用户提交了一个DataX作业,并且配置了总的并发度为20,目的是对一个有100张分表的mysql数据源进行同步。DataX的调度决策思路是:
(1)DataX Job根据分库分表切分策略,将同步工作分成100个Task。
(2)根据配置的总的并发度20,以及每个Task Group的并发度5,DataX计算共需要分配4个TaskGroup。
(3)4个TaskGroup平分100个Task,每一个TaskGroup负责运行25个Task。

7.DataX与Sqoop对比

在这里插入图片描述
在这里插入图片描述

二.DataX部署

1.下载DataX安装包并上传到linux系统

下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

2.解压datax.tar.gz到/opt/software

[root@VM-4-10-centos datax]# tar -zxvf datax.tar.gz -C ../software/

3.自检,执行如下命令

[root@VM-4-10-centos datax]# python /opt/software/datax/bin/datax.py /opt/software/datax/job/job.json

在这里插入图片描述
如果出现下图报错,说明路径有问题,检查路径。
在这里插入图片描述

三.DataX的使用

1.DataX任务提交命令

DataX的使用十分简单,用户只需根据自己同步数据的数据源和目的地选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行如下命令提交数据同步任务即可。
在这里插入图片描述

2. DataX配置文件格式

可以使用如下命名查看DataX配置文件模板

[root@VM-4-10-centos datax]# python bin/datax.py -r mysqlreader -w hdfswriter

在这里插入图片描述

3.同步MySQL数据到HDFS案例

先测试从mysql读取数据打印控制台是否正常
MySQL读插件官网:https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "create_time",
                            "update_time",
                            "value"
                        ],
                        "where": "id>=3",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://10.0.4.10:3306/medical?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
                                ],
                                "table": [
                                    "dict"
                                ]
                            }
                        ],
                        "password": "123456",
                        "splitPk": "",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false,
                        "encoding": "UTF-8"
                    }
                }
            }
        ]
    }
}

在这里插入图片描述
数据打印正常读取插件配置正常。

配置hdfs写入插件

{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", #读取数据的插件名称
                    "parameter": {
                        "column": [ #读取的表的字段
                            "id",
                            "create_time",
                            "update_time",
                            "value"
                        ],
                        "where": "id>=3", #过滤条件
                        "connection": [ #连接信息
                            {
                                "jdbcUrl": [  #连接链接
                                    "jdbc:mysql://10.0.4.10:3306/medical?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
                                ],
                                "table": [  #表名
                                    "dict"
                                ]
                            }
                        ],
                        "password": "123456",  #密码
                        "splitPk": "",
                        "username": "root"  #用户名
                    }
                },
                "writer": {
                    "name": "hdfswriter",  #写入数据的插件名称
                    "parameter": {
                        "defaultFS": "hdfs://101.91.153.39:8020",   #hdfs连接信息
                        "fileType": "text",    #文件格式
                        "path": "/datax/mysql",  # 写入地址
                        "fileName": "dict",   #文件名称
                        "column": [
                            {
                                "name": "id",
                                "type": "BIGINT"
                            },
                            {
                                "name": "create_time",
                                "type": "STRING"
                            },
                            {
                                "name": "update_time",
                                "type": "STRING"
                            },
                            {
                                "name": "value",
                                "type": "STRING"
                            }
                        ],
                        "writeMode": "append",  #写入文件的方式
                        "fieldDelimiter": "\t", #字段分割符
                        "compress":"gzip"  #hdfs文件压缩类型,默认不填写意味着没有压缩。
                    }
                }
            }
        ]
    }
}

如果写入HDFS出现如下错误

在这里插入图片描述

原因:NameNode节点存放的是文件目录,也就是文件夹、文件名称。 本地可以通过公网访问
NameNode,所以可以进行文件夹的创建,当上传文件需要写入数据到DataNode时, NameNode 和DataNode是通过局域网进行通信,NameNode返回地址为 DataNode 的私有 IP,本地无法访问。

解决方法:返回的IP地址无法返回公网IP,只能返回主机名,通过主机名与公网地址的映射便可以访问到DataNode节点,问题将解决。
datax的hdfswriter设置相关连接属性的hadoopConfig。

"hadoopConfig": {
                            "dfs.client.use.datanode.hostname": true
                          }, 

在这里插入图片描述
运行成功:
在这里插入图片描述
在这里插入图片描述


http://www.kler.cn/a/145383.html

相关文章:

  • doris:异常数据处理
  • mysql.sock.lock 导致mysql重启失败
  • 联想Y7000+RTX4060+i7+Ubuntu22.04运行DeepSeek开源多模态大模型Janus-Pro-1B+本地部署
  • 分布式微服务系统架构第88集:kafka集群
  • 【QT】- QUdpSocket
  • C语言自定义数据类型详解(二)——结构体类型(下)
  • PowerShell基础
  • VBA技术资料MF86:将PPT文件批量另存为PDF文件
  • Effective Modern C++(1.顶层const与底层const)
  • java线程中,join()方法?
  • Vue 或者 React 中,什么情况下会用 Es6 的 Class类
  • 02 RANSAC算法 及 Python 实现
  • 电商项目高级篇-03 商品上架
  • Elasticsearch集群部署,配置head监控插件
  • 【spring(五)】SpringMvc总结 SSM整合流程
  • GitHub上8个强烈推荐的 Python 项目
  • Vue-报错No “exports“ main defined in xx
  • Leetcode.1590 使数组和能被 P 整除
  • Spring面向切面编程(AOP);Spring控制反转(IOC);解释一下Spring AOP里面的几个名词;Spring 的 IoC支持哪些功能
  • Java中wait()方法在synchronized方法中调用的奥秘
  • php订单发起退款(余额和微信支付)
  • STL: 容器适配器stack 与 queue
  • 什么是proxy代理?
  • NX二次开发UF_CURVE_ask_joined_parms 函数介绍
  • 存算一体还是存算分离?谈谈数据库基础设施的架构选择
  • DDD(领域驱动设计)一些基础概念