当前位置: 首页 > article >正文

[实战-11] FlinkSql 设置时区对TIMESTAMP和TIMESTAMP_LTZ的影响

table.local-time-zone

  • table.local-time-zone
  • DataStream-to-Table Conversion(拓展知识)
  • 代码测试
  • flinksql代码
  • 执行结果截图
    • 1. Asia/Shanghai 结果如下
    • 2. UTC结果如下

table.local-time-zone

table.local-time-zone可用于设置flinksql的时区。
flink的内置数据类型TIMESTAMP(n)或者是TIMESTAMP_LTZ(n), 我们设置水位线都是基于这两种类型,不同的是前者本质是字符串形势,后者本质是long,也因此前者不受时区影响,后者受时区影响类型。(n指的毫秒级的精度取值范围是 0~9)
原始数据库如果不是时间类型,可能要用TO_TIMESTAMP(字符串格式的时间)或者TO_TIMESTAMP_LTZ(long数字,n)

如果原始数据库是string则需要用TO_TIMESTAMP(字符串格式的时间字段)转成TIMESTAMP(n)
如果原始数据库中是long则需要用TO_TIMESTAMP_LTZ(long数字,n) 转成TIMESTAMP_LTZ(n)

DataStream-to-Table Conversion(拓展知识)

datastream API到Table Api转换的时候,是以后string的形式传递event_time, 并且这个string在DataStream Api是以UTC时区转换的,如果你的原始数据中是long, 如果不做处理展示出来的string就是UTC字符串,为了在东八区展示,则需要将long再加上8小时

        // 水位线 允许乱序
        WatermarkStrategy<String> waterStrategy = WatermarkStrategy.<String>forMonotonousTimestamps() //ofSeconds(20)
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        try {
                            Mybook book= JSON.parseObject(element,Mybook.class);
                            return boo.time+8*60*60*1000  //转成东八区
                        }catch (Exception e){
                            return recordTimestamp;
                        }

                    }
                }).withIdleness(Duration.ofSeconds(timeWindowIdleness));

        SingleOutputStreamOperator<UserSlotGame> processStream = env
                .fromSource(source, waterStrategy, "readKafka")
                .process(new ProcessFunction<String, UserSlotGame>() {
                             @Override
                             public void processElement(String value, Context ctx, Collector<UserSlotGame> out) throws Exception {
                                // 省略

                             }
                    }) ;

代码测试

mysql时区是Asia/Shanghai

CREATE TABLE `versioned_rates` (
  `operation_code` int DEFAULT NULL,
  `update_time` varchar(255) DEFAULT NULL, -- 注意这是字符串
  `product_id` varchar(255) DEFAULT NULL,
  `product_name` varchar(255) DEFAULT NULL,
  `price` float DEFAULT NULL,
  `time_long` bigint NOT NULL DEFAULT '0' -- 注意这是long
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci



INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(1, '2024-01-01 00:01:00', 'p_001', 'scooter', 11.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(1, '2024-01-01 00:02:00', 'p_002', 'basketball', 23.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(2, '2024-01-01 12:00:00', 'p_001', 'scooter', 11.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(3, '2024-01-01 12:00:00', 'p_001', 'scooter', 12.99, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(2, '2024-01-01 12:00:00', 'p_002', 'basketball', 23.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(3, '2024-01-01 12:00:00', 'p_002', 'basketball', 19.99, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(4, '2024-01-01 18:00:00', 'p_001', 'scooter', 12.99, 1730346179000);

flinksql代码

package com.pg.TableAndDataStreamApi;



import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;


/*
*
* */
public class version_table {
    private static final String SOURCE="CREATE TABLE source_table(\n" +
            "\toperation_code int,\n" +
            "\tupdate_time string,\n" +
            "\tup_t AS TO_TIMESTAMP(update_time),\n" +
            "\ttime_long bigint,\n" +
            "\tbbb AS TO_TIMESTAMP_LTZ(time_long,3) \n" +
            "    ) WITH (\n" +
            "   'connector' = 'jdbc',\n" +
            "   'url' = 'jdbc:mysql://ip:3306/flink',\n" +
            "   'driver'='com.mysql.cj.jdbc.Driver',\n "+
            "   'username'='用户名',\n"+
            "   'password'='密码',\n"+
            "   'table-name' = 'versioned_rates'\n" +
            ")";

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql(SOURCE);
        Configuration configuration = new Configuration();
//        configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "UTC");
        configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "Asia/Shanghai");

        tableEnv.getConfig().addConfiguration(configuration);
        // 从 MySQL 表中选择所有行
        Table t = tableEnv.sqlQuery("select * from source_table");
        t.execute().print();


    }
}

执行结果截图

TO_TIMESTAMP_LTZ 受时区影响
而TO_TIMESTAMP()意味着原始数据中本就是string, 是不会受到时区影响的

  1. 下方第一个红色列不管是UTC还是 Asia/Shanghai 我们看大的string都是一样的
  2. 下方第一个红色列UTC比 Asia/Shanghai 少了8个小时

1. Asia/Shanghai 结果如下

在这里插入图片描述

2. UTC结果如下

在这里插入图片描述


http://www.kler.cn/a/383132.html

相关文章:

  • 2.利用docker进行gitlab服务器迁移
  • 在vscode的ESP-IDF中使用自定义组件
  • CSS系列(27)- 图形与滤镜详解
  • 密码学期末考试笔记
  • Mysql数据究竟是如何存储的
  • springboot中Jackson库和jsonpath库的区别和联系。
  • linux node vue3 部署手册
  • 基于YOLO11/v10/v8/v5深度学习的煤矿传送带异物检测系统设计与实现【python源码+Pyqt5界面+数据集+训练代码】
  • 信息学科平台系统设计与实现:Spring Boot框架精讲
  • 【H2O2|全栈】JS进阶知识(四)Ajax
  • 3种最难学习和最容易学习的 3 种编程语言
  • 微服务架构面试内容整理-Zuul
  • 打印等边三角形和直角三角形(用循环)C语言
  • 将vscode的终端改为cygwin terminal
  • 杨中科 .Net Core 笔记 DI 依赖注入
  • 【时间之外】IT人求职和创业应知【27】
  • PC端查看历史消息,鼠标向上滚动加载数据时页面停留在上次查看的位置
  • 使用k8s管理应用以及java案例
  • Go API 多种响应的规范化处理和简化策略
  • 晚霞视频素材哪里下载 高清无水印素材推荐平台
  • 在Scrapy爬虫中应用Crawlera进行反爬虫策略
  • 背包九讲——背包问题求具体方案
  • java.lang.NoClassDefFoundError: kotlin/jvm/JvmInline
  • 无root权限在Linux虚拟环境安装指定版本python
  • [354]基于springboot的足球俱乐部管理系统
  • 第07章 运算符的使用