Flink从ck拉起任务脚本
#!/bin/bash
APP_NAME="orderTest"
CHECKPOINT_BASE_PATH="hdfs:///jobs/flink/checkpoints/aaa-test/"
is_running=$(yarn application -list | grep -w "$APP_NAME" | grep -c "RUNNING")
if [ $is_running -gt 0 ]; then
echo "应用程序 '$APP_NAME' 在运行中,退出脚本"
exit 1
else
echo "应用程序 '$APP_NAME' 不在运行中,准备拉起任务"
fi
get_latest_checkpoint() {
latest_checkpoint=$(hdfs dfs -ls -t -R $CHECKPOINT_BASE_PATH | grep '_metadata' | sort -k6,7r | head -n 1 | awk '{print $8}' )
if [ -z "$latest_checkpoint" ]; then
echo "没有找到适合的ck,退出执行"
exit 1
fi
checkpoint_dir=$(dirname "$latest_checkpoint")
echo $checkpoint_dir
}
LATEST_CHECKPOINT=$(get_latest_checkpoint)
echo " '$APP_NAME' 任务将从 '$LATEST_CHECKPOINT' 启动"
flink run \
-t yarn-per-job \
-d \
-p 1 \
-Dyarn.application.queue=realtime \
-Dyarn.application.name=$APP_NAME\
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=1000mb \
-Dtaskmanager.memory.managed.size=0mb \
-Dtaskmanager.memory.network.max=64mb \
-Dtaskmanager.numberOfTaskSlots=1 \
-c com.test.Main\
-s $LATEST_CHECKPOINT \
./ds-test-1.0.jar
备注:由于Flink checkpoint 个别情况下,不一定能保证落地的checkpoint文件一定有效,所以需要人工介入支持。