当前位置: 首页 > article >正文

关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题

flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型

  • 问题
  • 解决
    • 1.自定义转换器类
    • 2.代码引用
  • 结果

问题

flink版本:1.18.1,mysql版本:8.0.40
使用FlinkCDC的MySqlSource 连接mysql,对于datetime 类型字段,Flink CDC 会自动将 datetime 类型的字段转换为时间戳(BIGINT 类型)。如:2020-10-21 18:49:12 变成 1603306152000
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

解决

1.自定义转换器类

创建 MyDateToStringConverter 类 实现 CustomConverter 接口,并重写


import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.function.Consumer;

/**
 * 日期时间类型转换成字符串
 */
public class MyDateToStringConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
    private static final Logger log = LoggerFactory.getLogger(MyDateToStringConverter.class);
    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
    private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
    private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
    private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
    private ZoneId timestampZoneId = ZoneId.systemDefault();

    public static final String CONVERTERS = "converters";
    public static final String DATE = "date";
    public static final String DATE_TYPE = "date.type";
    public static final String DATE_FORMAT_DATE = "date.format.date";
    public static final String DATE_FORMAT_DATETIME = "date.format.datetime";
    public static final String DATE_FORMAT_TIMESTAMP = "date.format.timestamp";
    public static final String DATE_FORMAT_TIMESTAMP_ZONE = "date.format.timestamp.zone";
    public static final String YEAR_MONTH_DAY_FORMAT = "yyyy-MM-dd";
    public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
    public static final String DATETIME_MICRO_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";
    public static final String TIME_ZONE_SHANGHAI = "Asia/Shanghai";
    public static final String TIME_ZONE_UTC_8 = "UTC+8";
    public static final String FORMAT_DATE = "format.date";
    public static final String FORMAT_TIME = "format.time";
    public static final String FORMAT_DATETIME = "format.datetime";
    public static final String FORMAT_TIMESTAMP = "format.timestamp";
    public static final String FORMAT_TIMESTAMP_ZONE = "format.timestamp.zone";
    public static final String UPPERCASE_DATE = "DATE";
    public static final String TIME = "TIME";
    public static final String DATETIME = "DATETIME";
    public static final String TIMESTAMP = "TIMESTAMP";
    public static final String SMALLDATETIME = "SMALLDATETIME";
    public static final String DATETIME2 = "DATETIME2";
    

    public static final Properties DEFAULT_PROPS = new Properties();

    static {
        DEFAULT_PROPS.setProperty(CONVERTERS, DATE);
        DEFAULT_PROPS.setProperty(DATE_TYPE, "com.flink.test.MyDateToStringConverter"); // 需要设置本类的全类名引用,具体根据自己的类设置
        DEFAULT_PROPS.setProperty(DATE_FORMAT_DATE, YEAR_MONTH_DAY_FORMAT);
        DEFAULT_PROPS.setProperty(DATE_FORMAT_DATETIME, DATE_TIME_FORMAT);
        DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP, DATE_TIME_FORMAT);
        DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP_ZONE, TIME_ZONE_UTC_8);
    }

    @Override
    public void configure(Properties props) {
        readProps(props, FORMAT_DATE, p -> dateFormatter = DateTimeFormatter.ofPattern(p));
        readProps(props, FORMAT_TIME, p -> timeFormatter = DateTimeFormatter.ofPattern(p));
        readProps(props, FORMAT_DATETIME, p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));
        readProps(props, FORMAT_TIMESTAMP, p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
        readProps(props, FORMAT_TIMESTAMP_ZONE, z -> timestampZoneId = ZoneId.of(z));
    }

    private void readProps(Properties properties, String settingKey, Consumer<String> callback) {
        String settingValue = (String) properties.get(settingKey);
        if (settingValue == null || settingValue.length() == 0) {
            return;
        }
        try {
            callback.accept(settingValue.trim());
        } catch (IllegalArgumentException | DateTimeException e) {
            log.error("setting {} is illegal:{}", settingKey, settingValue);
            throw e;
        }
    }

    @Override
    public void converterFor(
            RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
        String sqlType = column.typeName().toUpperCase();
        SchemaBuilder schemaBuilder = null;
        Converter converter = null;
        if (UPPERCASE_DATE.equals(sqlType)) {
            schemaBuilder = SchemaBuilder.string().optional();
            converter = this::convertDate;
        }
        if (TIME.equals(sqlType)) {
            schemaBuilder = SchemaBuilder.string().optional();
            converter = this::convertTime;
        }
        if (DATETIME.equals(sqlType)) {
            schemaBuilder = SchemaBuilder.string().optional();
            converter = this::convertDateTime;
        }
        if (TIMESTAMP.equals(sqlType)) {
            schemaBuilder = SchemaBuilder.string().optional();
            converter = this::convertTimestamp;
        }
        if (schemaBuilder != null) {
            registration.register(schemaBuilder, converter);
        }
    }

    private String convertDate(Object input) {
        if (input instanceof LocalDate) {
            return dateFormatter.format((LocalDate) input);
        } else if (input instanceof Integer) {
            LocalDate date = LocalDate.ofEpochDay((Integer) input);
            return dateFormatter.format(date);
        }
        return null;
    }

    private String convertTime(Object input) {
        if (input instanceof Duration) {
            Duration duration = (Duration) input;
            long seconds = duration.getSeconds();
            int nano = duration.getNano();
            LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
            return timeFormatter.format(time);
        }
        return null;
    }

    private String convertDateTime(Object input) {
        if (input instanceof LocalDateTime) {
            return datetimeFormatter.format((LocalDateTime) input);
        } else if (input instanceof Timestamp) {
            return datetimeFormatter.format(((Timestamp) input).toLocalDateTime());
        }
        return null;
    }

    private String convertTimestamp(Object input) {
        if (input instanceof ZonedDateTime) {
            // MySQL中的TIMESTAMP数据类型会被转换为UTC时间进行存储,
            //而在程序中处理这个zonedDatetime时,它表示的是UTC时间
            ZonedDateTime zonedDateTime = (ZonedDateTime) input;
            LocalDateTime localDateTime =
                    zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
            return timestampFormatter.format(localDateTime);
        } else if (input instanceof Timestamp) {
            return timestampFormatter.format(
                    ((Timestamp) input).toInstant().atZone(timestampZoneId).toLocalDateTime());
        }
        return null;
    }
}

2.代码引用

//获取Flink执行环境
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(2);

// 连接mysql
   MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("junbo-bigdata01")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("gmall")
                .tableList("gmall.activity_info")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverTimeZone("Asia/Shanghai")
                .includeSchemaChanges(true)
                // 日期时间类型转换成字符串 的设置引用
                .debeziumProperties(MyDateToStringConverter.DEFAULT_PROPS) 
                .build();

//读取数据
  DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(), "mysql-source");
  dataStreamSource.print();
  env.execute();

结果

datetime 类型 成功转换成 字符串 类型
在这里插入图片描述


http://www.kler.cn/a/463144.html

相关文章:

  • seata分布式事务详解(AT)
  • RAG实战:本地部署ragflow+ollama(linux)
  • VSCode设置ctrl或alt+mouse(left)跳转
  • Ansys Discovery 中的网格划分方法:探索模式
  • 蓝耘平台使用InstantMesh‌生成高质量的三维网格模型!3D内容创作!小白入门必看!!!
  • 大模型系列——旋转位置编码和长度外推
  • Kali 自动化换源脚本编写与使用
  • Mac M2 Pro安装MySQL 8.4.3
  • Django中创建自定义命令发送钉钉通知
  • ARM架构服务器安装部署KVM虚拟化环境
  • LLaMA 2开放基础和微调聊天模型
  • 自定义luacheck校验规则
  • spring boot通过文件配置yaml里面的属性
  • 从数据映射到文件生成:一个R语言实践案例
  • 自己电脑搭建个人知识库,一般电脑也能玩(支持通义千问、GPT等)。
  • VSCode 插件开发实战(十六):详解插件生命周期
  • selenium(三)
  • Midjourney技术浅析(三):文本编码
  • .NET | 详解通过Win32函数实现本地提权
  • 计算机网络—————考研复试
  • WOFOST作物模型(2.1):模型参数介绍
  • Python基于Django的web漏洞挖掘扫描技术的实现与研究(附源码,文档说明)
  • 数据库在大数据领域的探索与实践:动态存储与查询优化
  • 二叉树的基本数据结构类型(c语言)
  • OpenCV 图像处理之形态学转换
  • 数据结构(Java)—— 栈(Stack)