当前位置: 首页 > article >正文

数据采集(全量采集和增量采集)

全量采集:采集全部数据

3、全量采集

vim students_all.json
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "splitPk": "id",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "update_time"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "students"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/bigdata31"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://master:9000",
                        "fileType": "text",
                        "path": "/data/students_all/dt=${dt}",
                        "fileName": "students",
                        "column": [
                            {
                                "name": "id",
                                "type": "STRING"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "gender",
                                "type": "STRING"
                            },
                            {
                                "name": "clazz",
                                "type": "STRING"
                            },
                             {
                                "name": "update_time",
                                "type": "STRING"
                            }
                        ],
                        "writeMode": "truncate",
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}

# 创建分区目录
hdfs dfs -mkdir -p  /data/students_all/dt=2024-10-21
# 执行datax脚本
datax.py -p"-Ddt=2024-10-21" students_all.json
# 增加分区
hive -e "alter table students_all add if not exists partition(dt='2024-10-21');"

增量采集:就只采集新插入或修改的数据

1、原表需要有一个更新时间字段

CREATE TABLE `students`  (
  `id` bigint(20) ,
  `name` varchar(255) ,
  `age` bigint(20),
  `gender` varchar(255) ,
  `clazz` varchar(255),
  `update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ;

2、在hive中创建分区表

create external table if not exists students_all(
    id bigint comment '学生id'
    ,name string comment '学生姓名'
    ,age bigint comment '学生年龄'
    ,sex string comment '学生性别'
    ,clazz string comment '学生班级'
    ,update_time string comment '更新时间'
) comment '学生信息表'
partitioned by (dt string)
row format delimited fields terminated by ','
stored as textfile 
location 'hdfs://master:9000/data/students_all';

4、创建增量表

create external table if not exists students_acc(
    id bigint comment '学生id'
    ,name string comment '学生姓名'
    ,age bigint comment '学生年龄'
    ,sex string comment '学生性别'
    ,clazz string comment '学生班级'
    ,update_time string comment '更新时间'
) comment '学生信息表'
partitioned by (dt string)
row format delimited fields terminated by ','
stored as textfile 
location 'hdfs://master:9000/data/students_acc';

5、增量采集更新的数据

vim students_acc.json
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "splitPk": "id",
            		    "where": "substr(update_time,1,10)='${dt}'",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "update_time"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "students"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/bigdata31"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                   "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://master:9000",
                        "fileType": "text",
                        "path": "/data/students_acc/dt=${dt}",
                        "fileName": "students",
                        "column": [
                            {
                                "name": "id",
                                "type": "STRING"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "gender",
                                "type": "STRING"
                            },
                            {
                                "name": "clazz",
                                "type": "STRING"
                            },
                             {
                                "name": "update_time",
                                "type": "STRING"
                            }
                        ],
                        "writeMode": "truncate",
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}
# 创建分区目录
hdfs dfs -mkdir -p  /data/students_acc/dt=2024-10-22
# 执行datax脚本
datax.py -p"-Ddt=2024-10-22" students_acc.json
# 增加分区
hive -e "alter table students_acc add if not exists partition(dt='2024-10-22');"

6、合并数据

vim student_merge.sql
insert overwrite table students_all partition(dt='${dt}')
select
    id,
    name,
    age,
    sex,
    clazz,
    update_time
from
    (
        select
            id,
            name,
            age,
            sex,
            clazz,
            update_time,
            row_number() over (
                partition by
                    id
                order by
                    update_time desc
            ) as r
        from
            (
                select
                    *
                from
                    students_all
                where
                    dt = '${diff_dt}'
                union all
                select
                    *
                from
                    students_acc
                where
                    dt = '${dt}'
            ) as a
    ) as b
where
    r = 1;
hive -f student_merge.sql -d dt=2024-10-22 -d diff_dt=2024-10-21


spark-sql \
--master yarn \
--deploy-mode client \
--num-executors 2 \
--executor-cores 1 \
--executor-memory 2G \
--conf spark.sql.shuffle.partitions=1 \
-f student_merge.sql -d dt=2024-10-22 -d diff_dt=2024-10-21


http://www.kler.cn/a/371841.html

相关文章:

  • 深入理解gPTP时间同步过程
  • Hexo提交部署命令与Git Bash Here控制终端中按下Ctrl+C无法中断hexo s的解决办法
  • 华为认证网络工程师
  • week08 zookeeper多种安装与pandas数据变换操作-new
  • 基于SSM+小程序的童装商城管理系统(商城3)
  • ---synchronized 关键字---
  • 三方接口调用设计方案
  • 3. STM32之TIM实验--输出比较(PWM输出,电机,四轴飞行器,智能车,机器人)--(实验1:PWM驱动LED呼吸灯)
  • sparksql建临时表的几种方式
  • Java | Leetcode Java题解之第513题找树左下角的值
  • 数据结构 ——— 二叉树的概念及结构
  • 联动香港、成都、武汉三所高校!“2024 深圳国际金融科技大赛”校园行圆满结束
  • MySql基础34题写题记录(21-29)
  • 【AI】numpy_pandas_matplotlib_sklearn合集
  • macOS Sonoma 14.7.1 (23H222) Boot ISO 原版可引导镜像下载
  • 没有对象来和我手撕红黑树吧
  • 安装git-lfs发生报错Could not find Git; can not register Git LFS.解决方案
  • Milvus - 内存索引类型详解
  • 中阳金融智能量化交易系统的创新与未来发展
  • 出海要深潜,中国手机闯关全球化有了新标杆
  • 网络安全包含哪些方面?如何加强网络安全建设?
  • 函数练习python
  • BERT,RoBERTa,Ernie的理解
  • UI 组件的二次封装
  • 获取平台Redis各项性能指标
  • socket编程---UDP