当前位置: 首页 > article >正文

flink cdc2.2.1同步postgresql表

目录

  • 简要说明
  • 前置条件
  • maven依赖
  • 样例代码

简要说明

在flink1.14.4 和 flink cdc2.2.1下,采用flink sql方式,postgresql同步表数据,本文采用的是上传jar包,利用flink REST api的方式进行sql执行。

前置条件

1.开启logical
确保你的 postgresql.conf 文件中的相关设置允许逻辑复制和插件的使用。特别是下面几个配置项:
wal_level 应该设置为 logical。
max_replication_slots 需要大于0。
配置文件修改完毕后,重启 PostgreSQL 服务
SHOW wal_level; 命令查看日志等级是否修改
2.创建逻辑复制槽
SELECT * FROM pg_create_logical_replication_slot(‘flink_slot’, ‘pgoutput’);
flink_slot 为槽名
pgoutput 是从PostgreSQL 10开始提供的一个内置输出插件,用于逻辑解码
验证逻辑复制槽:SELECT * FROM pg_replication_slots;
查询逻辑复制状态:SELECT * FROM pg_stat_replication;
3.更改复制标识包含更新和删除之前值(目的是为了确保表 xxxx(tableName) 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 xxxx 表可能无法实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE xxxx REPLICA IDENTITY FULL;
4.修改类加载机制
在flink的flink-conf.yaml文件,classloader.resolve-order: child-first,将 child-first 改为 parent-first

maven依赖

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.14.4</flink.version>
        <flink-cdc.version>2.2.1</flink-cdc.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
<dependencies>
        <!-- flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>1.14.4</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <!-- flink cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-oracle-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>

        <!-- database driver -->
        <!-- postgresql -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.5</version>
        </dependency>

        <!-- json -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.9.3</version>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>

        <!-- log -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <!-- junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

样例代码

sql:
CREATE TABLE `new_table1_37877` (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'debezium.database.tablename.case.insensitive'='false',
'debezium.log.mining.continuous.mine'='true',
'password'='*****',
'hostname'='***.**.**.***',
'debezium.log.mining.strategy'='online_catalog',
'connector'='postgres-cdc',
'port'='5432',
'schema-name'='public',
'database-name'='test',
'table-name'='new_table1',
'username'='******',
'slot.name'='flink_slot',
'decoding.plugin.name'='pgoutput'
);
CREATE TABLE `new_table1_bak_37877` (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'password'='*****',
'connector'='jdbc',
'table-name'='public.new_table1_bak',
'url'='jdbc:postgresql://地址:5432/test',
'username'='用户'
);
insert into new_table1_bak_37877 select * from new_table1_37877;
参数类:
@Data
public class InputOutputParams {

    /**
     * 作业名称
     */
    private String jobName;
    
    /**
     * 代码文本,分号分隔的flink sql语句
     */
    private String codeText;
    
}
main方法:
public class FlinkMain {

    /**
     * flink job 运行入口
     *
     * @param args 运行参数
     */
    public static void main(String[] args) throws IOException {
        if (args == null || args.length == 0) {
            throw new RuntimeException("运行参数为空");
        }

        // 取第一个参数(必须是json字符串)为运行参数
        String json = args[0];
        ObjectMapper objectMapper =
                new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        InputOutputParams params = objectMapper.readValue(json, InputOutputParams.class);

        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 开启快照点,每 3 * 60秒保存一次快照
        env.enableCheckpointing(3 * 60 * 1000L);

        //检查点可容忍失败阈值
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
        //检查点超时时间
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
        // 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 开启在 job 中止后仍然保留的 externalized checkpoints
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 重启策略,最多尝试重启3次,每次重启的时间间隔为20秒
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(20L, TimeUnit.SECONDS)));
        env.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        // 获取表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        tEnv.getConfig().getConfiguration().setString("pipeline.name", params.getJobName());

         // 执行操作sql
        String codeText = params.getCodeText();
        if (codeText == null || codeText.trim().isEmpty()) {
            throw new RuntimeException("flink sql is empty");
        }
        String[] flinkSqlArr = codeText.split(";");
        for (String flinkSql : flinkSqlArr) {
            if (flinkSql != null && !flinkSql.trim().isEmpty()) {
                tEnv.executeSql(flinkSql);
            }
        }

    }
}

将项目打包成不带依赖的jar

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.10</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <!-- 复制依赖jar包 -->
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <!-- 依赖jar包输出目录 -->
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <!-- main方法所在主类 -->
                            <mainClass>com.test.FlinkMain</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>

然后将lib下的依赖全部拷贝到flink的lib下,将刚才打包好的jar界面上传
在这里插入图片描述
然后通过postman调用flink的REST api接口提交sql,接口文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/
在这里插入图片描述


http://www.kler.cn/a/542402.html

相关文章:

  • 运用 LangChain 编排任务处理流水线,实现多轮对话场景
  • C++智能指针的使用
  • 在Uniapp中使用阿里云OSS插件实现文件上传
  • Android笔记【snippet】
  • SQL Server安装流程
  • 【Docker】
  • k8s中Network Policy的设计原理和实现方式?
  • 拾取丢弃物品(结构体/数组/子UI/事件分发器)
  • Python 面向对象(类,对象,方法,属性,魔术方法)
  • 提升LCP(Largest Contentful Paint)
  • LogicFlow自定义节点:矩形、HTML(vue3)
  • 【愚公系列】《循序渐进Vue.js 3.x前端开发实践》068-商业项目:电商后台管理系统实战(主页模块开发)
  • kafka的架构和工作原理
  • NO.14十六届蓝桥杯备战|switch语句|break|default|2道练习(C++)
  • Java的直接内存(Direct Memory)是什么意思?
  • 计算机毕业设计Spark+大模型知网文献论文推荐系统 知识图谱 知网爬虫 知网数据分析 知网大数据 知网可视化 预测系统 大数据毕业设计 机器学习
  • 工具-screen-管理终端会话(服务器长时间运行任务)
  • Redis存储⑥Redis五大数据类型之 Zset
  • MFC线程安全案例
  • 对JVM的错误理解与纠正
  • 解决虚幻Unreal Engine手动配置安卓打包环境后无法识别SDK问题
  • Express 中间件
  • VUE3项目结构说明
  • android studio开发科大讯飞最新版
  • 深入理解x86汇编:GNU格式的全面指南
  • 深入理解Java对接DeepSeek