关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题
flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型
- 问题
- 解决
- 1.自定义转换器类
- 2.代码引用
- 结果
问题
flink版本:1.18.1,mysql版本:8.0.40
使用FlinkCDC的MySqlSource 连接mysql,对于datetime 类型字段,Flink CDC 会自动将 datetime 类型的字段转换为时间戳(BIGINT 类型)。如:2020-10-21 18:49:12 变成 1603306152000
解决
1.自定义转换器类
创建 MyDateToStringConverter 类 实现 CustomConverter 接口,并重写
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.function.Consumer;
/**
* 日期时间类型转换成字符串
*/
public class MyDateToStringConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private static final Logger log = LoggerFactory.getLogger(MyDateToStringConverter.class);
private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
private ZoneId timestampZoneId = ZoneId.systemDefault();
public static final String CONVERTERS = "converters";
public static final String DATE = "date";
public static final String DATE_TYPE = "date.type";
public static final String DATE_FORMAT_DATE = "date.format.date";
public static final String DATE_FORMAT_DATETIME = "date.format.datetime";
public static final String DATE_FORMAT_TIMESTAMP = "date.format.timestamp";
public static final String DATE_FORMAT_TIMESTAMP_ZONE = "date.format.timestamp.zone";
public static final String YEAR_MONTH_DAY_FORMAT = "yyyy-MM-dd";
public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static final String DATETIME_MICRO_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";
public static final String TIME_ZONE_SHANGHAI = "Asia/Shanghai";
public static final String TIME_ZONE_UTC_8 = "UTC+8";
public static final String FORMAT_DATE = "format.date";
public static final String FORMAT_TIME = "format.time";
public static final String FORMAT_DATETIME = "format.datetime";
public static final String FORMAT_TIMESTAMP = "format.timestamp";
public static final String FORMAT_TIMESTAMP_ZONE = "format.timestamp.zone";
public static final String UPPERCASE_DATE = "DATE";
public static final String TIME = "TIME";
public static final String DATETIME = "DATETIME";
public static final String TIMESTAMP = "TIMESTAMP";
public static final String SMALLDATETIME = "SMALLDATETIME";
public static final String DATETIME2 = "DATETIME2";
public static final Properties DEFAULT_PROPS = new Properties();
static {
DEFAULT_PROPS.setProperty(CONVERTERS, DATE);
DEFAULT_PROPS.setProperty(DATE_TYPE, "com.flink.test.MyDateToStringConverter"); // 需要设置本类的全类名引用,具体根据自己的类设置
DEFAULT_PROPS.setProperty(DATE_FORMAT_DATE, YEAR_MONTH_DAY_FORMAT);
DEFAULT_PROPS.setProperty(DATE_FORMAT_DATETIME, DATE_TIME_FORMAT);
DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP, DATE_TIME_FORMAT);
DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP_ZONE, TIME_ZONE_UTC_8);
}
@Override
public void configure(Properties props) {
readProps(props, FORMAT_DATE, p -> dateFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, FORMAT_TIME, p -> timeFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, FORMAT_DATETIME, p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, FORMAT_TIMESTAMP, p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, FORMAT_TIMESTAMP_ZONE, z -> timestampZoneId = ZoneId.of(z));
}
private void readProps(Properties properties, String settingKey, Consumer<String> callback) {
String settingValue = (String) properties.get(settingKey);
if (settingValue == null || settingValue.length() == 0) {
return;
}
try {
callback.accept(settingValue.trim());
} catch (IllegalArgumentException | DateTimeException e) {
log.error("setting {} is illegal:{}", settingKey, settingValue);
throw e;
}
}
@Override
public void converterFor(
RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
String sqlType = column.typeName().toUpperCase();
SchemaBuilder schemaBuilder = null;
Converter converter = null;
if (UPPERCASE_DATE.equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional();
converter = this::convertDate;
}
if (TIME.equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional();
converter = this::convertTime;
}
if (DATETIME.equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional();
converter = this::convertDateTime;
}
if (TIMESTAMP.equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional();
converter = this::convertTimestamp;
}
if (schemaBuilder != null) {
registration.register(schemaBuilder, converter);
}
}
private String convertDate(Object input) {
if (input instanceof LocalDate) {
return dateFormatter.format((LocalDate) input);
} else if (input instanceof Integer) {
LocalDate date = LocalDate.ofEpochDay((Integer) input);
return dateFormatter.format(date);
}
return null;
}
private String convertTime(Object input) {
if (input instanceof Duration) {
Duration duration = (Duration) input;
long seconds = duration.getSeconds();
int nano = duration.getNano();
LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
return timeFormatter.format(time);
}
return null;
}
private String convertDateTime(Object input) {
if (input instanceof LocalDateTime) {
return datetimeFormatter.format((LocalDateTime) input);
} else if (input instanceof Timestamp) {
return datetimeFormatter.format(((Timestamp) input).toLocalDateTime());
}
return null;
}
private String convertTimestamp(Object input) {
if (input instanceof ZonedDateTime) {
// MySQL中的TIMESTAMP数据类型会被转换为UTC时间进行存储,
//而在程序中处理这个zonedDatetime时,它表示的是UTC时间
ZonedDateTime zonedDateTime = (ZonedDateTime) input;
LocalDateTime localDateTime =
zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
return timestampFormatter.format(localDateTime);
} else if (input instanceof Timestamp) {
return timestampFormatter.format(
((Timestamp) input).toInstant().atZone(timestampZoneId).toLocalDateTime());
}
return null;
}
}
2.代码引用
//获取Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 连接mysql
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("junbo-bigdata01")
.port(3306)
.username("root")
.password("123456")
.databaseList("gmall")
.tableList("gmall.activity_info")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("Asia/Shanghai")
.includeSchemaChanges(true)
// 日期时间类型转换成字符串 的设置引用
.debeziumProperties(MyDateToStringConverter.DEFAULT_PROPS)
.build();
//读取数据
DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(), "mysql-source");
dataStreamSource.print();
env.execute();
结果
datetime 类型 成功转换成 字符串 类型