当前位置: 首页 > article >正文

Flink cdc同步增量数据timestamp字段相差八小时(分析|解决)不是粘贴复制的!

问题

我使用flink cdc同步mysql到mysql遇到了timestamp字段缺少八小时的问题。很少无语,flink ,cdc,debezium时区都设置了,没有任何效果!

分析

问题出现在mysql binlog身上!!!
因为默认mysql会使用UTC来存储binlog,你可以使用下方的sql验证:

mysqlbinlog --base64-output=DECODE-ROWS -v --start-datetime="2024-11-26 16:20:59" --stop-datetime="2024-11-26 16:30:59" /path/to/binlog-file

设置起止时间和binlog位置

而我们存储mysql数据的时候使用的时区大概率是上海,你也可以查看:

SELECT @@global.time_zone, @@session.time_zone;

如果都是system,而你就在中国大陆那就没错了

验证分析:

而这个时间相差你会发现只在同步增量数据的时候才出现!因为.startupOptions(StartupOptions.initial())会同步历史数据,这些都是从数据库读取的,所以两边都是上海时区就不会有问题!

解决

其实官方给了解决的方案,但是说的非常的模糊,如果对flink cdc不是很熟悉的朋友大概率会云里雾里!
这是官方的常见问题汇总:
以下是原话:
Q2: 使用 MySQL CDC,增量阶段读取出来的 timestamp 字段时区相差8小时,怎么回事呢?

#在解析binlog数据中的timestamp字段时,cdc 会使用到作业里配置的server-time-zone信息,也就是MySQL服务器的时区,如果这个时区没有和你的MySQL服务器时区一致,就会出现这个问题。

此外,如果是在DataStream作业中自定义列化器如 MyDeserializer implements DebeziumDeserializationSchema, 自定义的序列化器在解析 timestamp 类型数据时,需要参考下 RowDataDebeziumDeserializeSchema 中对 timestamp 类型的解析,用时给定的时区信息。


private TimestampData convertToTimestamp(Object dbzObj, Schema schema) {
        if (dbzObj instanceof Long) {
            switch (schema.name()) {
                case Timestamp.SCHEMA_NAME:
                    return TimestampData.fromEpochMillis((Long) dbzObj);
                case MicroTimestamp.SCHEMA_NAME:
                    long micro = (long) dbzObj;
                    return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000));
                case NanoTimestamp.SCHEMA_NAME:
                    long nano = (long) dbzObj;
                    return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000));
            }
        }
        LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
        return TimestampData.fromLocalDateTime(localDateTime);
    }

其实意思很简单,就是自定义序列化器(实现接口CustomConverter)并且对timestamp字段进行单独的处理就可以!

总结!!!

关于序列化,flink有一个官方的序列化器,是debezium的,源码下载链接:
在这里插入图片描述
你只需要在这个方法里面手动修改时区就可以了!
在这里插入图片描述

注意:你要观察下按照你的环境版本timestamp字段映射的对象类型是不是 ZonedDateTime!!!

使用也很简单(好人做到底):

MySqlSource<DataChangeInfo> mySqlSource = MySqlSource.<DataChangeInfo>builder()
                .hostname("192.168.10.14")
                .port(3306)
                .databaseList("xcode")
                .tableList("xcode.temp_flink")
                .username("root")
                .password("123456")
//                .serverId("5401-5404")
                .debeziumProperties(getProperties())
                .deserializer(new MysqlDeserialization())
//                .scanNewlyAddedTableEnabled(true)
//                .includeSchemaChanges(true) // Configure here and output DDL events
                .startupOptions(StartupOptions.initial())
//                .serverTimeZone("Asia/Shanghai")
                .build();

// 关键代码在这里!!!!!!!!!
private static Properties getProperties() {
    Properties properties = new Properties();
    properties.setProperty("converters", "dateConverters");
    //这里!这里!!这里!!!(这是官方的,用上面的源码自己修改完填你的全路径)
    properties.setProperty("dateConverters.type", "io.debezium.connector.mysql.converters.MysqlDebeziumTimeConverter");
    properties.setProperty("dateConverters.format.date", "yyyy-MM-dd");
    properties.setProperty("dateConverters.format.time", "HH:mm:ss");
    properties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss");
    properties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss");
    // timestamp没用。。。
//        properties.setProperty("dateConverters.format.timestamp.zone", "UTC");
    //全局读写锁,可能会影响在线业务,跳过锁设置
    properties.setProperty("debezium.snapshot.locking.mode","none");
    properties.setProperty("include.schema.changes", "true");
    properties.setProperty("bigint.unsigned.handling.mode","long");
    properties.setProperty("decimal.handling.mode","double");
    return properties;
}

http://www.kler.cn/a/411455.html

相关文章:

  • 自由学习记录(25)
  • 嵌入式系统与OpenCV
  • NLP论文速读(剑桥大学出品)|分解和利用专家模型中的偏好进行改进视觉模型的可信度
  • 零基础学安全--云技术基础
  • HttpServletRequest req和前端的关系,req.getParameter详细解释,req.getParameter和前端的关系
  • 在win10环境部署opengauss数据库(包含各种可能遇到的问题解决)
  • 2024APMCM亚太杯数学建模C题【宠物行业】原创论文分享
  • kali Linux中foremost安装
  • 实现乱序函数?(面试常考)
  • 计算(a+b)/c的值
  • [STM32]从零开始的STM32 FreeRTOS移植教程
  • 运维面试整理总结
  • 2024年11月22日Github流行趋势
  • 基于Java+SpringBoot+Mysql在线简单拍卖竞价拍卖竞拍系统功能设计与实现九
  • html转成图片
  • 「Mac玩转仓颉内测版29」基础篇9 - 数组类型详解
  • 【论文解读】CVPR 2024 DSL-FIQA :全新人脸面部图像质量评估算法(附论文地址)
  • HPA - k8s自动伸缩机制
  • 2024年11月26日Github流行趋势
  • 推荐一款龙迅HDMI2.0转LVDS芯片 LT6211UX LT6211UXC
  • 1 ISP一键下载
  • ffmpeg 生成白噪声,粉噪声,棕噪声,蓝噪声,紫噪声,丝绒噪声
  • SVG无功补偿装置MATLAB仿真模型
  • java虚拟机——频繁发生Full GC的原因有哪些?如何避免发生Full GC
  • 快速理解微服务中Fegin的概念
  • 241126学习日志——[CSDIY] [ByteDance] 后端训练营 [19]