当前位置: 首页 > article >正文

hadoop大数据平台操作笔记(下)

–接hive数据库的操作

函数的操作

聚合函数

函数名说明
sun()求和
max()最大值
min()最小值
count()统计
avg()平均值

单行函数

数字类型
函数名说明
abs()绝对值
ceil()进一取整
floor()去尾取整
round()四舍五入
pow()幂运算
rand()随机值,获取0~1的小数
percentile(字段,百分比)获取字段的任意比例,例如中位数是0.5
字符串类型
函数名说明
substr()字符串截取
instr()字符串查找
replace()字符串整体替换
translate()按顺序依次替换
lower()/upper()/initcap()大小写转换
l/rtrim()清除两边空格
l/rpad()左右拼接
length()长度
concat()字符串拼接,效果相当于oracle中的“||”
concat_ws(‘连接符’,字段)指定连接符的拼接
split()字符串分割,将字符串拆分成数组
regexp_replace(column,re,new)通过正则表达式进行匹配然后替换,oracle通用
get_json_object(column,path)从一个json数据中,提取需要的内容;json数据中字段的读取要使用 $. 的格式
时间日期类型
函数名说明
months_between()求两日期的间隔月份
months_add()增加\减少月份
last_day()求当月最后一天的日期
next_day()求下一个星期几的日期
current_date当前系统时间(年月日)
current_timestamp当前系统时间(年月日时分秒毫秒)
unix_timestamp()返回系统时间戳
from_unixtime(timestamp,column)根据时间戳计算相应的元素(年月日等)
date_add(日期,天数)日期的计算
datediff(date1,date2)计算日期的间隔(天数)
year()/month()/day()/hour()/minute()/second()时间维度的提取
数据转换操作

cast(数据类型 as 新的数据类型)
拼接时,可以用此函数将所有数据都转换为字符串类型

select cast(1.666 as int);
select concat_ws('-',cast(empno as string),ename,job) from emp;
数组类型
函数名说明
array()将多个数据组成一个数组
sort_array()对数组内容进行排序
collect_list()将一个列的所有数据,拼接为一个数组
collect_set()将一个列的所有数据,拼接为一个数组,并对数组内容进行去重
explode()炸裂函数,将一个数组拆分成一个字段显示。explode函数默认只能显示被拆分的字段,不能显示除了拆分字段外的其他字段的信息,如果一定要显示其他信息,需要用到hive中虚拟视图的句型:select 其他字段,被拆分字段的别名 from 表名 lateral view explode(拆分字段) v as 被拆分字段的别名

数组还可以和字符串里面的concat_ws()函数拼接起来使用(当数组的内容全部都是字符串的时候):

select concat_ws('/',addr) from test15;
映射类型
函数名说明
map()将成对的数据当成映射数据进行存储(参数必须为偶数个)
map_key()/map_values()查看map数据的关键字和值的操作

explode和map的结合

create table sc(
userid int,
score map<string,int>
)row format delimited fields terminated by ','
collection items terminated by '/'
map keys terminated by '-';

1,math-88/chinese-99
2,math-76/chinese-92/english-60

load data local inpath '/root/data04.txt' into table sc;

用explode进行拆分,取别名时多个字段一定要括起来
select explode(score) as (course,degree) from sc;

用虚拟视图拆分map的多个字段,别名不能用括号括起来
select userid,course,degree from sc lateral view explode(score) v as course,degree;

size():map与array两个类型通用的函数,查看数据中的元素个数

select userid,size(score) from test16;
select userid,size(addr) from test15;
和逻辑判断相关的函数以及语法
函数名说明
nvl()判断字段是否为空,为空赋默认值
case when对各种不同判断进行语句的执行
if(表达式,判断为真,判断为假)通过判断表达式进行不同的操作,if语句可以嵌套使用
select ename, comm,
if(comm is null or comm=0,'没有奖金','有奖金')
from emp;

判断每个人的工资等级
select ename,sal,
if(sal<2000,'C', if(sal<3000,'B','A'))
from emp;
窗口函数
偏移函数

lag()
lead()

排名函数

row_number(): 排名不重复且连续
rank(): 排名重复且不连续
dense_rank(): 排名重复且连续

自定义函数的使用

自定义函数分为UDF、UDAF、UDTF三个部分:
UDF输入一行数据,输出一行数据;
UDAF输入多行数据,返回一行或者多行;
UDTF输入一行数据,输出多行数据。

在Hive的统计过程中,经常会遇到数据量很大的表格,使用group by之类的统计语句运行的时间很长或者造成性能问题之类的,或者数据本身很复杂的,例如嵌套层次很多的json之类的,我们就会使用自定义函数来辅助进行数据的统计和计算。
hive可以结合python或者java进行自定义函数的编写。
在这里插入图片描述
UDF的例子:
例如下面一个表格:

create table test17(
userid int,
card string
)row format delimited fields terminated by ',';

1,756464199809158463
2,85734837
3,87459219900808844x
4,98473420000205859X
5,847836199812148473
load data local inpath '/root/17.txt' into table test17;

接着我们使用Python来处理数据:

#coding=utf-8

# 导入控制台拦截的模块,是我们的系统模块
import sys

# 保存和捕获控制台的数据,line变量是每一行数据
for line in sys.stdin:
    li=line.strip().split("\t")        # 首先先清除一下获取的行数据里面左右多余的空格,然后按照tab切割字符串
    userid=li[0]
    card=li[1]
    status=''
    if len(card)!=18 or not card[-1].isdigit() and card[-1] not in ('x','X'):
        status='不是正确的身份证'
    else:
        if int(card[-2])%2==1:
            status='男生'
        else:
            status='女生'
    print(str(userid)+'\t'+card+'\t'+status)

将这个python文件上传到Linux的服务器中,并且要在hive数据库中添加这个python文件并且使用python运行和读取表格数据:

添加python文件到hive数据库中
add file /root/checkCard.py;

运行sql语句
select transform(userid,card)
using 'python checkCard.py'
as (userid,card,status)
from test17;

UDAF的例子:
读取test14的表格,计算apple和pear各自的平均价格:

#coding=utf-8

import sys,json

apple=[]
pear=[]

for line in sys.stdin:
    #拿到每一行的数据,根据\t切割字段
    li=line.strip().split("\t")
    #需要的数据在列表的第二个列
    datas=li[1]
    #将json数据转换成字典的格式
    dict_datas=json.loads(datas)
    #  {"dt":"2020-1-4","list":[{"fruit":"apple","price":8},{"fruit":"pear","price":7.5}]}
    for d in dict_datas['list']:
        if d['fruit']=='apple':
            apple.append(d['price'])
        elif d['fruit']=='pear':
            pear.append(d['price'])

s1=0
for a in apple:
    s1+=a
s1=s1/len(apple)

s2=0
for b in pear:
    s2+=b
s2=s2/len(pear)

print('apple'+'\t'+str(s1))
print('pear'+'\t'+str(s2))
add file /root/avgPrice.py;

select transform(id,datas)
using 'python avgPrice.py'
as (fruit, avgPrice)
from test14;
建表语句:
create table lx001(
month int,
sale int
)row format delimited fields terminated by ',';

插入数据:
1,100
2,\N
3,150
4,\N
5,\N
6,90
7,99
8,\N

题目:
当某一行的sale值为空时,用上一个离它最近的有值数据进行替换
select month, 
max(sale) over(partition by s2) sales
from (
select month,sale, 
sum(s) over(order by month) s2
from (
select month, sale,
case when sale is null then 0 else 1 end s
from lx001) t) t2;

如果用oracle写这个练习:
select month, 
last_value(sale ignore nulls) over(order by month) sales
from lx001;

hive数据库的其它语句

没有dcl:权限控制和tcl:事务控制

DDL

create:
create [external] [temporary] table 表名(
列名 数据类型
) [partitioned by (分区字段 分区类型)]
[clustered by (分桶字段) into xx buckets]
[row format delimited fields terminated by ‘’
collection items terminated by ‘’
map keys terminated by ‘’]
[tblproperties (属性名=属性值)]
[stored as 存储类型]
[location ‘hdfs文件夹存储位置’];

create table A like B;

create table A as select * from B;

create view A as select * from B;

drop:
drop table A;

truncate:
truncate table A;

alter:
分区:
alter table A add partition (分区字段=分区值) [location ‘hdfs的文件夹位置’];
alter table A drop partition (分区字段=分区值);

属性:
alter table A set tblproperties (‘属性名字’=‘属性值’);

表格结构:
– 新增字段
alter table A add columns(列名 数据类型); 可以一次新增多个字段

– 修改字段的数据类型和字段名字
alter table A change 已有列名 新的列名 新的数据类型;

– 修改表名
alter table A rename to B;

DML

insert:
insert into A (列名) values(值);
insert into A (列名) select 查询;
insert overwrite table A [partition(分区信息)] select 查询;
insert into table A [partition(分区信息)] select 查询;

特殊语句
查询一次表格,将查询的表格的结果分别插入到N个不同的表格中去。
from 表A
insert overwrite table 表B select 字段 where 筛选
insert overwrite table 表C select 字段 where 筛选
insert overwrite table 表D select 字段 where 筛选
… ;
update 和 delete:
只有开启了事务的orc表格才能使用update和delete的语句,其他的表格要实现数据修改的话,只能使用 insert overwrite table 进行代替。

DQL

关键字执行顺序
select 字段4
from 表名1
where 筛选2
group by 分组3
having 聚合之后的筛选5
order by 排序6
limit 分页7

limit 分页
select * from A limit 3; 查询表格的前三行数据
limit 开始位置, 连续行数
limit 3 其实就是 limit 0,3 如果是从第一行开始取值,那么这个0可以省略。
如果要取5-8的数据,写成 limit 4,4,在数据库中只有limit的序号从0开始。

Hive数据库的优化(面试重点)

执行计划

explain select 查询语句;
在Hive的执行计划中看不到资源的消耗和语句的执行时间,所以sql效率的比较只能通过执行计划日志的长度来比较,谁运行的步骤更多,谁消耗的时间就更多。在hive中没有办法很直观的去查看语句的运行效率的。(可以通过执行计划检查sql语句是否正确)

具体的sql优化的步骤和出现问题的原因

在运行过程中,先查看map和reduce运行的日志。
map是数据读取阶段,reduce是数据计算和汇总阶段
出现性能的问题,基本上都是在reduce阶段。

  1. 如果reduce卡在中间的位置不动,一般都是服务器资源不足导致运行不下去了。
    解决办法:
    可以将很大的表格进行拆分,拆成多个表格分别运行
    例如 select deptno,sum(sal) s from emp group by deptno; 这个表格有10亿行数据,运行卡在了中间,将这个表格拆分,例如拆分三个表格先试着运行看看。

    首先: 在Linux中创建一个xxx.hql的文件:
    使用 hive -f xxx.hql运行文件

use bigdata;

set hive.exec.mode.local.auto=true;

create table emp_a like emp;
create table emp_b like emp;
create table emp_c like emp;

from emp
insert overwrite table emp_a select * limit 0,5
insert overwrite table emp_b select * limit 4,5
insert overwrite table emp_c select * limit 10,4
;

create table tmp_a as
select deptno,sum(sal) s from emp_a group by deptno;

create table tmp_b as
select deptno,sum(sal) s from emp_b group by deptno;

create table tmp_c as
select deptno,sum(sal) s from emp_c group by deptno;

select deptno,sum(s) s from (
select * from tmp_a
union all
select * from tmp_b
union all
select * from tmp_c ) t
group by deptno;

drop table emp_a;
drop table emp_b;
drop table emp_c;
drop table tmp_a;
drop table tmp_b;
drop table tmp_c;
  1. 如果reduce卡在了99%或100%的位置,这种现象被称为数据倾斜,数据倾斜有两个主要的原因,第一个是group by统计时,key值分布不均匀,即各个分组的数据量不均匀,第二个原因时表格的联合查询出现了问题。
    • key值分布不均匀:
      • 当分组的字段有大量空值时,所有的空值会被放在一个组中被处理,这是就要对扣空值进行打散的操作,把空值替换成带标志的随机值(字符串+随机数)。
			nvl(列名, concat('random', substr(rand(),-2)%3))--分成三组
   		 在emp表中,comm为空的记录有十条,可以将这些记录分成三组,分别统计每组的记录,这样就实现了将空值打散的效果。
select count(1) c1 from emp where comm is not null group by comm
union all
select  sum(c) c1 from(
select comm, count(1) c from
(select nvl(comm,concat('random',substr(rand(),-2)%3)) comm from emp where comm is null) t
group by comm) t1;
  • 字段没有什么空值,就是有的组数据多,有的组数据少,例如两个表格,一个是店铺的维度表,一个是销售商品的事实表,现在对这两个表格进行拼接和分组统计,计算每个店铺订单的销售情况。
    • a数据是10行,b数据是10000行,倾斜的差距是9910,将表格进行拆分,拆分3个表格,那么数据量有可能就是a是3行,b是3333行,倾斜的差距是3330,表格拆分越多,不同key值之间数据的倾斜的差距就越小,所以这种场景也可以通过拆分表格的方式来操作。
    • 通过设置分桶对group by进行优化;
    • 通过设置参数对分组进行优化:set hive.groupby.skewindata;负载均衡开关,开启之后,当有的reduce完成而有的没完成,完成的会去帮没完成的,实现负载均衡。
  1. join阶段产生的数据倾斜
    • 在写sql的时候,将数据量小的表格写在join左边,大的写在join右边,小表驱动大表。
    • 使用分桶表进行表格的联合查询,用分桶减少表连接的筛选次数。
    • 使用mapjoin的优化器来操作连接表:select /*+ 优化器内容 */
	select  /*+ mapjoin(a,b) */ 字段  from  a join b on a.xx=b.yy;

mapjoin将小的表格先读取到内存中,用内存里面的数据和硬盘中的大表格数据进行匹配。

    • 对大的表格进行拆分,用小的表格和拆分后的表格分别连接,最后union all合并

小文件导致服务器数据读取的性能出现问题

出现小文件过多的原因
  1. 分桶的操作会让大的表格变成N个小文件
  2. 从业务系统中读取的数据,由很多小文件组成
  3. 数据仓库里面有一套实时数据流的业务系统,例如每隔五分钟就会上传一个服务器数据到数据仓库中,会产生大量的小文件。
导致的问题
  1. 每个文件都会启动一个map进程来读取它,小文件过多的话,map进程就会过多,每个map的启动都是需要消耗时间和服务器资源的。
  2. NameNode中会保存数据和文件的元数据信息,加入一个文件元数据是1kb,默认保存三份,存储消耗3kb,这个文件被拆成100份的话,存储就要消耗300kb的大小。小文件会额外占用很多的NameNode的存储空间。
解决的办法

通过设置hive给的参数去调整优化,例如有合并文件的参数,合并计算量大小的参数,合并服务器节点的参数,合并机架数据节点的参数等等。

--每个Map最大输入大小,决定合并后的文件数
set mapred.max.split.size=256000000;
--一个节点上split的至少的大小,决定了多个data node上的文件是否需要合并
set mapred.min.split.size.per.node=100000000;
--一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
set mapred.min.split.size.per.rack=100000000;
--执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.merge.mapfiles=true#在Map-only的任务结束时合并小文件
set hive.merge.mapredfiles=true#在Map-Reduce的任务结束时合并小文件
set hive.merge.size.per.task=256*1000*1000#合并文件的大小
set hive.merge.smallfiles.avgsize=16000000#当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
set hive.auto.convert.join=false;
set mapreduce.map.memory.mb=8192;
set mapreduce.reduce.memory.mb=8192;
set mapred.child.java.opts=-Xmx1536m;
set mapreduce.job.reduce.slowstart.completedmaps=0.8;
set mapred.reduce.tasks=20;  --此处设置了reducer的个数为20个则会生成20个文件
set hive.exec.parallel=true;

http://www.kler.cn/a/320733.html

相关文章:

  • C++ 内联函数
  • 【MySQL】优化方向+表连接
  • AI大模型如何重塑软件开发流程:智能化与自动化的新时代
  • LlamaIndex+本地部署InternLM实践
  • [ACTF2020]Upload 1--详细解析
  • 前端埋点、监控
  • C++/Qt 集成 AutoHotkey
  • 这本书简直就是自然语言处理学习者的福音!
  • 408算法题leetcode--第14天
  • Git 版本控制--git restore和git reset
  • 大数据Flink(一百二十三):五分钟上手Flink MySQL连接器
  • SQLServer TOP(Transact-SQL)
  • 【C++类的设计】题目(二):设计圆柱Column类
  • 【NLP】循环神经网络--RNN学习.day3
  • Rust编程的if选择语句
  • HTML中的表单(超详细)
  • pycirclize python包画circos环形图
  • .net 未能加载文件或程序集“System.Diagnostics.DiagnosticSource, Version=6.0.0.1 解决方案
  • OpenCV运动分析和目标跟踪(4)创建汉宁窗函数createHanningWindow()的使用
  • C++速通LeetCode中等第20题-随机链表的复制(三步简单图解)
  • 优化算法(四)—蚁群算法(附MATLAB程序)
  • spark的stage划分的原理
  • 如何完成一个每天自定义的主题,然后提出该主题的100个问题,然后自动完成。使用playwright
  • Chroma 向量数据入门
  • zico2打靶记录
  • 结合人工智能,大数据,物联网等主流技术实现业务流程的闭环整合的名厨亮灶开源了