当前位置: 首页 > article >正文

[实战-12] flinkSql 时间属性

flinkSql 时间属性

  • 一. 摘要
  • 二.flink 时间数据类型
      • 2.1. TIMESTAMP(n)
      • 2.2. TIMESTAMP_LTZ(n)
  • 三.flink 受时区影响的时间函数
  • 五. time Attributes
    • 5.1. event_time
    • 5.2. process_time
  • 六. event_time 时间字段转化
  • 六. process_time 时间字段转化
  • 七.参考链接
    • flink-doc-time-attr [time-zone](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/timezone/)

一. 摘要

在阅读下面知识之前,读者需要先看 Flinksql设置时区

二.flink 时间数据类型

flink兼容很多上游数据库,一般要求数据库中的字段类型为datetime或者是timestamp的时候,flinksql create table的时候可以用TIMESTAMP()或者TIMESTAMP_ltz()接收
如果上游数据是string或者是long则需要转化,这块后面再说。

2.1. TIMESTAMP(n)

是 TIMESTAMP(n) WITHOUT TIME ZONE 的简写,不携带时区信息。意思是你再源数据库中看到的yyyy-MM-dd
hh:MM:ss是什么样子, 在flink中看到的就是什么样子。换句话说
TIMESTAMP(n)相当于一个字符串类型,无论作业时区怎么变,得到的字符串是不变的。

注意:TIMETSAMP(n) 在flink中是 字符串,字符串,字符串,重要的事情说三遍

2.2. TIMESTAMP_LTZ(n)

是TIMESTAMP§ WITH LOCAL TIME ZONE的简写, 是一个全球统一的时间点类型,
它底层不是字符串,属于Bigint类型。如果将其转为字符串,则结果会随着作业时区改变。
flinksql查看执行结果的时候,不管是TIMESTAMP()还是TIMESTAMP_LTZ我们看到的都是字符串.
TIMESTAMP_LTZ展示给我们的 字符串 会随着时区的table.local-time-zone 的设置而变化。这点要注意哦。
而TIMESTAMP(n)底层本身就是 字符串 , 是和时区没有关系的。 这就是两个的区别。

注意:TIMETSAMP(n) 在flink中是 long,long,long,重要的事情说三遍

三.flink 受时区影响的时间函数

函数例子
LOCALTIME15:18:36
LOCALTIMESTAMP2021-04-15 15:18:36.384
CURRENT_DATE2021-04-15
CURRENT_TIME15:18:36.384
CURRENT_TIMESTAMP2021-04-15 15:18:36.384
CURRENT_ROW_TIMESTAMP()2021-04-15 15:18:36.384
NOW()2021-04-15 15:18:36.384
PROCTIME()2021-04-15 15:18:36.384
  • TIMESTAMP(n) 对flink来说是string不受到时区的影响
    不管如何设置时区,我们看到的日期字符串都是不变的
  • TIMESTAMP_LTZ(n) 对flink来说是long,受到时区影响
    对我们来说看到的都是yyyy-MM-dd hh:MM:ss格式的字符串,
    long->可见的字符串这个过程 是受到时区影响的 因此时区不同我们看到的字符串也是不同的。

五. time Attributes

我们知道,flink常见的事件事件和处理时间, 二者都可以用来开窗口计算。
event_time和process_time都是时间类型,被称为time Attributes(时间属性)

5.1. event_time

可用必须是timestamp()或者是timestamp_ltz 的类型

5.2. process_time

flink-1.13之前PROCTIME()  用的是TIMESTAMP(3)字符串类型,且这个字符串是UTC时区转化的字符串
flink-1.13只后PROCTIME() 用的是TIMESTAMP_LTZ() 会自动按照当前sessoin的时区转换。

对flink来说外界数据时间,需要转化为fink识别的TIMESTAMP或者TIMESTAMP_LTZ(),才可以进行flink后续的窗口计算。窗口以及水位线的声明需要依赖转化后的 时间字段

六. event_time 时间字段转化

  • DDL 指定水位线和进行时间字段的转化

    	CREATE TABLE user_actions (
    	  time_str STRING,
    	  t1 as TO_TIMESTAMP(time_str,'yyyy-MM-dd HH:mm:ss'),
    	  t2 AS TO_TIMESTAMP_LTZ(time_long,3),
    	  time_datetime TIMESTAMP(3),
    	  //WATERMARK FOR t1 AS t1 - INTERVAL '5' SECOND,
    	 // WATERMARK FOR t2 AS t2 - INTERVAL '5' SECOND,
    	  //WATERMARK FOR time_datetime  AS time_datetime - INTERVAL '5' SECOND,
    	) WITH (
    	  ...
    	);
    
    

    主要分为几种情况

    1. 数据库中是yyyy-MM-dd hh:MM:dd 的string
      需要借助函数TO_TIMESTAMP(字符串字段,'yyyy-MM-dd hh:MM:ss)
    2. 数据库是datetime或者timestamp
      可直接被识别为TIMESTAMP(n)不需要转化
    3. 数据库中是long 时间戳
      需要借助函数需要借助函数TO_TIMESTAMP_LTZ(long字段,精度)
      ts AS TO_TIMESTAMP_LTZ(time_long,3) 意思是将time_long 按照flink中设置的时区转化为 flink可识别的TIMESTAMP_LTZ(3)
      并将转化后的值 命名为ts字段

    注意上面sql中被注释的部分,意思是转化后的时间字段,就可以被用于水位线设置了

  • fromDataStream 或fromChangeStream()的时候
    这种情况必须借助 Scheme 定义水位线和时间属性

  1. 如果DatasStream 提前定义了事件事件和水位线,则Scheme可直接提取
    DataStream Api中有个隐藏是rowtime 属性,该属性其实就是event_time, 我们可以通过以下方式直接获取
           Schema.newBuilder()
               .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") // 获取元数据中的rowtime属性,其实就是event_time
               .watermark("rowtime", "SOURCE_WATERMARK()")  // 意思是获取DataStream的水位线传播到Table Api
               .build()
  1. 如果DatasStream 没有定义事件事件和水位线,此时需要在转换的时候重新定义水位线
    假设DataStream中数据有个字段叫做my_time是个long类型的时间戳,则通过以下方式定义新的水位线
        Schema s1=Schema.newBuilder()
                .columnByExpression("myrowtime", "CAST(my_time AS TIMESTAMP_LTZ(0))")
               // .columnByExpression("myrowtime", "TO_TIMESTAMP(my_time)")
              //  .columnByExpression("myrowtime", "TO_TIMESTAMP_LTZ(my_time,3)") 
                .watermark("myrowtime", "myrowtime- INTERVAL '10' SECOND")
                .build();
        string类型也可以用TO_TIMESTAMP(my_time) 转换
        long类型也可以用TO_TIMESTAMP_LTZ(my_time,3)转换
        CAST那种写法比较暴力,直接是类型转换

六. process_time 时间字段转化

比较简单,直接用 PROCTIME() 函数就行了

  • DDL

    	CREATE TABLE user_actions (
    		  process_time AS PROCTIME() 
    		) WITH (
    		  ...
    		);
    	```
    
    
    
  • fromDataStream

      Schema schema = Schema.newBuilder()
              .columnByExpression("proctime", "PROCTIME()")
              .build();
    

七.参考链接


flink-doc-time-attr
time-zone


http://www.kler.cn/a/375783.html

相关文章:

  • 如何学习网络安全?有哪些小窍门?
  • AI刷题-小R的随机播放顺序、不同整数的计数问题
  • Redis系列之底层数据结构字典Dict
  • 2025.1.16——六、BabySQL 双写绕过|联合注入
  • 春秋杯-WEB
  • PyTorch 神经协同过滤 (NCF) 推荐系统教程
  • 互联网技术比游戏后端技术领先十年吗?
  • Android Pair
  • Yocto中的DISTRO和MACHINE的含义与机制
  • 使用 Java 实现从搜索引擎批量下载图片
  • 【STM32】内存管理
  • D55【python 接口自动化学习】- python基础之模块与标准库
  • solidity中的Error和Modifier详解
  • 构建高效信息学科平台:Spring Boot实践
  • JQuery基本介绍和使用方法
  • Docker-微服务项目部署
  • android浏览器源码 可输入地址或关键词搜索 android studio 2024 可开发可改地址
  • 回归与分类中的过拟合问题探讨与解决
  • 人工智能图谱
  • 2-140 基于Solidworks和Matlab Simulink Simscape仿真的机器人手臂仿真
  • ES海量数据插入如何优化性能?
  • 八、快速入门Kubernetes之service
  • 【Matlab】基础操作汇总
  • 【系统集成项目管理工程师教程】第1章 信息化发展
  • Tensor做为索引
  • 庭田科技参与第四届计算机辅助焊接工程与增材制造国际研讨会