当前位置: 首页 > article >正文

27、使用StreamPark管理Flink作业中,关于对Flink客户端的conf进行占位符替换的修改

上一个文档说了,用的flink on k8s,镜像也是用的官方开源的基础镜像(flink:1.15.0)。
之前提到过<flink on k8s的作业日志收集>中,我会将作业日志按照作业名称,ip等信息记录在对应的obs对象存储路径下。
还有就是每个任务的 cp和sp都是按照作业名称来配置路径的,如果在代码中配置的话,增加代码行数,而且也不规范,还是得在streampark和提交客户端上做文章。
但是在用flink客户端提交作业到k8s上时,flink-conf都是从客户端的conf目录里传上去的成一个 configMap,作业是去读取自己对应的configmap,同一个客户端,那么所有任务的configmap都是一样的(包括log的),如何让各自任务获取到各自的作业名称呢?
**解决办法:**环境变量+占位符。而且flink的官方镜像也是这么做的,我们看看flink基础镜像的启动脚本:

CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
....
prepare_configuration() {
    set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
    set_config_option blob.server.port 6124
    set_config_option query.server.port 6125

    if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
        set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
    fi

    if [ -n "${FLINK_PROPERTIES}" ]; then
        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
    fi
    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}


可以看到启动脚本里会使用envsubst将环境注入到flink-conf.yaml生产一个flink-conf.yaml.tmp的临时文件,然后将临时文件替代flink-conf.yaml。一切看似美好,但是用起来可不太行:
在这里插入图片描述
可以看到,这个启动脚本就这行报错,不能替换文件,搞了两天没解决~~😅
另辟蹊径:阅读streampark对应提交flink作业的源码,发现这段代码
在这里插入图片描述
可以看到streampark是把客户端的flink-conf读出来了的,既然是读出来然后提交作业的,那就好办了,在代码里进行占位符的注入。这里主要是把作业名称给它替换了😅,这也是临时的一个办法,后面看启动脚本环境变量注入失败的问题的时候发现,确实是一个普遍,有大佬用其它办法解决的,社区也在后面的版本中修复了,后面再说吧😅

private[this] def placeHolderCheck(
    flinkConfig: Configuration,
    submitRequest: SubmitRequest): Unit = {
    val keys: util.Set[String] = flinkConfig.keySet()
    // TODO 后续有需要再加map内容
    val map = Map(
        "FLINK_JOB_NAME" -> submitRequest.appName,
        "CLUSTER_ID" -> submitRequest.clusterId
    )
    keys.foreach(
        key => {
        val value = flinkConfig.getString(key, null)
        if (value != null) {
            val replaceValue = Try(replacePlaceholders(value, map)) match {
                case Success(s) => s
                case Failure(e) => {
                logger.error("对Flink-conf进行占位符替换时报错")
                e.printStackTrace()
                value
            }
        }
        flinkConfig.setString(key, replaceValue)
    }
    })
}

private[this] def replacePlaceholders(input: String, values: Map[String, String]): String = {
    // 正则表达式,只匹配 ${} 格式的占位符
    val placeholderPattern = """\$\{([^\}]+)\}""".r
    // 替换所有符合 ${} 的占位符
    placeholderPattern.replaceAllIn(
        input,
        m => {
        // 获取 ${} 中的键
        val key: String = m.group(1)
        val v: String = m.matched
        // 从 map 中获取对应的值,如果不存在则保留原样
        values.getOrElse(key, "\\$\\{" + key + "\\}")
    }
    )
}

http://www.kler.cn/a/466506.html

相关文章:

  • 2024年河北省职业院校技能大赛云计算应用赛项赛题第4套(容器云)
  • 安卓漏洞学习(十七):维京海盗-Strandhogg漏洞
  • 概述(讲讲python基本语法和第三方库)
  • Spring boot接入xxl-job
  • Mysql SQL 超实用的7个日期算术运算实例(10k)
  • 我们公司只有3个人,一个前端,一个后端
  • 将数组转换为laravel中的对象
  • ts中 type 和 interface 定义类型有什么区别?
  • 【Unity】【图形渲染】Unity Shader基础操作3:内置文件与包含文件的使用
  • 实时数仓与离线数仓的全面对比
  • 上升沿下降沿递增
  • 高等数学学习笔记 ☞ 函数的极限
  • 微信小程序滑动解锁、滑动验证
  • git 退出编辑模式
  • AI对嵌入式开发行业的影响
  • 家政上门小程序如何创建?家政服务怎么能少了小程序帮手
  • Unity 对Sprite或者UI使用模板测试扣洞
  • 安装并配置Ubuntu22.04桌面
  • 【Python系列】处理空请求体Body
  • 中间件自动化测试框架cmdlinker
  • SQL 中复杂 CASE WHEN 嵌套逻辑优化
  • ros2 笔记-1.1 体验C++编译
  • Deepseek v3 的笔记
  • 如何使用OpenCV进行抓图-多线程
  • 基于AI边缘计算盒子的智慧零售场景智能监控解决方案
  • (NIPS-2023)ProlificDreamer:通过变分分数蒸馏实现高保真、多样化的文本到 3D 生成