当前位置: 首页 > article >正文

【Flink实战】flink消费http数据并将数组展开多行

文章目录

  • 一. 需求描述
  • 二. 方案思路
    • 1. 解决思路
    • 2. flink json 解析
      • 2.1. 通过json path解析非array数据
      • 2.2. 通过json path解析array数据
    • 3. CROSS JOIN逻辑
  • 三. 方案实现
    • 1. http json数据样例
    • 2. flink sql 说明

一. 需求描述

flink消费http接口的数据,将json中的数组展开多行

如下样例数据以及要求处理的数据效果

{  
  "name": "John Doe",  
  "age": 30,  
  "address": {  
    "street": {  
      "street": "123 Main St",  
      "city": "New York",  
      "state": "NY"  
    },  
    "city": "New York",  
    "state": "NY"  
  },  
  "phoneNumbers": [  
    {  
      "type": "home",  
      "number": "212-555-1234"  
    },  
    {  
      "type": "fax",  
      "number": "646-555-4567"  
    }  
  ],  
  "children": [],  
  "spouse": null  
}

nameagestreetcitystatephone_typephone_number
John Doe30123 Main StNew YorkNYhome212-555-1234
John Doe30123 Main StNew YorkNYfax646-555-4567

二. 方案思路

1. 解决思路

  1. flink 消费http接口的数据(json),发送到下游
  2. 下游算子解析json数据,当遇到数组时,算子解析返回array
  3. 通过使用CROSS JOIN 将数组数据拍平,如上表格展现

2. flink json 解析

2.1. 通过json path解析非array数据

如下通过flink内置函数:JSON_VALUE 进行数据解析,支持多种类型的输出,默认输出为string。

在这里插入图片描述

这里使用 cast转换,如下举例

cast(JSON_VALUE(json_string,'$.id') as int) as id ,  
JSON_VALUE(json_string,'$.name')  as name,  
cast(JSON_VALUE(json_string,'$.details.age.real') as int) as  `real`  ,  
JSON_VALUE(json_string,'$.details.address') as address,

 

2.2. 通过json path解析array数据

官网:目前JSON_QUERY虽然能够包装为array但实际上总是会返回为string,不符合要求。

在这里插入图片描述

如下:

<dependencies>  
    <dependency>  
        <groupId>com.jayway.jsonpath</groupId>  
        <artifactId>json-path</artifactId>  
        <version>2.6.0</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-table-common</artifactId>  
        <version>${flink.version}</version>  
        <scope>provided</scope>  
    </dependency>

通过udf解决

package com.dtstack.chunjun.local.test;  
  
import com.jayway.jsonpath.JsonPath;  
import org.apache.flink.table.functions.ScalarFunction;  
  
import java.util.ArrayList;  
import java.util.List;  
  
public class JsonArrayFieldExtractor extends ScalarFunction {  
  
    public List<String> eval(String jsonString, String jsonPath) {  
        if (jsonString == null || jsonString.isEmpty()) {  
            return new ArrayList<String>();  
        }  
        try {  
            List<?> result = JsonPath.read(jsonString, jsonPath);   
            List<String> stringList = new ArrayList<>();  
            for (Object obj : result) {  
                stringList.add(obj.toString());  
            }  
            return stringList;  
        } catch (Exception e) {  
            return new ArrayList<String>();  
        }  
    }  
  
}

3. CROSS JOIN逻辑

Array Expansion

在这里插入图片描述

注意:CROSS JOIN 返回两个连接表的笛卡尔积,当有多个数组时会产生笛卡尔积。比如:两个数组,分别有100个元素,那么如果使用两次CROSS JOIN 则会产生1万行数据。

 

三. 方案实现

1. http json数据样例

{  
  "id": 1,  
  "name": "Alice",  
  "details": {  
    "age": {"real":11},  
    "address": "123Mainst",  
    "contacts": [  
      {  
        "type": "email",  
        "value": "alice@example.com"  
      },  
      {  
        "type": "phone",  
        "value": "123-456-7890"  
      }  
    ],  
    "grade": [  
      {  
        "grade": [{"zz":11},{"zz":11}],  
        "bb": {"rr":{"yy":"alice@example.com"}}  
      },  
      {  
        "grade": [{"zz":22}],  
        "bb": {"rr":{"yy":"alice@example.com"}}  
      }  
    ]  
  }  
}

 

2. flink sql 说明

CREATE TEMPORARY SYSTEM FUNCTION get_json_array AS 'com.dtstack.chunjun.local.test.JsonArrayFieldExtractor';

CREATE TABLE source
(
       json_string varchar
) WITH (
      'connector' = 'http-x'
      ,'url' = 'http://localhost:8088/api/arraypage'
      ,'intervalTime'= '3000'
      ,'method'='get'                              --请求方式:get 、post
      ,'decode'='text'                             -- 数据格式:只支持json模式
                                                   -- 以下4个参数要同时存在:
      ,'page-param-name'='pagenum'                          -- 多次请求参数1:分页参数名:例如:pageNum
      ,'start-index'='1'                             -- 多次请求参数2:开始的位置
      ,'end-index'='4'                               -- 多次请求参数3:结束的位置
      ,'step'='1'                                  -- 多次请求参数4:步长:默认值为1
      );

CREATE TABLE sink
(
    id               int,
    name             varchar,
    `real`               int,
    address                varchar,
    zz                int,
    yy                varchar
) WITH (
      'connector' = 'print'
      );


insert into sink   SELECT
                      cast(JSON_VALUE(json_string,'$.id') as int) as id ,
                       JSON_VALUE(json_string,'$.name')  as name,
                       cast(JSON_VALUE(json_string,'$.details.age.real') as int) as  `real`  ,
                        JSON_VALUE(json_string,'$.details.address') as address,
                        cast(`$.grade[*].grade[*].zz` as int ) as zz,
                        `$.details.grade[*].bb.rr.yy` as yy
                      FROM source
                    CROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].grade[*].zz' )) AS T(`$.grade[*].grade[*].zz`)
                    CROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].bb.rr.yy'   )) AS T1(`$.details.grade[*].bb.rr.yy`);





--{
--  "id": 1,
--  "name": "Alice",
--  "details": {
--    "age": {"real":11},
--    "address": "123Mainst",
--    "contacts": [
--      {
--        "type": "email",
--        "value": "alice@example.com"
--      },
--      {
--        "type": "phone",
--        "value": "123-456-7890"
--      }
--    ],
--    "grade": [
--      {
--        "grade": [{"zz":11},{"zz":11}],
--        "bb": {"rr":{"yy":"alice@example.com"}}
--      },
--      {
--        "grade": [{"zz":22}],
--        "bb": {"rr":{"yy":"alice@example.com"}}
--      }
--    ]
--  }
--}

消费结果
在这里插入图片描述

具体逻辑描述

  1. http连接器消费http接口数据 具体使用chunjun的http连接器,相关代码见:我提供的相关pr:
    [feature-DTStack#1775][connector][http] http supports offline mode

  2. 使用JSON_VALUE、get_json_array解析为string和array<string>,之后使用cast进行类型转换

  3. CROSS JOIN 生成笛卡尔积


http://www.kler.cn/a/314847.html

相关文章:

  • Kubernetes在容器编排中的应用
  • ISAAC SIM踩坑记录--ubuntu 22.04操作系统安装
  • 使用docker-compose单点搭建社区版seafile+onlyoffice在线word编辑平台
  • jQuery笔记
  • docker基础:搭建centos7(详见B站泷羽sec)
  • 【Java学习】电脑基础操作和编程环境配置
  • linux-虚拟化与容器化-虚拟化
  • 无法删除选定的端口,不支持请求【笔记】
  • Java流程控制语句——跳转语句详解:break 与 continue 有什么区别?
  • Go 并发模式:管道的妙用
  • biopython解析mmcif文件得到组装体、链、序列、原子坐标、变换矩阵等信息
  • 统信服务器操作系统【1050e版】安装手册
  • 十个服务器中毒的常见特征及其检测方法
  • elasticsearch学习与实战应用
  • 音视频生态下Unity3D和虚幻引擎(Unreal Engine)的区别
  • T4—猴痘识别
  • Qwen2-VL的微调及量化
  • React【1】【ref常用法】
  • 小程序地图展示poi帖子点击可跳转
  • 20240921在友善之臂的NanoPC-T6开发板上使用Rockchip原厂的Android12适配宸芯的数传模块CX6602N
  • 【监控】【Nginx】使用 ELK Stack 监控 Nginx
  • Docker Compose 启动 PostgreSQL 数据库
  • 《在华为交换机上配置防止 ARP 攻击》
  • 一个基于 Tauri、Vite 5、Vue 3 和 TypeScript 构建的即时通讯系统,牛啊牛啊!(附源码)
  • 无人机助力智慧农田除草新模式,基于YOLOv10全系列【n/s/m/b/l/x】参数模型开发构建无人机航拍场景下的农田杂草检测识别系统
  • 分布式变电站电力监控系统