当前位置: 首页 > article >正文

FlinkCDC-MYSQL批量写入

一、运行环境

(1)Flink:1.17.2

(2)Scala:2.12.20

(3)Mysql:5.7.43     ##开启binlog

二、代码示例

         思路:通过滚动窗口收集一批数据推给sink消费。binlog日志对于dataStream是没有key的,那么需要给每条数据造一个key。两种方式:(1)通过UUID创建一个全局key。(2)解析数据中的时间字段作为key。

package org.example;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import lombok.val;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.example.sink.MyJdbcSinkFunction;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;

public class FLCDCToMySql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("snapshot.locking.mode", "none");

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("root")
                .password("root321")
                .databaseList("test")
                .tableList("test.t_class")
                .debeziumProperties(properties)
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.earliest())
                .build();

        DataStreamSource<String> dataStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "Mysql Source");
        // 10s一批数据推给sink
        SingleOutputStreamOperator<List<String>> outputStream = dataStream.map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return Tuple2.of(UUID.randomUUID().toString(), s);
                    }
                }).keyBy(x -> x.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<Tuple2<String, String>, List<String>, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<Tuple2<String, String>, List<String>, String, TimeWindow>.Context context, Iterable<Tuple2<String, String>> iterable, Collector<List<String>> collector) throws Exception {
                        List datalist = new ArrayList();
                        for(Tuple2<String, String> element : iterable){
                            datalist.add(element.f1);
                        }
                        collector.collect(datalist);
                    }
                });
//        outputStream.print();
        outputStream.addSink(new MyJdbcSinkFunction());
        env.execute("flink cdc demo ");

    }
}

        批次写入思路:使用executeBatch方式提交,url中需添加rewriteBatchedStatements=true 。

package org.example.sink;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.example.RecordData;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;

public class MyJdbcSinkFunction extends RichSinkFunction<List<String>> {

    private int batchSize = 10;
    private Connection connection = null;
    private PreparedStatement statement = null;
    private String jbcUrl = String.format("jdbc:mysql://localhost:3306/test?&useSSL=false&rewriteBatchedStatements=true");
    private String insertSql = String.format("insert into %s values (?,?,?,?,?,?)","t_demo");

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = DriverManager.getConnection(jbcUrl,"root","xxxxxx");
        connection.setAutoCommit(false);

        statement = connection.prepareStatement(insertSql);
    }

    @Override
    public void invoke(List<String> value, Context context) throws Exception {
        super.invoke(value, context);
        System.out.println("dataInfo: "+ value);
        int flag = 0;          //处理数据条数的标记
        int batchSize = 1000;  //批次commit
        int dataSize = value.size();

        String before = "";
        String after = "";
        String op = "";
        String dbname = "";
        String tablename = "";
        String ts_ms = "";

        for (int i = 0; i < dataSize; i++){
            String data = value.get(i);
            JSONObject object = JSONObject.parseObject(data);

            op = String.valueOf(object.get("op"));
            if("c".equals(op)){
                after = String.valueOf(object.get("after"));
            }else if("d".equals(op)){
                before = String.valueOf(object.get("before"));
            }else if("u".equals(op)){
                before = String.valueOf(object.get("before"));
                after = String.valueOf(object.get("after"));
            }

            JSONObject sourceObj = JSONObject.parseObject(object.get("source").toString());
            dbname = String.valueOf(sourceObj.get("db"));
            tablename = String.valueOf(sourceObj.get("table"));
            ts_ms = String.valueOf(sourceObj.get("ts_ms"));

            statement.setString(1, before);
            statement.setString(2, after);
            statement.setString(3, op);
            statement.setString(4, dbname);
            statement.setString(5, tablename);
            statement.setString(6, ts_ms);
            statement.addBatch();

            flag = flag + 1;
            if(i % batchSize == 0 ){
                statement.executeBatch();
                connection.commit();
                statement.clearBatch();
                flag = 0;  //批次提交后重置
            }
        }
        if(flag > 0){   //不满批次的提交
            statement.executeBatch();
            connection.commit();
            statement.clearBatch();
        }

    }

    @Override
    public void close() throws Exception {
        super.close();
        connection.close();
    }
}

三、插入目标表结果

 


http://www.kler.cn/a/380476.html

相关文章:

  • 重温设计模式-外观模式和适配器模式的异同
  • EMS(energy managment system)从0到1
  • 在跨平台开发环境中构建高效的C++项目:从基础到最佳实践20241225
  • 16_HTML5 语义元素 --[HTML5 API 学习之旅]
  • ROS1入门教程6:复杂行为处理
  • 《智启新材:人工智能重塑分子结构设计蓝图》
  • OceanBase V4.3.3,首个面向实时分析场景的GA版本发布
  • 【漏洞复现】某最新版快递微信小程序系统存在前台任意文件读取漏洞
  • HTML 标签属性——<a>、<img>、<form>、<input>、<table> 标签属性详解
  • 基于Python爬虫与文本挖掘的网络舆情监控系统【附源码】
  • JAVA题目笔记(十二) 拼图游戏
  • 【Linux】深入理解进程控制:从创建到终止和进程等待
  • 如何安装 Vue.js:适合不同场景的方案
  • c++基础12比较/逻辑运算符
  • 11月1日笔记(UAC绕过)
  • Golang | Leetcode Golang题解之第535题TinyURL的加密与解密
  • PHP单商户多门店会员管理系统小程序源码
  • react jsx基本语法,脚手架,父子传参,refs等详解
  • 【系统架构设计师】2023年真题论文: 论软件可靠性评价的设计与实现(包括和素材和论文)
  • 大厂面试真题-说说redis的分片方式
  • 深入研究 RAG 流程中的关键组件
  • Kubernetes架构及核心组件
  • Soul App创始人张璐团队自研多模态大模型,亮相GITEX GLOBAL获好评
  • 【Arch Linux 上安装 Appium 】
  • 进程介绍!
  • AcWing 1073 树的中心 树形dp (详解)