当前位置: 首页 > article >正文

2.阿里云flinkselectdb-jar作业

1.概述

本文继续介绍使用阿里云实时计算flink把数据从自建mysql同步到阿里云selectdb的过程。上一节使用sql作业,不够强大,有如下问题:

  • 不支持自动创建结果表(selectdb表)。同步前需要手动在selectdb创建结果表;
  • 不支持源表(mysql表)的ddl语句。源表增加/修改字段,需要先手动在结果表(selectdb表)执行,然后重启sql作业;
  • 不支持添加新的源表。添加新表源表需要重新从全量同步阶段开始运行(flink cdc作业分为全量同步和增量同步两个阶段);
  • 不支持连接复用。sql作业里面的每个insert语句都需要一个源表(mysql表)的连接,当同步的源表比较多时,会占用大量的数据库连接;

本节使用jar作业,通过写代码的方式,解决sql作业存在的问题;

2.目标

把自建mysql的约100张表准实时同步到云服务selectdb。数据量不大,约5个G左右;

源表flink结果表
自建mysql实时计算flink云服务selectdb

3.步骤(重点)

对问题和过程没兴趣的同学,可以直接看这里。本章节记录了阿里云flink与selectdb集成时,使用jar作业的实现方式;

3.1.创建作业

  • JAR作业开发需要使用JDK 1.8版本;
  • JAR作业需要线下完成开发,然后打成jar包,上传到在Flink全托管控制台上部署并运行;
  • JAR作业不支持在Main函数中读取本地配置,读取配置文件需要可通过以下方式;
    • 部署作业所添加附加依赖文件将会加载到作业所运行Pod的/flink/usrlib目录下。配置文件以作业附加依赖文件上传,然后通过代码读取;
    • 上传到其它可访问地址,通过网络读取(注意Flink版默认不能访问公网,需要额外操作开通);
  • JAR作业依赖的其它jar包,可通过直接打进JAR作业的方式 ,也可以通过部署作业时添加附加依赖文件的方式;
@Slf4j
public class CdcMysqlToDorisStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String database = "test";
        Map<String, String> mysqlConfig = new HashMap<>();
            mysqlConfig.put(MySqlSourceOptions.DATABASE_NAME.key(), "bpms");
            mysqlConfig.put(MySqlSourceOptions.HOSTNAME.key(), "127.0.0.1");
            mysqlConfig.put(MySqlSourceOptions.PORT.key(), "3306");
            mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), "test");
            mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), "test");
            mysqlConfig.put("jdbc.properties.use_ssl", "false");
            mysqlConfig.put("sink.properties.format", "json");
            //**支持在作业运行到增量同步阶段后,动态添加新的源表
            mysqlConfig.put(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
        Configuration config = Configuration.fromMap(mysqlConfig);

        Map<String, String> sinkConfig = new HashMap<>();
            sinkConfig.put(DorisConfigOptions.FENODES.key(), "test.selectdbfe.rds.aliyuncs.com:8080");
            sinkConfig.put(DorisConfigOptions.USERNAME.key(), "test");
            sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "test");
            sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), "jdbc:mysql://test.selectdbfe.rds.aliyuncs.com:9030");
            sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), UUID.randomUUID().toString());
            sinkConfig.put("sink.enable-delete", "false");
        Configuration sinkConf = Configuration.fromMap(sinkConfig);

        Map<String, String> tableConfig = new HashMap<>();
            tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
            tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, ".*:1");
        String includingTables = getTables();
        String excludingTables = "";
        boolean ignoreDefaultValue = false;
        boolean useNewSchemaChange = true;
        String schemaChangeMode = SchemaChangeMode.DEBEZIUM_STRUCTURE.getName();
        boolean singleSink = false;
        boolean ignoreIncompatible = false;
        DatabaseSync databaseSync = new MysqlDatabaseSync();
        databaseSync.setEnv(env)
                .setDatabase(database)
                .setConfig(config)
                .setIncludingTables(includingTables)
                .setExcludingTables(excludingTables)
                .setIgnoreDefaultValue(ignoreDefaultValue)
                .setSinkConfig(sinkConf)
                .setTableConfig(new DorisTableConfig(tableConfig))
                .setCreateTableOnly(false)
                .setNewSchemaChange(useNewSchemaChange)
                .setSchemaChangeMode(schemaChangeMode)
                .setSingleSink(singleSink)
                .setIgnoreIncompatible(ignoreIncompatible)
                .create();
        databaseSync.build();
        env.execute(String.format("mysql-doris数据库同步,database=%s", database));
    }

    //**读取配置文件里面,获取需要同步的表
    @SneakyThrows
    private static String getTables() {
        String rst;
        //**Flink JAR作业不支持在Main函数中读取本地配置
        //**在作业运行时,部署作业所添加附加依赖文件将会加载到作业所运行Pod的/flink/usrlib目录下
        try (Stream<String> stream = Files.lines(Paths.get("/flink/usrlib/mysql-to-doris-tables"))) {
            rst = stream.map(String::trim).filter(StringUtils::isNotBlank).collect(joining("|"));
        }

        log.info("读取同步的表成功,tables={}", rst);
        Assert.notBlank(rst, "同步的表不能为空");
        return rst;
    }
}

3.2.部署作业

阿里云flink-jar作业部署

4.遇到的问题

全托管flink怎么获取上传的文件;

我们使用的是阿里云全托管flink,文件都上传到了阿里云管理的oss。查询文档发现地址如下:

oss://flink-fullymanaged-<工作空间ID>/artifacts/namespaces/<项目空间名称>/文件名

如果还是不知道是多少,可以先创建一个jar作业,然后在作业的基础信配置->JAR Uri里面查看;

运行报错java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer;

  • 原因: 编译的jdk版本和运行的jdk版本不一致。jdk8和jdk11的此方法不兼容;
  • 解决: 查看打包编译的jdk版本,需使用jdk8;

http://www.kler.cn/a/459476.html

相关文章:

  • 用Tkinter制作一个用于合并PDF文件的小程序
  • 前端编码技巧与规范
  • 右值引用全面剖析
  • Java 操作 PDF:从零开始创建功能丰富的PDF文档
  • 图文教程:使用PowerDesigner导出数据库表结构为Word/Html文档
  • Linux-mac地址
  • 【React】- 跨域PDF预览、下载(改文件名)、打印
  • Flink如何处理迟到数据?
  • Python毕业设计选题:基于Hadoop 的国产电影数据分析与可视化_django+spider
  • C++ 函数式编程Lambda表达式
  • 磁编码器(Magnetic Encoder)
  • 【每日学点鸿蒙知识】Web嵌套滚动体验、拷贝传递 ArrayBuffer异常问题、ObjectLink 的属性传递、构建读取参数
  • 【高阶数据结构】红黑树封装map、set
  • leetcode hot100 tire前缀树
  • go语言中zero框架项目日志收集与配置
  • 【2024年-7月-6日-开源社区openEuler实践记录】探秘 Qingzhou:开启高效开发与运维新旅程
  • 012-spring的注解开发、bean的属性、IOC实现原理
  • 【服务器】上传文件到服务器并训练深度学习模型下载服务器文件到本地
  • 基于GA遗传优化TCN-LSTM时间卷积神经网络时间序列预测算法matlab仿真
  • EL表达式与JSTL
  • Quo Vadis, Anomaly Detection? LLMs and VLMs in the Spotlight 论文阅读
  • Java基础(三):桌球案例
  • Qt https请求报错SSL handshake failed 解决思路方法
  • AI大模型-提示工程学习笔记0
  • 进程通信(8)读写锁
  • LabVIEW手部运动机能实验系统