当前位置: 首页 > article >正文

[实时计算flink]IntervalJoin语句

本文为您介绍如何使用IntervalJoin语句。

背景信息

IntervalJoin语句可以让两个流进行JOIN时,左流和右流中每条记录只关联另外一条流上满足定义的时间范围内的数据,且进行完JOIN后,仍然保留输入流上的时间列,让您继续进行基于Event Time的操作。

语法格式

SELECT column-names
FROM table1  [AS <alias1>]
[INNER | LEFT | RIGHT |FULL ] JOIN table2 
ON table1.column-name1 = table2.key-name1 AND TIMEBOUND_EXPRESSION

说明

  • 支持INNER JOIN、LEFT JOIN、RIGHT JOIN和FULL JOIN,如果直接使用JOIN,默认为INNER JOIN。

  • 暂不支持SEMI JOIN和ANTI JOIN。

  • TIMEBOUND_EXPRESSION为左右两个流时间属性列上的区间条件表达式,支持以下三种条件表达式:

    • ltime = rtime

    • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE

    • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

示例1(基于Event Time)

统计下单后4个小时内的物流信息。

  • 测试数据

    • 订单表(Orders)

      id

      productName

      orderTime

      1

      iphone

      2020-04-01 10:00:00.0

      2

      mac

      2020-04-01 10:02:00.0

      3

      huawei

      2020-04-01 10:03:00.0

      4

      pad

      2020-04-01 10:05:00.0

    • 物流表(Shipments)

      shipId

      orderId

      status

      shiptime

      0

      1

      shipped

      2020-04-01 11:00:00.0

      1

      2

      delivered

      2020-04-01 17:00:00.0

      2

      3

      shipped

      2020-04-01 12:00:00.0

      3

      4

      shipped

      2020-04-01 11:30:00.0

  • 测试语句

    CREATE TEMPORARY TABLE Orders(
      id BIGINT,
      productName VARCHAR,
      orderTime TIMESTAMP,
      WATERMARK wk FOR orderTime as withOffset(orderTime, 2000)  --为rowtime定义Watermark。
    ) WITH (
      type='datahub',
      endpoint='<yourEndpoint>',
      accessId='<yourAccessID>',
      accessKey='<yourAccessSecret>',
      projectName='<yourProjectName>',
      topic='<yourTopic>',
      project='<yourProjectName>'
    );
    
    CREATE TEMPORARY TABLE Shipments(
      shipId BIGINT,
      orderId BIGINT,
      status VARCHAR,
      shiptime TIMESTAMP,
      WATERMARK wk FOR shiptime as withOffset(shiptime, 2000)  --为rowtime定义Watermark。
    ) WITH (
      type='datahub',
      endpoint='<yourEndpoint>',
      accessId='<yourAccessID>',
      accessKey='<yourAccessSecret>',
      projectName='<yourProjectName>',
      topic='<yourTopic>',
      project='<yourProjectName>'
    );
    
    --使用RDS作为结果表。
    CREATE TEMPORARY TABLE rds_output(
      id BIGINT,
      productName VARCHAR,
      status VARCHAR
    ) WITH (
      type='rds',
      url='<yourDatabaseURL>',
      tableName='<yourDatabaseTablename>',
      userName='<yourDatabaseUserName>',
      password='<yourDatabasePassword>'
    );
    
    INSERT INTO rds_output
    SELECT id, productName, status
    FROM Orders AS o
    JOIN Shipments AS s on o.id = s.orderId AND
         o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime;
  • 测试结果

    id(bigint)

    productName(varchar)

    status(varchar)

    1

    iphone

    shipped

    3

    huawei

    shipped

    4

    pad

    shipped

示例2(基于Processing Time)

  • 测试数据

    • datahub_stream1

      k1

      v1

      1

      val1

      2

      val2

      3

      val3

    • datahub_stream2

      k1

      v1

      1

      val1

      2

      val2

      3

      val3

  • 测试语句

    CREATE TEMPORARY TABLE datahub_stream1 (
      k1 BIGINT,
      v1 VARCHAR,
      d AS PROCTIME()
    ) WITH (
      'connector' = 'datahub',
      'subId' = '<yourSubId>',
      'endPoint' = '<yourEndPoint>',
      'project' = '<yourProjectName>',
      'topic' = '<yourTopicName>',
      'accessId' = '<yourAccessId>',
      'accessKey' = '<yourAccessKey>'
    );
    
    CREATE TEMPORARY TABLE datahub_stream2 (
      k2 BIGINT,
      v2 VARCHAR,
      e AS PROCTIME()
    ) WITH (
      'connector' = 'datahub',
      'subId' = '<yourSubId>',
      'endPoint' = '<yourEndPoint>',
      'project' = '<yourProjectName>',
      'topic' = '<yourTopicName>',
      'accessId' = '<yourAccessId>',
      'accessKey' = '<yourAccessKey>'
    );
    
    --使用RDS作为结果表。
    CREATE TEMPORARY TABLE rds_output(
      k1 BIGINT,
      v1 VARCHAR,
      v2 VARCHAR
    ) WITH (
      'connector'='rds',
      'tableName'='your-table-name',
      'userName'='your-user-name',
      'password'='your-password',
      'url'='your-url'
    );
    
    INSERT INTO rds_output
    SELECT k1, v1, v2
    FROM datahub_stream1 AS o
    JOIN datahub_stream2 AS s on o.k1 = s.k2 AND
         o.d BETWEEN s.e - INTERVAL '4' MINUTE AND s.e;

说明

由于结果取决于两个流里每条数据进入系统的时间,具有不确定性,因此该示例暂不提供预期结果。

Flink SQL支持添加提示(Hints),允许您手动影响执行计划的生成结果,从而优化SQL作业的执行。本文为您介绍如何使用SQL提示。

动态表选项提示

从VVR 4.x开始,您可以通过动态表选项来动态地制定或覆盖表选项,从而让这些表参数仅在Query级别的范围内生效。动态表选项的语法和示例,请参见动态表(dynamic table)选项。

查询提示

维表联接提示

维表联接提示(Lookup Join Hints)详情,请参见维表JOIN Hints 。

算子状态生命周期(State TTL)提示

通过为状态设置状态生命周期,可以显著减少算子状态的大小,从而增强系统稳定性和可靠性。算子状态生命周期设置的方式详情如下:

  • 状态生命周期提示:为特殊的有状态算子设置算子粒度的状态生命周期。当前该方式仅适用于双流联接算子和分组聚合算子,具体的设置方式如下:

    • 双流联接算子

      • VVR 8.0.7及以上版本:可以通过状态生命周期提示或双流JOIN hints的提示来实现。

      • VVR 8.0.1~VVR 8.0.6版本:仅支持通过双流JOIN hints的提示来实现。

    • 分组聚合算子

      • VVR 8.0.7及以上版本:通过分组聚合状态生命周期提示实现,更多语法和示例请参见状态生命周期提示。

      • VVR 8.0.6及以前版本:不支持配置算子状态生命周期提示。

  • 作业参数配置:设置作业参数来设置全局作业级别的状态生命周期,详情请参见table.exec.state.ttl。

  • 专家模式:为有状态算子配置算子粒度的状态生命周期,详情请参见配置算子并发、Chain策略和TTL。

说明

算子状态生命周期设置生效的优先级从高到低依次是:专家模式、状态生命周期提示、作业参数配置。


http://www.kler.cn/news/365641.html

相关文章:

  • 创建型模式-----建造者模式
  • 【Jenkins】解决在Jenkins Agent节点容器内无法访问物理机的docker和docker compose的问题
  • 分布式光伏发电系统电气一次部分设计(开题报告3)
  • LSTM反向传播及公式推导
  • win10怎么卸载软件干净?电脑彻底删除软件的方法介绍,一键清理卸载残留!
  • Python的NumPy库简介
  • 查找算法和排序算法
  • Django+Vue全栈开发项目入门(二)
  • Git版本控制
  • 前沿技术与未来发展第一节:C++与机器学习
  • less 命令无法正确显示中文字符问题
  • 探索 DevOps:从概念到实践
  • java中使用redis的方法
  • ClickHouse在百度MEG数据中台的落地和优化
  • disabled_button
  • 死锁(Deadlock)C#
  • 什么是js中的入口函数
  • Apache HttpClient 和 OkHttpClient 的使用
  • 青少年编程与数学 02-002 Sql Server 数据库应用 13课题、函数的编写
  • Mac电脑:资源库Library里找不到WebServer问题的解决
  • Appium中的api(三)
  • AIGC:开启智能创造的璀璨新篇章
  • uni-app 获取 android 手机 IMEI码
  • 算法笔记day06
  • 【Jenkins】解决使用容器化部署的Jenkins Agent节点时出现的git检查报错
  • 24.redis高性能