当前位置: 首页 > article >正文

大数据-234 离线数仓 - 异构数据源 DataX 将数据 从 HDFS 到 MySQL

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容(留存会员模块):

  • DWS 层
  • ADS 层
  • 创建 Hive 执行脚本

在这里插入图片描述

基本架构

之前已经完成了Flume的数据采集到HDFS中,现在我们将依次走通流程:

  • ODS

  • DWD

  • DWS

  • ADS

  • DataX数据导出到MySQL
    在这里插入图片描述
    ADS有4张表需要从数据仓库的ADS层导入MySQL,即:Hive => MySQL

    ads.ads_member_active_count
    ads.ads_member_retention_count
    ads.ads_member_retention_rate
    ads.ads_new_member_cnt

在Hive中可以看到这几张表:
在这里插入图片描述

创建库表

-- MySQL 建表
-- 活跃会员数
create database dwads;
drop table if exists dwads.ads_member_active_count;
create table dwads.ads_member_active_count(
  `dt` varchar(10) COMMENT '统计日期',
  `day_count` int COMMENT '当日会员数量',
  `week_count` int COMMENT '当周会员数量',
  `month_count` int COMMENT '当月会员数量',
  primary key (dt)
);

-- 新增会员数
drop table if exists dwads.ads_new_member_cnt;
create table dwads.ads_new_member_cnt
(
  `dt` varchar(10) COMMENT '统计日期',
  `cnt` int,
  primary key (dt)
);

-- 会员留存数
drop table if exists dwads.ads_member_retention_count;
create table dwads.ads_member_retention_count
(
  `dt` varchar(10) COMMENT '统计日期',
  `add_date` varchar(10) comment '新增日期',
  `retention_day` int comment '截止当前日期留存天数',
  `retention_count` bigint comment '留存数',
  primary key (dt)
) COMMENT '会员留存情况';

-- 会员留存率
drop table if exists dwads.ads_member_retention_rate;
create table dwads.ads_member_retention_rate
(
  `dt` varchar(10) COMMENT '统计日期',
  `add_date` varchar(10) comment '新增日期',
  `retention_day` int comment '截止当前日期留存天数',
  `retention_count` bigint comment '留存数',
  `new_mid_count` bigint comment '当日会员新增数',
  `retention_ratio` decimal(10,2) comment '留存率',
  primary key (dt)
) COMMENT '会员留存率';

执行结果如下图:
在这里插入图片描述

DataX

DataX 之前章节已经介绍过了 这里就简单一说 详细教程看之前的

基本介绍

DataX 是阿里巴巴开源的一款分布式数据同步工具,用于实现各种异构数据源之间高效、稳定的数据同步。其主要功能包括数据的批量导入、导出和实时传输,支持多种主流数据源,例如关系型数据库、NoSQL 数据库、大数据存储系统等。

DataX 的核心思想是“插件化架构”,通过灵活的 Reader 和 Writer 插件实现不同数据源之间的数据交换。

DataX 的特点

插件化架构
  • Reader:用于从数据源读取数据。
  • Writer:用于将数据写入目标存储。
  • 插件开发简单,可以根据需要扩展支持新的数据源。
高性能与高扩展性
  • 支持大规模数据同步,处理速度快。
  • 支持多线程并发传输,利用 CPU 和 IO 性能。
  • 可配置分片任务(Shard),实现分布式同步。
兼容性强
  • 支持丰富的异构数据源,包括 MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、HDFS、Hive、ODPS、ElasticSearch 等。
  • 可在不同系统之间传输数据,比如从传统 RDBMS 数据库迁移到大数据系统。
易用性
  • 配置简单,基于 JSON 文件定义任务,易于上手。
  • 提供详尽的运行日志,便于定位和解决问题。
  • 开源代码,支持二次开发。
可监控性
  • 提供详细的任务运行指标,比如吞吐量、数据量等。
  • 支持失败任务自动重试,确保数据同步过程的可靠性。

配置文件

导出活跃会员数(ads_member_active_count),编写一个JSON出来:

vim /opt/wzk/datax/export_member_active_count.json

hdfsreader => mysqlwriter

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [{
      "reader": {
        "name": "hdfsreader",
        "parameter": {
          "path":
          "/user/hive/warehouse/ads.db/ads_member_active_count/dt=$do_date/*",
          "defaultFS": "hdfs://h121.wzk.icu:9000",
          "column": [{
            "type": "string",
            "value": "$do_date"
          }, {
            "index": 0,
            "type": "string"
          },
            {
              "index": 1,
              "type": "string"
            },
            {
              "index": 2,
              "type": "string"
            }
          ],
          "fileType": "text",
          "encoding": "UTF-8",
          "fieldDelimiter": ","
        }
      },
      "writer": {
        "name": "mysqlwriter",
        "parameter": {
          "writeMode": "replace",
          "username": "hive",
          "password": "hive@wzk.icu",
          "column": ["dt","day_count","week_count","month_count"],
          "preSql": [
            ""
          ],
          "connection": [{
            "jdbcUrl":
            "jdbc:mysql://h122.wzk.icu:3306/dwads?
            useUnicode=true&characterEncoding=utf-8",
            "table": [
              "ads_member_active_count"
            ]
          }]
        }
      }
    }]
  }
}

写入的内容如下所示:
在这里插入图片描述

编写命令

DataX的运行的方式如下所示:

python datax.py -p "-Ddo_date=2020-07-21" /opt/wzk/datax/export_member_active_count.json

编写脚本

编写一个脚本用来完成这个流程:

vim /opt/wzk/hive/export_member_active_count.sh

写入的内容如下所示:

#!/bin/bash
JSON= /opt/wzk/datax
source /etc/profile
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" $JSON/export_member_active_count.json

写入的内容如下所示:
在这里插入图片描述


http://www.kler.cn/a/472136.html

相关文章:

  • 腾讯云AI代码助手编程挑战赛-图片转换工具
  • el-table 多级表头
  • 使用JMeter玩转tidb压测
  • PostgreSQL学习笔记(二):PostgreSQL基本操作
  • NodeLocal DNS 全攻略:从原理到应用实践
  • Keepalived 进阶秘籍:全方位配置优化
  • SQL编程语言
  • pytorch 比较两个张量的是否相等的函数介绍
  • Python爬虫应用领域
  • 计算机网络:虚拟机虚拟网络配置
  • 鸿蒙中黑白版
  • 基于RedHat9部署WordPress+WooCommerce架设购物网站
  • SQL Server存储过程来实现分页功能
  • TRELLIS - 生成 3D 作品的开源模型
  • KUKA机器人如何修改程序并下载到机器人控制器中?
  • uniapp uni-popup使用scroll-view滚动时,底部按钮设置position:fixed失效,部分ios设置有问题
  • 大模型RAG面试内容有哪些?(附面试资料合集)
  • redis学习-value数据结构
  • # Kafka组件化及拓展
  • Bash语言的数据类型
  • H5通过URL Scheme唤醒手机地图APP
  • 分享3个国内使用正版GPT的网站【亲测有效!2025最新】
  • CV-LLM经典论文解读|VTimeLLM: Empower LLM to Grasp Video MomentsVTimeLLM:赋能大语言模型理解视频片段
  • 使用react开发一个外卖程序
  • 【Pytorch报错】AttributeError: cannot assign module before Module.__init__() call
  • Powerbi官方认证!2025年入行数据分析