当前位置: 首页 > article >正文

Flink CDC 自定义函数处理 SQLServer XML类型数据 映射 doris json字段方案

Flink CDC 自定义函数处理 SQLServer XML类型数据方案

1. 背景

因业务使用SQLServer数据库,CDC同步到doris 数仓。对于SQLServer xml类型,doris没有相应的字段对应,

可以使用json来存储xml数据。需要进行一步转换。从 flink 自定义函数入手。

2. 解决方案

  • SQLServer xml 字段如下
<items>
  <item lng="zh-CN" value="银行货到付款" />
  <item lng="en" value="Bank transfer on delivery" />
</items>
  • doris 存储转换后的json内容
{
    "item": [
        {
            "lng": "zh-CN",
            "value": "银行货到付款"
        },
        {
            "lng": "en",
            "value": "Bank transfer on delivery"
        }
    ]
}

在这里插入图片描述

  • flink 自定义函数代码

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 将XML转换为JSON
 */
public class XmlToJson extends ScalarFunction {
    private Logger log = LoggerFactory.getLogger(XmlToJson.class);
    /**
     * 创建XmlMapper对象用于解析XML
     */
    private final XmlMapper xmlMapper = new XmlMapper();

    public String eval(String xml) {
        // 将XML字符串解析为JsonNode对象
        JsonNode jsonNode = null;
        try {
            jsonNode = xmlMapper.readTree(xml);
        } catch (JsonProcessingException e) {
            log.error("XML解析失败", e);
        }
        // 将JsonNode对象转换为JSON字符串
        return jsonNode.toString();
    }
}
  • doris 表
-- GName 为json格式
CREATE TABLE `table1` (
  `ID` int(11) NOT NULL COMMENT '字典表统一ID',
  `Name` varchar(600) NULL COMMENT '统一进行字典命名',
  `GName` json NULL COMMENT '采用xml存储多语言',
) ENGINE=OLAP
UNIQUE KEY(`ID`)
COMMENT '测试表'
DISTRIBUTED BY HASH(`ID`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"is_being_synced" = "false",
"storage_format" = "V2",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);
  • 注册自定义函数 sql调用转换
create temporary function xml_to_json as 'com.zfb.flink.udf.XmlToJson';
              
 INSERT INTO flink_doris (`ID`,`Name`, `GName`)
SELECT 
`ID`,
    `Name`, 
    xml_to_json(`GName`), 
    `TypeID`
FROM 
    table1;
   
  • doris json使用
select
	json_extract_string(GName, '$.item[0].value') as cn_name,
	*
from
	table1;  

http://www.kler.cn/a/461036.html

相关文章:

  • Yocto项目 - 详解PACKAGECONFIG机制
  • 如何使用Python自动化发送消息:用pynput库批量输入并发送文本
  • ELK入门教程(超详细)
  • tcpdump的常见方法
  • 单区域OSPF配置实验
  • LabVIEW 使用 Resample Waveforms VI 实现降采样
  • 通过交叉实现数据触底分页效果new IntersectionObserver()(html、react、vue2、vue3)中使用
  • 【二】arcgis JavaScript api 实现加载不同坐标系的底图和三维服务
  • 工作中常用Vim的命令
  • MiFlash 线刷工具下载合集
  • 查看 GitHub 仓库的创建时间
  • Excel for Finance 06 `STOCKHISTORY` 函数
  • Vue.js前端框架教程15:Vue父子组件之间的通信ref、emits
  • HarmonyOS NEXT应用开发之工具安装
  • 初次使用Oracle存储过程,定时任务--记录
  • 产品经理2025年展望
  • 创新引领未来,科技照亮梦想
  • Spring Boot缓存
  • 【MySQL】第一弹----库的操作及数据类型
  • 网络安全问题解答
  • 尚硅谷Vue3入门到实战 —— 02 编写 App 组件
  • axios拦截器底层实现原理
  • 基于SpringBoot+Vue的旅游推荐系统
  • [pdf、epub]260道《软件方法》强化自测题业务建模需求分析共216页(202412更新)
  • Doris安装部署
  • 实现单例模式的五种方式