【Flink实战】flink消费http数据并将数组展开多行
文章目录
- 一. 需求描述
- 二. 方案思路
- 1. 解决思路
- 2. flink json 解析
- 2.1. 通过json path解析非array数据
- 2.2. 通过json path解析array数据
- 3. CROSS JOIN逻辑
- 三. 方案实现
- 1. http json数据样例
- 2. flink sql 说明
一. 需求描述
flink消费http接口的数据,将json中的数组展开多行
如下样例数据以及要求处理的数据效果
{
"name": "John Doe",
"age": 30,
"address": {
"street": {
"street": "123 Main St",
"city": "New York",
"state": "NY"
},
"city": "New York",
"state": "NY"
},
"phoneNumbers": [
{
"type": "home",
"number": "212-555-1234"
},
{
"type": "fax",
"number": "646-555-4567"
}
],
"children": [],
"spouse": null
}
name | age | street | city | state | phone_type | phone_number |
---|---|---|---|---|---|---|
John Doe | 30 | 123 Main St | New York | NY | home | 212-555-1234 |
John Doe | 30 | 123 Main St | New York | NY | fax | 646-555-4567 |
二. 方案思路
1. 解决思路
- flink 消费http接口的数据(json),发送到下游
- 下游算子解析json数据,当遇到数组时,算子解析返回array
- 通过使用CROSS JOIN 将数组数据拍平,如上表格展现
2. flink json 解析
2.1. 通过json path解析非array数据
如下通过flink内置函数:JSON_VALUE 进行数据解析,支持多种类型的输出,默认输出为string。
这里使用 cast转换,如下举例
cast(JSON_VALUE(json_string,'$.id') as int) as id ,
JSON_VALUE(json_string,'$.name') as name,
cast(JSON_VALUE(json_string,'$.details.age.real') as int) as `real` ,
JSON_VALUE(json_string,'$.details.address') as address,
2.2. 通过json path解析array数据
官网:目前JSON_QUERY虽然能够包装为array但实际上总是会返回为string,不符合要求。
如下:
<dependencies>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
通过udf解决
package com.dtstack.chunjun.local.test;
import com.jayway.jsonpath.JsonPath;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.ArrayList;
import java.util.List;
public class JsonArrayFieldExtractor extends ScalarFunction {
public List<String> eval(String jsonString, String jsonPath) {
if (jsonString == null || jsonString.isEmpty()) {
return new ArrayList<String>();
}
try {
List<?> result = JsonPath.read(jsonString, jsonPath);
List<String> stringList = new ArrayList<>();
for (Object obj : result) {
stringList.add(obj.toString());
}
return stringList;
} catch (Exception e) {
return new ArrayList<String>();
}
}
}
3. CROSS JOIN逻辑
Array Expansion
注意:CROSS JOIN 返回两个连接表的笛卡尔积,当有多个数组时会产生笛卡尔积。比如:两个数组,分别有100个元素,那么如果使用两次CROSS JOIN 则会产生1万行数据。
三. 方案实现
1. http json数据样例
{
"id": 1,
"name": "Alice",
"details": {
"age": {"real":11},
"address": "123Mainst",
"contacts": [
{
"type": "email",
"value": "alice@example.com"
},
{
"type": "phone",
"value": "123-456-7890"
}
],
"grade": [
{
"grade": [{"zz":11},{"zz":11}],
"bb": {"rr":{"yy":"alice@example.com"}}
},
{
"grade": [{"zz":22}],
"bb": {"rr":{"yy":"alice@example.com"}}
}
]
}
}
2. flink sql 说明
CREATE TEMPORARY SYSTEM FUNCTION get_json_array AS 'com.dtstack.chunjun.local.test.JsonArrayFieldExtractor';
CREATE TABLE source
(
json_string varchar
) WITH (
'connector' = 'http-x'
,'url' = 'http://localhost:8088/api/arraypage'
,'intervalTime'= '3000'
,'method'='get' --请求方式:get 、post
,'decode'='text' -- 数据格式:只支持json模式
-- 以下4个参数要同时存在:
,'page-param-name'='pagenum' -- 多次请求参数1:分页参数名:例如:pageNum
,'start-index'='1' -- 多次请求参数2:开始的位置
,'end-index'='4' -- 多次请求参数3:结束的位置
,'step'='1' -- 多次请求参数4:步长:默认值为1
);
CREATE TABLE sink
(
id int,
name varchar,
`real` int,
address varchar,
zz int,
yy varchar
) WITH (
'connector' = 'print'
);
insert into sink SELECT
cast(JSON_VALUE(json_string,'$.id') as int) as id ,
JSON_VALUE(json_string,'$.name') as name,
cast(JSON_VALUE(json_string,'$.details.age.real') as int) as `real` ,
JSON_VALUE(json_string,'$.details.address') as address,
cast(`$.grade[*].grade[*].zz` as int ) as zz,
`$.details.grade[*].bb.rr.yy` as yy
FROM source
CROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].grade[*].zz' )) AS T(`$.grade[*].grade[*].zz`)
CROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].bb.rr.yy' )) AS T1(`$.details.grade[*].bb.rr.yy`);
--{
-- "id": 1,
-- "name": "Alice",
-- "details": {
-- "age": {"real":11},
-- "address": "123Mainst",
-- "contacts": [
-- {
-- "type": "email",
-- "value": "alice@example.com"
-- },
-- {
-- "type": "phone",
-- "value": "123-456-7890"
-- }
-- ],
-- "grade": [
-- {
-- "grade": [{"zz":11},{"zz":11}],
-- "bb": {"rr":{"yy":"alice@example.com"}}
-- },
-- {
-- "grade": [{"zz":22}],
-- "bb": {"rr":{"yy":"alice@example.com"}}
-- }
-- ]
-- }
--}
消费结果
具体逻辑描述
-
http连接器消费http接口数据 具体使用chunjun的http连接器,相关代码见:我提供的相关pr:
[feature-DTStack#1775][connector][http] http supports offline mode -
使用JSON_VALUE、get_json_array解析为string和
array<string>
,之后使用cast进行类型转换 -
CROSS JOIN 生成笛卡尔积