当前位置: 首页 > article >正文

Flink滑动窗口(Sliding)中window和windowAll的区别

滑动窗口的使用,主要是计算,在reduce之前添加滑动窗口,设置好间隔和所统计的时间,然后再进行reduce计算数据即可。

窗口设置好时间间隔,和处理时间窗口的时间,比如将滑动窗口的时间间隔都设置为5s,处理时间为15s,意思是每隔五秒,就处理15s秒的数据

滑动窗口(window)

比如打了3s的输入,到了第五秒的时候,滑动window开始处理15秒的数据,数据就像滑动一样,用一个线段展示。

代码展示:


import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class Demo4Window {
    public static void main(String[] args) throws Exception {
        //1、创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2、读取数据
        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        //使用lambda表达式处理数据
        DataStream<String> wordsDS = linesDS
                .flatMap((line, out) -> {
                    for (String word : line.split(",")) {
                        out.collect(word);
                    }
                }, Types.STRING);

        DataStream<Tuple2<String, Integer>> kvDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                //指定返回类型
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);

        /*
         * SlidingProcessingTimeWindows:滑动的处理时间窗口
         */
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS
                //每隔5秒计算最近15秒的数据
                .window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));


        //kv1代表之前的结果(状态),kv2代码最新一条数据
        //reduce:有状态计算
        DataStream<Tuple2<String, Integer>> countDS = windowDS
                .reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));

        countDS.print();

        //execute方法会触发任务执行(任务调度)
        env.execute("lambda");
    }
}

滑动窗口(windowAll) 

将同一个窗口的数据放在一起计算,将之前计算的结果与最新统计的结果相加

 代码展示:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class Demo4WindowAll {
    public static void main(String[] args) throws Exception {
        //1、创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2、读取数据
        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        //使用lambda表达式处理数据
        DataStream<String> wordsDS = linesDS
                .flatMap((line, out) -> {
                    for (String word : line.split(",")) {
                        out.collect(word);
                    }
                }, Types.STRING);

        DataStream<Tuple2<String, Integer>> kvDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                //指定返回类型
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        /*
         * SlidingProcessingTimeWindows:滑动的处理时间窗口
         */
        AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowAllDS = kvDS
                //每隔5秒计算最近15秒的数据
                //windowAll:将同一个窗口的数据发一起进行计算
                .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));

        //kv1代表之前的结果(状态),kv2代码最新一条数据
        //reduce:有状态计算
        DataStream<Tuple2<String, Integer>> countDS = windowAllDS
                .reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));

        countDS.print();

        //execute方法会触发任务执行(任务调度)
        env.execute("lambda");
    }
}


http://www.kler.cn/a/388010.html

相关文章:

  • 《AI 使生活更美好》
  • JDBC-Dao层模式
  • 云防护单节点2T抗攻击能力意味着什么?
  • 速盾:cdn和反向代理的关系是什么?
  • sql server启用远程连接与修改默认端口
  • Vue3入门介绍及快速上手
  • redis用法(二)
  • 项目功能--运营数据统计报表导出
  • 【真题笔记】21年系统架构设计师案例理论点总结
  • 【SpringBoot】19 文件/图片下载(MySQL + Thymeleaf)
  • 说说webpack中常见的Plugin?解决了什么问题?
  • Ubuntu18.04更换PREEMPT RT内核
  • 软考:论容器编排
  • 微信小程序——01开发前的准备和开发工具
  • C++builder中的人工智能(20):如何在C++中开发一个简单的Hopfield网络
  • 推荐一款管道数据检索工具:Pipedata-Pro
  • C语言复习第9章 字符串/字符/内存函数
  • 卫星授时服务器,单北斗授时服务器,北斗卫星时钟服务器
  • 域名邮箱推荐:安全与稳定的邮件域名邮箱!
  • 【go从零单排】迭代器(Iterators)
  • chrony服务器(linux)
  • w029基于springboot的网上购物商城系统研发
  • Kubernetes中的statefulset控制器
  • 构建智能防线 灵途科技光电感知助力轨交全向安全防护
  • 现代Web开发:React Hooks深入解析
  • 鸿蒙移动应用开发-------初始arkts