flink StreamGraph 构造flink任务
文章目录
- 背景
- 主要步骤
- 代码
背景
通常使用flink 提供的高级算子来编写flink 任务,对底层不是很了解,尤其是如何生成作业图的细节
下面通过构造一个有向无环图,来实际看一下
主要步骤
1.增加source
2.增加operator
3. 增加一条边,连接source和operator
4. 增加sink
5. 增加一条边,连接operator和sink
代码
// Step 1: Create basic configurations
Configuration configuration = new Configuration();
ExecutionConfig executionConfig = new ExecutionConfig();
CheckpointConfig checkpointConfig = new CheckpointConfig();
SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
// Step 2: Create a new StreamGraph instance
StreamGraph streamGraph = new StreamGraph(configuration, executionConfig, checkpointConfig, savepointRestoreSettings);
// Step 3: Add a source operator
GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;
DataGeneratorSource<String> source = new DataGeneratorSource<>(generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.STRING);
SourceOperatorFactory<String> sourceOperatorFactory = new SourceOperatorFactory<>(source, WatermarkStrategy.noWatermarks());
streamGraph.addSource(1, "sourceNode", "sourceDescription", sourceOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sourceSlot");
// Step 4: Add a map operator to transform the data
StreamMap<String, String> mapOperator = new StreamMap<>(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value;
}
});
SimpleOperatorFactory<String> mapOperatorFactory = SimpleOperatorFactory.of(mapOperator);
streamGraph.addOperator(2, "mapNode", "mapDescription", mapOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "mapSlot");
// Step 5: Connect source and map operator
streamGraph.addEdge(1, 2, 0);
// Step 6: Add a sink operator to consume the data
StreamMap<String, String> sinkOperator = new StreamMap<>(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
System.out.println(value);
return value;
}
});
SimpleOperatorFactory<String> sinkOperatorFactory = SimpleOperatorFactory.of(sinkOperator);
streamGraph.addSink(3, "sinkNode", "sinkDescription", sinkOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sinkSlot");
// Step 7: Connect map and sink operator
streamGraph.addEdge(2, 3, 0);
streamGraph.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
streamGraph.setMaxParallelism(1,1);
streamGraph.setMaxParallelism(2,1);
streamGraph.setMaxParallelism(3,1);
streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);
// Step 8: Convert StreamGraph to JobGraph
JobGraph jobGraph = streamGraph.getJobGraph();
// Step 9: Set up a MiniCluster for local execution
MiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(10)
.setNumSlotsPerTaskManager(10)
.build();
MiniCluster miniCluster = new MiniCluster(miniClusterConfig);
// Step 10: Start the MiniCluster
miniCluster.start();
// Step 11: Submit the job to the MiniCluster
JobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);
System.out.println("Job completed with result: " + result);
// Step 12: Stop the MiniCluster
miniCluster.close();