当前位置: 首页 > article >正文

Flink之KeyedState

前面的文章中介绍过Operator State,这里介绍一下Keyed State.
在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Checkpoint.

  • 需求
    将接收到的Socket数据源中的字符串进行拼接
    在命令行开启socket命令:
    nc -lk 8888
    
  • 业务代码
    public class FlinkKeyedState {
      public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          // 设置并行度为1,便于观察
          env.setParallelism(1);
          // 开启Checkpoint, 8秒一个周期并开启一次性语义
          env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);
          // 指定checkpoint持久化路径
          env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
          // 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止
          env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));
          // 获取Socket数据源
          DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);
          // 将数据进行分组,将分组key给一个常量值
          SingleOutputStreamOperator<String> map = socketSource.keyBy(s -> "1")
                  // 使用Keyed State的算子必须实现RichFunction接口,如RichMapFunction,ProcessFunction等
                  .map(new RichMapFunction<String, String>() {
                      ListState<String> listState;
    
                      // open方法可以理解为和Operator State中的initializeState方法一样,需要在这个方法中构造和获取状态存储器
                      @Override
                      public void open(Configuration parameters) throws Exception {
                          // 获取上下文
                          RuntimeContext ctx = getRuntimeContext();
                          // 获取ListState,不同于Operator State的是在这里有更多的选择,如ListState,MapState等
                          listState = ctx.getListState(new ListStateDescriptor<>("demo", String.class));
                      }
    
                      // 在map方法中正常编写业务逻辑
                      @Override
                      public String map(String s) throws Exception {
                          // 模拟Task失败
                          if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {
                              throw new Exception("Task 异常");
                          }
                          // 将数据添加到状态存储器中
                          listState.add(s);
    
                          Iterable<String> strings = listState.get();
                          StringBuilder builder = new StringBuilder();
                          for (String string : strings) {
                              builder.append(string);
                          }
                          return builder.toString();
                      }
                  });
          map.print();
          env.execute("Keyed State");
      }
    }
    
    API的使用大概就这些内容,不过在使用Keyed Sate时首先要对keyBy的特性有所了解,才能得到最终想要的结果数据,如使用keyBy时上下游之间的数据分发模式、所设置的默认并行度上下游算子的并行度是否一致等问题,这些都是需要注意的,然后根据实际业务需求开发对应的逻辑就可以了.

http://www.kler.cn/a/133958.html

相关文章:

  • 【数据分享】1929-2024年全球站点的逐年平均气温数据(Shp\Excel\无需转发)
  • 计算机创造的奇迹——C语言
  • 回归预测 | MATLAB基于TCN-BiGRU时间卷积神经网络结合双向门控循环单元多输入单输出回归预测
  • 调试Hadoop源代码
  • 前沿技术趋势洞察与分析:探寻科技变革的多维密码
  • 1166 Summit (25)
  • R语言——taxize(第二部分)
  • 036、目标检测-锚框
  • 集合的运算
  • uni-app下,页面跳转后wacth持续监听的问题处理
  • LeetCode:689. 三个无重叠子数组的最大和(dp C++)
  • Java基础- StringBuilder StringBuffer
  • Android图片涂鸦,Kotlin(1)
  • k8s_base
  • 物联网AI MicroPython学习之语法 I2C总线
  • 基于SpringBoot+Redis的前后端分离外卖项目-苍穹外卖(五)
  • 随机过程-张灏
  • java springboot 在测试类中声明临时Bean对象
  • 15. Spring源码篇之获取方法参数名
  • Qt按钮大全续集(QCommandLinkButton和QDialogButtonBox )
  • 【小黑送书—第六期】>>AI时代,程序员如何应对挑战——《AI时代系列书籍》
  • Ubuntu 20.04 LTS设置系统虚拟内存大小
  • 4 redis的HyperLogLog入门原理
  • Kubernetes(k8s)进阶
  • 大数据-之LibrA数据库系统告警处理(ALM-12055 证书文件即将过期)
  • 2023年第九届数维杯国际大学生数学建模挑战赛