当前位置: 首页 > article >正文

Flink之OperatorState

在Flink中状态主要分为三种:

  • Operator State(算子状态)
  • Keyed State(键控状态)
  • Broadcast State(广播状态)

这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解

  • 数据源
    这里选用Socket作为Source输入,便于测试
    ➜  ~ nc -lk 8888
    a
    b
    c
    k
    k
    k
    
  • 状态算子代码
    /**
    * @Description TODO 自定义状态MapFunc
    **/
    // 状态算子必须要实现对应的算子接口和CheckpointFunction接口
    class StateMapFunc implements MapFunction<String, String>, CheckpointedFunction{
      private ListState<String> strListState;
    
      /**
       * @Param o
       * @return String
       * @Description TODO map方法的正常处理逻辑
      **/
      @Override
      public String map(String s) throws Exception {
          // 模拟Task失败
          if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {
              throw new Exception("Task 异常");
          }
          // 将数据添加到状态存储器中
          strListState.add(s);
    
          Iterable<String> strings = strListState.get();
          StringBuilder builder = new StringBuilder();
          for (String string : strings) {
              builder.append(string);
          }
          return builder.toString();
      }
    
      /**
       * @Param functionSnapshotContext
       * @return void
       * @Description TODO 系统对状态数据做快照(持久化)会调用此方法, 用户使用此方法在持久化前对状态数据可以做一些操控
      **/
      @Override
      public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
          System.out.println("快照生成, checkpointId: " + functionSnapshotContext.getCheckpointId());
      }
    
      /**
       * @Param functionInitializationContext
       * @return void
       * @Description TODO 算子任务在启动前会调用此方法,未用户状态数据进行初始化
      **/
      @Override
      public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
          // 获取算子状态存储器
          OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();
    
          /**
           * ListStateDescriptor状态描述
           * 参数1:一个自定义名称
           * 参数2:存储的数据类型
          **/
          ListStateDescriptor<String> stateDescriptor = new ListStateDescriptor<>("demo", String.class);
          /**
           * 算子状态存储器, 只提供ListSate的形式(和Java中的List不是一回事)来存储状态数据
           * getListSate方法,会在Task失败后,task自动重启时,会帮助用户加载最近一次的快照数据,如果是job重启则不会加载
          **/
          strListState = operatorStateStore.getListState(stateDescriptor);
      }
    }
    
    要注意代码注释中的内容,getListState只作用于Task的自动重启,如果是整个Job重启时不生效的,如果是想Job重启后从重启前的State获取数据需要在Job提交时就指定checkpoint镜像文件.
  • 业务代码
      public class FlinkOperatorState {
      public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          // 设置并行度1
          env.setParallelism(1);
          // 开启Checkpoint, 8秒一个周期并开启一次性语义
          env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);
          // 指定checkpoint持久化路径
          env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
          // 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止
          env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));
          // 获取Socket数据源
          DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);
          // 将自定义的StateOperator传入
          SingleOutputStreamOperator<String> map = socketSource.map(new StateMapFunc());
          // 打印结果
          map.print();
          env.execute("Operator State");
      }
    }
    

具体的代码模板和API的介绍大概就这些内容,具体实践要根据业务逻辑而定.


http://www.kler.cn/news/135705.html

相关文章:

  • Android MQTT开发之 Hivemq MQTT Client
  • 全志R128内存泄漏调试案例
  • 鸿蒙4.0开发笔记之DevEco Studio之配置代码片段快速生成(三)
  • 【Python 千题 —— 基础篇】输出可以被5整除的数
  • 嵌入式QTGit面试题
  • 计算机毕业设计选题推荐-高校后勤报修微信小程序/安卓APP-项目实战
  • 可逆矩阵的性质
  • 获取阿里云Docker镜像加速器
  • Arduino驱动DS18B20数字温度传感器(温湿度传感器)
  • OpenCV快速入门:直方图、掩膜、模板匹配和霍夫检测
  • 第四篇 《随机点名答题系统》——基础设置详解(类抽奖系统、在线答题系统、线上答题系统、在线点名系统、线上点名系统、在线考试系统、线上考试系统)
  • AtCoder Beginner Contest 329 题解A~F
  • 【数据机构】最小生成树(prim算法)
  • Harmony Ble 蓝牙App (一)扫描
  • .babyk勒索病毒解析:恶意更新如何威胁您的数据安全
  • SpringCloud相关
  • Mac安装win程序另一个方案
  • TCP传输的三次握手、四次挥手策略是什么
  • 【苏州元德维康生物医药-注册】
  • 2.3IP详解及配置
  • 给大伙讲个笑话:阿里云服务器开了安全组防火墙还是无法访问到服务
  • java 位运算 表示状态小记
  • HDD与QLC SSD深度对比:功耗与存储密度的终极较量
  • C#中ManualResetEvent的Reset,Set,WaitOne
  • 手把手从零开始训练YOLOv8改进项目(官方ultralytics版本)教程
  • uniapp如何使用api相关提示框
  • Springboot框架中使用 Redis + Lua 脚本进行限流功能
  • Flutter最新稳定版3.16 新特性介绍
  • 达尔优EK87键盘说明书
  • sapjco3.dll has version “721.619“, but required is at least version “721.913“