hadoop大数据平台操作笔记(下)
–接hive数据库的操作
函数的操作
聚合函数
函数名 | 说明 |
---|---|
sun() | 求和 |
max() | 最大值 |
min() | 最小值 |
count() | 统计 |
avg() | 平均值 |
单行函数
数字类型
函数名 | 说明 |
---|---|
abs() | 绝对值 |
ceil() | 进一取整 |
floor() | 去尾取整 |
round() | 四舍五入 |
pow() | 幂运算 |
rand() | 随机值,获取0~1的小数 |
percentile(字段,百分比) | 获取字段的任意比例,例如中位数是0.5 |
字符串类型
函数名 | 说明 |
---|---|
substr() | 字符串截取 |
instr() | 字符串查找 |
replace() | 字符串整体替换 |
translate() | 按顺序依次替换 |
lower()/upper()/initcap() | 大小写转换 |
l/rtrim() | 清除两边空格 |
l/rpad() | 左右拼接 |
length() | 长度 |
concat() | 字符串拼接,效果相当于oracle中的“||” |
concat_ws(‘连接符’,字段) | 指定连接符的拼接 |
split() | 字符串分割,将字符串拆分成数组 |
regexp_replace(column,re,new) | 通过正则表达式进行匹配然后替换,oracle通用 |
get_json_object(column,path) | 从一个json数据中,提取需要的内容;json数据中字段的读取要使用 $. 的格式 |
时间日期类型
函数名 | 说明 |
---|---|
months_between() | 求两日期的间隔月份 |
months_add() | 增加\减少月份 |
last_day() | 求当月最后一天的日期 |
next_day() | 求下一个星期几的日期 |
current_date | 当前系统时间(年月日) |
current_timestamp | 当前系统时间(年月日时分秒毫秒) |
unix_timestamp() | 返回系统时间戳 |
from_unixtime(timestamp,column) | 根据时间戳计算相应的元素(年月日等) |
date_add(日期,天数) | 日期的计算 |
datediff(date1,date2) | 计算日期的间隔(天数) |
year()/month()/day()/hour()/minute()/second() | 时间维度的提取 |
数据转换操作
cast(数据类型 as 新的数据类型)
拼接时,可以用此函数将所有数据都转换为字符串类型
select cast(1.666 as int);
select concat_ws('-',cast(empno as string),ename,job) from emp;
数组类型
函数名 | 说明 |
---|---|
array() | 将多个数据组成一个数组 |
sort_array() | 对数组内容进行排序 |
collect_list() | 将一个列的所有数据,拼接为一个数组 |
collect_set() | 将一个列的所有数据,拼接为一个数组,并对数组内容进行去重 |
explode() | 炸裂函数,将一个数组拆分成一个字段显示。explode函数默认只能显示被拆分的字段,不能显示除了拆分字段外的其他字段的信息,如果一定要显示其他信息,需要用到hive中虚拟视图的句型:select 其他字段,被拆分字段的别名 from 表名 lateral view explode(拆分字段) v as 被拆分字段的别名 |
数组还可以和字符串里面的concat_ws()函数拼接起来使用(当数组的内容全部都是字符串的时候):
select concat_ws('/',addr) from test15;
映射类型
函数名 | 说明 |
---|---|
map() | 将成对的数据当成映射数据进行存储(参数必须为偶数个) |
map_key()/map_values() | 查看map数据的关键字和值的操作 |
explode和map的结合
create table sc(
userid int,
score map<string,int>
)row format delimited fields terminated by ','
collection items terminated by '/'
map keys terminated by '-';
1,math-88/chinese-99
2,math-76/chinese-92/english-60
load data local inpath '/root/data04.txt' into table sc;
用explode进行拆分,取别名时多个字段一定要括起来
select explode(score) as (course,degree) from sc;
用虚拟视图拆分map的多个字段,别名不能用括号括起来
select userid,course,degree from sc lateral view explode(score) v as course,degree;
size():map与array两个类型通用的函数,查看数据中的元素个数
select userid,size(score) from test16;
select userid,size(addr) from test15;
和逻辑判断相关的函数以及语法
函数名 | 说明 |
---|---|
nvl() | 判断字段是否为空,为空赋默认值 |
case when | 对各种不同判断进行语句的执行 |
if(表达式,判断为真,判断为假) | 通过判断表达式进行不同的操作,if语句可以嵌套使用 |
select ename, comm,
if(comm is null or comm=0,'没有奖金','有奖金')
from emp;
判断每个人的工资等级
select ename,sal,
if(sal<2000,'C', if(sal<3000,'B','A'))
from emp;
窗口函数
偏移函数
lag()
lead()
排名函数
row_number(): 排名不重复且连续
rank(): 排名重复且不连续
dense_rank(): 排名重复且连续
自定义函数的使用
自定义函数分为UDF、UDAF、UDTF三个部分:
UDF输入一行数据,输出一行数据;
UDAF输入多行数据,返回一行或者多行;
UDTF输入一行数据,输出多行数据。
在Hive的统计过程中,经常会遇到数据量很大的表格,使用group by之类的统计语句运行的时间很长或者造成性能问题之类的,或者数据本身很复杂的,例如嵌套层次很多的json之类的,我们就会使用自定义函数来辅助进行数据的统计和计算。
hive可以结合python或者java进行自定义函数的编写。
UDF的例子:
例如下面一个表格:
create table test17(
userid int,
card string
)row format delimited fields terminated by ',';
1,756464199809158463
2,85734837
3,87459219900808844x
4,98473420000205859X
5,847836199812148473
load data local inpath '/root/17.txt' into table test17;
接着我们使用Python来处理数据:
#coding=utf-8
# 导入控制台拦截的模块,是我们的系统模块
import sys
# 保存和捕获控制台的数据,line变量是每一行数据
for line in sys.stdin:
li=line.strip().split("\t") # 首先先清除一下获取的行数据里面左右多余的空格,然后按照tab切割字符串
userid=li[0]
card=li[1]
status=''
if len(card)!=18 or not card[-1].isdigit() and card[-1] not in ('x','X'):
status='不是正确的身份证'
else:
if int(card[-2])%2==1:
status='男生'
else:
status='女生'
print(str(userid)+'\t'+card+'\t'+status)
将这个python文件上传到Linux的服务器中,并且要在hive数据库中添加这个python文件并且使用python运行和读取表格数据:
添加python文件到hive数据库中
add file /root/checkCard.py;
运行sql语句
select transform(userid,card)
using 'python checkCard.py'
as (userid,card,status)
from test17;
UDAF的例子:
读取test14的表格,计算apple和pear各自的平均价格:
#coding=utf-8
import sys,json
apple=[]
pear=[]
for line in sys.stdin:
#拿到每一行的数据,根据\t切割字段
li=line.strip().split("\t")
#需要的数据在列表的第二个列
datas=li[1]
#将json数据转换成字典的格式
dict_datas=json.loads(datas)
# {"dt":"2020-1-4","list":[{"fruit":"apple","price":8},{"fruit":"pear","price":7.5}]}
for d in dict_datas['list']:
if d['fruit']=='apple':
apple.append(d['price'])
elif d['fruit']=='pear':
pear.append(d['price'])
s1=0
for a in apple:
s1+=a
s1=s1/len(apple)
s2=0
for b in pear:
s2+=b
s2=s2/len(pear)
print('apple'+'\t'+str(s1))
print('pear'+'\t'+str(s2))
add file /root/avgPrice.py;
select transform(id,datas)
using 'python avgPrice.py'
as (fruit, avgPrice)
from test14;
建表语句:
create table lx001(
month int,
sale int
)row format delimited fields terminated by ',';
插入数据:
1,100
2,\N
3,150
4,\N
5,\N
6,90
7,99
8,\N
题目:
当某一行的sale值为空时,用上一个离它最近的有值数据进行替换
select month,
max(sale) over(partition by s2) sales
from (
select month,sale,
sum(s) over(order by month) s2
from (
select month, sale,
case when sale is null then 0 else 1 end s
from lx001) t) t2;
如果用oracle写这个练习:
select month,
last_value(sale ignore nulls) over(order by month) sales
from lx001;
hive数据库的其它语句
没有dcl:权限控制和tcl:事务控制
DDL
create:
create [external] [temporary] table 表名(
列名 数据类型
) [partitioned by (分区字段 分区类型)]
[clustered by (分桶字段) into xx buckets]
[row format delimited fields terminated by ‘’
collection items terminated by ‘’
map keys terminated by ‘’]
[tblproperties (属性名=属性值)]
[stored as 存储类型]
[location ‘hdfs文件夹存储位置’];
create table A like B;
create table A as select * from B;
create view A as select * from B;
drop:
drop table A;
truncate:
truncate table A;
alter:
分区:
alter table A add partition (分区字段=分区值) [location ‘hdfs的文件夹位置’];
alter table A drop partition (分区字段=分区值);
属性:
alter table A set tblproperties (‘属性名字’=‘属性值’);
表格结构:
– 新增字段
alter table A add columns(列名 数据类型); 可以一次新增多个字段
– 修改字段的数据类型和字段名字
alter table A change 已有列名 新的列名 新的数据类型;
– 修改表名
alter table A rename to B;
DML
insert:
insert into A (列名) values(值);
insert into A (列名) select 查询;
insert overwrite table A [partition(分区信息)] select 查询;
insert into table A [partition(分区信息)] select 查询;
特殊语句
查询一次表格,将查询的表格的结果分别插入到N个不同的表格中去。
from 表A
insert overwrite table 表B select 字段 where 筛选
insert overwrite table 表C select 字段 where 筛选
insert overwrite table 表D select 字段 where 筛选
… ;
update 和 delete:
只有开启了事务的orc表格才能使用update和delete的语句,其他的表格要实现数据修改的话,只能使用 insert overwrite table 进行代替。
DQL
关键字 | 执行顺序 |
---|---|
select 字段 | 4 |
from 表名 | 1 |
where 筛选 | 2 |
group by 分组 | 3 |
having 聚合之后的筛选 | 5 |
order by 排序 | 6 |
limit 分页 | 7 |
limit 分页
select * from A limit 3; 查询表格的前三行数据
limit 开始位置, 连续行数
limit 3 其实就是 limit 0,3 如果是从第一行开始取值,那么这个0可以省略。
如果要取5-8的数据,写成 limit 4,4,在数据库中只有limit的序号从0开始。
Hive数据库的优化(面试重点)
执行计划
explain select 查询语句;
在Hive的执行计划中看不到资源的消耗和语句的执行时间,所以sql效率的比较只能通过执行计划日志的长度来比较,谁运行的步骤更多,谁消耗的时间就更多。在hive中没有办法很直观的去查看语句的运行效率的。(可以通过执行计划检查sql语句是否正确)
具体的sql优化的步骤和出现问题的原因
在运行过程中,先查看map和reduce运行的日志。
map是数据读取阶段,reduce是数据计算和汇总阶段
出现性能的问题,基本上都是在reduce阶段。
-
如果reduce卡在中间的位置不动,一般都是服务器资源不足导致运行不下去了。
解决办法:
可以将很大的表格进行拆分,拆成多个表格分别运行
例如 select deptno,sum(sal) s from emp group by deptno; 这个表格有10亿行数据,运行卡在了中间,将这个表格拆分,例如拆分三个表格先试着运行看看。首先: 在Linux中创建一个xxx.hql的文件:
使用 hive -f xxx.hql运行文件
use bigdata;
set hive.exec.mode.local.auto=true;
create table emp_a like emp;
create table emp_b like emp;
create table emp_c like emp;
from emp
insert overwrite table emp_a select * limit 0,5
insert overwrite table emp_b select * limit 4,5
insert overwrite table emp_c select * limit 10,4
;
create table tmp_a as
select deptno,sum(sal) s from emp_a group by deptno;
create table tmp_b as
select deptno,sum(sal) s from emp_b group by deptno;
create table tmp_c as
select deptno,sum(sal) s from emp_c group by deptno;
select deptno,sum(s) s from (
select * from tmp_a
union all
select * from tmp_b
union all
select * from tmp_c ) t
group by deptno;
drop table emp_a;
drop table emp_b;
drop table emp_c;
drop table tmp_a;
drop table tmp_b;
drop table tmp_c;
- 如果reduce卡在了99%或100%的位置,这种现象被称为数据倾斜,数据倾斜有两个主要的原因,第一个是group by统计时,key值分布不均匀,即各个分组的数据量不均匀,第二个原因时表格的联合查询出现了问题。
- key值分布不均匀:
- 当分组的字段有大量空值时,所有的空值会被放在一个组中被处理,这是就要对扣空值进行打散的操作,把空值替换成带标志的随机值(字符串+随机数)。
- key值分布不均匀:
nvl(列名, concat('random', substr(rand(),-2)%3))--分成三组
在emp表中,comm为空的记录有十条,可以将这些记录分成三组,分别统计每组的记录,这样就实现了将空值打散的效果。
select count(1) c1 from emp where comm is not null group by comm
union all
select sum(c) c1 from(
select comm, count(1) c from
(select nvl(comm,concat('random',substr(rand(),-2)%3)) comm from emp where comm is null) t
group by comm) t1;
- 字段没有什么空值,就是有的组数据多,有的组数据少,例如两个表格,一个是店铺的维度表,一个是销售商品的事实表,现在对这两个表格进行拼接和分组统计,计算每个店铺订单的销售情况。
- a数据是10行,b数据是10000行,倾斜的差距是9910,将表格进行拆分,拆分3个表格,那么数据量有可能就是a是3行,b是3333行,倾斜的差距是3330,表格拆分越多,不同key值之间数据的倾斜的差距就越小,所以这种场景也可以通过拆分表格的方式来操作。
- 通过设置分桶对group by进行优化;
- 通过设置参数对分组进行优化:set hive.groupby.skewindata;负载均衡开关,开启之后,当有的reduce完成而有的没完成,完成的会去帮没完成的,实现负载均衡。
- join阶段产生的数据倾斜
- 在写sql的时候,将数据量小的表格写在join左边,大的写在join右边,小表驱动大表。
- 使用分桶表进行表格的联合查询,用分桶减少表连接的筛选次数。
- 使用mapjoin的优化器来操作连接表:select /*+ 优化器内容 */
select /*+ mapjoin(a,b) */ 字段 from a join b on a.xx=b.yy;
mapjoin将小的表格先读取到内存中,用内存里面的数据和硬盘中的大表格数据进行匹配。
-
- 对大的表格进行拆分,用小的表格和拆分后的表格分别连接,最后union all合并
小文件导致服务器数据读取的性能出现问题
出现小文件过多的原因
- 分桶的操作会让大的表格变成N个小文件
- 从业务系统中读取的数据,由很多小文件组成
- 数据仓库里面有一套实时数据流的业务系统,例如每隔五分钟就会上传一个服务器数据到数据仓库中,会产生大量的小文件。
导致的问题
- 每个文件都会启动一个map进程来读取它,小文件过多的话,map进程就会过多,每个map的启动都是需要消耗时间和服务器资源的。
- NameNode中会保存数据和文件的元数据信息,加入一个文件元数据是1kb,默认保存三份,存储消耗3kb,这个文件被拆成100份的话,存储就要消耗300kb的大小。小文件会额外占用很多的NameNode的存储空间。
解决的办法
通过设置hive给的参数去调整优化,例如有合并文件的参数,合并计算量大小的参数,合并服务器节点的参数,合并机架数据节点的参数等等。
--每个Map最大输入大小,决定合并后的文件数
set mapred.max.split.size=256000000;
--一个节点上split的至少的大小,决定了多个data node上的文件是否需要合并
set mapred.min.split.size.per.node=100000000;
--一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
set mapred.min.split.size.per.rack=100000000;
--执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.merge.mapfiles=true#在Map-only的任务结束时合并小文件
set hive.merge.mapredfiles=true#在Map-Reduce的任务结束时合并小文件
set hive.merge.size.per.task=256*1000*1000#合并文件的大小
set hive.merge.smallfiles.avgsize=16000000#当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
set hive.auto.convert.join=false;
set mapreduce.map.memory.mb=8192;
set mapreduce.reduce.memory.mb=8192;
set mapred.child.java.opts=-Xmx1536m;
set mapreduce.job.reduce.slowstart.completedmaps=0.8;
set mapred.reduce.tasks=20; --此处设置了reducer的个数为20个则会生成20个文件
set hive.exec.parallel=true;