离线任务的稳定性
数据同步底层脚本
日志追踪,关键字提取,任务失败重启策略
Mysql_to_hive.sh
#!/bin/bash
echo "mysql host is" $1
echo "mysql db is" $2
echo "mysql table is" $3
echo "mysql username is" $4
echo "mysql passwd is" $5
echo "hive db is" $6
echo "hive table prefix is" $7
echo "format is" $8
echo "create_time is" $9
echo "update_time is" ${10}
echo "query_begin_date is" ${11}
echo "query_end_date is" ${12}
echo "hive_tables is" ${13}
echo "condition is" ${14}
echo "dropCols is" ${15}
host=$1
db=$2
table=$3
username=$4
passwd=$5
hive_db=$6
hive_table_prefix=$7
format=$8
create_time=$9
update_time=${10}
dt=${11}
dt1=${12}
hive_tables=${13}
condition=${14}
dropCols=${15}
s=0
limit_cnts=10
f(){
s=$(($s+1))
echo "函数f被调用次数:$s"
if [ $s -gt $limit_cnts ]
then
echo "the cycle times is gt $limit_cnts,exit"
exit 1
fi
query_begin_date=${1}
query_end_date=${2}
WORK_DIR=$(cd "$(dirname "$0")";pwd)
echo "WORK_DIR:"${WORK_DIR}
LOG_PATH="${WORK_DIR}/log/${hive_db}/${query_end_date}"
echo "LOG_PATH:${LOG_PATH}"
mkdir -p ${LOG_PATH}
FILE_LOG="${LOG_PATH}/${table}_to_hive.log"
echo "FILE_LOG:${FILE_LOG}"
/opt/cloudera/parcels/SPARK2/bin/spark2-submit \
--class com.mingzhi.common.universal.common_mysql_to_hive \
--master yarn \
--deploy-mode client \
--driver-memory 2g \
--num-executors 3 \
--executor-memory 3G \
--executor-cores 3 \
--conf spark.default.parallelism=200 \
--conf spark.port.maxRetries=1000 \
--conf spark.rpc.numRetries=1000000 \
--conf spark.sql.shuffle.partitions=10 \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.rpc.askTimeout=3600 \
--conf spark.rpc.lookupTimeout=3600 \
--conf spark.network.timeout=3600 \
--conf spark.rpc.io.connectionTimeout=3600 \
/mnt/db_file/jars/common-1.0-SNAPSHOT-jar-with-dependencies.jar \
"$host" "$db" "$table" "$username" "$passwd" "$hive_db" "$hive_table_prefix" "$format" "$create_time" "$update_time" "$query_begin_date" "$query_end_date" "$hive_tables" "$condition" "$dropCols" 2>&1 | tee ${FILE_LOG}
while read -r line
do
#echo "$line"
error1="SparkContext has been shutdown"
error2="Failed to send RPC"
error3="java.nio.channels.ClosedChannelException"
error4="Marking as slave lost"
error5="org.apache.spark.SparkException"
error6="Exception in thread"
error7="SparkContext was shut down"
error8="org.apache.spark.sql.AnalysisException"
error9="java.util.concurrent.RejectedExecutionException"
if [[ ${line} == *${error9}* || ${line} == *${error1}* || ${line} == *${error2}* || ${line} == *${error3}* || ${line} == *${error4}* || ${line} == *${error5}* || ${line} == *${error6}* || ${line} == *${error7}* || ${line} == *${error8}* ]]
then
echo "SPARK SQL EXECUTION FAILED ......AND JOB FAILED DATE IS 【${query_begin_date},${query_end_date}】......"
#exit 1
sleep 1m
f "$query_begin_date" "$query_end_date"
fi
done < ${FILE_LOG}
}
interval_day=5
start_date=${dt}
end_date=${dt1}
while [[ $start_date < $end_date || $start_date = $end_date ]]
do
query_begin_date=`date -d "$start_date" "+%Y-%m-%d"`
query_end_date=`date -d "+${interval_day} day ${start_date}" +%Y-%m-%d`
start_date=`date -d "+${interval_day} day +1 day ${start_date}" +%Y-%m-%d`
if [[ $query_end_date > $end_date ]]
then
query_end_date=$end_date
fi
echo "【本次任务开始时间:$query_begin_date,本次任务结束时间:$query_end_date,下一次任务的开始时间:$start_date】"
#开始执行spark任务
sleep 1s
f "$query_begin_date" "$query_end_date"
done
Hive_to_mysql.sh
#!/bin/bash
mysql_host=$1
from_db=$2
from_tables=$3
to_db=$4
to_tables=$5
username=$6
passwd=$7
dt=$8
dt1=$9
savemode=${10}
dropCols=${11}
echo "mysql host is $1"
echo "from_db is ${2}"
echo "from tables is $3"
echo "to_db is $4"
echo "to_tables is $5"
echo "username is $6"
echo "passwd is $7"
echo "dt is $8"
echo "dt1 is $9"
echo "savemode is ${10}"
echo "dropCols is ${11}"
if [ $8 ];
then
dt=$8
else
dt=`date -d "-1 day" +%F`
fi
if [ $9 ];
then
dt1=$9
else
dt1=`date -d "-1 day" +%F`
fi
if [ ${10} ];
then
savemode=${10}
else
savemode='OverWriteByDt'
fi
echo '==============================================================================='
echo "final dt is $dt"
echo "final dt1 is $dt1"
echo "final savemode is $savemode"
echo "final dropCols is $dropCols"
#/mnt/db_file/jars/common-1.0-SNAPSHOT-jar-with-dependencies_202201.jar "${mysql_host}" "${from_db}" "${from_tables}" "${to_db}" "${to_tables}" "$username" "${passwd}" "${dt}" "${dt1}" "${savemode}"
s=0
limit_cnts=10
f(){
s=$(($s+1))
echo "函数f被调用次数:$s"
if [ $s -gt $limit_cnts ]
then
echo "the cycle times is gt $limit_cnts,exit"
exit 1
fi
query_begin_date=${1}
query_end_date=${2}
WORK_DIR=$(cd "$(dirname "$0")";pwd)
echo "WORK_DIR:"${WORK_DIR}
LOG_PATH="${WORK_DIR}/log/${from_db}/${query_end_date}"
echo "LOG_PATH:${LOG_PATH}"
mkdir -p ${LOG_PATH}
FILE_LOG="${LOG_PATH}/${from_tables}_to_mysql.log"
echo "FILE_LOG:${FILE_LOG}"
/opt/cloudera/parcels/SPARK2/bin/spark2-submit \
--class com.mingzhi.common.universal.hive_to_mysql \
--master yarn \
--deploy-mode client \
--executor-memory 2G \
--num-executors 2 \
--executor-cores 4 \
--conf spark.dynamicAllocation.maxExecutors=3 \
--conf spark.driver.cores=4 \
--conf spark.driver.memory=2g \
/mnt/db_file/jars/common-1.0-SNAPSHOT-jar-with-dependencies.jar \
"${mysql_host}" "${from_db}" "${from_tables}" "${to_db}" "${to_tables}" "$username" "${passwd}" "${query_begin_date}" "${query_end_date}" "${savemode}" "${dropCols}" 2>&1 | tee ${FILE_LOG}
while read -r line
do
#echo "$line"
error1="SparkContext has been shutdown"
error2="Failed to send RPC"
error3="java.nio.channels.ClosedChannelException"
error4="Deadlock found"
error5="org.apache.spark.SparkException"
error6="Exception in thread"
error7="SparkContext was shut down"
error8="org.apache.spark.sql.AnalysisException"
if [[ ${line} == *${error1}* || ${line} == *${error2}* || ${line} == *${error3}* || ${line} == *${error4}* || ${line} == *${error5}* || ${line} == *${error6}* || ${line} == *${error7}* || ${line} == *${error8}* ]]
then
echo "SPARK SQL EXECUTION FAILED ......AND JOB FAILED DATE IS 【${query_begin_date},${query_end_date}】......"
#exit 1
sleep 1m
f "$query_begin_date" "$query_end_date"
fi
done < ${FILE_LOG}
}
interval_day=4
start_date=${dt}
end_date=${dt1}
if [[ $start_date == "9999"* ]]
then
interval_day=0
fi
while [[ $start_date < $end_date || $start_date = $end_date ]]
do
query_begin_date=`date -d "$start_date" "+%Y-%m-%d"`
query_end_date=`date -d "+${interval_day} day ${start_date}" +%Y-%m-%d`
start_date=`date -d "+${interval_day} day +1 day ${start_date}" +%Y-%m-%d`
if [[ $query_end_date > $end_date ]]
then
query_end_date=$end_date
fi
echo "【本次任务开始时间:$query_begin_date,本次任务结束时间:$query_end_date,下一次任务的开始时间:$start_date】"
#开始执行spark任务
if [[ $query_begin_date == "10000"* ]]
then
exit
Fi
s=0
f "$query_begin_date" "$query_end_date"
done
Es_to_hive.sh
[root@mz-hadoop-01 import]# cat /root/bin/es_to_hive.sh
#!/bin/bash
echo "es_host is" $1
echo "es_indexes is" $2
echo "hive_db is" $3
echo "hive_tables is" $4
echo "create_time is" $5
echo "update_time is" $6
echo "dt is" $7
echo "dt1 is" $8
echo "format is $9"
echo partitions is ${10}
es_host=$1
es_indexes=$2
hive_db=$3
hive_tables=$4
create_time=$5
update_time=$6
dt=$7
dt1=$8
format=$9
partitions=${10}
cnts=0
cnts_limit=5
memory=1
memory_limit=4
f(){
cnts=$(($cnts+1))
memory=$(($memory+1))
if [ $memory -gt $memory_limit ]
then
memory=$memory_limit
fi
echo "函数f被调用次数cnts:$cnts and memory is $memory G"
if [ $cnts -gt $cnts_limit ]
then
echo "the cycle times is gt $cnts_limit,exit"
exit 1
fi
query_begin_date=$1
query_end_date=$2
WORK_DIR=$(cd "$(dirname "$0")";pwd)
echo "WORK_DIR:"${WORK_DIR}
LOG_PATH="${WORK_DIR}/log/${hive_db}/${query_end_date}"
echo "LOG_PATH:${LOG_PATH}"
mkdir -p ${LOG_PATH}
FILE_LOG="${LOG_PATH}/${hive_tables}_to_hive.log"
echo "FILE_LOG:${FILE_LOG}"
/opt/cloudera/parcels/SPARK2/bin/spark2-submit \
--class com.mingzhi.common.universal.common_es_to_hive \
--master yarn \
--deploy-mode client \
--num-executors 2 \
--executor-memory ${memory}G \
--executor-cores 2 \
--conf spark.default.parallelism=200 \
--conf spark.port.maxRetries=300 \
/mnt/db_file/jars/common-1.0-SNAPSHOT-jar-with-dependencies.jar \
"$es_host" "$es_indexes" "$hive_db" "$hive_tables" "$create_time" "$update_time" "$query_begin_date" "$query_end_date" "$format" "$partitions" 2>&1 | tee ${FILE_LOG}
while read -r line
do
#echo "$line"
error1="SparkContext has been shutdown"
error2="Failed to send RPC"
error3="java.nio.channels.ClosedChannelException"
error4="Marking as slave lost"
error5="org.apache.spark.SparkException"
error6="Exception in thread"
error7="SparkContext was shut down"
error8="org.apache.spark.sql.AnalysisException"
error9="java.util.concurrent.RejectedExecutionException"
error0="java.io.IOException"
if [[ ${line} == *${error9}* || ${line} == *${error1}* || ${line} == *${error2}* || ${line} == *${error3}* || ${line} == *${error4}* || ${line} == *${error5}* || ${line} == *${error6}* || ${line} == *${error7}* || ${line} == *${error8}* || ${line} == *${error0}* ]]
then
echo "SPARK SQL EXECUTION FAILED ......AND JOB FAILED DATE IS 【${query_begin_date},${query_end_date}】......"
#exit 1
sleep 5s
f "$query_begin_date" "$query_end_date"
fi
done < ${FILE_LOG}
}
interval_day=0
start_date=${dt}
end_date=${dt1}
while [[ $start_date < $end_date || $start_date = $end_date ]]
do
query_begin_date=`date -d "$start_date" "+%Y-%m-%d"`
query_end_date=`date -d "+${interval_day} day ${start_date}" +%Y-%m-%d`
start_date=`date -d "+${interval_day} day +1 day ${start_date}" +%Y-%m-%d`
if [[ $query_end_date > $end_date ]]
then
query_end_date=$end_date
fi
echo "【本次任务开始时间:$query_begin_date,本次任务结束时间:$query_end_date,下一次任务的开始时间:$start_date】"
#开始执行spark任务
cnts=0
f "${query_begin_date}" "${query_end_date}"
done
数据同步业务脚本
Import
#!/bin/bash
source /root/bin/common_config/db_config.properties
if [ $1 ];
then
dt=$1
else
dt=`date -d "-1 day" +%F`
fi
if [ $2 ];
then
dt1=$2
else
dt1=`date -d "-1 day" +%F`
fi
/root/bin/mysql_to_hive.sh "$wfs_host" $wfs_db tbwork_order "$wfs_user" "$wfs_pwd" paascloud '' '' 'create_time' 'update_time' $dt $dt1
[root@mz-hadoop-01 import]# cat wfs_order_list_index.sh
#!/bin/bash
source /root/bin/common_config/es_config.properties
if [ $1 ];
then
dt=$1
else
dt=`date -d "-1 day" +%F`
fi
if [ $2 ];
then
dt1=$2
else
dt1=`date -d "-1 day" +%F`
fi
year=${dt:0:4}
cur_day=`date -d "-1 day" +%F`
cur_year=${cur_day:0:4}
echo "year is ${year} and cur_year is ${cur_year}"
#if [ ${year} == '2023' ];
if [ ${year} == ${cur_year} ];
then
port=9200
else
port=9500
fi
echo "port is ${port}"
/root/bin/es_to_hive.sh "$wfs_es_host:${port}" wfs_order_list_index_${year} paascloud wfs_order_list_index "orderCreateTime" "orderUpdateTime" $dt $dt1 'parquet' 1
Export
有更新的导出
[root@mz-hadoop-01 tcm]# cat /mnt/db_file/tcm/hive_to_olap_4_tcm_parse.sh
#!/bin/bash
source /root/bin/common_config/db_config.properties
hive_table=$1
target_table=$2
if [ $3 ];
then
dt=$3
else
dt=`date -d "-1 day" +%F`
fi
if [ $4 ];
then
dt1=$4
else
dt1=`date -d "-1 day" +%F`
fi
echo "起始日期为$dt"
echo "结束日期为$dt1"
f(){
do_date=$1
echo "===函数执行日期为 $do_date==="
/root/bin/hive_to_mysql.sh "$olap_host" tcm "$hive_table" "$olap_db" "$target_table" "$olap_user" "$olap_pwd" $1 $1
}
if [[ $dt == $dt1 ]]
then
echo "dt = dt1......"
for i in `/mnt/db_file/tcm/get_changed_dt.sh $dt`
do
echo "同步变化的日期======================>$i"
f $i
done
else
echo "batch process..."
start_day=$dt
end_day=$dt1
dt=$start_day
while [[ $dt < `date -d "+1 day $end_day" +%Y-%m-%d` ]]
do
echo "批处理===>"$dt
f $dt
dt=`date -d "+1 day $dt" +%Y-%m-%d`
done
fi
无更新的导出
直接导出
[root@mz-hadoop-01 export]# cat wfs_ads_order_material_stats_to_olap.sh
source /root/bin/common_config/db_config.properties
echo "olap_host is :${olap_host}"
if [ $1 ];
then
dt=$1
else
dt=`date -d "-1 day" +%F`
fi
echo "dt:$dt"
if [ $2 ];
then
dt1=$2
else
dt1=`date -d "-1 day" +%F`
fi
/root/bin/hive_to_mysql.sh "${olap_host}" paascloud wfs_ads_order_material_stats mz_olap wfs_ads_order_material_stats $olap_user $olap_pwd ${dt} ${dt1}
spark通用计算任务
#!/bin/bash
if [ $1 ];
then
className=$1
else
echo "need className"
exit
fi
if [ $2 ];
then
jarPath=$2
else
echo "need jarPath"
exit
fi
if [ $3 ];
then
dt=$3
else
dt=`date -d "-1 day" +%F`
fi
if [ $4 ];
then
dt1=$4
else
dt1=`date -d "-1 day" +%F`
fi
echo "起始日期:$dt,结束日期:$dt1"
f(){
query_begin_date=${1}
query_end_date=${2}
WORK_DIR=$(cd "$(dirname "$0")";pwd)
echo "WORK_DIR:"${WORK_DIR}
LOG_PATH="${WORK_DIR}/log/${query_end_date}"
echo "LOG_PATH:${LOG_PATH}"
mkdir -p ${LOG_PATH}
FILE_LOG="${LOG_PATH}/$className.log"
echo "FILE_LOG:${FILE_LOG}"
/opt/cloudera/parcels/SPARK2/bin/spark2-submit \
--class $className \
--master yarn \
--deploy-mode client \
--driver-memory 4g \
--num-executors 2 \
--executor-memory 4G \
--executor-cores 4 \
--conf spark.driver.cores=5 \
--conf spark.port.maxRetries=1000 \
--conf spark.rpc.numRetries=1000000 \
--conf spark.sql.shuffle.partitions=3 \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.rpc.askTimeout=3600 \
--conf spark.rpc.lookupTimeout=3600 \
--conf spark.network.timeout=3600 \
--conf spark.rpc.io.connectionTimeout=3600 \
--conf spark.default.parallelism=50 \
$jarPath \
$query_begin_date $query_begin_date 2>&1 | tee ${FILE_LOG}
while read -r line
do
#echo "$line"
error1="SparkContext has been shutdown"
error2="Failed to send RPC"
error3="java.nio.channels.ClosedChannelException"
error4="Marking as slave lost"
error5="org.apache.spark.SparkException"
error6="Exception in thread"
error7="SparkContext was shut down"
error8="org.apache.spark.sql.AnalysisException"
if [[ ${line} == *${error1}* || ${line} == *${error2}* || ${line} == *${error3}* || ${line} == *${error4}* || ${line} == *${error5}* || ${line} == *${error6}* || ${line} == *${error7}* || ${line} == *${error8}* ]]
then
echo "SPARK SQL EXECUTION FAILED ......AND JOB FAILED DATE IS 【${query_begin_date},${query_end_date}】......"
#exit 1
sleep 10s
f "$query_begin_date" "$query_end_date"
fi
done < ${FILE_LOG}
}
interval_day=0
start_date=${dt}
end_date=${dt1}
while [[ $start_date < $end_date || $start_date = $end_date ]]
do
query_begin_date=`date -d "$start_date" "+%Y-%m-%d"`
query_end_date=`date -d "+${interval_day} day ${start_date}" +%Y-%m-%d`
start_date=`date -d "+${interval_day} day +1 day ${start_date}" +%Y-%m-%d`
if [[ $query_end_date > $end_date ]]
then
query_end_date=$end_date
fi
echo "【本次任务开始时间:$query_begin_date,本次任务结束时间:$query_end_date,下一次任务的开始时间:$start_date】"
#开始执行spark任务
f "$query_begin_date" "$query_end_date"
done
Spark业务计算任务
spark_job_4_wfs.sh
#!/bin/bash
if [ $1 ];
then
className=$1
else
echo "need className"
exit
fi
if [ $2 ];
then
dt=$2
else
dt=`date -d "-1 day" +%F`
fi
if [ $3 ];
then
dt1=$3
else
dt1=`date -d "-1 day" +%F`
fi
sh /root/bin/spark_job.sh $className /mnt/db_file/wfs/jar/wfs-1.0-SNAPSHOT-jar-with-dependencies.jar $dt $dt1
dwd_order_info_abi.sh
[root@mz-hadoop-01 dwd]# cat dwd_order_info_abi.sh
#!/bin/bash
if [ $1 ];
then
dt=$1
else
dt=`date -d "-1 day" +%F`
fi
if [ $2 ];
then
dt1=$2
else
dt1=`date -d "-1 day" +%F`
fi
echo "起始日期:$dt,结束日期:$dt1"
/mnt/db_file/wfs/spark_job_4_wfs.sh com.mingzhi.wfs.dwd.dwd_order_info_abi $dt $dt1
Hive计算任务
#!/bin/bash
db=paascloud
hive=/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/hive
if [ $1 ];
then
dt=$1
else
dt=`date -d "-1 day" +%F`
fi
if [ $2 ];
then
dt1=$2
else
dt1=`date -d "-1 day" +%F`
fi
echo "起始日期:$dt,结束日期:$dt1"
f(){
do_date=$1
echo "===函数日期为 $do_date==="
sql="
use $db;
set hive.exec.dynamic.partition.mode=nonstrict;
add jar /mnt/db_file/wfs/jar/udf-1.4.3-SNAPSHOT-jar-with-dependencies.jar;
create temporary function str_distinct as 'com.mingzhi.StringDistinct';
insert overwrite table ads_order_overall_cube partition(dt)
select
corcode_f3,max(sort_f3),
corcode_f2,max(sort_f2),
corcode_f1,max(sort_f1),
orderlargertype,
--ordersecondtype,
--orderthirdlytype,
orderSource,
orderStatus,
count(1) as cnts,
str_distinct(concat_ws(',',collect_set(deal_user_ids))) as all_persons,
if(str_distinct(concat_ws(',',collect_set(deal_user_ids)))='' ,0, size(split(str_distinct(concat_ws(',',collect_set(deal_user_ids))),','))) as all_person_cnts,
--regexp_replace(regexp_replace(regexp_replace(lpad(bin(cast(grouping_id() as bigint)),5,'0'),"0","x"),"1","0"),"x","1") as dim
reverse(lpad(bin(cast(GROUPING__ID as bigint)),6,'0')) as dim
,'$do_date'
from
dwd_order_info_abi where dt='$do_date'
group by
corcode_f3,
corcode_f2,
corcode_f1,
orderlargertype,
--ordersecondtype,
--orderthirdlytype,
orderSource,
orderStatus
grouping sets(corcode_f2,corcode_f1,(corcode_f2,orderlargertype),(corcode_f2,orderlargertype,orderSource,orderStatus),(corcode_f1,orderlargertype),(corcode_f1,orderlargertype,orderSource,orderStatus) )
;
"
# 获取当前目录
WORK_DIR=$(cd "$(dirname "$0")";pwd)
LOG_PATH="$WORK_DIR/log/$do_date"
mkdir -p $LOG_PATH
FILE_NAME="ads_order_overall_cube"
#*****************************************************************************
$hive -e "$sql" 2>&1 | tee $LOG_PATH/${FILE_NAME}.log
while read -r line
do
echo "$line"
error="FAILED"
if [[ $line == *$error* ]]
then
echo "HIVE JOB EXECUTION FAILED AND DATE IS 【${do_date}】......"
#exit 1
f ${do_date}
fi
done < ${LOG_PATH}/${FILE_NAME}.log
}
start_day=$dt
end_day=$dt1
dt=$start_day
while [[ $dt < `date -d "+1 day $end_day" +%Y-%m-%d` ]]
do
#for i in `/mnt/db_file/wfs/get_changed_dt.sh $dt`
for i in `cat /mnt/db_file/wfs/log/$dt/get_changed_dt_result`
do
echo ===================执行变化的日期:$i ===========================
f $i
done
dt=`date -d "+1 day $dt" +%Y-%m-%d`
done