记录下数仓相关的东西
1、指标体系
1.1、指标体系如何搭建
指标中心是数仓来做,统一指标收口,可以用低代码平台、可视化门户搭建;
维护由数仓和数分一起维护,数分提供指标口径,数仓负责加工和归纳
1.2、如何从0到1构建复购率分析体系
首先要从业务视角出发,通过逻辑树将复购率这一北极星指标进行系统性拆解,形成分子分母明确的派生指标体系。
这个过程中需要与数据产品、分析师和业务方多方沟通,在确保指标全面性的同时避免几余,最终收敛出清晰的业务口径和技术定义。
在完成指标体系设计后,下一步是梳理支撑这些指标的数据基础。
这需要深入业务场景,盘点现有数据仓库中可用的数据表,识别数据缺口并及时补充数据源。
若涉及跨部门数据,还需要提前与相关团队对齐数据使用规范、关联方式等细节问题。
基于完整的数据支撑,才能构建起稳定可靠的指标数据模型。
当指标体系和数据基础都准备就绪,便可以着手开发分析应用。
当然,最为重要的是深入挖掘数据背后的业务洞察可以从用户属性、场景特征等多个维度展开分析,找出影响复购的关键因素,形成有价值的分析报告。这些发现需要及时与业务团队交流,帮助他们发现问题、优化运营策略。
这样一个完整的复购率分析体系,不仅能够准确监测业务表现,更能够驱动业务持续改善。要搭建这样的完整的复购率分析体系,需要数据团队与业务团队的紧密配合,在指标设计、数据治理和分析应用等环节中不断打磨优化,最终实现数据驱动业务的目标。
1.3 指标体系方法论
1.分析痛点:
了解当前数仓侧与业务应用方对指标到不到、难使用的痛点及日常指标使用习惯,制定指标中心所需功能并设计指标中心样式。
2.制定指标规范:
定义指标类型、指标使用方、确定指标域(这里是数据域)、指标要具备的属性(业务/技术口径、负责人、类型等)
3.梳理当前指标:
用在线excel整理数据/主题域对应的指标,优先找ads交集的核心指标,并区分核心与非核心,溯源每一层级并记录
4.与业务方沟通:
按照数据/主题域与业务方确定指标口径与来源唯一性,并对高频、低频指标打标(无应用指标优先下线)
5.建设指标中心:
完成指标中心各个模块(创建、编辑等)及搜索跳转功能开发,并创建后台数据库及指标内容数据表用于后续数据存放,后续接入数仓,通过用户指标行为的明细数据加工及指标维度表下维度退化,建设指标侧应用数据模型,用于统计指标颗粒度下使用情况(热度、查询率、被引用情况等),再与前端开发团队配合完成可视化界面搭建支撑界面应用。
6.公共指标模型建设:
将已梳理的重复的公共逻辑提取出来,并集中建设成公共数据模型,消除冗余代码,解决复用性。
7.设立指标规范:
与业务方共创维护指标上下线/变更流程,设立接口人,保障流程规范性
8.指标维护:
制定指标评分规范,通过指标热度及使用情况打分,对无用指标定期提醒及下线。
1.4 指标模型开发流程
1.前期准备工作:
1.梳理看板SQL逻辑
2.统一指标口径:
1.业务口径
2.技术口径
3.识别颗粒度:
1.员工维度
2. 部门维度等
2.指标分层存储:
1.ADS层:
1.按主题划分(员工晋升、员工绩效)
2.存放基础指标
DWS层使用场景:
指标量大且复杂
依赖表数量>2
·示例:近3年员工绩效
·包含A绩效次数等指标
·存在多表关联计算
3.数据应用建议:
1.避免指标存放在数据集
2.数据集直接读取ADS表
3.最佳实践:
1.数据落地OLAP
2.数据集从OLAP读取
3.提升查询性能
2、组件问题
2.1、Flink四个基础问题 ???
2.1.1 Flink处理乱序的方法
Flink处理乱序的方法:waterMark + window + eventtime,如果只是waterMark + eventtime,只能处理超时数据,无妨处理乱序(因为没用window累计数据,来一条处理一条)
2.2 Spark ui快速定位数据倾斜的sql位置
Spark数据倾斜问题分析和解决_spark 有一个任务运行得特别慢 没有数据倾斜-CSDN博客
2.2.1 job、stage、task区别
当Driver起来后,Driver会根据用户程序逻辑准备任务,并根据Executor资源情况逐步分发任务,一个Spark应用程序包括job、stage和task三个概念
job:已action方法为界,一个action触发一个job
Stage:它是job的子集,已RDD宽依赖为界,遇到宽依赖即划分stage
task:它是stage的子集,以分区数来衡量,分区数多少,task就有多少
spark 任务从发起到执行可用下图表示
Client— >ResourceManage
(1). Client 端通过 spark-submit + 参数 发起任务,即向ResourceManage 提交 application,注意该 application 包含了一堆参数,如 Executor 数,Executor 内存,Driver 内存等;
(2). ResourceManage 需要先判断现在资源是否能满足该 application,如果满足,则响应该 application,如果不满足,报错;
(3). 如果资源满足,Client 端准备 ApplicationMaster 的启动上下文,并交给 ResourceManage;
(4). 并且循环监控 application 的状态;
ResourceManage— >ApplicationMaster
(1). ResourceManage 找一个 worker 启动 ApplicationMaster;
(2). ApplicationMaster 向 ResourceManage 申请 Container;
(3). ResourceManage 收集可用资源,并告诉 ApplicationMaster;
(4). ApplicationMaster 尝试在对应的 Container 上启动 Executor 进程;
ApplicationMaster-Driver
(1). 有了资源,ApplicationMaster 启动 Driver;
//Driver 线程主要是初始化 SparkContext 对象,准备运行所需上下文,并保持与 ApplicationMaster 的 RPC 连接,通过 ApplicationMaster 申请资源
(2). Driver 启动成功后,告诉 ApplicationMaster;
Driver-Executor
(1). Executor 启动成功后,反向注册到 Driver 上,并持续向 Driver 发送心跳;
(2). Driver 启动 task,分发给 Executor,并监控 task 状态;
(3). 当 Executor 任务执行完毕后,将任务状态发送给 Driver;
spark 的核心就是资源申请和任务调度,主要通过 ApplicationMaster、Driver、Executor 来完成
spark 任务调度分为两层,一层是 stage 级的调度,一层是 task 级的调度
2.3 流量激增问题
大促场景:无法实时解决,需要提前做好预案
日常流量突增:
1、kafka承压问题
症状:kafka顶不住突发流量
解决方案:联系后端增加消费者分区
配置多Topic并发流
2、Flink消费问题
排查方向:
通过Flink UI定位具体卡点
观察任务运行状态和资源使用情况
增加计算资源
限制流入读取量
长期优化建议
1、事后复盘
分析大流量场景下的系统表现
优化监控看板的慢SQL
进行实时压测,确定合理的资源配比
2、架构优化
准备柴流方案
实现双链路架构:多Topic设计;多OLAP方案(如ClickHouse + Doris)
提前准备多屏备选方案
3、预案准备
制定完整的应急预案
定期进行压力测试
建立监控告警机制
2.4 Hive调优的几个细节
问题1:set
hive.exec.max.dynamic.partitions=1000;--所有节点的总(默认)限制为1000个动态分区sehive.exec.max.dynamic.partitions.pernode=1(0;-默认值为每个节点100个动态分区我看某个博客上:sethive.exec.max.dynamic.partitions.pernode=100#表示每个maper或reducer可以允许创建的最大动态分区个数这种说法对吗?如果正确,是不是:意味着每个节点上只能有1个maper或reducer
问题2:输入的数据量比较大,比如1T数据此时启动2000个Mapper任务去读取,这些Mapper任务的并行度是多少呢,如何确定的?开启动态分区后,这个任务的小文件数是如何确定的呢?博客上说是每个任务下有100个分区,将产生2000*100=200000个小文件,这个说法对吗?
问题3:如图,dynamic.partitions/dynamic.partitions.pernode<=mapred.reduce.tasks如何理解?
第一个问题是最大上限.
第二个问题你需要理解一下mapper数量的生成规则,他是根据文件与你设置的参数和集群hdfs对应的block大小,取一个max计算出来的。而且也不是说你设置了这些参数,有些map数量也不一定就是完全就会处理对应的数据可以追踪一下源码,分析一下hive4.0之后应该是做了很多优化了。
还有个问题,小文件大多是shuffe过程中出现的,也就是说,你设置的动态分区上限,往往需要考虑对应reduce数量,才可能会得到理想的效果。
你可以看下,如果你1t的数据,只谈论mapper,可能设置20000个map数量中,不一定所有的map会真正的处理数据的,甚至你设置的这个结果都不一定会生效,这是hive的情况,不过现在企业基本都切sparksql了,可能跟hive还不太一样了。
2.5 流量域下数据很多该怎么存储
流量数据一般做数据分析一般是近期数据探查分析以及td的指标
1、如果真的要省存储就把最近一两年的数据存热数据,历史数据放在片会议的存储里,或者根据生命周期直接删一定周期前的数据
2、数仓做好公共层建设,避免重复建设导致的存储浪费
3、流批一体架构解决存储异构的问题,这种一般对基础架构的能力要求非常高
4、lambda架构下,离线做最高实效的数据,低实效数据基于高实效数据做视图,实时基于流量底表按数据域进行分流处理,减小下游消费压力
2.6 Taozex的数据湖分享
数据湖的出现主要是为了解决Hive速度慢的问题,利用表格式和索引的加速实现细粒度的数据过滤。在使用数据湖后,链路可以做到10-15min左右,所以湖的作用就是为了将离线链路转化为近实时链路
这里简要说-下hudi和iceberg的小区别:hudi的upsert能力很优秀,而iceberg在公司内部优化后,也比开源版本hudi的upsert能力差一些iceberg的设计更优雅简洁,它的表存储格式抽象的最好的,包括读写引擎、Table Schema、文件存储格式都是pluggable的,可以进行比较灵活的扩展并保证和开源以及之前版本的兼容性,另外开源版本代码质量上相对而言iceberg也更好些
2.7 Flink CDC
什么是CDC?
CDC(Change Data Capture)是一种用于跟踪数据库中数据更改的技术。它用于监视数据库中的变化,并捕获这些变化,以便实时或定期将变化的数据同步到其他系统、数据仓库或分析平台。CDC技术通常用于数据复制、数据仓库更新、实时报告和数据同步等场景。
CDC 可以捕获数据库中的以下类型的数据变化
1.插入(Insert):当新数据被插入到数据库表中时。
2.更新(Update):当数据库表中的现有数据被修改时。
3.删除(Delete):当数据从数据库表中被删除时
什么是FlinkCDC
Flink CDC(Change Data Capture,即数据变更抓取)是一个开源的数据库变更日志捕获和处理框架,它可以实时地从各种数据库(如MySQLPostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据。FlinkCDC 可以帮助实时应用程序实时地处理和分析这些流数据,从而实现数据同步、数据管道、实时分析和实时应用等功能。
本质上是一系列的Flink Source Connector集合,用于来获取数据库的实时变更,底层基于Debezium实现。
https://github.com/ververica/flink-cdc-connectors
2.8 实时开发入门攻略
实时数据开发入门路线可分为起步和提升两个主要阶段:
起步阶段核心要点:
-以 Kafka 为切入点,掌握精确一致性、ConsumerGroup、Topic 分区等基础概念
选择 Doris 或 StarRocks 作为目标端,构建完整数据链路
-重点练习消息处理、JSON 解析及高效写入等业务场景
环境配置建议:
主机配置:1台 32GB 主机+3台 8GB 虚拟机
预装组件:Hadoop、Spark、Flink、Hive.HBase、Redis、Kafka、Zookeeper、 Doris.StarRocks
提升阶段重点
深入学习 CDC(Change Data Capture) 技术
掌握 binlog/WAL 日志解析方法
练习不同格式数据向下游系统的写入
这个学习路线帮助你循序渐进地掌握实时数据处理技能,从基础概念到进阶应用,为成为数据工程师打下扎实基础。
要点:先通过 Kafka 打好基础,再通过CDC拓展能力边界。
3、 数据仓库&数据分析
3.1、 数仓汇报如何去讲故事
需要4个大图去讲故事
业务版图:根据当前公司的产品还有业务线大体梳理出业务具体内容
数仓定位:这里可以写一段内容去介绍我们数仓具体在公司承担什么位置;去解决什么问题,最后能支持业务什么场景,如何输出给他们内容的,如何帮助业务提效。
数仓版图:介绍我们现在数仓都有什么组件,有什么内容,在未来我们是如何规划的,可以附带一份方案,梳理出未来几个阶段我们都要做哪些
数仓架构图:数仓架构和数仓开发中用到的架构与业务方的关系,这里梳理清楚你们的上下游,以及在中间处理数据对接哪些同学以及哪些部门
3.2、 数仓问题 - 维度和颗粒度;数据集市和数据仓库区别
维度与颗粒度关系:
颗粒度是数据聚合的级别
维度是数据的分析角度
同一字段在不同场景下可以是维度也可以是颗粒度
数据集市和数据仓库的区别
数据集市特点:面相具体业务线;范围较小,聚焦特定领域;更适合部门级应用
3.3、 数据表打上核心表和相关热度标签有什么用
1、出问题时分优先级
核心表出问题,所有人立马放下手头工作
临时表出问题,先看看影响面再说
2、做运维更有针对性
热度高的表,多备份、多优化
冷表就没必要投入太多资源
3、帮助新人快速上手
看到标签就知道这表要不要谨慎处理
3.4、 数仓的关键能力
沟通能力(解决写作交流,高效交付)、学习能力(提升开发效率)、自取能力(能尽早发现业务能力,通过数据展示及预防)、业务能力(对业务理解,能更快解读指标,熟悉业务流程从而根据业务场景及环节构建好数据模型)、细心观察能力(因为做数据室比较容易出现问题行业,所以对数据观察对数据交付是非常重要的,同时也会给业务对接的人给更好的印象)
3.5、 数据域和主题域
数据域:从数据视角构建业务流程
数据域是从数据视角自下而上搭建的,对每个业务环节进行切割划分,形成不同环节的数据集,并组装成完整的业务流程
主题域:从业务视角进行专项分析
主题域是从业务视角自上而下分析出来的,针对整体业务环节进行升华,形成大的专项分析模块
主题域结合了业务范围和行业形态,从更高的视角去洞察整个业务流程。
类比解释
数据域是做菜需要的原材料,如萝卜、青菜、肉等
这些原材料对应数据开发中业务库的数据,而主题域则类似菜谱的分类
数据分域的好处:
方便整体数据管理:分域能够使得数据的管理更有条理,便于查找和使用
业务环节模块独立化:使得每个业务环节模块更加独立,有助于问题的定位和解决
明确职责划分:让数据仓库和数据分析人员能够清晰划分自己的责任域
提高业务分析的准确性:从不同视角对数据进行分域,有助于更精准的业务分析和决策
4、 数据质量&数据资产
4.1、 如何保证脚本开发的准确性
1、充分自测:检查数据分布情况,最大值、最小值和空值
2、采用抽样验证的方法:从ODS层提取样本数据,将其与开发的指标数据进行对比
4.2、 数仓开发中数据校验不一致时的排查思路
1、指标口径核验
确认指标的业务口径是否完全一致
检查是否存在遗漏的过滤条件
验证时间窗口、维度限定等是否统一
2、分层追踪法
选定特定用户ID作为样本进行追踪
由ODS层向上追溯至DWD层比对
如果DWD与ODS数据一致,说明明细层加工正确,若不一致,则需重点排查DWD处理逻辑
3、ADS层问题排查
使用count distinct检测是否存在笛卡尔积
验证关联键的正确性和完整性
检查where子句中的过滤条件
4、技术层面检查
考虑spark函数使用是否恰当
对于金融业务,特别注意bigint和decimal类型字段的处理
4.3、 数据价值对业务的帮助
用户增长/经营性分析
用户增长/经营性分析即通过数据仓库建设的数据模型帮助业务方带来用户增长,是数据仓库对业务的核心价值,例如数据模型支撑了用户基础画像、用户在业务中全流程行为分析、用户在消费行为中表现情况等,通过数据模型帮助业务能够快速定位未来活动、未来业务走向等,为业务在拉新、促活挽留等方面提供精准流量(而非原有广告投放导致流量较为广泛,定位不精准),在此处数据仓库帮助业务达到了用户规模增长,从原有规模增长到现有规模(例如5000万用户增长到8000万)、为业务新的经营分析场景,为业务带来规模性营收(例如金额收益增长40%+),对于数据仓库同学来说,可以从数据表中查询已知的增长规模,还需要和业务方确认增长的方向和其他数据。
数据质量/产出稳定
数据质量/产出稳定更多的价值在于提供的数据能让业务方使用的安心无忧,如果数据质量问题经常被业务反馈、每天都无法提供数据给业务因果难过那业务方对于数据仓库的依赖会逐渐降低,丧失用数信心,因此稳定数据质量及任务产出也较为重要,此处的价值可从原有数据质量问题触发情况与现有情况进行比对,问题触发降低了多少等(例如每个月下游反馈40-50bug 现在降低每个月下游反馈7个bug),任务产出这里也可以从基线/SLA破线无法交付次数降低了多少去评价(每周5次破线降至每周小于等于1次破线),同时还需要做数据质量问题质量可视化监控看板提供给业务方查看,并按照周/月形式进行定期邮件反馈。
查数/用数提效
查数/用数提效为业务方提升了更快的效率,能够快速找到并使用数据,在这里数据仓库侧需要对元数据维护、定制相关提效数据服务(数据资产门户指标中心、ONE-ID等),通过数据服务以及元数据维护将查数/用数成本降低,将原业务方几小时查询及询问时长降低至分钟内自助查询定位(例如原来业务方自己找数据表 查询指标要3小时,现在能够实现自助查询,并能再5分钟内定位),极大降低了成本,同时减少了数据仓库侧问题答疑次数,达到快速定位效果。
降低部门支出
由于数据仓库任务及数据表日积月累式增长导致计算及存储的费用不断提升,从而增加部门整体费用(例如计算/存储金额从3000万降低至2100万等),通过数据治理或数据技术架构更换,帮助业务降低整体支出,为部门节省整体开支。
5、数据治理
包括模型合规、数据质量、数据安全、计算/存储资源、数据价值等治理
5.1、 计算资源存在问题
(1)30+高消耗任务:由于数仓前中期业务扩张要覆盖大量场景应用,存在大量问题代码运行时数据倾斜,在消耗大量集群计算资源下,产出时间也久;
(2)200w+的小文件:当前任务存在未合并小文件、任务Reduce数量过多、上游数据源接入(尤其是API数据接入)会造成过多小文件出现,小文件过多会开启更多数据读取,执行会浪费大量的资源,严重影响性能;
(3)任务调度安排不合理:多数任务集中在凌晨25点执行且该区间CPU满载,导致该时间段资源消耗成了重灾区,所有核心/非核心任务都在争抢资源部分核心任务不能按时产出一直在等待阶段,
(4)线上无效DQC(数据质量监控)&监控配置资源过小:存在部分历史任务没下线表及DQC场景每日都在空跑无意义DQC浪费资源,同时DQC资源过少导致DQC需要运行过长时间;
(5)重复开发任务/无用任务:早期协助下游做了较多烟囱数据模型,因为种种原因,部分任务不再被使用,烟囱模型分散加工导致资源复用率降低;
(6)任务缺少调优参数&部分任务仍然使用MapReduce/Spark2计算引擎:任务缺少调优参数导致资源不能适配及动态调整,甚至线上仍有早期配置MapReduce/Spark2计算引擎导致运行效率较低。
5.1.1、 解决方案
对当前治理优先级/改动成本大小/难度做了一个排序,我们先选择从简单的参数调优&任务引擎切换开始->小文件治理->DQC治理->高消耗任务治理->调度安排>下线无用模型及沉淀指标到其他数据资产
(1)大部分任务切换至Spark3计算引擎&补充任务调优参数
任务统一使用Spark3引擎加速,并充分利用补充Spark调优参数(参数内容详见文末),Spark3的AQE特性及Z-0rder排序算法特性。
AQE解释:Spark 社区在 DAG Scheduler 中,新增了一个 API在支持提交单个 Map 阶段以及在运行时修改 shuffe 分区数等等,而这些就是 AQE,在 Spark 运行时,每当一个Shuffe、Map 阶段进行完毕,AQE 就会统计这个阶段的信息,并且基于规则进行动态调整并修正还未执行的任务逻辑计算与物理计划(在条件运行的情况下),使得 Spark程序在接下来的运行过程中得到优化。
Z-0rder解释:2-0rder 是一种可以将多维数据压缩到一维的技术,在时空索引以及图像方面使用较广,比如我们常用order by a,b,c会面临索引覆盖的问题,Z-0rder by a,b,c效果对每个字段是对等的
(2)小文件治理
在这里我们使用内部数据治理平台-数据治理360对存在小文件较多表提供内容展示(本质采集HDFS对应路径下文件数的日志去显示)
当前小文件处理:
(Spark3具备小文件自动合并功能,如未使对于分区较多使用Spark3进行动态分区刷新,用Spark3可配置Spark3/Hive小文件合并参数刷新,参数详见文末),代码如下:
set hive.exec.dynamic.partition.mode=nonstrict
insert overwrite table xxx.xxx partition(ds)
select column,ds
from xxx.xxX
对于分区较少或未分区的表采用重建表,补数据方法回刷。
小文件预防
使用Spark3引擎,自动合并小文件
减少Reduce的数量(可以使用参数进行控制)
用Distribute By Rand控制分区中数据量
添加合并小文件参数
将数据源抽取后的表做一个任务(本质也是回刷分区合并小文件任务)去处理小文件保障从数据源开始小文件不向下游流去
(3)DQC治理
无效DQC下线:难点在于需要查找所有DQC对应的线上任务,查看该DQC任务是否与线上任务--匹配,从而找到无效DQC任务下线,内容繁杂耗时较多。
DQC资源:由于之前DQC配置资源为集群默认参数,效率极低导致所有DQC运行时长均超过10min,从而使得整体任务链路运行时长过久,调整Driver内存为2048M,Executor个数为2,Executor内存为4096M
(4)高消耗任务调优
这里存在2个难点:优化效果不可控、高消耗任务调整到何种程度算合适,针对这个这个难点我们取所有核心数据资产任务均值,保障单个任务消耗小于平均消耗,同时我们针对当前高消耗任务列举出如下可优化的方式:
关联表过多,需拆分
关联时一对多,数据膨胀.
资源配置过多,运行时资源严重浪费,需要将配置调小(包括Driver内存、Executor个数、Executor内存)
代码结尾添加Distribute ByRand(),用来控制Map输出结果的分发
查询中列和行未裁剪、分区未限定、Where条件未限定
SQL中Distinct切换为Group by(Distinct会被hive翻译成一个全局唯-Reduce任务来做去重操作,Group by则会被hive翻译成分组聚合运算,会有多个Reduce任务并行处理,每个Reduce对收到的一部分数据组,进行每组聚合(去重))
关联后计算切换为子查询计算好后再关联
使用Map Join(Map Join会把小表全部读入内存中,在Map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在Map是进行了Join操作,省去了Reduce运行的效率也会高很多)可用参数代替
(5)任务调度合理优化
对于调度优化一开始会无从下手,统计凌晨2-5点区间下大概600+任务难理,同时存在任务依赖,修改起来可能会对下游整体有大的影响,因此我们选择循序渐进先梳理再改善。
找到所有表的输出输入点即启始ODS与末尾ADS
划分其中核心表/非核心表,及对应任务开始时间与结束时间
按照梳理内容把非核心的任务穿插在当前集群资源非高峰时期(2点前与5点后),同时把核心任务调度提前,保障CDM层任务及时产出
对实践后内容再度调优,达到资源最大利用率
(6)烟囱任务下沉&无用任务下线
烟囱表过多,需下沉指标到DWS中提升复用性,对于无用任务也需要及时下线(这里需要拿到元数据血缘最好到报表层级的数据血缘,防止任务下线后导致可视化内容问题产生),减少开发资源消耗。
5.1.2、 总结
计算资源治理核心在于降本增效,用有效资源区运行更多任务,通过一系列治理操作也让数仓同学积累技术经验同事规范化自身开发标准,让治理反推进组内技术进步
计算资源治理是一件长久之事,并不能因为资源紧张才去治理,而要将计算治理常态化,可通过周/月资源三秒内容及时推送给每个同学,并为之打分,让每个任务都有源可循,有方法可优化。
5.1.3、 参数内容
Hive:
(1)set hive.auto.convert.join=true;(是否自动转化成Map Join)
(2)set hive.map.aggr=true;(用于控制负载均衡,顶层的聚合操作放在Map阶段执行从而减轻清洗阶段数据传输和Reduce阶段的执行时间,提升总体性能,该设置会消耗更多的内存)
(3)set hive.groupby.skewindata=true;(用于控制负载均衡,当数据出现倾斜时,如果该变量设置为true,那么Hive会自动进行负载均衡)
(4)set hive.merge.mapfiles=true;(用于hive引擎合并小文件使用)
(5)set mapreduce.map.memory.mb=4096;(设置Map内存大小,解决Memory占用过大/小)
(6)set mapreduce.reduce.memory.mb=4096;(设置Reduce内存大小,解决Memory占用过大/小)
(7)set hive.exec.dynamic.partition.mode=nonstrict;(动态分区开启)
Spark:
(1)set spark.sql.legacy.parquet.datetimeRebaseModeInRead=LEGACY;(用于spark3中字段类型不匹配(例如datetime无法转换成date),消除sql中时间歧义,将Spark.sqlLEGACY.timeparserpolicy设置为LEGACY来恢复Spark 3.0之前的状态来转化)
(2)set spark.sql.adaptive.enabled=true;(是否开启调整Partition功能,如果开启,spark.sql.shuffe.partitions设置的Partition可能会被合并到一个Reducer里运行。平台默认开启,同时强烈建议开启。理由:更好利用单个Executor的性能,还能缓解小文件问题)
(3)set spark.sql.hive.convertInsertingPartitionedTable=false;(解决数据无法同步Impala问题,使用Spark3引擎必填)
(4)set spark.sqlfinalStage.adaptive.advisoryPartitionSizelnBytes=2048M; (Spark小文件合并)
5.2、 数据表合规治理
存在问题:数据表难用,想要的指标都不知道存放在哪个数据表、数据表明明缺少规范不易读
思路:从简单的开始,数据标准重制定 --> 无用/临时数据表下线 --> 应用指标公共下沉复用 --> 解决ODS穿透问题 --> 烟囱数据表重构及下线 --> 元数据非合规数据表(包括元数据字段信息)修改
治理过程
1、数据标准重制定
重新制定元数据规范(表名、字段名、字段数据格式等),用于新数据表建设中规范内容(还要保证元数据的清晰包括数据表使用说明、存储标准、数据表负责人注明、字段类型统一等)
数据域和主题域的划分
主题域:从业务视角自上而下分析,从整体业务环节中升华出来大的专项分析模块,结合对接的业务范围和行业形态从更高的视角去洞察整个业务流程
数据域:从数据视角从下而上搭建,对每个业务环境进行切割划分,形成不同环节的数据集,组装为完整的业务流程
2、 无用/临时数据表下线
根据表血缘对线上长期无用表、下游无血缘且空跑数据表、临时表进行扫描及下线,降低无用存储及计算损耗
元数据信息采集:使用第三方工具ApacheAmbari、Apache Atlas 等对hive元数据信息手机,并后续存储在数据表中存放,并对表进行重要等级打标,可以使用DataHub开源数据中心(The #1 Open Source Metadata Platform | DataHub)
3、应用指标公共下沉复用
4、解决ODS穿透问题
使用血缘图找到跨层引用表的问题,并对这些数据表按照模型5要素(数据域、颗粒度、度量、维度、事实表类型)构建CDM(DWD和DWS)层,并严重引用新的DWD表的质量情况
5、烟囱数据表重构/下线
整合相似场景下数据表一段内容到一个或多个数据表中,提升数据表易用性,从而避免由内容不足而导致的相互依赖和任务链路延长等问题
6、元数据非合规数据表重构/修改
对于非合规数据表元数据修改,要同时修改下游表名依赖及代码中字段引用信息,避免线上故障发生
7、数据表合规后续维护
可以从数据表价值(被引用次数、查询次数、被收藏次数)、数据表元数据规范(按新制定规范去检测打分)
思考
对于数据表合规治理是持续性的工作,数仓侧无法保障每个数据表就一定是合规的、易用的,要把数据表治理常态化,强制规范化从而减少问题发生。
对于本次遇到治理困难为部门之间协调配合问题其实在事后自己也有一个复盘思考,我觉得治理工作配合要从3个点出发:
(1)让下游配合其实最重要的是调动他们积极性,因为数据表治理对于下游来说可有可无,没有你数据表治理线上任务依日在跑着,数仓只是修改了数据表内容,保障了易用性,可能对于下游来说毫无感知,所以可以从下游使用数据中的痛点去沟通,在优先业务支持的同时给予时间设计数据表内容,共同维护好数据标准。
(2)除了这些还可以加一些奖惩措施活动,让下游觉得配合是有价值的,例如通过红黑榜定期也给他们发送邮件或者信息,并开展简单的培训,让下游具备治理的意识,同时在他们自助治理后提供激励。
(3)如果治理在周边部门起到了效果,可以做更大的推进作用,比如我们在和下游一起做治理并取到了效果,可以发治送理效果月报/周报 发送全部门,让其他人也有感知,并定期分享自己治理心得与其他部门数据部沟通,提升数据部在公司的影响力。
数据标准(附录)
1.数据表命名规范
ODS层(接入层)
ods_-{业务数据库名}_{业务数据表名}(可以在结尾补充增量或全量情况,或者在元数据侧补充)
DWD层(明细层)
dwd_{一级数据域)_{(二级数据域}_{三级数据域)_{业务过程(不清楚或没有写detail)}存储策略(df/di,df为全量数据,di为增量数据))
DWS层(汇总层)
dws_{一级数据域)_{二级数据域)_{三级数据域)_{颗粒度)(例如员工/部门)_{业务过程}_{周期粒度}(例如近30天写30d、90天写3m)
ADS层(应用层)
ads_{应用主题/应用场景}_{颗粒度}(例如买家/卖家)_{业务过程}_{调度周期}(例如1天调度一次写1d)
DIM表(维度表)
dim_{维度定义}_{更新周期(可不添加)}(例如日期写date)
TMP表(临时表)
tmp_{表名}_{临时表编号)
VIEW(视图)
{表名}_view
备份表:.
{表名}_bak
2.数据表命名词根
(1)存储策略
日全量。df:
。di:日增量
。hf:小时全量
。hi:小时增量
。mf:月全量
。mi:月增量
。wf:周全量
。wi:周增量
(2)颗粒度
。buyer:买家
。seller:卖家
。user:用户
。emp:员工
。order:订单
(3)统计周期
·1d:近一天指标统计
。1m:近一月指标统计
。1y:近一年指标统计
。3m:近三个月指标统计
。6m:近六个月指标统计
·nd:近n天指标统计(无法确定具体天可用nd替代)
。td:历史累计
(4)调度周期
。1d:天调度
。1m:月调度
·1y:小时调度
3.字段命名规范
是否某某类型用户,字段命名规范:is_(内容)。
。枚举值类型字段命名规范:xxx_type
时间戳类型字段命名规范:xxx_date,xxx_time.
周期指标命名:{内容}_{时间描述}(如最近一次lst1,最近两次lst2,历史his,最近第.
次last2nd date)
百分比命名:{内容}_rate
·数值类型(整型)命名:{内容}_cnt_{周期}(周期看情况添加)
。数值类型(小数)金额命名:{内容}_amt_{周期}(周期看情况添加)
4.字段类型规范
文本:String。
。日期:String
。整数:Bigint
。小数:高精度用Decimal、正常使用Double
。枚举值:单枚举-'Y/'N'用String、多枚举用String
。各类:IDString
5.数据表中其他元数据规范
数据表负责人(0wner);.
数据表中文名及使用说明;。
·每个开发字段中文名(中文名需要包含该字段内容,例如是否为某某类型用户,需要写出包含内容(Y/N))
数据表的颗粒度;
数据表的主键或联合主键;
6.数据表中存储周期规范
·ODS层1年
。DWD层3-5年
。DWS层10年(部分可永久)
。ADS层10年(部分可永久)
。DIM层3-5年(部分可永久)
数据表分区建议最多2级分区,超过2级分区会造成数据长周期存储问题,1级分区为业务日期,2级根据业务场景设置。
5.3、 小文件治理
小文件如何产生的:
日常任务及动态分区插入数据(使用的Spark2 MapReduce引擎),产生大量的小文件,从而导致Map数量剧增
Reduce数量越多,小文件也越多(Reduce的个数和输出文件是对应的)
数据源本身就包含大量的小文件,Api、Kafka等
实时数据落Hive也会产生大量小文件
小文件问题的影响:
从Hive的角度看,小文件会开很多Map,一个Map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能
在HDFS中,每个小文件对象约占150Byte,如果小文件过多会占用大量内存,会直接影响NameNode性能,相对的如果HDFS读写小文件也回更加耗时,并与对应的DataNode获取元信息,并与对应的DataNode建立连接,如果NameNode在宕机中恢复,也需要更多的时间从元数据文件中加载
占用HDFS存储
解决方案
计算引擎使用spark3合并小文件(AQE特新自动合并较小的分区)
减少reduce的数量(计算引擎为Hive,可以使用参数进行控制)
DIstribute By Rand():控制分区中数据量,使得sparkSQL的执行计划中多一个shuffle,用于代码结尾(用来控制Map输出结果的分发,即Map端如何拆分数据给Reduce端。会根据Reduce的个数进行数据分发,默认采用hash算法,当Distribute by后面跟的列是Rand()时,即保证每个分区的数据量基本一致)
在数据传输任务后再做一个清洗任务(本质也是回刷分区合并小文件任务,去处理小文件保障从数据源开始小文件不向下游流去)
实时任务传输hive后采用每天调度任务来合并小文件
通过参数方式合并小文件
小文件处理
使用spark3进行动态刷新代码
重建表:如果无分区的表可以考虑直接将表删掉,再重建,使用spark3跑数据
治理问题点
遇到问题点1:
用spark3+动态分区合并小文件发现一个问题,如果我给分区固定日期 小文件原30个会合并1个,如果用动态分区的话刷完可能部分分区还是30个,后面问了数据平台大佬,大佬说没加spark.sql.optimizer.insertRepartitionBeforeWritelfNoShuffe.enabled=true ,原理是把inset select from这种简单的没shffule的合并小文件关掉的,动态分区写入和静态分区写入时候创建文件的姿势确实是不一样的
遇到问题点2:
实时数据传入hive小文件居多也需要合并,这里我们可以把历史数据通过spark3+动态分区先回刷 后续建一个每日spark3的调度任务刷t-1的小文件即可
遇到问题点3:
使用Spark3刷小文件时候如果用到Impala同学记住一定要加这个参数,解决Spark3刷新数据后无法同步到Imapla
5.4、 数据质量治理
1、基线告警问题处理:
产出延迟任务,通过血缘找到实际运行时间最长的任务,然后看怎么处理,是否可以调整依赖为t+2
链路过长导致的,找到所有依赖表,看是否存在指标反复加工计算或数据表反复依赖(ADS依赖ADS再依赖ADS)把指标进行公共沉淀到DWS,形成公共指标复用
高优先级基线调度时间重叠,把重叠调度时间提前,进行削峰填平,充分使用其他闲置时间资源
任务失败导致告警,完善评审机制
跨部门数据表依赖未产出告警,强依赖需拉齐夜间值班群,非强依赖可以临时取消依赖修改代码取最新分区
夜间值班手册,列出最近常出现问题,给出解决步骤
2、DQC整治
对于长期未告警DQC可以采取下线,时常告警DQC,例如之前课程讲过的用户基础信息、业务流程数据丢失,同时也对这块数据配置了业务DQC,导致时常告警,此时需要暂停DQC,通过血缘进行数据丢失溯源,如果是源头数据问题需要起修复。
3、数据开发/校验流程重制定
需求评估:从接到需求开始进行评估,除了对接口径需求背景外,此时可以添加一道流程,即让业务阐述需求带来的价值和数据增长,从而判断需求要不要接,从源头降低无效需求出现。
模型设计:在接入需求后按照模型5要素(数据域、颗粒度、维度、度量、实时)完成数据模型设计,并进行需求模型评审(开个简单的会5分钟评审沟通)。
(1)设计是否满足5要素(颗粒度(买家 OR 卖家这种)清晰、主题域有划分、维度属性退化到模型、度量值是否合理(DWD存明细不做聚合操作)、全量增量是否有做区分)
(2)内容是否添加插入数据日期及时间字段、内容是否将code转化、脱敏字段是否处理(看业务场景不-定要做)
(3)表/字段命名是否按照规范来命名、字段属性是否按照规范、字段Comment是否清晰(要精确到内容例如xxx date Comment xxx日期类型(yyyy-mm-dd))。
(4)任务owner是否标清楚、模型对应业务过程是否描述清楚、颗粒度是否有描述、表使用说明是否添加(可不写)、表生命周期/存储格式是否按照标准、核心表是否有重点标注。
数据校验:重点一定是再次确认好业务口径
(1)通过平台工具或自测去看看数据分布情况最大值最小值空值也就是数据探查,这里可以通过SQL自查例如探查空选择where is nul,通过数据比对平台或用开发环境数据表与线上环境对比检查是否数据膨胀等问题。
(2)采用抽样数据写从ODS 取出来逻辑与自己开发的抽样指标数据比对保障开发内容能对产,从ODS取数写SQL去关联清洗,通过算子加工与最后呈现指标去比对,但这里数据只能完成80%准确度,剩余20%需要与数据分析同学进行联调。
Code Review:除了代码合理性外,需要检查依赖数据校验报告、任务在开发环境中运行情况是否超时等
5.5、 存储资源治理
下线无用数据表节省存储 --> 存储格式季压缩格式配置 --> 分区生命周期优化 --> 根据业务情景实现节省存储(模型优化)
无用表,可以定位最近30天/60天数据表被检索次数、饮用次数、读取次数
表存储格式+压缩:存储使用orc或者parquet,如果使用spark3跑任务,建议使用parquet格式,Spark SQL可以更有效的进行数据的调度和执行。(可以优化执行路径,减少stage的执行消耗,并降低CPU的消耗,还可以利用下推过滤器等技术来进一步减少磁盘I/O和内存的占用);压缩格式选择snappy or gzip,snappy适用于需要快速压缩和解压的场景,gzip偏向于高压缩率,使用存储归档或传输大量数据以节省存储空间或带宽
分区生命周期治理:1要确认表分区是否永久,2要确认表分区范围是否合理
分区优化及增全量修改:二级分区(日期+场景/日期+规则)过大的话可以直接拆表按二级分区内容再做分类;如之前存储采用全量分区存放,虽然提升便捷但分区不能这么支撑,所以需要全改增