当前位置: 首页 > article >正文

flink StreamGraph 构造flink任务

文章目录

      • 背景
      • 主要步骤
      • 代码

背景

通常使用flink 提供的高级算子来编写flink 任务,对底层不是很了解,尤其是如何生成作业图的细节
下面通过构造一个有向无环图,来实际看一下

主要步骤

1.增加source
2.增加operator
3. 增加一条边,连接source和operator
4. 增加sink
5. 增加一条边,连接operator和sink

代码

 // Step 1: Create basic configurations
        Configuration configuration = new Configuration();
        ExecutionConfig executionConfig = new ExecutionConfig();
        CheckpointConfig checkpointConfig = new CheckpointConfig();
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();

        // Step 2: Create a new StreamGraph instance
        StreamGraph streamGraph = new StreamGraph(configuration, executionConfig, checkpointConfig, savepointRestoreSettings);

        // Step 3: Add a source operator

        GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;
        DataGeneratorSource<String> source = new DataGeneratorSource<>(generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.STRING);
        SourceOperatorFactory<String> sourceOperatorFactory = new SourceOperatorFactory<>(source, WatermarkStrategy.noWatermarks());
        streamGraph.addSource(1, "sourceNode", "sourceDescription", sourceOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sourceSlot");

        // Step 4: Add a map operator to transform the data
        StreamMap<String, String> mapOperator = new StreamMap<>(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        });
        SimpleOperatorFactory<String> mapOperatorFactory = SimpleOperatorFactory.of(mapOperator);
        streamGraph.addOperator(2, "mapNode", "mapDescription", mapOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "mapSlot");

        // Step 5: Connect source and map operator
        streamGraph.addEdge(1, 2, 0);

        // Step 6: Add a sink operator to consume the data
        StreamMap<String, String> sinkOperator = new StreamMap<>(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                System.out.println(value);
                return value;
            }
        });
        SimpleOperatorFactory<String> sinkOperatorFactory = SimpleOperatorFactory.of(sinkOperator);
        streamGraph.addSink(3, "sinkNode", "sinkDescription", sinkOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sinkSlot");

        // Step 7: Connect map and sink operator
        streamGraph.addEdge(2, 3, 0);
        streamGraph.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        streamGraph.setMaxParallelism(1,1);
        streamGraph.setMaxParallelism(2,1);
        streamGraph.setMaxParallelism(3,1);
        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);


        // Step 8: Convert StreamGraph to JobGraph
        JobGraph jobGraph = streamGraph.getJobGraph();


        // Step 9: Set up a MiniCluster for local execution
        MiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder()
                .setNumTaskManagers(10)
                .setNumSlotsPerTaskManager(10)
                .build();
        MiniCluster miniCluster = new MiniCluster(miniClusterConfig);

        // Step 10: Start the MiniCluster
        miniCluster.start();

        // Step 11: Submit the job to the MiniCluster
        JobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);
        System.out.println("Job completed with result: " + result);

        // Step 12: Stop the MiniCluster
        miniCluster.close();

http://www.kler.cn/a/401829.html

相关文章:

  • 第13天:高级主题 - ViewModel 和 LiveData
  • uni-app快速入门(十)--常用内置组件(下)
  • 基于Java Springboot二手书籍交易系统
  • 【蓝桥杯C/C++】深入解析I/O高效性能优化:std::ios::sync_with_stdio(false)
  • RHCE——系统的延迟任务及定时任务
  • HarmonyOs鸿蒙开发实战(16)=>沉浸式效果第一种方案一窗口全屏布局方案
  • Blender vs 3dMax谁才是3D软件的未来?
  • 【Unity踩坑】Unity编辑器占用资源过高
  • SSH公钥有什么用?Windows 11操作系统上如何获取SSH公钥
  • 厦门凯酷全科技有限公司正规吗?
  • 【设计模式】行为型模式(三):责任链模式、状态模式
  • 【Python模拟websocket登陆-拆包封包】
  • 优化装配,提升品质:虚拟装配在汽车制造中的关键作用
  • 悬浮框前端效果查看与造数
  • 硬件工程师之电子元器件—二极管(10)之可变电容和TVS二极管
  • 从0开始学PHP面向对象内容之常用设计模式(建造者,原型)
  • 【PGCCC】PostgreSQL 数据库设计中的文本标识符 | 翻译
  • docker有哪些网络模式
  • 【计算机网络实验】之静态路由配置
  • 前端项目接入单元测试手册
  • 白蚁自动化监测系统的装置和优势
  • 【网络安全】(一) 0成本添加访问级监控
  • 【C/C++】随机数生成的现代化封装
  • 前端注册代码
  • C#获取视频第一帧_腾讯云媒体处理获取视频第一帧
  • C函数从lua中读取数据接口常用接口