2. Flink分区策略
一. Flink分区策略概述
Flink任务在执行过程中,一个流(stream)包含一个或多个分区(Stream partition),TaskManager中的一个slot的SubTask就是一个stream partition(流分区)。
Flink分区之间进行数据传递模式有两种。
1. one-to-one模式
- 数据不需要重新分布,上游SubTask生产的数据与下游SubTask收到的数据完全一致,常见的map,filter等算子的SubTask的数据传递都是one-to-one的对应关系,类似于spark中的窄依赖。
2. redistributing模式
- 数据需要经过重新分区,比如keyBy算子的SubTask的数据传递都是Redistributing方式,类似于spark中的宽依赖。
二. Flink内置分区策略
Flink内置的分区策略有如下8种:
- ForwardPartitioner
- RebalancePartitioner
- ShufflePartitioner
- BroadcastPartitioner
- GlobalPartitioner
- KeyGroupStreamPartitioner
- RescalePartitioner
- BinaryHashPartitioner
在上下游的算子没有指定分区策略的情况下,如果上下游的算子并行度一致且满足one-to-one模式,则默认使用ForwardPartitioner,否则使用RebalancePartitioner。在StreamGraph类的源码中可以看到该规则:
private void createActualEdge(
Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
OutputTag outputTag,
StreamExchangeMode exchangeMode,
IntermediateDataSetID intermediateDataSetId) {
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// 如果没有指定分区器,上下游并行度一致的情况下使用ForwardPartitioner,
// 否则使用RebalancePartitioner
if (partitioner == null
&& upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner =
dynamic ? new ForwardForUnspecifiedPartitioner<>() : new ForwardPartitioner<>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
// ...
}
1. ForwardPartitioner
ForwardPartitioner策略将上游同一个分区的元素发送到了下游同一个分区中。
代码示例:
public class PartitionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
// 禁止operator chain 优化
env.disableOperatorChaining();
DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);
source.print();
env.execute("PartitionDemo");
}
}
此时WebUI上算子链的数据流转关系如下:
2. RebalancePartitioner
RebalancePartitioner会先随机选一个下游分区,之后轮询(round-robin)遍历下游所有分区进行数据传输。
代码示例:
public class PartitionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(4);
// 禁止operator chain 优化
env.disableOperatorChaining();
DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);
source.print();
env.execute("PartitionDemo");
}
}
此时WebUI上算子链的数据流转关系如下:
3. ShufflePartitioner
ShufflePartitioner会随机选取下游分区进行数据传输。下游分区由Random生成的随机数决定。
代码示例:
public class PartitionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(4);
// 禁止operator chain 优化
env.disableOperatorChaining();
DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);
source.shuffle().print();
env.execute("PartitionDemo");
}
}
此时WebUI上算子链的数据流转关系如下:
4. BroadcastPartitioner
BroadcastPartitioner会将每条数据发送给下游每一个分区。
代码示例:
public class PartitionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(4);
// 禁止operator chain 优化
env.disableOperatorChaining();
DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);
source.broadcast().print();
env.execute("PartitionDemo");
}
}
运行结果:
# scoket发送hello
[root@hadoop3 ~]# nc -lk 8888
hello
最终输出了4条消息
4> hello
1> hello
2> hello
3> hello
此时WebUI上算子链的数据流转关系如下:
5. GlobalPartitioner
GlobalPartitioner只将消息下发给下游算子的第一个分区。
代码示例
public class PartitionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(4);
// 禁止operator chain 优化
env.disableOperatorChaining();
DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);
source.global().print();
env.execute("PartitionDemo");
}
}
运行结果:
# 向socket发送2条消息
[root@hadoop3 ~]# nc -lk 8888
hello
world
最终两条消息都被发送到了下游1号分区
1> hello
1> world
此时WebUI上算子链的数据流转关系如下:
6. KeyGroupStreamPartitioner
KeyGroupStreamPartitioner将消息发送给key值经过hash计算后的下游分区。
示例代码:
public class PartitionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(4);
// 禁止operator chain 优化
env.disableOperatorChaining();
SingleOutputStreamOperator<Tuple2<String, Integer>> mapped = env.socketTextStream("192.168.47.130", 8888)
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
mapped.keyBy(0).sum(1).print();
env.execute("PartitionDemo");
}
}
此时WebUI上算子链的数据流转关系如下:
7. RescalePartitioner
RescalePartitioner在pointwise模式下会先根据上下游并行度进行匹配,再从匹配后的下游中从0号分区轮询传输数据。
代码示例
public class PartitionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(4);
// 禁止operator chain 优化
env.disableOperatorChaining();
DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);
source.rescale().print();
env.execute("PartitionDemo");
}
}
此时WebUI上算子链的数据流转关系如下:
8. BinaryHashPartitioner
BinaryHashPartitioner是一种针对BinaryRowData的哈希分区器。BinaryRowData是RowData的实现,可以显著减少Java对象的序列化与反序列化。
三. Flink自定义分区策略
如何Flink内置的分区策略不能满足业务需求,可以通过调用DataStream的partitionCustom()方法实现自定义分区策略。
代码实现
public class PartitionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(2);
// 禁止operator chain 优化
env.disableOperatorChaining();
DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);
source.partitionCustom(new MyPartitioner(), new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
}).print();
env.execute("PartitionDemo");
}
}
class MyPartitioner implements Partitioner<String> {
@Override
public int partition(String key, int numPartitions) {
// 偶数分到分区1,奇数分到分区2
return Integer.parseInt(key) % numPartitions;
}
}
运行结果:
# 向socket发送数据
[root@hadoop3 ~]# nc -lk 8888
1
3
5
2
4
6
奇数和偶数输出到不同的分区中
2> 1
2> 3
2> 5
1> 2
1> 4
1> 6