[实战-11] FlinkSql设置时区(table.local-time-zone)
table.local-time-zone
- table.local-time-zone
- 代码测试
- flink代码
- 执行结果截图
- 1. Asia/Shanghai 结果如下
- 2. UTC结果如下
table.local-time-zone
设置flink程序运行期间得时区,flink的内置数据类型TIMESTAMP(3), 我们设置水位线都是基于TIMESTAMP(3)类型,
当我们flinkSql 查询flink表的时候,因为时区不同 TIMESTAMP 展示给我们的字符串也是不同的。
代码测试
mysql时区是Asia/Shanghai
CREATE TABLE `versioned_rates` (
`operation_code` int DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`product_id` varchar(255) DEFAULT NULL,
`product_name` varchar(255) DEFAULT NULL,
`price` float DEFAULT NULL,
`time_long` bigint NOT NULL DEFAULT '0'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
INSERT INTO flink.versioned_rates (operation_code,update_time,product_id,product_name,price,time_long) VALUES
(1,'2024-01-01 00:01:00.0','p_001','scooter',11.11,1730346179000)
,(1,'2024-01-01 00:02:00.0','p_002','basketball',23.11,1730346179000)
,(2,'2024-01-01 12:00:00.0','p_001','scooter',11.11,1730346179000)
,(3,'2024-01-01 12:00:00.0','p_001','scooter',12.99,1730346179000)
,(2,'2024-01-01 12:00:00.0','p_002','basketball',23.11,1730346179000)
,(3,'2024-01-01 12:00:00.0','p_002','basketball',19.99,1730346179000)
,(4,'2024-01-01 18:00:00.0','p_001','scooter',12.99,1730346179000)
;
flink代码
package com.pg.TableAndDataStreamApi;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
/*
*
* */
public class version_table {
private static final String SOURCE="CREATE TABLE mysql_table(\n" +
"\toperation_code int,\n" +
"\tupdate_time TIMESTAMP(0),\n" +
"\tproduct_id string,\n" +
"\tproduct_name string,\n" +
"\tprice float,\n" +
"\ttime_long bigint,\n" +
"\tts AS TO_TIMESTAMP_LTZ(time_long,3)\n" +
" ) WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://ip:3306/flink',\n" +
" 'driver'='com.mysql.cj.jdbc.Driver',\n "+
" 'username'='root',\n"+
" 'password'='密码',\n"+
" 'table-name' = 'versioned_rates'\n" +
")";
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(SOURCE);
Configuration configuration = new Configuration();
// configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "UTC");
configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "Asia/Shanghai");
tableEnv.getConfig().addConfiguration(configuration);
// 从 MySQL 表中选择所有行
Table t = tableEnv.sqlQuery("SELECT * FROM mysql_table");
t.execute().print();
}
}