当前位置: 首页 > article >正文

Filnk并行度和算子链

1. 并行度(Parallelism)

1.1 什么是并行度?

  • 基本概念:
    在 Flink 中,每个操作(算子)都可以被拆分成多个“子任务”(subtasks),这些子任务可以同时在不同的线程、机器或容器上运行。这种同时运行的方式就称为 并行处理
  • 类比:
    想象你有一个大任务需要做,比如在一大堆苹果中挑出红色的苹果。你可以自己一个人做,也可以叫上好几个人一起帮忙。如果你叫了 3 个人,那么总共就有 4 个人在同时挑苹果,这里就相当于并行度为 4。
  • 在 Flink 中:
    如果一个算子的并行度设置为 2,那么这个算子就会被分成 2 个子任务同时运行,从而提高处理速度。

1.2 如何设置并行度?

有几种方法可以设置并行度:

  • 代码中设置:
    对于某个具体算子:

    stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
    

    这表示这个 map 操作会被分成 2 个并行子任务。

  • 全局设置:
    在创建执行环境时:

    env.setParallelism(2);
    

    这样,默认情况下,所有算子都会使用 2 个并行任务。

  • 提交任务时设置:
    在使用命令提交作业时加上参数 -p,例如:

    bin/flink run -p 2 -c com.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
    
  • 配置文件设置:
    flink-conf.yaml 中:

    parallelism.default: 2
    

    如果代码和提交参数中都没有设置并行度,则采用配置文件中的默认值。

2. 算子链(Operator Chain)

2.1 什么是算子链?

  • 基本概念:
    在数据处理过程中,数据需要经过多个算子(例如 mapfilter 等)处理。如果这些算子之间的处理关系是一对一的(即一个算子处理完的数据直接传给下一个算子,而不需要重新分区或打乱顺序),它们可以被“链”在一起执行,这样就称为 算子链
  • 类比:
    想象你在做一道菜,需要经过洗菜、切菜、炒菜三个步骤。如果你能把这三个步骤连在一起连续进行,就不需要每一步都停下来,这样能提高做菜的效率。在 Flink 中,把多个一对一的操作合并到同一个线程中运行,就能减少线程切换和数据在不同线程间传递的开销,从而提高效率。

示例说明

假设我们有一个简单的 Flink 流处理任务,它从 Socket 读取数据,然后依次执行以下操作:

  1. Map:将每个字符串转换成一个二元组(单词, 1)。
  2. Filter:过滤出以字母 “A” 开头的单词。
  3. KeyBySum:按单词分组并求和。

在默认情况下,Flink 会对 MapFilter 这两个一对一的算子进行链式合并,也就是说,它们会被打包到同一个任务(Task)中,由同一个线程执行。这可以减少线程切换和数据在不同任务之间传递的开销,从而提升性能。

代码示例

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class OperatorChainExample {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从 Socket 读取数据(例如:localhost 9999 端口)
        DataStream<String> text = env.socketTextStream("localhost", 9999);

        // 执行 Map、Filter、KeyBy 和 Sum 操作
        DataStream<Tuple2<String, Integer>> result = text
            // Map 算子:将输入字符串转换成 (单词, 1)
            .map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    return Tuple2.of(value, 1);
                }
            })
            // Filter 算子:只保留以 "A" 开头的单词
            .filter(new FilterFunction<Tuple2<String, Integer>>() {
                @Override
                public boolean filter(Tuple2<String, Integer> value) throws Exception {
                    return value.f0.startsWith("A");
                }
            })
            // keyBy 和 sum 通常会引入重分区(shuffle),这里单独作为一个阶段
            .keyBy(value -> value.f0)
            .sum(1);

        // 打印结果
        result.print();

        // 执行任务
        env.execute("Operator Chain Example");
    }
}

具体说明

  1. Map 与 Filter 的链合

    • 在上述代码中,map()filter() 都是简单的、一对一的算子,它们的输入与输出之间没有重分区或者数据打乱。
    • Flink 会自动将这两个算子合并到同一个任务中运行,这就是 算子链。合并后,数据从 map() 输出后会直接传递给 filter(),而不需要经过额外的数据传输步骤。
  2. KeyBy 与 Sum 的阶段

    • 当你调用 keyBy() 时,会引入 重分区,因为需要根据 key 把数据重新分组。重分区通常会单独成为一个阶段,因此 keyBy() 之后的 sum() 可能会运行在不同的任务中,而不与前面的算子链合并。
  3. 手动调整算子链

    • 如果你希望 map()filter() 单独运行(例如为了调试或隔离资源),可以使用 disableChaining()

      .map(new MapFunction<String, Tuple2<String, Integer>>() { ... })
      .disableChaining() // 禁用后,该算子就不会与后续算子链合并
      
    • 或者使用 startNewChain() 来从当前算子开始新的链:

      .map(new MapFunction<String, Tuple2<String, Integer>>() { ... })
      .startNewChain() // 从此算子开始,新链不与前面的链合并
      
  • 算子链:通过将多个一对一算子合并到同一个任务中运行,可以减少线程切换和数据传输的开销,从而提高处理效率。
  • 自动与手动控制:Flink 默认会自动合并满足条件的算子链,但你可以通过 disableChaining()startNewChain() 手动控制合并情况。

2.2 什么时候使用算子链?

  • 默认情况:
    Flink 会自动对满足条件的一对一算子进行链式合并,从而优化执行效率。

  • 手动控制:
    如果你希望某个算子单独运行,不参与合并,可以使用:

    .map(word -> Tuple2.of(word, 1L)).disableChaining();
    

    如果你希望从某个算子开始新链,也可以使用:

    .map(word -> Tuple2.of(word, 1L)).startNewChain();
    

小结

  • 并行度 让我们可以同时处理多个数据片段,就像叫多个人同时帮忙完成一项任务一样,可以大大提高处理速度。
  • 算子链 则是优化处理过程的技术,把多个简单操作合并在一起执行,减少中间传递和线程切换的开销,从而提高整体效率。

通过这两个机制,Flink 能够高效地处理大规模数据流,并在集群环境下发挥出更强的并行处理能力。


http://www.kler.cn/a/599565.html

相关文章:

  • Python前缀和(例题:异或和,求和)
  • Java中static final才是修饰常量的,单独的final并不能修饰常量这样理解对吗?
  • Codeforces Round 1012 (Div. 2)
  • JSONP 漏洞
  • Thinkphp(TP)框架漏洞攻略
  • HarmonyOS NEXT (六):系统安全架构
  • 内核中的互斥量
  • 《Python实战进阶》第31集:特征工程:特征选择与降维技术
  • 【蓝桥杯】每日练习 Day10
  • 平芯微PW2606过压保护芯片应用电路
  • 第二阶段面试题
  • 安宝特分享 | AR眼镜技术解析:B端与C端应用场景与设计差异
  • MediaPipe软件包如何构建和安装
  • Simula语言的安全开发
  • 嵌入式八股文学习笔记——C++学习笔记面向对象相关
  • 质检LIMS系统在临床试验机构的实践 临床试验的LIMS应用突破
  • Java实习生面试题(2025.3.23 be)
  • 安宝特分享|AR智能装备赋能企业效率跃升
  • redis7.4.2单机配置
  • CentOS 7 更换 yum 源(阿里云)+ 扩展 epel 源