[实时计算flink]IntervalJoin语句
本文为您介绍如何使用IntervalJoin语句。
背景信息
IntervalJoin语句可以让两个流进行JOIN时,左流和右流中每条记录只关联另外一条流上满足定义的时间范围内的数据,且进行完JOIN后,仍然保留输入流上的时间列,让您继续进行基于Event Time的操作。
语法格式
SELECT column-names
FROM table1 [AS <alias1>]
[INNER | LEFT | RIGHT |FULL ] JOIN table2
ON table1.column-name1 = table2.key-name1 AND TIMEBOUND_EXPRESSION
说明
-
支持INNER JOIN、LEFT JOIN、RIGHT JOIN和FULL JOIN,如果直接使用JOIN,默认为INNER JOIN。
-
暂不支持SEMI JOIN和ANTI JOIN。
-
TIMEBOUND_EXPRESSION为左右两个流时间属性列上的区间条件表达式,支持以下三种条件表达式:
-
ltime = rtime
-
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
-
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
-
示例1(基于Event Time)
统计下单后4个小时内的物流信息。
-
测试数据
-
订单表(Orders)
id
productName
orderTime
1
iphone
2020-04-01 10:00:00.0
2
mac
2020-04-01 10:02:00.0
3
huawei
2020-04-01 10:03:00.0
4
pad
2020-04-01 10:05:00.0
-
物流表(Shipments)
shipId
orderId
status
shiptime
0
1
shipped
2020-04-01 11:00:00.0
1
2
delivered
2020-04-01 17:00:00.0
2
3
shipped
2020-04-01 12:00:00.0
3
4
shipped
2020-04-01 11:30:00.0
-
-
测试语句
CREATE TEMPORARY TABLE Orders( id BIGINT, productName VARCHAR, orderTime TIMESTAMP, WATERMARK wk FOR orderTime as withOffset(orderTime, 2000) --为rowtime定义Watermark。 ) WITH ( type='datahub', endpoint='<yourEndpoint>', accessId='<yourAccessID>', accessKey='<yourAccessSecret>', projectName='<yourProjectName>', topic='<yourTopic>', project='<yourProjectName>' ); CREATE TEMPORARY TABLE Shipments( shipId BIGINT, orderId BIGINT, status VARCHAR, shiptime TIMESTAMP, WATERMARK wk FOR shiptime as withOffset(shiptime, 2000) --为rowtime定义Watermark。 ) WITH ( type='datahub', endpoint='<yourEndpoint>', accessId='<yourAccessID>', accessKey='<yourAccessSecret>', projectName='<yourProjectName>', topic='<yourTopic>', project='<yourProjectName>' ); --使用RDS作为结果表。 CREATE TEMPORARY TABLE rds_output( id BIGINT, productName VARCHAR, status VARCHAR ) WITH ( type='rds', url='<yourDatabaseURL>', tableName='<yourDatabaseTablename>', userName='<yourDatabaseUserName>', password='<yourDatabasePassword>' ); INSERT INTO rds_output SELECT id, productName, status FROM Orders AS o JOIN Shipments AS s on o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime;
-
测试结果
id(bigint)
productName(varchar)
status(varchar)
1
iphone
shipped
3
huawei
shipped
4
pad
shipped
示例2(基于Processing Time)
-
测试数据
-
datahub_stream1
k1
v1
1
val1
2
val2
3
val3
-
datahub_stream2
k1
v1
1
val1
2
val2
3
val3
-
-
测试语句
CREATE TEMPORARY TABLE datahub_stream1 ( k1 BIGINT, v1 VARCHAR, d AS PROCTIME() ) WITH ( 'connector' = 'datahub', 'subId' = '<yourSubId>', 'endPoint' = '<yourEndPoint>', 'project' = '<yourProjectName>', 'topic' = '<yourTopicName>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessKey>' ); CREATE TEMPORARY TABLE datahub_stream2 ( k2 BIGINT, v2 VARCHAR, e AS PROCTIME() ) WITH ( 'connector' = 'datahub', 'subId' = '<yourSubId>', 'endPoint' = '<yourEndPoint>', 'project' = '<yourProjectName>', 'topic' = '<yourTopicName>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessKey>' ); --使用RDS作为结果表。 CREATE TEMPORARY TABLE rds_output( k1 BIGINT, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector'='rds', 'tableName'='your-table-name', 'userName'='your-user-name', 'password'='your-password', 'url'='your-url' ); INSERT INTO rds_output SELECT k1, v1, v2 FROM datahub_stream1 AS o JOIN datahub_stream2 AS s on o.k1 = s.k2 AND o.d BETWEEN s.e - INTERVAL '4' MINUTE AND s.e;
说明
由于结果取决于两个流里每条数据进入系统的时间,具有不确定性,因此该示例暂不提供预期结果。
Flink SQL支持添加提示(Hints),允许您手动影响执行计划的生成结果,从而优化SQL作业的执行。本文为您介绍如何使用SQL提示。
动态表选项提示
从VVR 4.x开始,您可以通过动态表选项来动态地制定或覆盖表选项,从而让这些表参数仅在Query级别的范围内生效。动态表选项的语法和示例,请参见动态表(dynamic table)选项。
查询提示
维表联接提示
维表联接提示(Lookup Join Hints)详情,请参见维表JOIN Hints 。
算子状态生命周期(State TTL)提示
通过为状态设置状态生命周期,可以显著减少算子状态的大小,从而增强系统稳定性和可靠性。算子状态生命周期设置的方式详情如下:
-
状态生命周期提示:为特殊的有状态算子设置算子粒度的状态生命周期。当前该方式仅适用于双流联接算子和分组聚合算子,具体的设置方式如下:
-
双流联接算子
-
VVR 8.0.7及以上版本:可以通过状态生命周期提示或双流JOIN hints的提示来实现。
-
VVR 8.0.1~VVR 8.0.6版本:仅支持通过双流JOIN hints的提示来实现。
-
-
分组聚合算子
-
VVR 8.0.7及以上版本:通过分组聚合状态生命周期提示实现,更多语法和示例请参见状态生命周期提示。
-
VVR 8.0.6及以前版本:不支持配置算子状态生命周期提示。
-
-
-
作业参数配置:设置作业参数来设置全局作业级别的状态生命周期,详情请参见table.exec.state.ttl。
-
专家模式:为有状态算子配置算子粒度的状态生命周期,详情请参见配置算子并发、Chain策略和TTL。
说明
算子状态生命周期设置生效的优先级从高到低依次是:专家模式、状态生命周期提示、作业参数配置。