当前位置: 首页 > article >正文

Flink学习连载文章9--状态(State)

State

state 可以理解为-- 历史计算结果

有状态计算和无状态计算

  • 无状态计算:
    • 不需要考虑历史数据, 相同的输入,得到相同的输出!
    • 如:map, 将每个单词记为1, 进来一个hello, 得到(hello,1),再进来一个hello,得到的还是(hello,1)
  • 有状态计算:
    • 需要考虑历史数据, 相同的输入,可能会得到不同的输出!
    • 如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)

注意: Flink默认已经支持了无状态和有状态计算!

例如WordCount代码:已经做好了状态维护, 输入hello,输出(hello,1),再输入hello,输出(hello,2)

有状态计算和无状态计算的应用场景

  • 无状态计算:数据转换,过滤等操作直接使用无状态的map/filter即可
  • 有状态计算:需要做聚合/比较的操作得使用有状态的sum/reduce/maxBy/minBy....

以wordcout为例,说明上图的流程

对Managed State继续细分,它又有两种类型:Keyed State和Operator State。

Flink状态 
 - 托管状态
   - KeyedState ( 在keyBy之后可以使用状态 )
      - ValueState  (存储一个值)
      - ListState   (存储多个值)
      - MapState    (存储key-value) 
   - OperatorState ( 没有keyBy的情况下也可以使用 ) [不用]
 - 原生状态 (不用)

Keyed State (键控状态)

Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。

需要注意的是键控状态只能在 KeyedStream 上进行使用,可以通过 stream.keyBy(...) 来得到 KeyedStream 。

Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):

· ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。

· ListState:存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素;并通过 get() 获得整个列表。

· ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。

· AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。

· FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。

· MapState:维护 Map 类型的状态。

代码演示-Managed State-Keyed State

//nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/state/

案例1:

使用KeyedState中的ValueState获取数据中的最大值(获取每个key的最大值)(实际中直接使用maxBy即可)

也就是我们自己使用KeyState中的ValueState来模拟实现maxBy

代码实现:

package com.bigdata.state;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-11-26 15:27:21
 **/
public class _01_KeyedStateDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(
                Tuple2.of("北京", 1L),
                Tuple2.of("上海", 2L),
                Tuple2.of("北京", 6L),
                Tuple2.of("上海", 8L),
                Tuple2.of("北京", 3L),
                Tuple2.of("上海", 4L),
                Tuple2.of("北京", 7L)
        );

        //2. source-加载数据
        tupleDS.keyBy(new KeySelector<Tuple2<String, Long>, String>() {

            @Override
            public String getKey(Tuple2<String, Long> value) throws Exception {
                return value.f0;
            }
        }).map(new RichMapFunction<Tuple2<String, Long>, Tuple2<String,Long>>() {

            // 借助状态这个API实现

            ValueState<Long> maxValueState= null;
            @Override
            public void open(Configuration parameters) throws Exception {
                // 就是对ValueState初始化
                ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<Long>("valueState",Long.class);
                maxValueState = getRuntimeContext().getState(stateDescriptor);
            }



            @Override
            public Tuple2<String, Long> map(Tuple2<String, Long> value) throws Exception {

                Long val = value.f1;
                if(maxValueState.value() == null){
                    maxValueState.update(val);
                }else{
                    if(maxValueState.value() < val){
                        maxValueState.update(val);
                    }
                }
                return Tuple2.of(value.f0,maxValueState.value());
            }
        }).print();
                //.maxBy(1).print();
        //3. transformation-数据处理转换
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

案例2:

如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]


            姓名,温度
            输入                      输出
            张三,37
            张三,38
            张三,39
            张三,35
            张三,40
            张三,41               张三,[39,40,41]
            张三,40               张三,[39,40,41,40]
package com.bigdata.state;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;

import java.util.ArrayList;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-11-26 15:54:07
 **/
public class _02_KeyedStateDemo2 {

    // 如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]
    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);
        //3. transformation-数据处理转换   zs,37
        dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] arr = value.split(",");
                return Tuple2.of(arr[0],Integer.valueOf(arr[1]));
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        }).flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, ArrayList<Integer>>>() {

            ValueState<Integer> valueState = null;
            ListState<Integer> listState = null;

            @Override
            public void open(Configuration parameters) throws Exception {

                ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<Integer>("numState",Integer.class);
                valueState = getRuntimeContext().getState(stateDescriptor);

                ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("listState", Integer.class);
                listState = getRuntimeContext().getListState(listStateDescriptor);

            }

            @Override
            public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, ArrayList<Integer>>> out) throws Exception {


                Integer tiwen = value.f1;
                if(tiwen >= 38){
                    valueState.update(valueState.value()==null?1:(valueState.value()+1));
                    listState.add(tiwen);
                }
                if(valueState.value()!=null && valueState.value() >= 3){
                    ArrayList<Integer> list = new ArrayList<>();
                    Iterable<Integer> iterable = listState.get();
                    for (Integer tiwenwen : iterable) {
                        list.add(tiwenwen);
                    }
                    out.collect(Tuple2.of(value.f0,list));
                }

            }
        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}


http://www.kler.cn/a/420398.html

相关文章:

  • Vue框架开发一个简单的购物车(Vue.js)
  • 【娱乐项目】基于cnchar库与JavaScript的汉字查询工具
  • 【机器学习】CatBoost 模型实践:回归与分类的全流程解析
  • lua download
  • 使用STM32CubeMX配置串口各种功能
  • Unity类银河战士恶魔城学习总结(P156 Audio Settings音频设置)
  • 群控系统服务端开发模式-应用开发-邮箱短信通道功能开发
  • 65页PDF | 企业IT信息化战略规划(限免下载)
  • idea中git的将A分支某次提交记录合并到B分支
  • 大模型缩放法则放缓,OpenAI、谷歌、Anthropic推进AI模型创新 | LeetTalk Daily
  • 大数据新视界 -- 大数据大厂之 Hive 数据压缩算法对比与选择(下)(20 / 30)
  • SpringMVC:入门案例
  • 机器学习任务功略
  • 切换python版本;vscode切换版本与cmd版本不一致
  • SpringBoot 助力下的在线家具商城设计与实现之旅
  • SpringBoot 架构下的在线家具商城:规划与实践之路
  • 【webApp之h5端实战】项目基础结构搭建及欢迎页面的实现
  • 科技为翼 助残向新 高德地图无障碍导航规划突破1.5亿次
  • 数据结构基础之《(9)—归并排序》
  • WPF DataGrid 列隐藏
  • 普通单向有头链表,用于内存资源受限,不带mmu的单片机
  • redis升级
  • 人工智能机器学习算法分类全解析
  • 带Burst AOT Settings移植问题
  • CSS:Web美学的革新之旅
  • 【Java基础入门篇】三、面向对象和JVM底层分析(3)