hive(hdfs)补数脚本
pb级别迁移通常要持续1个月以上。一般的过程是,全量迁移,追平数据,增量同步,校验,补数。
这里的指定补数脚本:
输入需要补数的表,如Input.txt,如果有分区则加补此分区,没有分区,则重迁移此表。
input.txt
ods_xxx dt=202403
dwd_xxx
dwd_xxxd temp
则补数脚本应该满足,迁移ods_xxx/dt=202403分区的数据,迁移dwd_xxx的数据,迁移dwd_xxxd/temp的数据。
input.txt格式
ods_xxr dt=20250214
ods_xi_202402_monitor dt=20250214
代码
#!/bin/bash
#场景:数据在同一库下,并且hive是内部表(前缀的hdfs地址是相同的)
#1.读取一个文件,获取表名
con_address=hdfs://xx.xx.xx.104:4007
#数组,这个值是存在的 表名/分区名或者文件名
fenqu_allow_list=()
#检测即将迁移的数据量
total_size=0
while IFS=' ' read -r table_name fenqu_name
do
hdfs dfs -count $con_address/apps/hive/warehouse/bigdata.db/$table_name/$fenqu_name
if [ -z "$fenqu_name" ]; then
echo "没有分区,迁移表,检测表大小"
else
echo "有分区或文件,检测分区大小或者文件大小"
fi
#文件大小,字节
fenqu_size=$(hdfs dfs -count $con_address/apps/hive/warehouse/bigdata.db/$table_name/$fenqu_name | awk -F ' ' '{print $3}')
if [[ "$fenqu_size" =~ ^[0-9]+$ ]]; then
#变成GB
fenqu_gb=$(echo "scale=2; $fenqu_size / 1073741824" | bc)
total_size=$(echo "scale=2;$total_size + $fenqu_gb"| bc)
fenqu_allow_list+=("$table_name/$fenqu_name")
else
echo "无值$table_name/$fenqu_name" >> fail.txt
fi
done < "$1"
echo "此批迁移的总大小是$total_size GB"
read -p "是否继续? (y/n): " user_input
# 判断用户输入
if [[ "$user_input" == "y" || "$user_input" == "Y" ]]; then
echo "即将执行脚本"
elif [[ "$user_input" == "n" || "$user_input" == "N" ]]; then
echo "退出脚本..."
exit 0
else
echo "无效输入,退出脚本..."
exit 1
fi
declare -A repair_map
#迁移指定分区或者文件
for element in "${fenqu_allow_list[@]}"; do
table_name=$(echo "$element" | awk -F'/' '{print $1}')
echo "表名是$table_name"
#删除对应的内容
echo "执行命令 hdfs dfs -rm -r $con_address/apps/hive/warehouse/bigdata.db/$element"
hdfs dfs -rm -r $con_address/apps/hive/warehouse/bigdata.db/$element
#迁移
source_path=hdfs://xx.xx.xx.7:8020/apps/hive/warehouse/bigdata.db/$element
target_path=hdfs://xx.xx.xx.104:4007/apps/hive/warehouse/bigdata.db/$element
echo "执行命令 hadoop distcp -skipcrccheck -i -strategy dynamic -bandwidth 30 -m 20 $source_path $target_path"
hadoop distcp -skipcrccheck -i -strategy dynamic -bandwidth 30 -m 20 $source_path $target_path
#table_name去重
repair_map["$table_name"]=1
echo "-----------------------"
done
for key in "${!repair_map[@]}"; do
table_name=$key
#修复元数据
BEELINE_CMD="beeline -u 'jdbc:hive2://xx.xx.xx.104:7001/cfc;principal=hadoop/xx.xx.xx.104@TBDS-09T7KXLE'"
# 执行MSCK REPAIR TABLE命令
echo "Repairing partitions for table: $table_name"
$BEELINE_CMD -e "MSCK REPAIR TABLE $table_name;"
if [ $? -eq 0 ]; then
echo "Successfully repaired partitions for table: $table_name"
else
echo "Failed to repair partitions for table: $table_name"
fi
done