flink学习(9)——time+water mark
Time的分类 (时间语义)
EventTime:事件(数据)时间,是事件/数据真真正正发生时/产生时的时间
IngestionTime:摄入时间,是事件/数据到达流处理系统的时间
ProcessingTime:处理时间,是事件/数据被处理/计算时的系统的时间
EventTime
一个订单数据,支付数据的事件时间是11点59分(发送支付请求的时间),而支付数据的处理时间是12点01分(完成订单,扣钱的时间)
问题:
如果要统计12之前的订单金额,那么这笔交易是否应被统计?
答案:
应该被统计,因为该数据的真真正正的产生时间为11点59分,即该数据的事件时间为11点59分,
事件时间能够真正反映/代表事件的本质! 所以一般在实际开发中会以事件时间作为计算标准。
——要想使用eventTime进行统计,那么必须添加waterMark(水印)
Watermark——水印,水位线
为什么会有WaterMark?
当flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:
————只能解决短期时间的问题
Watermark的核心本质可以理解成一个延迟触发机制。
假如没有这个水位线
1、错过了就是错过了,然后坐下一班车
2、这几辆车均留下,一直等着,那么什么时候开车呢——waterMark
3、当此时出现的最大事件事件 - 最大允许的延迟时间(或最大允许的乱序度时间) = waterMark
4、车走的时机,也就是窗口触发的时机
有了Watermark就可以根据Watermark来决定窗口的触发时机,满足下面的条件才触发:
1.窗口有数据
2.Watermark >= 窗口的结束时间
可以认为水印时间就是一个标志时间戳,用于判断这个窗口等待多长时间
举例:
窗口5秒,延迟(水印)3秒,按照事件时间计算
数据事件时间3, 落入窗口0-5.水印时间0
来一条数据事件时间7, 落入窗口6-10,水印时间4
来一条数据事件时间4,落入窗口0-5,水印时间4
来一条数据事件时间8,落入窗口6-10,水印时间5
这一条数据水印时间大于等于窗口0-5的窗口结束时间。
满足了对窗口0-5的提交,这个窗口关闭,并触发数据计算
可以看出,第三条数据,其是延迟数据,它的事件时间是4,却来的比事件时间为7的数据还要晚。
但是因为水印的机制,这个数据未错过它的窗口,依旧成功进入属于它的窗口并且被计算
这就是水印的功能:在不影响按照事件时间判断数据属于哪个窗口的前提下,延迟某个窗口的关闭时间,让其等待一会儿延迟数据。
多并行度的水印触发
在多并行度下,每个并行有一个水印
比如并行度是6,那么程序中就有6个watermark
分别属于这6个并行度(线程)
那么,触发条件以6个水印中最小的那个为准
比如, 有个窗口是0-5
其中5个并行度的水印都超过了5
但有一个并行度的水印是3
那么,不管另外5个并行度中的水印达到了多大,都不会触发
因为6个并行度中的6个水印,最小的是3,不满足大于等于窗口结束5的条件
Watermark代码演示
实体类 OrderInfo
package com.bigdata.day05;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderInfo {
private String orderId;
private int uId;
private double money;
private long timeStamp;
}
自定义source
// 为了防止并行度的影响,因此设置为一个并行度
package com.bigdata.day05;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
import java.util.UUID;
public class MySource implements SourceFunction<OrderInfo>{
private boolean flag = true;
@Override
public void run(SourceContext<OrderInfo> ctx) throws Exception {
Random random = new Random();
while (flag){
String orderId = UUID.randomUUID().toString();
int uId = random.nextInt(10);
double money =Math.round(random.nextDouble() * 100);
long timeStamp = System.currentTimeMillis() - random.nextInt(3000);
OrderInfo orderInfo = new OrderInfo(orderId,uId,money,timeStamp);
ctx.collect(orderInfo);
Thread.sleep(20);
}
}
@Override
public void cancel() {
flag = false;
}
}
CartInfo案例_waterMark
// 滑动窗口
每隔5秒收集5秒的数据
计算每个用户总共的订单金额
package com.bigdata.day05;
import java.time.Duration;
public class _02_CartInfo案例_waterMark {
public static void main(String[] args) throws Exception {
// 获取环境
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
// 添加数据
DataStreamSource<OrderInfo> orderInfoDataStreamSource = env.addSource(new MySource());
// 设置水印
SingleOutputStreamOperator<OrderInfo> watermarkSource = orderInfoDataStreamSource
.assignTimestampsAndWatermarks(
WatermarkStrategy.
// 最大延迟时间为3秒
<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
@Override
public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
// 注意:时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。
return orderInfo.getTimeStamp();
}
}));
watermarkSource.keyBy(new KeySelector<OrderInfo, Integer>() {
@Override
public Integer getKey(OrderInfo value) throws Exception {
return value.getUId();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
@Override
public void apply(Integer uId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {
double sumMoney = 0;
for (OrderInfo orderInfo : input) {
sumMoney += orderInfo.getMoney();
}
String startStr = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");
String endStr = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");
out.collect("uid: "+uId+","+"sumMoney: "+sumMoney+","+"startTime: "+startStr+","+"endTime: "+endStr);
}
}).print().setParallelism(1);
env.execute();
}
}
1、时间戳的分配与 watermark 的生成是齐头并进的,
2、可以告诉 Flink 应用程序事件时间的进度。通过指定 WatermarkGenerator 来配置 watermark 的生成方式。
3、Watermark定义方式,首先assignTimestampsAndWatermarks
然后
1、实现WatermarkStrategy 接口
2、通过WatermarkStrategy 工具类
第二种方式的具体实现:
orderInfoDataStreamSource
.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
@Override
public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
// 注意:时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。 若不是则需要转为毫秒值
return orderInfo.getTimeStamp();
}
}));
4、WatermarkStrategy 可以在 Flink 应用程序中的两处使用,
第一种是直接在数据源上使用
第二种是直接在非数据源的操作之后使用