当前位置: 首页 > article >正文

Flink (Windows Function 窗口函数)

1、窗口函数的分类

窗口函数,即数据划分窗口后可以调用的处理函数。

l 全量函数:窗口先缓存所有元素,等到触发条件后对窗口内的全量元素执行计算。

l 增量函数:窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据。

2、增量聚合函数

指窗口每进入一条数据就计算一次

 

实现方法(常见的增量聚合函数如下):
reduce(reduceFunction)
aggregate(aggregateFunction)
sum()
min()
max()

reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个 <T>
maxBy、minBy、sum这3个底层都是由reduce实现的
aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有<T, ACC, R>

 

AggregateFunction 【了解】

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)

输入类型是输入流中的元素类型,AggregateFunction有一个add方法可以将一个输入元素添加到一个累加器中。该接口还具有创建初始累加器(createAccumulator方法)、将两个累加器合并到一个累加器(merge方法)以及从累加器中提取输出(类型为OUT)的方法。

package com.bigdata.windows;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class _04_AggDemo {

    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L),
            Tuple3.of("class1", "王五", 60L),
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L),
            Tuple3.of("class2", "小八", 50L)
    };

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //2. source-加载数据
        DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);

        KeyedStream<Tuple3<String,String,Long>, String> keyedStream = dataStreamSource.keyBy(new KeySelector<Tuple3<String,String,Long>, String>() {

            @Override
            public String getKey(Tuple3<String,String,Long> tuple3) throws Exception {
                return tuple3.f0;
            }
        });

        //3. transformation-数据处理转换
        // 三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)
        keyedStream.countWindow(3).aggregate(new AggregateFunction<Tuple3<String,String,Long>, Tuple3<String,Long,Integer>, Tuple2<String,Double>>() {

            // 初始化一个中间变量
            Tuple3<String,Long,Integer> tuple3 = Tuple3.of(null,0L,0);
            @Override
            public Tuple3<String,Long,Integer> createAccumulator() {
                return tuple3;
            }

            @Override
            public Tuple3<String,Long,Integer> add(Tuple3<String, String, Long> value, Tuple3<String,Long,Integer> accumulator) {
                long tempScore = value.f2 + accumulator.f1;
                int length = accumulator.f2 + 1;
                return Tuple3.of(value.f0, tempScore,length);
            }

            @Override
            public Tuple2<String, Double> getResult( Tuple3<String,Long,Integer> accumulator) {
                return Tuple2.of(accumulator.f0,(double) accumulator.f1 / accumulator.f2);
            }

            @Override
            public Tuple3<String, Long, Integer> merge(Tuple3<String, Long, Integer> a, Tuple3<String, Long, Integer> b) {
                return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);
            }


        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

3、全量聚合函数

指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求) 

实现方法
apply(windowFunction)
process(processWindowFunction)

全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。
ProcessWindowFunction一次性迭代整个窗口里的所有元素,比较重要的一个对象是Context,可以获取到事件和状态信息,这样我们就可以实现更加灵活的控制,该算子会浪费很多性能,主要原因是不增量计算,要缓存整个窗口然后再去处理,所以要设计好内存。
package com.bigdata.day04;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;
public class Demo03 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        Tuple3[] ENGLISH = new Tuple3[] {
                Tuple3.of("class1", "张三", 100L),
                Tuple3.of("class1", "李四", 40L),
                Tuple3.of("class1", "王五", 60L),
                Tuple3.of("class2", "赵六", 20L),
                Tuple3.of("class2", "小七", 30L),
                Tuple3.of("class2", "小八", 50L)
        };
        // 先求每个班级的总分数,再求每个班级的总人数
        DataStreamSource<Tuple3<String,String,Long>> streamSource = env.fromElements(ENGLISH);
        KeyedStream<Tuple3<String, String, Long>, String> keyedStream = streamSource.keyBy(v -> v.f0);
        // 每个分区中的数据都达到了3条才能触发,哪个分区达到了三条,哪个就触发,不够的不计算
        // //Tuple3<String, String, Long> 输入类型
        //    //Tuple2<Long, Long> 累加器ACC类型,保存中间状态 第一个值代表总成绩,第二个值代表总人数
        //    //Double 输出类型
        // 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象
        keyedStream.countWindow(3).apply(new WindowFunction<Tuple3<String, String, Long>, Double, String, GlobalWindow>() {
            @Override
            public void apply(String s, GlobalWindow window, Iterable<Tuple3<String, String, Long>> input, Collector<Double> out) throws Exception {
                // 计算总成绩,计算总人数
                int sumScore = 0,sumPerson=0;
                for (Tuple3<String, String, Long> tuple3 : input) {
                    sumScore += tuple3.f2;
                    sumPerson += 1;
                }
                out.collect((double)sumScore/sumPerson);
            }
        }).print();


        //5. execute-执行
        env.execute();
    }
}

 

 


http://www.kler.cn/a/414624.html

相关文章:

  • Python标识符命名规则
  • Harbor安装、HTTPS配置、修改端口后不可访问?
  • 什么是串联谐振
  • Dapper简易入门
  • 【网络安全设备系列】12、态势感知
  • 数据结构 ——— 归并排序算法的实现
  • [Maven]3.5.3配置
  • HTTP超文本协议
  • Python轴承故障诊断 (21)基于VMD-CNN-BiTCN的创新诊断模型
  • C++练级计划-> 《IO流》iostream fstream sstream详解
  • 操作系统 | 学习笔记 | 王道 | 2.1 进程与线程
  • 若依前端报错 components.at is not a function
  • JVM(JAVA虚拟机)内存溢出导致内存不足,Java运行时环境无法继续
  • 外卖点餐系统小程序
  • LeetCode --- 424周赛
  • 光伏功率预测!Transformer-LSTM、Transformer、CNN-LSTM、LSTM、CNN五模型时序预测
  • Springboot项目搭建(7)-Layout界面布局
  • Vue.js - axios网络请求
  • C/C++ 中volatile 关键字
  • 【DERPNSTINK靶场渗透】
  • [在线实验]-Redis Docker镜像的下载与部署
  • C++中智能指针的使用及其原理 -- RAII,内存泄漏,shared_ptr,unique_ptr,weak_ptr
  • vue安装cypress及其部分用法
  • 基于C#+SQLite开发数据库应用的示例
  • 从传统IT运维到智能化运维的转型之路
  • 数据结构 (10)队列