当前位置: 首页 > article >正文

flink学习(8)——窗口函数

增量聚合函数

——指窗口每进入一条数据就计算一次

例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27

 reduce

aggregate(aggregateFunction)

package com.bigdata.day04;

public class _04_agg函数 {
    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L),
            Tuple3.of("class1", "王五", 60L),
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L),
            Tuple3.of("class2", "小八", 50L),
    };

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2. source-加载数据
        DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);
        // 此时我要获取每个班级的平均成绩
        // 输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)
        // IN——Tuple3<String, String, Long>
        // ACC——Tuple3<String, Integer,Long> 第一个是班级(key)第二个是数量,第三个是总的成绩
        // OUT —— Tuple2<String,Double> 第一个是班级 第二个是平均成绩
        dataStreamSource.countWindowAll(3).aggregate(new AggregateFunction<Tuple3<String, String, Long>, Tuple3<String, Integer,Long>, Tuple2<String,Double>>() {
        
            // 初始化一个 累加器
            @Override
            public Tuple3<String, Integer, Long> createAccumulator() {
                return Tuple3.of(null,0,0L);
            }


            // 累加器和输入的值进行累加
            // Tuple3<String, String, Long> value 第一个是传入的值
            // Tuple3<String, Integer, Long> accumulator 第二个是累加器的值
            @Override
            public Tuple3<String, Integer, Long> add(Tuple3<String, String, Long> value, Tuple3<String, Integer, Long> accumulator) {

                return Tuple3.of(value.f0,accumulator.f1+1,accumulator.f2+value.f2);
            }

            // 获取结果——在不同节点的结果进行汇总后实现
            @Override
            public Tuple2<String, Double> getResult(Tuple3<String, Integer, Long> accumulator) {

                return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);
            }


            // 由于flink是分布式,所以在别的节点也会进行累加 ,该方法是不同节点的结果进行汇总
            // 即累加器之间的累加
            @Override
            public Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> a, Tuple3<String, Integer, Long> b) {
                return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);
            }
        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

sum()

min()

max()

 全量聚合函数

指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)

全量聚合函数比较简单,但是会将所有的数据存放在内存中,因此会占用大量的内存空间

apply 

package com.bigdata.day04;

public class _05_app函数 {
    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L),
            Tuple3.of("class1", "王五", 60L),
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L),
            Tuple3.of("class2", "小八", 50L),
    };

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);

        //2. source-加载数据
        dataStreamSource.countWindowAll(3).apply(new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>() {

            @Override
            public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) throws Exception {
                Long sum = 0L;
                int length = 0;
                String key = null;
                for (Tuple3<String, String, Long> value : values) {
                    sum += value.f2;
                    length++;
                    key = value.f0;

                }
                out.collect(Tuple2.of(key,(double) sum/length));
            }
        }).print();
      
        env.execute();
    }
}

// 总结

// 接口
new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>()
GlobalWindow 窗口对象  Tuple3<String,String,Long> 传入的值  Tuple2<String,Double> 结果

// 重写的方法
public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) 

Iterable<Tuple3<String, String, Long>> values 传入值的迭代器 进行遍历
Collector<Tuple2<String,Double>> out 收集器 调用collect方法收集即可
window 窗口对象

//使用窗口对象我们可以拿到窗口的起始时间
 long start = window.getStart();
 long end = window.getEnd();

process

使用方式一:在connect合流之后对两个类型不同的流进行处理

使用方式二:在分流的时候使用,可以通过context.output方法对每个数据添加一个标签

 使用方式一
new CoProcessFunction<Long, String, String>()  
// 第一个泛型是第一个流的类型 第二个泛型是第二个流的类型  第三个泛型是合并后流的类型

 @Override
        public void processElement1(Long l, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {
            
            // Long 是数据类型 结果使用collector中的collect 收集
            collector.collect(String.valueOf(l));
        }

@Override
        public void processElement2(String s, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {
        
            //  String 是数据类型 结果使用collector中的collect 收集
            collector.collect(s);
        }
使用方式二
此时使用的是context中的context.output(odd, element); 方法
odd 是标签
element 是元素

OutputTag<Long> odd = new OutputTag<>("奇数",TypeInformation.of(Long.class));
OutputTag<Long> even = new OutputTag<>("偶数", TypeInformation.of(Long.class));


http://www.kler.cn/a/411523.html

相关文章:

  • mvn-mac操作小记
  • 路由器中继与桥接
  • 利用 OSHI获取机器的硬件信息
  • 软件团队的共担责任
  • 现代密码学
  • Ansible--自动化运维工具
  • Diffusion异常检测相关论文及代码整理
  • MySQL底层概述—3.InnoDB线程模型
  • godot游戏引擎_瓦片集和瓦片地图介绍
  • Python开源项目月排行 2024年10月
  • thinkphp日志记录到文件
  • 【大数据学习 | Spark-Core】RDD的五大特性(包含宽窄依赖)
  • Oracle RAC 环境下数据文件误建在本地目录的处理过程
  • TCP/IP网络编程-C++(上)
  • C++设计模式之组合模式在解决层次性问题中的好处
  • 基于Kubernetes编排部署EFK日志收集系统
  • 项目二:m序列的设计与实现(FPGA)
  • C++ 中的多继承
  • 【044】基于51单片机数码管时钟【Proteus仿真+Keil程序+报告+原理图】
  • PyQt5控件QWebEngineView(WebView)
  • yolov11剪枝
  • Hive-定时清理无用的临时表
  • Ajax局部刷新,异步请求
  • Java Map
  • 使用ElementUI中的el-table制作可编辑的表格
  • 做好技术文档的几大要素(按过往经验整理)