当前位置: 首页 > article >正文

flink JobGraph解析

JobGraph组成

JobGraph主要是StreamGraph经过优化后生成的,主要优化的就是对符合条件节点进行chain,这样可以减少数据流动的序列化和传输。

JobGraph主要由三部分组成。

  • JobVertex:图的顶点。输入是一个JobEdge,输出是IntermediateDataSet。它可以对应多个StreamNode,将多个operator合并到一起。
  • IntermediateDataSet:中间结果集。是JobVertex处理后生成的结果集,为了方便下游复用,producer 是 JobVertex ,consumer 是 JobEdge。
  • JobEdge:边。JobGraph的传输管道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex 。

JobVertex

  • operatorIDs:该 job 节点包含的所有 operator ids,以深度优先方式存储 ids
  • results:job 节点计算出的中间结果
  • inputs:输入数据的边列表

IntermediateDataSet

  • producer:生产者,JobVertex
  • consumers:消费边,可以对应多个,但是必须具有相同的分区器和并行性
  • resultType:运行时使用的分区类型
    • BLOCKING 阻塞,批处理模式
    • PIPELINED 管道非阻塞,流处理模式

JobEdge

  • target:edge的输出,JobVertex
  • source:edge的源,IntermediateDataSet
  • distributionPattern:决定了在上游节点(生产者)的子任务和下游节点(消费者)之间的连接模式
    • ALL_TO_ALL:每个生产子任务都连接到消费任务的每个子任务
    • POINTWISE:每个生产子任务都连接到使用任务的一个或多个子任务

JobGraph生成

入口是在StreamingJobGraphGenerator的createJobGraph方法

createJobGraph过程比较多,重点是三步:

  1. 为各个StreamNode生成hash值,这样在故障恢复的时候可以识别
  2. 生成JobVertex
  3. 生成JobEdge、IntermediateDataSet

生成JobVertex(setChaining

从 Source StreamNode 实例开始设置 task chain,它将会递归地创建所有的 JobVertex 实例。

buildChainedInputsAndGetHeadInputs会得到chain的起点集合,然后遍历进行createChain

buildChainedInputsAndGetHeadInputs

private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(
        final Map<Integer, byte[]> hashes, final List<Map<Integer, byte[]>> legacyHashes) {

    // 可以chain的source,单独处理这种节点
    final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();
    // chain的起点(不能chain的souce节点、可以chain的souce节点的下一个节点)
    final Map<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<>();

    // 遍历streamGraph的所有source node
    for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
        final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId);

        if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory
                && sourceNode.getOutEdges().size() == 1) {
            // 要求source node的outEdge只有一个。有多个出边的source不能chain
            // as long as only NAry ops support this chaining, we need to skip the other parts
            final StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);
            final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());
            final ChainingStrategy targetChainingStrategy =
                    target.getOperatorFactory().getChainingStrategy();

            if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES
                    && isChainableInput(sourceOutEdge, streamGraph)) {
                final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));
                final StreamConfig.SourceInputConfig inputConfig =
                        new StreamConfig.SourceInputConfig(sourceOutEdge);
                final StreamConfig operatorConfig = new StreamConfig(new Configuration());
                setOperatorConfig(sourceNodeId, operatorConfig, Collections.emptyMap());
                setOperatorChainedOutputsConfig(operatorConfig, Collections.emptyList());
                // we cache the non-chainable outputs here, and set the non-chained config later
                opNonChainableOutputsCache.put(sourceNodeId, Collections.emptyList());

                // sources的index都是0
                operatorConfig.setChainIndex(0); // sources are always first
                operatorConfig.setOperatorID(opId);
                operatorConfig.setOperatorName(sourceNode.getOperatorName());
                chainedSources.put(
                        sourceNodeId, new ChainedSourceInfo(operatorConfig, inputConfig));

                final SourceOperatorFactory<?> sourceOpFact =
                        (SourceOperatorFactory<?>) sourceNode.getOperatorFactory();
                final OperatorCoordinator.Provider coord =
                        sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);

                // chainEntryPoints中添加(targetNodeId, chainInfo)
                final OperatorChainInfo chainInfo =
                        chainEntryPoints.computeIfAbsent(
                                sourceOutEdge.getTargetId(),
                                (k) ->
                                        new OperatorChainInfo(
                                                sourceOutEdge.getTargetId(),
                                                hashes,
                                                legacyHashes,
                                                chainedSources,
                                                streamGraph));
                chainInfo.addCoordinatorProvider(coord);
                chainInfo.recordChainedNode(sourceNodeId);
                continue;
            }
        }

        chainEntryPoints.put(
                sourceNodeId,
                new OperatorChainInfo(
                        sourceNodeId, hashes, legacyHashes, chainedSources, streamGraph));
    }

    return chainEntryPoints;
}

createChain

在创建chain的过程中,一个chain完成后,在头结点创建一个JobVertex。

private List<StreamEdge> createChain(
        final Integer currentNodeId,
        final int chainIndex,
        final OperatorChainInfo chainInfo,
        final Map<Integer, OperatorChainInfo> chainEntryPoints) {

    Integer startNodeId = chainInfo.getStartNodeId();
    if (!builtVertices.contains(startNodeId)) {
        // transitiveOutEdges 过渡的出边集合,就是两个StreamNode不能再进行chain的那条边,用于生成JobEdge
        List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

        // chainableOutputs 两个StreamNode可以进行chain的出边集合
        // nonChainableOutputs 两个StreamNode不能进行chain的出边
        List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
        List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

        StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);

        for (StreamEdge outEdge : currentNode.getOutEdges()) {
            if (isChainable(outEdge, streamGraph)) {
                chainableOutputs.add(outEdge);
            } else {
                nonChainableOutputs.add(outEdge);
            }
        }

        for (StreamEdge chainable : chainableOutputs) {
            // 如果存在可以chain的边,那么就继续往这条边的target operator进行chain。
            // transitiveOutEdges最终返回给首次调用栈的是不能再继续chain的那条边
            transitiveOutEdges.addAll(
                    createChain(
                            chainable.getTargetId(),
                            chainIndex + 1,
                            chainInfo,
                            chainEntryPoints));
        }

        for (StreamEdge nonChainable : nonChainableOutputs) {
            //如果存在了不可chain的边,说明该边就是StreamNode chain之间的过渡边,添加到transitiveOutEdges中,
            //继续对该边的target StreamNode进行新的createChain操作,意味着一个新的chain
            transitiveOutEdges.add(nonChainable);
            createChain(
                    nonChainable.getTargetId(),
                    1, // operators start at position 1 because 0 is for chained source inputs
                    chainEntryPoints.computeIfAbsent(
                            nonChainable.getTargetId(),
                            (k) -> chainInfo.newChain(nonChainable.getTargetId())),
                    chainEntryPoints);
        }

        chainedNames.put(
                currentNodeId,
                createChainedName(
                        currentNodeId,
                        chainableOutputs,
                        Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
        chainedMinResources.put(
                currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
        chainedPreferredResources.put(
                currentNodeId,
                createChainedPreferredResources(currentNodeId, chainableOutputs));

        // 添加当前的StreamNode到chain中
        OperatorID currentOperatorId =
                chainInfo.addNodeToChain(
                        currentNodeId,
                        streamGraph.getStreamNode(currentNodeId).getOperatorName());

        if (currentNode.getInputFormat() != null) {
            getOrCreateFormatContainer(startNodeId)
                    .addInputFormat(currentOperatorId, currentNode.getInputFormat());
        }

        if (currentNode.getOutputFormat() != null) {
            getOrCreateFormatContainer(startNodeId)
                    .addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
        }

        // chain的头结点创建JobVertex
        StreamConfig config =
                currentNodeId.equals(startNodeId)
                        ? createJobVertex(startNodeId, chainInfo)
                        : new StreamConfig(new Configuration());

        tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs);

        setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());

        setOperatorChainedOutputsConfig(config, chainableOutputs);

        // we cache the non-chainable outputs here, and set the non-chained config later
        // 缓存不能chain的出边集合
        opNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);

        if (currentNodeId.equals(startNodeId)) {
            // 头结点
            chainInfo.setTransitiveOutEdges(transitiveOutEdges);
            chainInfos.put(startNodeId, chainInfo);

            config.setChainStart();
            config.setChainIndex(chainIndex);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

        } else {
            chainedConfigs.computeIfAbsent(
                    startNodeId, k -> new HashMap<Integer, StreamConfig>());

            config.setChainIndex(chainIndex);
            StreamNode node = streamGraph.getStreamNode(currentNodeId);
            config.setOperatorName(node.getOperatorName());
            chainedConfigs.get(startNodeId).put(currentNodeId, config);
        }

        config.setOperatorID(currentOperatorId);

        if (chainableOutputs.isEmpty()) {
            // chain尾节点
            config.setChainEnd();
        }
        return transitiveOutEdges;

    } else {
        return new ArrayList<>();
    }
}

判断是否chainable

  • 公用一个slotGroup
  • 上下游operator可以chain
  • partitioner和exchangeMode可以chain(forward)
  • 并行度一样
  • 允许chain
  • 不能是联合操作

createJobVertex

  1. 创建对应的operator集合
  2. 创建JobVertex(InputOutputFormatVertex是一种特殊的 JobVertex,它用于处理输入输出格式相关的任务,例如读取和写入文件、数据库等)
  3. 添加对应的上游数据集
  4. 缓存JobVertex相关信息

生成JobEdge、IntermediateDataSet(setAllVertexNonChainedOutputsConfigs)

遍历jobVertices,调用connect连接起来。

connect

将两个JobVertex(headVertex、downStreamVertex)连接起来。关键方法是downStreamVertex.connectNewDataSetAsInput

connectNewDataSetAsInput

创建IntermediateDataSet和JobEdge,形成JobVertex->IntermediateDataSet->JobEdge->JobVertex


http://www.kler.cn/a/540281.html

相关文章:

  • 【网络安全】服务器安装Docker及拉取镜像教程
  • 【Javascript Day12】数组操作方法及String规则和方法
  • 继承(python)
  • 502 Bad Gateway 错误详解:从表现推测原因,逐步排查直至解决
  • Django操作指令大集合说明
  • 在请求时打印出实际代理的目标地址
  • leetcode刷题-动态规划04
  • 机器学习:学习记录(二)
  • Kotlin实战经验:将接口回调转换成suspend挂起函数
  • Bigemap Pro如何裁剪矢量数据
  • Ollama系列---【ollama使用gpu运行大模型】
  • 蓝耘智算平台部署deepseek-助力深度学习
  • webpack配置之---output.clean
  • AWS vs Azure vs 阿里云:出海企业全球扩张的技术选型指南(2024深度对比)
  • 如何使用 Redux 中间件?
  • 小白零基础如何搭建CNN
  • c# http
  • 1.1 单元测试核心原则
  • jenkins手动安装插件
  • 深度学习框架PyTorch
  • Python----PyQt开发(PyQt高级:组件大小,界面位置,按钮,文本显示,文本输入,字体大小)
  • Spring Boot + MyBatis Field ‘xxx‘ doesn‘t have a default value 问题排查与解决
  • 鸿蒙NEXT开发-发布三方库
  • CEF132 编译指南 MacOS 篇 - 启程:认识 CEF (一)
  • ubuntu下一键编译
  • qt open3d中添加统计滤波