当前位置: 首页 > article >正文

FlinkSql-Temporal Joins-Lookup Join

说明

在 Flink SQL 中,Temporal Joins 是一种常见的数据关联操作,特别适用于处理包含时间维度的数据。Lookup Join 是 Temporal Joins 的一种类型,它允许将流数据与维表数据进行关联。使用场景如下:

  1. 实时维度关联: 当您有一个实时的流数据流,并且需要与维表进行关联,以获取维度信息时,Lookup Join 是一个很有用的工具。例如,在电商领域,您可以将实时的订单流与商品维表进行关联,以获取商品的详细信息,如名称、价格、类别等。

  2. 动态数据关联: 如果您的维表数据是动态变化的,例如产品信息或用户配置信息,而且您希望在流数据处理过程中及时地获取最新的维度信息,Lookup Join 可以帮助您实现这一点。您可以将流数据与动态更新的维表进行关联,以确保关联的维度信息始终是最新的。

  3. 事件时间关联: Lookup Join 支持基于事件时间的关联操作,这意味着您可以根据事件发生的时间点来进行关联。这在需要处理时间窗口或事件序列的场景中特别有用。例如,您可以将实时的用户行为数据与用户配置信息进行关联,以便根据用户行为的时间戳获取相应的用户配置。

  4. 高效的维度查询: Lookup Join 通过将维表数据加载到内存中进行索引,提供了高效的维度查询能力。这使得在流数据处理过程中通过内存索引快速查找和关联维度数据成为可能,而无需频繁地访问外部存储系统。

总的来说,Lookup Join 适用于需要实时、动态和高效地关联流数据与维度数据的场景。它可以帮助您获取最新的维度信息,并在流数据处理过程中进行高效的维度查询和关联操作。

假设您有以下两个数据流:

  1. 订单流(Orders Stream)包含实时生成的订单数据,其中每个订单都包含商品ID(productId)和订单数量(quantity)。
  2. 商品维表(Products Dimension Table)包含商品的详细信息,包括商品ID(productId)、商品名称(productName)和商品价格(price)。

您可以使用 Lookup Join 将订单流与商品维表进行关联,以获取订单中商品的详细信息。以下是一个使用 Flink SQL 的示例:

-- 创建订单流表
CREATE TABLE orders (
  productId INT,
  quantity INT,
  orderTime TIMESTAMP(3),
  WATERMARK FOR orderTime AS orderTime - INTERVAL '5' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'orders_topic',
  'connector.properties.bootstrap.servers' = 'kafka:9092',
  'format.type' = 'json'
);

-- 创建商品维表
CREATE TABLE products (
  productId INT,
  productName STRING,
  price DECIMAL(10, 2),
  PRIMARY KEY (productId) NOT ENFORCED
) WITH (
  'connector.type' = 'jdbc',
  'connector.url' = 'jdbc:mysql://mysql:3306/my_database',
  'connector.table' = 'products',
  'connector.driver' = 'com.mysql.jdbc.Driver',
  'connector.username' = 'username',
  'connector.password' = 'password'
);

-- 执行 Lookup Join 操作
CREATE TABLE enrichedOrders AS
SELECT o.*, p.productName, p.price
FROM orders AS o
JOIN products FOR SYSTEM_TIME AS OF o.orderTime AS p
ON o.productId = p.productId;

在上述示例中,我们首先创建了订单流表和商品维表。订单流表从 Kafka 主题中读取实时订单数据,商品维表通过 JDBC 连接到 MySQL 数据库中的商品表。

然后,我们执行 Lookup Join 操作,将订单流表 orders 与商品维表 products 关联起来。通过 JOIN products FOR SYSTEM_TIME AS OF o.orderTime,我们将商品维表与订单流进行关联,并根据订单的事件时间 orderTime 来获取相应时间点的维度信息。

最后,我们将关联后的结果存储在 enrichedOrders 表中,其中包含了订单流的所有字段以及关联的商品名称和价格。

通过这个示例,您可以看到如何使用 Lookup Join 将流数据与维度数据进行关联,以获取实时的维度信息,丰富您的数据分析和处理过程。在实际应用中,您需要根据具体的数据源和业务需求进行相应的配置和调整。

实例demo

--模拟stream表
CREATE view kafka_mock as
select '123' as key, proctime() as _proc; -- proctime()作为处理时间-1,proctime()数据类型为TIMESTAMP_LTZ(3)

--可以直接查询的外部系统
CREATE TABLE es_dim(
    p_key     STRING,
    p_type    STRING
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'index01',
    'document-type' = 'type01',
    'hosts' = 'http://xxx:9200',
    'format' = 'json'
);

SELECT
    a.key,
    a._proc,
    CAST(a._proc AS TIMESTAMP(3)) as _proc_local
FROM kafka_mock a
join es_dim FOR SYSTEM_TIME AS OF a._proc as b --利用时态表,关联stream表-2
on a.key = b.p_key

时态表join-查找join
参考:Lookup Join


http://www.kler.cn/a/146786.html

相关文章:

  • kafka原理和实践
  • Golang 设计模式
  • LeetCode 热题 100_从前序与中序遍历序列构造二叉树(47_105_中等_C++)(二叉树;递归)
  • 第27章 汇编语言--- 设备驱动开发基础
  • 长安“战疫”网络安全公益赛的一些随想
  • 服务器数据恢复—EMC存储POOL中数据卷被删除的数据恢复案例
  • 基于官方YOLOv4-u5【yolov5风格实现】开发构建目标检测模型超详细实战教程【以自建缺陷检测数据集为例】
  • 力扣hot100 滑动窗口最大值 单调队列
  • C/C++ 常用加密与解密算法
  • 自己动手写编译器:golex 和 flex 比较研究 2
  • Java之面向对象《ATM自动取款机》
  • Arkts http数据请求
  • 每日一题--寻找重复数
  • opencv-python读取的图像分辨率太大不能完全显示
  • 优秀软件设计特征与原则
  • 买饮料问题
  • 【华为OD】B\C卷真题 100%通过:需要打开多少监控器 C/C++实现
  • java集合,ArrayList、LinkedList和Vector,多线程场景下如何使用 ArrayList
  • SQL Server:流程控制语言详解
  • leetcode 不同的二叉搜索树
  • java基础-运算符
  • virtualList 封装使用 虚拟列表 列表优化
  • 第四节HarmonyOS 熟知开发工具DevEco Studio
  • 深入解析:如何开发抖音票务小程序
  • CANdelaStudio 中 Bese Variant 和 Variant区别
  • JavaScript WebApi(二) 详解