当前位置: 首页 > article >正文

【大数据面试题】005 谈一谈 Flink Watermark 水印

一步一个脚印,一天一道面试题。

感觉我现在很难把水印描述的很好,但,完成比完美更重要。后续我再补充。各位如果有什么建议或补充也欢迎留言。(已更新1)

在实时处理任务时,由于网络延迟,人工异常,各种问题,数据往往会出现乱序,不按照我们的预期到达处理框架。
WaterMark 水印,就是为了一定程度的解决数据,延迟乱序问题的。

使用 WaterMark 一般有以下几个步骤:

  • 定义时间特性
    (Flink 1.12 已废弃,默认使用 事件时间)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  • 设置 Watermark 策略,赋值事件时间
        // 分配时间戳和水位线
        DataStream<Tuple2<Long, Integer>> withTimestampsAndWatermarks = parsedStream
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Tuple2<Long, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((element, recordTimestamp) -> element.f0));
水位线特性
  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊数据。

  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展

  • 水位线是基于数据的时间戳生成的。

  • 水位线必须**单调递增,**以确保任务的时间时间时钟一直向前推进。

  • 水位线可以设置延迟,来尽量保证正确处理乱序数据。

  • 一个水位线 Watermark (t), 表示在当前流中事件时间已经达到了时间戳 t,这代表 t 之前的所有数据都到齐了,之后不会出现在时间戳 (t) 之前的数据。出现了在 t 之前的数据就会被抛弃不处理。

话不多说,直接给个 Watermark 水印样例代码。


public class SimpleWatermarkExample {
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从 socket 文本流接收数据
        DataStream<String> input = env.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1));

        // 解析输入的数据
        DataStream<Tuple2<Long, Integer>> parsedStream = input
                .map(new MapFunction<String, Tuple2<Long, Integer>>() {
                    @Override
                    public Tuple2<Long, Integer> map(String value) throws Exception {
                        String[] parts = value.split(",");
                        return new Tuple2<>(Long.parseLong(parts[0]), Integer.parseInt(parts[1]));
                    }
                });

        // 分配时间戳和水位线
        DataStream<Tuple2<Long, Integer>> withTimestampsAndWatermarks = parsedStream
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Tuple2<Long, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((element, recordTimestamp) -> element.f0));

        // 使用窗口函数统计每10秒内的最大值
        DataStream<String> maxValues = withTimestampsAndWatermarks
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .apply(new WindowFunction<Tuple2<Long, Integer>, String, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<Tuple2<Long, Integer>> values, Collector<String> out) throws Exception {
                        int maxValue = Integer.MIN_VALUE;
                        for (Tuple2<Long, Integer> value : values) {
                            maxValue = Math.max(maxValue, value.f1);
                        }
                        out.collect("Window: " + window + " Max Value: " + maxValue);
                    }
                });

        // 打印结果
        maxValues.print();

        // 执行程序
        env.execute("Simple Flink Watermark Example");
    }
}

http://www.kler.cn/news/233884.html

相关文章:

  • 突破编程_C++_面试(基础知识(9))
  • 飞书上传图片
  • FPS游戏框架漫谈第二十天
  • 【大厂AI课学习笔记】【1.5 AI技术领域】(10)对话系统
  • C++ 位运算
  • Github 2024-02-06 开源项目日报Top9
  • 波奇学Linux:文件重定向和虚拟文件系统
  • 2024年 前端JavaScript入门到精通 第一天
  • 2-8 单链表+双链表+模拟栈+模拟队列
  • [Angular 基础] - 指令(directives)
  • Go语言每日一题——链表篇(七)
  • ANSI Escape Sequence 下落的方块
  • Stable Diffusion 模型下载:ToonYou(平涂卡通)
  • python实现k路归并排序
  • 在gtkmm4 中检索子控件 (children)
  • 人工智能|深度学习——使用多层级注意力机制和keras实现问题分类
  • 70.SpringMVC怎么和AJAX相互调用的?
  • 【c++】c++入门(上)
  • 〖大前端 - ES6篇②〗- let和const
  • 11.2 OpenGL可编程顶点处理:细分着色器
  • webgis后端安卓系统部署攻略
  • 【数据分享】1929-2023年全球站点的逐月平均降水量(Shp\Excel\免费获取)
  • 数据结构红黑树
  • 分布式搜索引擎 elasticsearch
  • MySQL-视图(VIEW)
  • 基于Chrome插件的Chatgpt对话无损导出markdown格式(Typora完美显示)
  • 【数据结构】双向链表(链表实现+测试+原码)
  • 练习:鼠标类设计之1_类内容解析
  • JVM-双亲委派机制
  • 一步步建立一个C#项目(连续读取S7-1200PLC数据)