当前位置: 首页 > article >正文

Flink (十四) :Table API SQL (二) 流式概念

Flink 的Table API 和 SQL 是流批统一的 API。 这意味着 Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。 因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的, 关系型查询在流式场景下不如在批式场景下容易懂。以下包含了概念、实际的限制,以及流式数据处理中的一些特定的配置。

1. 状态管理

流模式下运行的表程序利用了 Flink 作为有状态流处理器的所有能力。事实上,一个表程序(Table program)可以配置一个状态后端和多个不同的 checkpoint 选项以处理对不同状态大小和容错需求。这可以对正在运行的 Table API & SQL 管道(pipeline)生成 savepoint,并在这之后用其恢复应用程序的状态。

1.1 状态使用

由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果, 管道会被现有优化规则集优化成尽可能少地使用状态。从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表即动态表。 它们的状态取决于用到的操作。

1.1.1 状态算子

包含诸如JOIN,聚合和去重等操作的语句需要在 Flink 抽象的容错存储内保存中间结果。例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。一个计算词频的例子如下:

CREATE TABLE doc (
    word STRING
) WITH (
    'connector' = '...'
);
CREATE TABLE word_cnt (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt  BIGINT
) WITH (
    'connector' = '...'
);

INSERT INTO word_cnt
SELECT word, COUNT(1) AS cnt
FROM doc
GROUP BY word;

word 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 word 次数。 输入 word 的值随时间变化。由于这个查询一直持续,Flink 会为每个 word 维护一个中间状态来保存当前词频,因此总状态量会随着 word 的发现不断地增长。

形如 SELECT ... FROM ... WHERE 这种只包含字段映射或过滤器的查询语句通常是无状态的管道。 然而在某些情况下,根据输入数据的特征,状态算子可能会被隐式地推导出来。

下面的例子展示了使用 SELECT ... FROM 语句查询 upsert kafka 源表。

CREATE TABLE upsert_kakfa (
    id INT PRIMARY KEY NOT ENFORCED,
    message  STRING
) WITH (
    'connector' = 'upsert-kafka',
    ...
);

SELECT * FROM upsert_kakfa;

源表的消息类型只包含 INSERTUPDATE_AFTER 和 DELETE,然而下游要求完整的 changelog(包含 UPDATE_BEFORE)。 所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。

1.1.2 空闲状态维持时间

空闲状态维持时间参数 table.exec.state.ttl 定义了状态的键在被更新后要保持多长时间才被移除。 在之前的查询例子中,word 的数目会在配置的时间内未更新时立刻被移除。通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是对应键的第一条记录。上述例子中意味着 cnt 会再次从 0 开始计数。

1.1.3 指定状态生命周期的不同方式

配置方式TableAPI/SQL 支持生效范围优先级
SET 'table.exec.state.ttl' = '...'TableAPI SQL作业粒度,默认情况下所有状态算子都会使用该值控制状态生命周期默认配置,可被覆盖
SELECT /*+ STATE_TTL(...) */ ...SQL有限算子粒度,当前支持连接和分组聚合算子该值将会优先作用于相应算子的状态生命周期。查阅状态生命周期提示获取更多信息。
修改序列化为 JSON 的 CompiledPlanTableAPI SQL通用算子粒度, 可修改任一状态算子的生命周期table.exec.state.ttl 和 STATE_TTL 的值将会序列化到 CompiledPlan,如果作业使用 CompiledPlan 提交,则最终生效的生命周期由最后一次修改的状态元数据决定。

1.1.4 配置算子粒度的状态 TTL

从 Flink v1.18 开始,Table API & SQL 支持配置细粒度的状态 TTL 来优化状态使用,可配置粒度为每个状态算子的入边数。具体而言,OneInputStreamOperator 可以配置一个状态的 TTL,而 TwoInputStreamOperator(例如双流 join)则可以分别为左状态和右状态配置 TTL。更一般地,对于具有 K 个输入的 MultipleInputStreamOperator,可以配置 K 个状态 TTL。

一些典型的使用场景如下

  • 为双流Join的左右流配置不同 TTL。 双流 Join 会生成拥有两条输入边的 TwoInputStreamOperator 的状态算子,它用到了两个状态,分别来保存来自左流和右流的更新。
  • 在同一个作业中为不同的状态计算设置不同 TTL。 举例来说,假设一个 ETL 作业使用 ROW_NUMBER 进行去重操作后, 紧接着使用 GROUP BY 语句进行聚合操作。 该作业会分别生成两个拥有单条输入边的 OneInputStreamOperator 状态算子。您可以为去重算子和聚合算子的状态分别设置不同的 TTL。
1.1.4.1 生成 Compiled Plan

配置过程首先会使用 COMPILE PLAN 语句生成一个 JSON 文件,它表示了序列化后的执行计划。

COMPILE PLAN 不支持查询语句 SELECT... FROM... 。

  • 执行 COMPILE PLAN 语句
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tableEnv.executeSql(
    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id BIGINT, ...)");
tableEnv.executeSql(
    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, ...)");
tableEnv.executeSql(
    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, order_status TINYINT, ...)");

// CompilePlan#writeToFile only supports a local file path, if you need to write to remote filesystem,
// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR ...")
CompiledPlan compiledPlan = 
    tableEnv.compilePlanSql(
        "INSERT INTO enriched_orders \n" 
       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
       + "FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id");

compiledPlan.writeToFile("/path/to/plan.json");
  • SQL 语法
COMPILE PLAN [IF NOT EXISTS] <plan_file_path> FOR <insert_statement>|<statement_set>;

statement_set:
    EXECUTE STATEMENT SET
    BEGIN
    insert_statement;
    ...
    insert_statement;
    END;

insert_statement:
    <insert_from_select>|<insert_from_values>

该语句会在指定位置 /path/to/plan.json 生成一个 JSON 文件。

1.1.4.2 修改 Compiled Plan

每个状态算子会显式地生成一个名为 “state” 的 JSON 数组,具有如下结构。 理论上一个拥有 k 路输入的状态算子拥有 k 个状态。

"state": [
    {
      "index": 0,
      "ttl": "0 ms",
      "name": "${1st input state name}"
    },
    {
      "index": 1,
      "ttl": "0 ms",
      "name": "${2nd input state name}"
    },
    ...
  ]

找到您需要修改的状态算子,将 TTL 的值设置为一个正整数,注意需要带上时间单位毫秒。举例来说,如果想将当前状态算子的 TTL 设置为 1 小时,您可以按照如下格式修改 JSON:

{
  "index": 0,
  "ttl": "3600000 ms",
  "name": "${1st input state name}"
}

保存好文件,然后使用 EXECUTE PLAN 语句来提交作业。理论上,下游状态算子的 TTL 不应小于上游状态算子的 TTL。

1.1.4.3 执行 Compiled Plan

EXECUTE PLAN 语句将会反序列化上述 JSON 文件,进一步生成 JobGraph 并提交作业。 通过 EXECUTE PLAN 语句提交的作业,其状态算子的 TTL 的值将会从文件中读取,配置项 table.exec.state.ttl 的值将会被忽略。

  • 执行 EXECUTE PLAN 语句
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tableEnv.executeSql(
    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id BIGINT, ...)");
tableEnv.executeSql(
    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, ...)");
tableEnv.executeSql(
    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, order_status TINYINT, ...)");

// PlanReference#fromFile only supports a local file path, if you need to read from remote filesystem,
// please use tableEnv.executeSql("EXECUTE PLAN 'hdfs://path/to/plan.json'").await();
tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await();
  • SQL 语法

    EXECUTE PLAN [IF EXISTS] <plan_file_path>;
    

    该语句反序列化指定的 JSON 文件,并提交作业。

完整示例

下面的例子展示了一个通过双流 Join 计算订单明细的作业,并且如何为左右流设置不同的 TTL。

  • 生成 compiled plan

    -- left source table
    CREATE TABLE Orders (
        `order_id` INT,
        `line_order_id` INT
    ) WITH (
        'connector'='...'
    );
    
    -- right source table
    CREATE TABLE LineOrders (
        `line_order_id` INT,
        `ship_mode` STRING
    ) WITH (
        'connector'='...'
    );
    
    -- sink table
    CREATE TABLE OrdersShipInfo (
        `order_id` INT,
        `line_order_id` INT,
        `ship_mode` STRING
    ) WITH (
        'connector' = '...'
    );
    
    COMPILE PLAN '/path/to/plan.json' FOR
    INSERT INTO OrdersShipInfo
    SELECT a.order_id, a.line_order_id, b.ship_mode 
    FROM Orders a JOIN LineOrders b 
        ON a.line_order_id = b.line_order_id;
    

    生成的 JSON 文件内容如下:

    {
      "flinkVersion" : "1.18",
      "nodes" : [ {
        "id" : 1,
        "type" : "stream-exec-table-source-scan_1",
        "scanTableSource" : {
          "table" : {
            "identifier" : "`default_catalog`.`default_database`.`Orders`",
            "resolvedTable" : { ... }
          }
        },
        "outputType" : "ROW<`order_id` INT, `line_order_id` INT>",
        "description" : "TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, line_order_id])",
        "inputProperties" : [ ]
      }, {
        "id" : 2,
        "type" : "stream-exec-exchange_1",
        "inputProperties" : [ ... ],
        "outputType" : "ROW<`order_id` INT, `line_order_id` INT>",
        "description" : "Exchange(distribution=[hash[line_order_id]])"
      }, {
        "id" : 3,
        "type" : "stream-exec-table-source-scan_1",
        "scanTableSource" : {
          "table" : {
            "identifier" : "`default_catalog`.`default_database`.`LineOrders`",
            "resolvedTable" : {...}
          }
        },
        "outputType" : "ROW<`line_order_id` INT, `ship_mode` VARCHAR(2147483647)>",
        "description" : "TableSourceScan(table=[[default_catalog, default_database, LineOrders]], fields=[line_order_id, ship_mode])",
        "inputProperties" : [ ]
      }, {
        "id" : 4,
        "type" : "stream-exec-exchange_1",
        "inputProperties" : [ ... ],
        "outputType" : "ROW<`line_order_id` INT, `ship_mode` VARCHAR(2147483647)>",
        "description" : "Exchange(distribution=[hash[line_order_id]])"
      }, {
        "id" : 5,
        "type" : "stream-exec-join_1",
        "joinSpec" : { ... },
        "state" : [ {
          "index" : 0,
          "ttl" : "0 ms",
          "name" : "leftState"
        }, {
          "index" : 1,
          "ttl" : "0 ms",
          "name" : "rightState"
        } ],
        "inputProperties" : [ ... ],
        "outputType" : "ROW<`order_id` INT, `line_order_id` INT, `line_order_id0` INT, `ship_mode` VARCHAR(2147483647)>",
        "description" : "Join(joinType=[InnerJoin], where=[(line_order_id = line_order_id0)], select=[order_id, line_order_id, line_order_id0, ship_mode], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])"
      }, {
        "id" : 6,
        "type" : "stream-exec-calc_1",
        "projection" : [ ... ],
        "condition" : null,
        "inputProperties" : [ ... ],
        "outputType" : "ROW<`order_id` INT, `line_order_id` INT, `ship_mode` VARCHAR(2147483647)>",
        "description" : "Calc(select=[order_id, line_order_id, ship_mode])"
      }, {
        "id" : 7,
        "type" : "stream-exec-sink_1",
        "configuration" : { ... },
        "dynamicTableSink" : {
          "table" : {
            "identifier" : "`default_catalog`.`default_database`.`OrdersShipInfo`",
            "resolvedTable" : { ... }
          }
        },
        "inputChangelogMode" : [ "INSERT" ],
        "inputProperties" : [ ... ],
        "outputType" : "ROW<`order_id` INT, `line_order_id` INT, `ship_mode` VARCHAR(2147483647)>",
        "description" : "Sink(table=[default_catalog.default_database.OrdersShipInfo], fields=[order_id, line_order_id, ship_mode])"
      } ],
      "edges" : [ ... ]
    }
    
  • 修改和执行 compiled plan

    如下 JSON 格式代表了 Join 算子的状态信息:

    "state": [
        {
          "index": 0,
          "ttl": "0 ms",
          "name": "leftState"
        },
        {
          "index": 1,
          "ttl": "0 ms",
          "name": "rightState"
        }
      ]
    

    其中 "index" 代表了当前状态属于算子的第几路输入,从 0 开始计数。 当前左右流的 TTL 值都是 "0 ms",表示此时 TTL 并未开启。 现在将左流的 TTL 设置为 "3000 ms",右流设置为 "9000 ms"。修改后的 JSON 如下所示。

    "state": [
        {
          "index": 0,
          "ttl": "3000 ms",
          "name": "leftState"
        },
        {
          "index": 1,
          "ttl": "9000 ms",
          "name": "rightState"
        }
      ]
    

    保存修改,紧接着使用 EXECUTE PLAN 语句提交作业,此时提交的作业中,Join 的左右流就使用了上述配置的不同 TTL。

    EXECUTE PLAN '/path/to/plan.json'

http://www.kler.cn/a/533246.html

相关文章:

  • 用python实现进度条
  • 【C++】STL——list的使用
  • 【PyQt】pyqt小案例实现简易文本编辑器
  • 深度求索DeepSeek横空出世
  • Golang 并发机制-5:详解syn包同步原语
  • git进阶--3---git pull和git fetch的区别与联系
  • 使用DeepSeek的技巧笔记
  • 【怎么用系列】短视频戒除—2—(移动端)抖音等短视频定时关闭方法
  • 视觉控件怎么去做以及过程,怎么让系统自动到保存电脑?
  • 配置@别名路径,把@/ 解析为 src/
  • 响应式编程_03响应式编程在Netflix Hystrix 、Spring Cloud Gateway、Spring WebFlux中的应用
  • java进阶文章链接
  • Page Assist - 本地Deepseek模型 Web UI 的安装和使用
  • SpringBoot开发(四)SpringBoot配置文件
  • 【数据可视化-14】Boss招聘数据分析岗位的可视化分析
  • C++11详解(二) -- 引用折叠和完美转发
  • 使用C++构建一个优先级队列
  • 代理模式的作用
  • 从 DeepSeek R1 中提取数学推理数据,使用 CAMEL
  • 华为手机nova9,鸿蒙系统版本4.2.0.159,智慧助手.今天版本是14.x,如何卸载智慧助手.今天?
  • Python进行模型优化与调参
  • SGlang 专为大模型设计的高效服务框架
  • DRGDIP 2.0时代下基于PostgreSQL的成本管理实践与探索(上)
  • AI透明化与全球政治格局的发展:如何避免AI被人为操控
  • 电商用户画像数据可视化分析
  • 基于MODIS/Landsat/Sentinel/国产卫星遥感数据与DSSAT作物模型同化的作物产量估算