27、使用StreamPark管理Flink作业中,关于对Flink客户端的conf进行占位符替换的修改
上一个文档说了,用的flink on k8s,镜像也是用的官方开源的基础镜像(flink:1.15.0)。
之前提到过<flink on k8s的作业日志收集>中,我会将作业日志按照作业名称,ip等信息记录在对应的obs对象存储路径下。
还有就是每个任务的 cp和sp都是按照作业名称来配置路径的,如果在代码中配置的话,增加代码行数,而且也不规范,还是得在streampark和提交客户端上做文章。
但是在用flink客户端提交作业到k8s上时,flink-conf都是从客户端的conf目录里传上去的成一个 configMap,作业是去读取自己对应的configmap,同一个客户端,那么所有任务的configmap都是一样的(包括log的),如何让各自任务获取到各自的作业名称呢?
**解决办法:**环境变量+占位符。而且flink的官方镜像也是这么做的,我们看看flink基础镜像的启动脚本:
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
....
prepare_configuration() {
set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
set_config_option blob.server.port 6124
set_config_option query.server.port 6125
if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
fi
if [ -n "${FLINK_PROPERTIES}" ]; then
echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
fi
envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}
可以看到启动脚本里会使用envsubst将环境注入到flink-conf.yaml生产一个flink-conf.yaml.tmp的临时文件,然后将临时文件替代flink-conf.yaml。一切看似美好,但是用起来可不太行:
可以看到,这个启动脚本就这行报错,不能替换文件,搞了两天没解决~~😅
另辟蹊径:阅读streampark对应提交flink作业的源码,发现这段代码
可以看到streampark是把客户端的flink-conf读出来了的,既然是读出来然后提交作业的,那就好办了,在代码里进行占位符的注入。这里主要是把作业名称给它替换了😅,这也是临时的一个办法,后面看启动脚本环境变量注入失败的问题的时候发现,确实是一个普遍,有大佬用其它办法解决的,社区也在后面的版本中修复了,后面再说吧😅
private[this] def placeHolderCheck(
flinkConfig: Configuration,
submitRequest: SubmitRequest): Unit = {
val keys: util.Set[String] = flinkConfig.keySet()
// TODO 后续有需要再加map内容
val map = Map(
"FLINK_JOB_NAME" -> submitRequest.appName,
"CLUSTER_ID" -> submitRequest.clusterId
)
keys.foreach(
key => {
val value = flinkConfig.getString(key, null)
if (value != null) {
val replaceValue = Try(replacePlaceholders(value, map)) match {
case Success(s) => s
case Failure(e) => {
logger.error("对Flink-conf进行占位符替换时报错")
e.printStackTrace()
value
}
}
flinkConfig.setString(key, replaceValue)
}
})
}
private[this] def replacePlaceholders(input: String, values: Map[String, String]): String = {
// 正则表达式,只匹配 ${} 格式的占位符
val placeholderPattern = """\$\{([^\}]+)\}""".r
// 替换所有符合 ${} 的占位符
placeholderPattern.replaceAllIn(
input,
m => {
// 获取 ${} 中的键
val key: String = m.group(1)
val v: String = m.matched
// 从 map 中获取对应的值,如果不存在则保留原样
values.getOrElse(key, "\\$\\{" + key + "\\}")
}
)
}