当前位置: 首页 > article >正文

[实战-11] FlinkSql设置时区(table.local-time-zone)

table.local-time-zone

  • table.local-time-zone
  • 代码测试
  • flink代码
  • 执行结果截图
    • 1. Asia/Shanghai 结果如下
    • 2. UTC结果如下

table.local-time-zone

设置flink程序运行期间得时区,flink的内置数据类型TIMESTAMP(3), 我们设置水位线都是基于TIMESTAMP(3)类型,
当我们flinkSql 查询flink表的时候,因为时区不同 TIMESTAMP 展示给我们的字符串也是不同的。

代码测试

mysql时区是Asia/Shanghai

CREATE TABLE `versioned_rates` (
  `operation_code` int DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `product_id` varchar(255) DEFAULT NULL,
  `product_name` varchar(255) DEFAULT NULL,
  `price` float DEFAULT NULL,
  `time_long` bigint NOT NULL DEFAULT '0'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
INSERT INTO flink.versioned_rates (operation_code,update_time,product_id,product_name,price,time_long) VALUES 
(1,'2024-01-01 00:01:00.0','p_001','scooter',11.11,1730346179000)
,(1,'2024-01-01 00:02:00.0','p_002','basketball',23.11,1730346179000)
,(2,'2024-01-01 12:00:00.0','p_001','scooter',11.11,1730346179000)
,(3,'2024-01-01 12:00:00.0','p_001','scooter',12.99,1730346179000)
,(2,'2024-01-01 12:00:00.0','p_002','basketball',23.11,1730346179000)
,(3,'2024-01-01 12:00:00.0','p_002','basketball',19.99,1730346179000)
,(4,'2024-01-01 18:00:00.0','p_001','scooter',12.99,1730346179000)
;

flink代码

package com.pg.TableAndDataStreamApi;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;


/*
*
* */
public class version_table {
    private static final String SOURCE="CREATE TABLE mysql_table(\n" +
            "\toperation_code int,\n" +
            "\tupdate_time TIMESTAMP(0),\n" +
            "\tproduct_id string,\n" +
            "\tproduct_name string,\n" +
            "\tprice float,\n" +
            "\ttime_long bigint,\n" +
            "\tts AS TO_TIMESTAMP_LTZ(time_long,3)\n" +
            "    ) WITH (\n" +
            "   'connector' = 'jdbc',\n" +
            "   'url' = 'jdbc:mysql://ip:3306/flink',\n" +
            "   'driver'='com.mysql.cj.jdbc.Driver',\n "+
            "   'username'='root',\n"+
            "   'password'='密码',\n"+
            "   'table-name' = 'versioned_rates'\n" +
            ")";
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql(SOURCE);
        Configuration configuration = new Configuration();
//        configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "UTC");
        configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "Asia/Shanghai");

        tableEnv.getConfig().addConfiguration(configuration);

        // 从 MySQL 表中选择所有行
        Table t = tableEnv.sqlQuery("SELECT * FROM mysql_table");

        t.execute().print();


    }
}

执行结果截图

1. Asia/Shanghai 结果如下

在这里插入图片描述

2. UTC结果如下

在这里插入图片描述


http://www.kler.cn/a/374429.html

相关文章:

  • shodan7,shodan参数使用,常用端口,Google语法
  • 【Mac】安装 F5-TTS
  • ROS(快速初步入门)
  • 分布式数据库的发展历程与大规模应用的历史
  • 【HarmonyOS NEXT】使用 Navigation 对折叠屏设备页面进行分栏展示,优化 UI 交互
  • Linux上本地部署KubeSphere与cpolar实现远程管理和监控集群
  • MySQL 的 select * 会用到事务吗?
  • Ethernet 系列(6)-- 基础学习::OSI Model
  • 金融小白两周完成一个量化系统 (二)项目进度以及数据获取
  • 数据分析可视化:散点图矩阵与雷达图的生成
  • 时序数据分析:工业时序数据的特点
  • 【python ASR】win11-从0到1使用funasr实现本地离线音频转文本
  • golang的多表联合orm
  • 关于回溯与分支限界的一些介绍
  • 【第一个qt项目的实现和介绍以及程序分析】【正点原子】嵌入式Qt5 C++开发视频
  • 神经网络:解析人工智能的智慧基石
  • WPF+MVVM案例实战(十四)- 封装一个自定义消息弹窗控件(下)
  • 二:Linux学习笔记(第一阶段)-- Linux命令
  • 记一次:Clickhouse同步mysql数据库
  • 基于Spring Boot的医疗陪护系统设计与实现(源码+定制+开发)病患陪护管理平台、医疗服务管理系统、医疗陪护信息平台
  • 【PCL】vs2022配置PCL环境
  • RHCE第三次实验
  • 强大的文本编辑器Notepad++8.4.6 最新版
  • 【回溯】力扣 77.组合
  • [数组基础] 0238. 除自身以外数组的乘积
  • 「Mac畅玩鸿蒙与硬件11」鸿蒙UI组件篇1 - Text 和 Button 组件详解