当前位置: 首页 > article >正文

Flink CDC 自定义反序列化

CDC读取Mysql数据 封装成JSONObject 

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {


    /**
     * 封装的数据格式  JSON
     * {
     *     "database":"",
     *     "tableName":"",
     *     "type":"c u d",
     *     "before":"{"id":"","name":"","":""...}",
     *     "after":"{"id":"","name":"","":""...}",
     *     // "ts":156489196115616
     * }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        //创建JSON对象用于存储最终数据
        JSONObject json = new JSONObject();

        //获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String databaes = fields[1];
        String tableName = fields[2];

        Struct value  = (Struct)sourceRecord.value();
        //获取before 数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        Schema beforeSchema = before.schema();
        List<Field> beforeFields = beforeSchema.fields();
        if(before != null){
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }

        //获取after 数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        Schema afterSchema = after.schema();
        List<Field> afterFields = afterSchema.fields();
        if(after != null){
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }

        //获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        System.out.println(operation);


        //将字段写入JSON对象
        json.put("database",databaes);
        json.put("tableName",tableName);
        json.put("before",beforeJson);
        json.put("after",afterJson);
        json.put("type",operation);

        //输出数据
        collector.collect(json.toJSONString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}


http://www.kler.cn/a/9455.html

相关文章:

  • JAVA:探索 EasyExcel 的技术指南
  • Tomcat 和 Netty 的区别及应用场景分析
  • SQL集合运算
  • 测试工程师简历「精选篇」
  • 深度学习中的感受野:从基础概念到多层次特征提取
  • 曹操为什么总是亲征
  • java 合作社交易系统Myeclipse开发mysql数据库mvc结构serlvet编程计算机网页项目
  • 多维时序 | MATLAB实现GRU门控循环单元多变量时间序列预测(多指标评价)
  • 【大数据开发运维解决方案】ssh: undefined symbol: EVP_KDF_ctrl, version OPENSSL_1_1_1b问题解决过程
  • 【Java面试八股文宝典之RabbitMQ篇】备战2023 查缺补漏 你越早准备 越早成功!!!——Day17
  • QT桌面的构建
  • es6和commonJs的区别
  • 医院设备管理的数字化转型:二维码巡检系统的实施与应用
  • 【Java数据结构】线性表-顺序表
  • gopls有没有什么很强大但是默认不开启的功能?
  • cursor.execute 执行两个结果并存储给变量
  • 【c语言】二维数组
  • SpringBoot案例
  • 从零开始学架构——高性能缓存架构
  • 【前端面试专题】【5】Vue3
  • sqlmap使用(以sqli-lab为例)
  • 【Leetcode之路 | Java Python】两数之和(暴力枚举哈希表)
  • 基于发票增值税OCR API设计自动识别应用系统,从此解放财务双手
  • 【图像分割】Segment Anything(Meta AI)论文解读
  • 经典文献阅读之--Bidirectional Camera-LiDAR Fusion(Camera-LiDAR双向融合新范式)
  • Java -- System类和冒泡排序