当前位置: 首页 > article >正文

大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink 并行度
  • Flink 并行度详解
  • Flink 并行度 案例
    在这里插入图片描述

状态类型

Flink根据是否需要保存中间结果,把计算分为有状态计算和无状态计算。

  • 有状态计算:依赖之前或之后的事件
  • 无状态计算:独立

根据数据结构不同,Flink定义了多种State,应用于不同的场景。

  • ValueState:即类型为T的单值状态,这个状态与对应的Key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过 value() 方法获取状态值
  • ListState:即Key上的状态值为一个列表,可以通过add方法往列表中附加值,也可以通过get()方法返回一个Iterable来遍历状态值
  • ReducingState:这种状态通过用户传入的ReduceFunction,每次调用add方法添加值的时候,会调用ReduceFunction,最后合并到一个单一的状态值。
  • FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态会在未来的Flink版本当中删除)
  • MapState:即状态值为一个Map,用户通过put和putAll方法添加元素

State按照是否有Key划分为:

  • KeyedState
  • OperatorState

案例1 利用State求平均值

实现思路

  • 读数据源
  • 将数据源根据Key分组
  • 按照Key分组策略,对流式数据调用状态化处理:实例化出一个状态实例,随着流式数据的到来更新状态,最后输出结果

编写代码

package icu.wzk;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class FlinkStateTest01 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Tuple2<Long, Long>> data = env
                .fromElements(
                        Tuple2.of(1L, 3L),
                        Tuple2.of(1L, 5L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(1L, 4L),
                        Tuple2.of(1L, 2L)
                );
        KeyedStream<Tuple2<Long, Long>, Long> keyed = data
                .keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {
                    @Override
                    public Long getKey(Tuple2<Long, Long> value) throws Exception {
                        return value.f0;
                    }
                });
        SingleOutputStreamOperator<Tuple2<Long, Long>> flatMapped = keyed
                .flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                    private transient ValueState<Tuple2<Long, Long>> sum;

                    @Override
                    public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
                        Tuple2<Long, Long> currentSum = sum.value();
                        if (currentSum == null) {
                            currentSum = Tuple2.of(0L, 0L);
                        }
                        // 更新
                        currentSum.f0 += 1L;
                        currentSum.f1 += value.f1;
                        System.out.println("currentValue: " + currentSum);
                        // 更新状态值
                        sum.update(currentSum);
                        // 如果 count >= 5 清空状态值 重新计算
                        if (currentSum.f0 >= 5) {
                            out.collect(new Tuple2<>(value.f0, currentSum.f1 / currentSum.f0));
                            sum.clear();
                        }
                    }

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
                                "average",
                                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
                        );
                        sum = getRuntimeContext().getState(descriptor);
                    }
                });
        flatMapped.print();
        env.execute("Flink State Test");
    }
}

运行结果

在这里插入图片描述

执行分析

在这里插入图片描述
在这里插入图片描述

Keyed State

表示和Key相关的一种State, 只能用于KeyedStream类型数据集对应的Function和Operator之上,KeyedState是OperatorState的特例,区别在于KeyedState事先按照Key对数据集进行了区分,每个KeyState仅对应一个Operator和Key的组合。

KeyedState可以通过KeyGroups进行管理,主要用于当算子并行度发生变化时,自动重新分布KeyedState数据。在系统运行过程中,一个Keyed算子实例可能运行一个或者多个KeyGroups的Keys。

Operator State

与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的Key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State 支持算子实例并行度发生变化时自动重新分配状态数据。

同时在Flink中KeyedState和OperatorState均具有两种形式,其中一种为托管状态(Managed State)形式,由FlinkRuntime中控制和管理状态数据,并将状态数据转换为内存HashTables或RocksDB的对象存储,然后将这些状态数据通过内部的接口持久话到CheckPoints中,任务异常时可以通过这些状态数据恢复任务。另外一种是原生状态(Row State)形式,由算子自己管理数据结构,当触发CheckPoint中,当从CheckPoint恢复任务时,算子自己再返序列化出状态的数据结构。

DataStreamAPI支持使用ManagedState和RawState两种状态形式,在Flink中推荐用户使用ManagedState管理状态数据,主要原因是ManagedState能够更好地支持状态数据的重平衡以及更加完善的内存管理。

状态描述

在这里插入图片描述

State既然是暴露给用户的,那么就需要有一些属性需要指定:

  • State名称
  • Value Serializer
  • State Type Info

在对应的StateBackend中,会去调用对应的create方法获取到stateDescriptor中的值。
Flink通过StateDescriptor来定义一个状态,这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息,与上面的状态对应,从StateDescriptor派生ValueStateDescriptor、ListStateDescriptor等等

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptot)

http://www.kler.cn/a/294768.html

相关文章:

  • go reflect 反射
  • [项目代码] YOLOv5 铁路工人安全帽安全背心识别 [目标检测]
  • 基于BILSTM及其他RNN序列模型的人名分类器
  • 大语言模型在序列推荐中的应用
  • 如何在 Ubuntu 16.04 上设置 NFS 挂载
  • 「QT」几何数据类 之 QLine 整型直线类
  • 网页开发 HTML
  • [代码已更新]2024数学建模国赛高教社杯C题:农作物的种植策略 思路代码文章助攻手把手保姆级
  • uniapp网站和微信小程序 添加 百度统计
  • 一起学习LeetCode热题100道(71/100)
  • 大数据时代的技术hive:hive的数据类型和数据模型
  • How can I provide a RGBA png file to OpenAI PHP library
  • 前缀和 — 利用前缀信息解决子数组问题
  • 【Azure Redis】Redis-CLI连接Redis 6380端口始终遇见 I/O Error
  • 实践reflex:项目架构解析
  • 去中心化网络:Web3如何颠覆传统互联网
  • 标准IO与系统IO
  • Java架构师未来篇大模型
  • 新加坡服务器:亚洲地区的优选之选
  • 【软件工程】软件开发模型
  • k8s中的层级结构,及节点组件的作用
  • Termius for Mac/Win:高效、安全的跨平台多协议远程管理软件
  • 黑马点评2——商户查询缓存(P37店铺类型查询业务添加缓存练习题答案)redis缓存、更新、穿透、雪崩、击穿、工具封装
  • 2-85 基于matlab的FrFT下时变幅度LFM信号参数估计
  • ROADM(可重构光分插复用器)-介绍
  • LTE PSS主同步信号搜索 MATLAB实现