当前位置: 首页 > article >正文

2. Flink分区策略

一. Flink分区策略概述

Flink任务在执行过程中,一个流(stream)包含一个或多个分区(Stream partition),TaskManager中的一个slot的SubTask就是一个stream partition(流分区)。

Flink分区之间进行数据传递模式有两种。
1. one-to-one模式

  • 数据不需要重新分布,上游SubTask生产的数据与下游SubTask收到的数据完全一致,常见的map,filter等算子的SubTask的数据传递都是one-to-one的对应关系,类似于spark中的窄依赖。

2. redistributing模式

  • 数据需要经过重新分区,比如keyBy算子的SubTask的数据传递都是Redistributing方式,类似于spark中的宽依赖。
二. Flink内置分区策略

Flink内置的分区策略有如下8种:

  • ForwardPartitioner
  • RebalancePartitioner
  • ShufflePartitioner
  • BroadcastPartitioner
  • GlobalPartitioner
  • KeyGroupStreamPartitioner
  • RescalePartitioner
  • BinaryHashPartitioner

在上下游的算子没有指定分区策略的情况下,如果上下游的算子并行度一致且满足one-to-one模式,则默认使用ForwardPartitioner,否则使用RebalancePartitioner。在StreamGraph类的源码中可以看到该规则:

private void createActualEdge(
            Integer upStreamVertexID,
            Integer downStreamVertexID,
            int typeNumber,
            StreamPartitioner<?> partitioner,
            OutputTag outputTag,
            StreamExchangeMode exchangeMode,
            IntermediateDataSetID intermediateDataSetId) {
        StreamNode upstreamNode = getStreamNode(upStreamVertexID);
        StreamNode downstreamNode = getStreamNode(downStreamVertexID);
        
        // 如果没有指定分区器,上下游并行度一致的情况下使用ForwardPartitioner, 
        // 否则使用RebalancePartitioner
        if (partitioner == null
                && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
            partitioner =
                    dynamic ? new ForwardForUnspecifiedPartitioner<>() : new ForwardPartitioner<>();
        } else if (partitioner == null) {
            partitioner = new RebalancePartitioner<Object>();
        }
 // ...
}

1. ForwardPartitioner
ForwardPartitioner策略将上游同一个分区的元素发送到了下游同一个分区中。
在这里插入图片描述

代码示例:

public class PartitionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        // 禁止operator chain 优化
        env.disableOperatorChaining();
        DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);
        source.print();
        env.execute("PartitionDemo");
    }
}

此时WebUI上算子链的数据流转关系如下:
在这里插入图片描述

2. RebalancePartitioner

在这里插入图片描述
RebalancePartitioner会先随机选一个下游分区,之后轮询(round-robin)遍历下游所有分区进行数据传输。

代码示例:

public class PartitionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(4);
        // 禁止operator chain 优化
        env.disableOperatorChaining();
        DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);
        source.print();
        
        env.execute("PartitionDemo");
    }
}

此时WebUI上算子链的数据流转关系如下:
在这里插入图片描述

3. ShufflePartitioner

在这里插入图片描述

ShufflePartitioner会随机选取下游分区进行数据传输。下游分区由Random生成的随机数决定。

代码示例:

public class PartitionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(4);
        // 禁止operator chain 优化
        env.disableOperatorChaining();
        DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);

        source.shuffle().print();

        env.execute("PartitionDemo");
    }
}

此时WebUI上算子链的数据流转关系如下:
在这里插入图片描述

4. BroadcastPartitioner

在这里插入图片描述

BroadcastPartitioner会将每条数据发送给下游每一个分区。

代码示例:

public class PartitionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(4);
        // 禁止operator chain 优化
        env.disableOperatorChaining();
        DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);

        source.broadcast().print();
        env.execute("PartitionDemo");
    }
}

运行结果:

# scoket发送hello
[root@hadoop3 ~]# nc -lk 8888
hello

最终输出了4条消息

4> hello
1> hello
2> hello
3> hello

此时WebUI上算子链的数据流转关系如下:

在这里插入图片描述

5. GlobalPartitioner
在这里插入图片描述
GlobalPartitioner只将消息下发给下游算子的第一个分区。

代码示例

public class PartitionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(4);
        // 禁止operator chain 优化
        env.disableOperatorChaining();
        DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);
        
        source.global().print();
        env.execute("PartitionDemo");
    }
}

运行结果:

# 向socket发送2条消息
[root@hadoop3 ~]# nc -lk 8888
hello
world

最终两条消息都被发送到了下游1号分区

1> hello
1> world

此时WebUI上算子链的数据流转关系如下:
在这里插入图片描述

6. KeyGroupStreamPartitioner
在这里插入图片描述
KeyGroupStreamPartitioner将消息发送给key值经过hash计算后的下游分区。

示例代码:

public class PartitionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(4);
        // 禁止operator chain 优化
        env.disableOperatorChaining();
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapped = env.socketTextStream("192.168.47.130", 8888)
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        return Tuple2.of(value, 1);
                    }
                });

        mapped.keyBy(0).sum(1).print();

        env.execute("PartitionDemo");
    }
}

此时WebUI上算子链的数据流转关系如下:
在这里插入图片描述

7. RescalePartitioner
在这里插入图片描述
RescalePartitioner在pointwise模式下会先根据上下游并行度进行匹配,再从匹配后的下游中从0号分区轮询传输数据。

代码示例

public class PartitionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(4);
        // 禁止operator chain 优化
        env.disableOperatorChaining();
        DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);

        source.rescale().print();
        env.execute("PartitionDemo");
    }
}

此时WebUI上算子链的数据流转关系如下:
在这里插入图片描述

8. BinaryHashPartitioner
BinaryHashPartitioner是一种针对BinaryRowData的哈希分区器。BinaryRowData是RowData的实现,可以显著减少Java对象的序列化与反序列化。

三. Flink自定义分区策略

如何Flink内置的分区策略不能满足业务需求,可以通过调用DataStream的partitionCustom()方法实现自定义分区策略。

代码实现

public class PartitionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(2);
        // 禁止operator chain 优化
        env.disableOperatorChaining();
        DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);

        source.partitionCustom(new MyPartitioner(), new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        }).print();

        env.execute("PartitionDemo");
    }
}



class MyPartitioner implements Partitioner<String> {

    @Override
    public int partition(String key, int numPartitions) {
    	// 偶数分到分区1,奇数分到分区2
        return Integer.parseInt(key) % numPartitions;
    }
}

运行结果:

# 向socket发送数据
[root@hadoop3 ~]# nc -lk 8888
1
3
5
2
4
6

奇数和偶数输出到不同的分区中

2> 1
2> 3
2> 5
1> 2
1> 4
1> 6

http://www.kler.cn/a/516853.html

相关文章:

  • 使用 Babylon.js 开发时如何通过 CSS 实现 UI 自适应
  • 云原生时代,如何构建高效分布式监控系统
  • 大华相机DH-IPC-HFW3237M支持的ONVIF协议
  • 把网站程序数据上传到服务器的方法和注意事项
  • BW复制ERP数据源跑程序激活后才可见
  • centos9编译安装opensips 二【进阶篇-定制目录+模块】推荐
  • vue3的组件v-model(defineModel()宏)
  • 第十五届蓝桥杯大赛软件赛省赛C/C++ 大学 B 组
  • 深度学习|表示学习|卷积神经网络|通道 channel 是什么?|05
  • 怎样使用树莓派自己搭建一套ADS-B信号接收系统
  • 栈和队列刷题篇
  • 新能源汽车充电桩选型以及安装应用
  • 2025.1.20——四、[强网杯 2019]Upload1 文件上传|反序列化
  • STM32——KEY按键
  • ETLCloud在iPaas中的是关键角色?
  • 若依 v-hasPermi 自定义指令失效场景
  • Java核心技术解析:泛型与类型安全全面指南
  • android wifi AsyncChannel(WifiManager和WifiP2pManager)
  • 【CS61A 2024秋】Python入门课,全过程记录P3(Week5 Sequences开始,更新于2025/1/23)
  • 韩国机场WebGIS可视化集合Google遥感影像分析
  • Java EE 进阶:Spring MVC(1)
  • HarmonyOS快速入门
  • 【YOLOv10改进[Backbone]】使用LSKNet替换Backbone | 用于遥感目标检测的大型选择性核网络
  • 在centos上编译安装opensips【初级-默认安装】
  • Nginx 性能优化技巧与实践(一)
  • PLC通信