当前位置: 首页 > article >正文

flink-connector-mysql-cdc:02 mysql-cdc高级扩展

  • flink-connector-mysql-cdc:
  • 01 mysql-cdc基础配置代码演示
  • 02 mysql-cdc高级扩展
  • 03 mysql-cdc常见问题汇总
  • 04 mysql-cdc-kafka生产级代码分享
  • 05 flink-kafka-doris生产级代码分享
  • 06 flink-kafka-hudi生产级代码分享

  • flink-cdc版本:3.2.0
  • flink版本:flink-1.18.0
  • mysql版本:8.0.26
  • java版本:1.8
  • maven版本:3.8.4

  • mysql-cdc同步从库数据

  • 从库需要配置 log-slave-updates = 1 使从实例也能将从主实例同步的数据写入从库的 binlog 文件中,如果主库开启了gtid mode,从库也需要开启。
log-slave-updates = 1
gtid_mode = on 
enforce_gtid_consistency = on 

mysql-cdc同步分库分表的表

mysql cdc 的表名和库名均支持正则配置,比如 ’.tableList("cdc_demo.flink_cdc_.*")’ 可以匹配表名 cdc_demo.flink_cdc_01, cdc_demo.flink_cdc_02,cdc_demo.flink_cdc_a表.

注意正则匹配任意字符是’.’ 而不是 ‘*’, 其中点号表示任意字符,星号表示0个或多个,databaseList也如此。

MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("flinkcdc")
                .password("123456")
                // 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02")
                .databaseList("cdc_.*")
                // 设置需要捕获日志的表名,注意需要配置库名,大小敏感
                .tableList("cdc_demo.flink_cdc_.*")
                .serverTimeZone("Asia/Shanghai")
                // 将 SourceRecord 转换为 JSON 字符串。
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

 自定义类型转器

3.3.1 实现类型转换类

package com.toroidal.utils;

import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Date;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

public class DebeziumConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
    private static final Logger log = LoggerFactory.getLogger(DebeziumConverter.class);
    private static final String DATE_FORMAT = "yyyy-MM-dd";
    private static final String TIME_FORMAT = "HH:mm:ss";
    private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
    private DateTimeFormatter timeFormatter;
    private DateTimeFormatter datetimeFormatter;
    private SchemaBuilder schemaBuilder;
    private String databaseType;
    private String schemaNamePrefix;
    // 获取默认时区
    private final ZoneId zoneId = ZoneOffset.systemDefault();


    @Override
    public void configure(Properties properties) {
        // 必填参数:database.type。获取数据库的类型,暂时支持mysql、sqlserver
        this.databaseType = properties.getProperty("database.type");
        // 如果未设置,或者设置的不是mysql、sqlserver,则抛出异常。
        if (this.databaseType == null || (!this.databaseType.equals("mysql") && !this.databaseType.equals("oracle") && !this.databaseType.equals("sqlserver"))) {
            throw new IllegalArgumentException("database.type 必须设置为: mysql、sqlserver或oracle");
        }
        // 选填参数:format.date、format.time、format.datetime。获取时间格式化的格式
        String dateFormat = properties.getProperty("format.date", DATE_FORMAT);
        String timeFormat = properties.getProperty("format.time", TIME_FORMAT);
        String datetimeFormat = properties.getProperty("format.datetime", DATETIME_FORMAT);
        // 获取自身类的包名+数据库类型为默认schema.name
        String className = this.getClass().getName();
        // 查看是否设置schema.name.prefix
        this.schemaNamePrefix = properties.getProperty("schema.name.prefix", className + "." + this.databaseType);
        // 初始化时间格式化器
        dateFormatter = DateTimeFormatter.ofPattern(dateFormat);
        timeFormatter = DateTimeFormatter.ofPattern(timeFormat);
        datetimeFormatter = DateTimeFormatter.ofPattern(datetimeFormat);
    }

    // mysql的转换器
    public void registerMysqlConverter(String columnType, ConverterRegistration<SchemaBuilder> converterRegistration) {
        String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase();
        schemaBuilder = SchemaBuilder.string().name(schemaName);
        switch (columnType) {
            case "DATE":
                converterRegistration.register(schemaBuilder, value -> {
                    if (value == null) {
                        return null;
                    } else if (value instanceof LocalDate) {
                        return dateFormatter.format((LocalDate) value);
                    } else if (value instanceof Integer) {
                        LocalDate date = LocalDate.ofEpochDay((Integer) value);
                        return dateFormatter.format(date);
                    } else {
                        return String.valueOf(value);
                    }
                });
                break;
            case "TIME":
                converterRegistration.register(schemaBuilder, value -> {
                    if (value == null) {
                        return null;
                    } else if (value instanceof java.time.Duration) {
                        return timeFormatter.format(
                                java.time.LocalTime.
                                        ofNanoOfDay(((java.time.Duration) value)
                                                .toNanos()));
                    } else if (value instanceof Integer) {
                        LocalDate date = LocalDate.ofEpochDay((Integer) value);
                        return dateFormatter.format(date);
                    } else {
                        return String.valueOf(value);
                    }
                });
                break;
            case "DATETIME":
            case "TIMESTAMP":
                converterRegistration.register(schemaBuilder, value -> {
                    if (value == null) {
                        return null;
                    } else if (value instanceof java.time.LocalDateTime) {
                        return datetimeFormatter.format((java.time.LocalDateTime) value);
                    } else if (value instanceof java.time.ZonedDateTime) {
                        // 获取系统默认时区
//                        ZoneOffset zoneOffset = java.time.ZoneId.systemDefault().getRules().getOffset(java.time.Instant.now());
//                        return datetimeFormatter.format(((java.time.ZonedDateTime) value).withZoneSameInstant(zoneOffset).toLocalDateTime());
                        return datetimeFormatter.format(((java.time.ZonedDateTime) value).withZoneSameInstant(zoneId).toLocalDateTime());

                    } else if (value instanceof java.sql.Timestamp) {
                        return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());
                    } else if (value instanceof String) {
                        // 初始化出现1970-01-01T00:00:00Zd的值,需要转换
                        Instant instant = Instant.parse((String) value);
                        java.time.LocalDateTime dateTime = java.time.LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
                        return datetimeFormatter.format(dateTime);
                    } else {
                        return String.valueOf(value);
                    }
                });
                break;
            default:
                schemaBuilder = null;
                break;
        }
    }

    // oracle的转换器
    public void registerSqlserverConverter(String columnType, ConverterRegistration<SchemaBuilder> converterRegistration) {
        String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase();
        schemaBuilder = SchemaBuilder.string().name(schemaName);
        switch (columnType) {
            case "DATE":
                converterRegistration.register(schemaBuilder, value -> {
                    System.out.println("120 value: " + value + " columnType: " + columnType);
                    if (value == null) {
                        return null;
                    } else if (value instanceof Date) {
                        return dateFormatter.format(((Date) value).toLocalDate());
                    } else if (value instanceof Integer) {
                        LocalDate date = LocalDate.ofEpochDay((Integer) value);
                        return dateFormatter.format(date);
                    } else {
                        return String.valueOf(value).replace("TO_DATE('", "").replace("', 'YYYY-MM-DD HH24:MI:SS')", "");
                    }
                });
                break;
            case "TIMESTAMP":
            case "TIMESTAMP(3)":
            case "TIMESTAMP(6)":
            case "TIMESTAMP(9)":
                converterRegistration.register(schemaBuilder, value -> {
                    System.out.println("137 value: " + value + " columnType: " + columnType);
                    if (value == null) {
                        return null;
                    } else if (value instanceof java.sql.Timestamp) {
                        return timeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime().toLocalTime());
                    } else {
                        return String.valueOf(value).replace("TO_TIMESTAMP('", "").replace("')", "");
                    }
                });
                break;
            default:
                schemaBuilder = null;
                break;
        }
    }

    @Override
    public void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
        // 获取字段类型
        String columnType = relationalColumn.typeName().toUpperCase();
        log.info("数据库:{},字段名称:{},字段类型:{},jdbc type :{}", this.databaseType, relationalColumn.name(), columnType, relationalColumn.jdbcType());
        // 根据数据库类型调用不同的转换器
        if (this.databaseType.equals("mysql")) {
            this.registerMysqlConverter(columnType, converterRegistration);
        } else if (this.databaseType.equals("oracle")) {
            this.registerSqlserverConverter(columnType, converterRegistration);
        } else {
            log.warn("===failed 不支持的数据库类型: {}", this.databaseType);
            schemaBuilder = null;
        }
    }

    private String getClassName(Object value) {
        if (value == null) {
            return null;
        }
        return value.getClass().getName();
    }
}

3.3.2 配置自定义类型转换器

Properties prop = new Properties();
        prop.setProperty("converters", "dateConverters");
        prop.setProperty("dateConverters.type", "com.toroidal.utils.DebeziumConverter");
        prop.setProperty("dateConverters.database.type", "mysql");

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                // 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02")
                .databaseList("cdc")
                // 设置需要捕获日志的表名,注意需要配置库名,大小敏感
                .tableList("cdc.flink_cdc_test")
                .username("flinkcdc")
                .serverTimeZone("Asia/Shanghai")
                .serverId("flink-cdc-01")
                .password("123456")
                .debeziumProperties(prop)
                // 将 SourceRecord 转换为 JSON 字符串。
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();


http://www.kler.cn/a/429920.html

相关文章:

  • 无监督目标检测最新CVPR解读
  • 【网络安全资料文档】网络安全空间态势感知系统建设方案,网络安全数据采集建设方案(word原件)
  • scala的正则表达式的特殊规则
  • 深入探索Redis:数据结构解析与Spring Boot实战应用
  • 介绍8款开源网络安全产品
  • python数据分析之爬虫基础:requests详解
  • 消息队列(MQ):系统解耦与异步通信的利器
  • C#中LinkedList与List的对比及应用实例
  • Swagger四种定义UI界面
  • LoViT: 用于手术阶段识别的长视频Transformer|文献速递-生成式模型与transformer在医学影像中的应用
  • canny算子解析
  • python+docx+docx2python:多文件合并,提取docx文件文本、表格及图片数据
  • AI开发: 知识图谱的初识,学会制作知识图谱- Python 机器学习
  • 基于python的自动化的sql延时注入脚本
  • 高级排序算法(一):快速排序详解
  • vue-resizable插件运用
  • vite+vue3 配置ip和端口以及自动打开浏览器
  • 【Linux】开机进入grub/怎么办?
  • 【Python】无法将“pip”项识别为 cmdlet、函数、脚本文件或可运行程序的名称解决方案
  • java中的数组(2)