20250124 Flink中 窗口开始时间和結束時間
增量聚合的 ProcessWindowFunction #
ProcessWindowFunction
可以与 ReduceFunction
或 AggregateFunction
搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction
将会得到聚合的结果。 这样它就可以增量聚合窗口的元素并且从 ProcessWindowFunction` 中获得窗口的元数据。
你也可以对过时的 WindowFunction
使用增量聚合。
使用 ReduceFunction 增量聚合 #
- 下例展示了如何将
ReduceFunction
与ProcessWindowFunction
组合,返回窗口中的最小元素和窗口的开始时间。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
通俗解释:窗口开始时间的作用
我们可以用一个更贴近生活的例子来理解 窗口开始时间 的意义。
场景比喻:每天上午的「温度统计报告
假设你有一个气象站,每5分钟记录一次户外温度。现在需要 每小时(例如8:00-9:00)统计一次该时段内的最低温度,并在报告中标注这个小时段的起始时间(如“8:00-9:00的最低温度是15°C”)。
关键点
-
窗口开始时间:就是时间段的起点(如8:00)。
-
窗口结束时间:就是时间段的终点(如9:00)。
-
为什么要记录开始时间?
方便人类理解数据属于哪个时段(比如“8点档”的数据)。
代码示例解析
1. 窗口如何划分?
假设使用 滚动窗口(Tumbling Window),每1小时划分一次:
Copy
8:00-9:00 → 窗口1 9:00-10:00 → 窗口2
所有时间戳在8:00≤t<9:00的数据会被分配到窗口1。
2. 窗口触发计算的时机
当系统时间(或事件时间)到达9:00时,窗口1关闭,触发计算:
-
调用
MyReduceFunction
找出该窗口内的最低温度。 -
调用
MyProcessWindowFunction
将结果与窗口开始时间(8:00)绑定。
3. 为什么输出的是开始时间(8:00)而不是结束时间(9:00)?
-
业务需求:通常更关心数据所属时段的起点(例如“8点档的数据”)。
-
避免歧义:如果输出9:00,可能被误解为“9点档的数据”(实际是8:00-9:00的数据)。
代码中具体如何获取开始时间?
在 MyProcessWindowFunction
中:
context.window().getStart(); // 返回窗口的起始时间戳(如8:00对应的毫秒值)
-
context
对象:包含窗口的元信息(起止时间、触发时间等)。 -
实际输出时:将时间戳转换为人类可读格式(如
8:00
)。
常见疑问解答
Q1:如果数据延迟到达(比如8:59的数据在9:05才到),会进入哪个窗口?
-
取决于时间语义:
-
若使用 事件时间(Event Time):按数据自带的时间戳分配到8:00-9:00窗口。
-
若使用 处理时间(Processing Time):按到达系统的时间分配到9:00-10:00窗口。
-
(示例代码未显式设置时间语义,默认可能是处理时间)
-
Q2:窗口开始时间是如何计算的?
-
由窗口分配器(Window Assigner)决定:
-
滚动窗口按固定间隔对齐(如整点)。
-
滑动窗口按步长对齐(如每30分钟滑动一次的1小时窗口)。
-
会话窗口根据数据活跃度动态划分。
-
Q3:可以同时输出开始时间和结束时间吗?
可以!修改 ProcessWindowFunction
:
out.collect(new Tuple3<>(context.window().getStart(), context.window().getEnd(), min));
总结
-
窗口开始时间 标记了数据所属时间段的起点(如“8:00档”)。
-
在 Flink 中,通过
ProcessWindowFunction
的context
可以轻松获取这一信息。 -
这种设计让数据处理结果更易理解(如统计报告、监控仪表盘)。