当前位置: 首页 > article >正文

大数据SQL调优专题——Flink执行原理

引入

上一篇我们了解了Spark,相比起MapReduce来说,它确实已经快了超级多了,但是人类的欲望是没有止境的,这也是推动人类进步的动力。

Flink就是为了满足实时响应的场景需求诞生的。

其实在Flink之前,实时处理其实已经有Storm和Spark Streaming了,它们都是老牌流处理引擎,那么为什么Flink会后来居上呢?

流处理场景中常用的两个标准,分别是数据处理延迟和产出数据质量:

  • 数据处理延迟:流处理的核心要求是数据处理快,因此数据处理延迟是衡量流处理引擎优劣的重要指标。
  • 产出数据质量:流处理作业一般都是7×24小时运行的,这期间很难保证作业不发生故障,数据质量主要指流处理作业在发生故障后,通过自身的异常容错功能是否能够保证数据计算结果的准确性。

我们以上面的两个标准来看看这三个引擎各自的优缺点:

    引擎数据处理延迟产出数据质量
    Apache Storm处理数据是一条一条进行的,可以做到毫秒级别延迟作业在发生故障时,只能保证数据不会丢失,但是有可能重复计算数据,只能保证至少一次(At-least-once)的数据处理。以一个案例说明至少一次数据处理的结果:流处理作业需要计算输入数据的条目数,假如有10条输入数据,理论上得到的正确结果为10,但是作业一旦发生故障,在异常容错恢复后,计算得到的结果会大于或等于10
    Spark StreamingFlink认为数据是以流的形式存在的,批是一种有界流;而Spark Streaming则相反,它认为数据是以批的形式存在的,流只是划分得非常细的批,流是批的一种特殊形式。基于该理论,Spark Streaming在处理数据时是按照微批进行的,一批数据统一处理一次,因此延迟相对比较高作业在发生故障时,基于Spark Streaming的检查点机制可以实现数据计算不重不丢,保证精确一次(Exactly-once)的数据处理。以一个案例说明精确一次数据处理的结果:流处理作业计算输入数据的条目数,有10条输入数据,在作业发生故障时,异常容错恢复后算出的结果依然等于10
    Apache Flink和Storm一样,处理数据是一条条进行的,可以做到毫秒级别延迟作业在发生故障时,基于Flink的检查点机制可以做到数据计算不重不丢,保证精确一次的数据处理

    可以看到,Flink在正是通过数据处理延迟低和产出数据质量高的优势,才得以后来居上的。

    Flink的核心概念

    先看看Flink官网对Flink的定义:

    翻译:

    有状态的流计算
    Apache Flink 是一个用于对有界无界数据流进行有状态计算的框架和分布式处理引擎。Flink 被设计用于在所有常见的集群环境中运行,以内存速度执行计算,并且能够处理任意规模的数据。

    下面我们看看里面的一些重点概念

    处理无界和有界数据流

    什么是流处理?

    流处理是对运动中的数据的处理,换句话说,在生成或接收数据时直接计算数据。

    任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录等等数据,大多都是连续的流,都是随着时间的推移而创建的。

    在流处理之前,这些数据通常存储在数据库、文件系统或其他形式的大容量存储中。应用程序将根据需要查询数据或计算数据。

    流处理改变了这种模式:让应用逻辑,分析和查询始终存在,让数据不断地流经它们。

    在从流中接收到事件时,流处理应用程序对该事件作出反应。它可以触发动作,更新聚合或其他统计,或“记住”该事件以供将来参考。以及连接多个数据流,并产生数据流。

    什么是有界和无界流?

    流也拥有着多种特征。这些特征决定了流如何以及何时被处理。

    在Flink的设计理念中,数据可以被作为无界或者有界流来处理。(当然本质上来说是不存在有界流,这只是一个逻辑概念,因为流就是无界的。所谓的有界流就是通过人为截取一段数据,这样所有数据就是确定的,可以被排序了。)

    • 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。

    • 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理

    所有的数据都是以流的方式产生,但用户通常会使用两种截然不同的方法处理数据。或是在数据生成时进行实时的处理;亦或是先将数据流持久化到存储系统中——例如文件系统或对象存储,然后再进行批处理。

    简单理解有界无界数据流处理,可以以老师批改作业为例子,老师考试完统一收起卷子来批改就是有界流处理也叫批处理,老师让学生做完就交卷,来一张批改一张,这就是无界数据流处理,也就是实时流处理。

    流与时间

    说到流处理,不得不提到时间的概念。时间是流处理应用另一个重要的组成部分。

    真实世界的系统、网络和通信渠道远非完美,流数据经常被推迟或无序(乱序)到达。如何在这种条件下提供准确和确定的结果是至关重要的。

    处理时间

    处理时间是处理流的应用程序的机器的本地时钟的时间。

    事件时间

    事件事件是流中的事件实际发生的时间。事件时间基于流中的事件所包含的时间戳。通常情况下,在事件进入流处理程序前,事件数据就已经包含了时间戳。

    事件时间使得计算结果的过程不需要依赖处理数据的速度。基于事件时间的操作是可以预测的,而计算结果也是确定的。

    无论流处理程序处理流数据的速度快或是慢,事件时间窗口的计算结果都是一样的。

    水位线Watermarks

    因为处理时间和事件时间总是不能完全一致,我们如何知道事件是迟到的?我们需要确定一个时间点之前的所有事件都已经到达之前,需要等待多久。

    水位线是全局进度的度量标准。系统可以确信在一个时间点之后,不会有早于这个时间点发生的事件到来了。

    当一个算子接收到含有时间T的水位线时,这个算子会认为早于时间T的发生的事件已经全部都到达了。

    对于事件时间窗口和乱序事件的处理,水位线非常重要。算子一旦接收到水位线,算子会认为一段时间内发生的所有事件都已经观察到,可以触发针对这段时间内所有事件的计算了。

    水位线提供了一种结果可信度和延时之间的妥协。激进的水位线设置可以保证低延迟,但结果的准确性不够。如果水位线设置得过于宽松,计算的结果准确性会很高,但可能会增加流处理程序不必要的延时。

    时间语义

    因为事件总是在特定时间点发生,所以大多数的事件流都拥有事件本身所固有的时间语义。进一步而言,许多常见的流计算都基于时间语义,例如窗口聚合、会话计算、模式检测和基于时间的 join。流处理的一个重要方面是应用程序如何衡量时间,即区分事件时间(event-time)和处理时间(processing-time)。

    Flink 提供了丰富的时间语义支持。

    • 事件时间模式:使用事件时间语义的流处理应用根据事件本身自带的时间戳进行结果的计算。因此,无论处理的是历史记录的事件还是实时的事件,事件时间模式的处理总能保证结果的准确性和一致性。

    • Watermark 支持:Flink 引入了 watermark 的概念,用以衡量事件时间进展。Watermark 也是一种平衡处理延时和完整性的灵活机制。

    • 迟到数据处理:当以带有 watermark 的事件时间模式处理数据流时,在计算完成之后仍会有相关数据到达。这样的事件被称为迟到事件。Flink 提供了多种处理迟到数据的选项,例如将这些数据重定向到旁路输出(side output)或者更新之前完成计算的结果。

    • 处理时间模式:除了事件时间模式,Flink 还支持处理时间语义。处理时间模式根据处理引擎的机器时钟触发计算,一般适用于有着严格的低延迟需求,并且能够容忍近似结果的流处理应用。

    有状态的计算

    Flink的有状态计算,本质就是把流处理需要的额外数据保存成一个“状态”,然后针对这条数据进行处理,并且更新状态。

    为了加快访问速度,我们可以直接将状态保存在本地内存。当应用收到一个新事件时,它可以从状态中读取数据,也可以更新状态。而当状态是从内存中读写的时候,这就和访问本地变量没什么区别了,实时性可以得到极大的提升。

    另外,数据规模增大时,我们也不需要做重构,只需要构建分布式集群,各自在本地计算就可以了,可扩展性也变得更好。

    因为底层是分布式系统,所以还需要保护本地状态,防止在故障时数据丢失。我们可以定期地将应用状态的一致性检查点(checkpoint)存盘,写入远程的持久化存储,遇到故障时再去读取进行恢复,这样就保证了更好的容错性。

    有状态的流处理是一种通用而且灵活的设计架构,可用于许多不同的场景。其处理架构上其实并不复杂,很多用户基于这种思想开发出了自己的流处理系统,这就是第一代流处理器。Apache Storm就是其中的代表。Storm 提供了低延迟的流处理,但很难实现高吞吐,而且无法保证结果的正确性,所以目前已经被Flink替代了。

    只有在每一个单独的事件上进行转换操作的应用才不需要状态,换言之,每一个具有一定复杂度的流处理应用都是有状态的。任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接收的事件或中间结果,以供后续的某个时间点(例如收到下一个事件或者经过一段特定时间)进行访问并进行后续处理。

    有状态流处理是流处理的子集。此状态用于存储从先前看到的事件派生的信息。

    大多数的流处理都需要处理状态:

    • 防欺诈应用程序会保留信用卡的最后交易。将新的状态与保留的状态中进行比较,标记为有效或欺诈,并更新状态。

    • 在线推荐应用程序将保留描述用户偏好的参数。

    有状态流处理需要支持状态管理的流处理器。

    状态是 Flink 中的一等公民,Flink 提供了许多状态管理相关的特性支持,其中包括:

    • 多种状态基础类型:Flink 为多种不同的数据结构提供了相对应的状态基础类型,例如原子值(value),列表(list)以及映射(map)。开发者可以基于处理函数对状态的访问方式,选择最高效、最适合的状态基础类型。

    • 插件化的State Backend:State Backend 负责管理应用程序状态,并在需要的时候进行 checkpoint。Flink 支持多种 state backend,可以将状态存在内存或者RocksDB。RocksDB 是一种高效的嵌入式、持久化键值存储引擎。Flink 也支持插件式的自定义 state backend 进行状态存储。

    • 精确一次语义:Flink 的 checkpoint 和故障恢复算法保证了故障发生后应用状态的一致性。因此,Flink 能够在应用程序发生故障时,对应用程序透明,不造成正确性的影响。

    • 超大数据量状态:Flink 能够利用其异步以及增量式的 checkpoint 算法,存储数 TB 级别的应用状态。

    • 可弹性伸缩的应用:Flink 能够通过在更多或更少的工作节点上对状态进行重新分布,支持有状态应用的分布式的横向伸缩。

    核心特性

    下面我们总结一下Flink的核心特性

    1.真正的流处理

    在Flink的流处理模式下,Flink处理数据的粒度是事件粒度或者说数据粒度,也就是来一条数据就处理一条数据。这个特性反馈到日常的业务中就是数据处理延迟极低,一般是毫秒级别,基于该特性,在实时大屏场景中,我们可以实现大屏指标的快速更新。

    2.强大的性能

    Flink是分布式的计算引擎,处理数据的吞吐能力能够轻松达到百万、千万级别QPS(Queries Per Second,每秒处理数据的条目)​。Flink处理数据的吞吐量在很多场景下甚至可以做到和物理资源呈线性关系,因此在面对大流量数据时,Flink无所畏惧。

    3.时间语义丰富

    Flink预置了多种时间语义的API,包括事件时间、处理时间和摄入时间语义,我们可以通过这些预置API实现时间窗口上的高效数据处理。

    4.高可用的有状态计算

    Flink不但提供了丰富的状态类型及状态操作API,而且提供了Checkpoint、Savepoint这样的快照机制来保障精确一次的数据处理,即使作业发生异常,我们也无须担心数据丢失或者重复。同时,Flink支持TB级别的状态数据存储能力。

    5.流批一体

    Flink不仅是流处理的好手,目前在批处理方面也在大力发展,我们通过同一条SQL语句就可以同时完成流处理、批处理,这可以显著降低开发、维护和资源使用的成本。

    Flink的执行原理

    由于目前使用最广泛的是1.13版本,这里我们重点就以1.13版本源码,来看看FlinkSQL的执行过程。既然是FLinkSQL,自然要看flink-table里面的内容啦,我们先看看这个里面有些什么:

    flink-sql-client:提供了 Flink SQL 的命令行客户端工具,用户可以通过该客户端连接到 Flink 集群,执行 SQL 脚本,进行交互式的数据查询和操作。它封装了与 Flink 集群通信的逻辑,支持提交 SQL 作业、查看作业状态等功能,方便用户在命令行环境下使用 Flink SQL 进行数据处理任务。
     

    flink-sql-parser:负责解析 Flink SQL 语句,将其转换为抽象语法树(AST)。在解析过程中,会对 SQL 语句进行词法分析和语法分析,检查语句的语法正确性,识别出 SQL 语句中的关键字、标识符、表达式等元素,并构建出相应的 AST 结构,为后续的查询优化和执行打下基础。
     

    flink-sql-parser-hive:提供了对 Hive SQL 语法的支持,使得 Flink 能够解析和执行 Hive SQL 语句。它扩展了 Flink 的 SQL 解析器,增加了对 Hive 特有语法和函数的解析能力,使得用户可以使用熟悉的 Hive SQL 语法来操作 Flink,降低了从 Hive 迁移到 Flink 的学习成本和开发成本。
     

    flink-table-api-java:提供了 Java 语言的 Table API,允许开发者使用 Java 代码以面向对象的方式进行表数据的操作和处理。通过该 API,开发者可以方便地进行表的创建、查询、转换、连接等操作,实现复杂的数据处理逻辑,并且可以与 Flink 的其他 API(如 DataStream API)无缝集成。
     

    flink-table-api-java-bridge:作为 Java 语言的 Table API 与其他 Flink API 之间的桥梁,使得开发者可以在 Java 代码中方便地在 Table API 和其他 API(如 DataStream API)之间进行切换和交互。它提供了一些转换方法和工具类,帮助开发者将 Table 转换为 DataStream 或将 DataStream 转换为 Table,从而实现不同 API 之间的数据共享和协同处理。
     

    flink-table-api-scala:提供了 Scala 语言的 Table API,允许开发者使用 Scala 代码以面向对象的方式进行表数据的操作和处理。Scala 语言的 Table API 在功能上与 Java 版本类似,但由于 Scala 语言的特性,其语法更加简洁和灵活,能够更好地利用 Scala 的函数式编程特性进行数据处理。
     

    flink-table-api-scala-bridge:作为 Scala 语言的 Table API 与其他 Flink API 之间的桥梁,使得开发者可以在 Scala 代码中方便地在 Table API 和其他 API(如 DataStream API)之间进行切换和交互。它提供了一些转换方法和工具类,帮助开发者将 Table 转换为 DataStream 或将 DataStream 转换为 Table,从而实现不同 API 之间的数据共享和协同处理。
     

    flink-table-common:包含了 Flink Table 模块中的一些通用类和接口,如表结构定义、数据类型定义、函数定义等。这些通用类和接口在 Flink Table 模块的各个子模块中被广泛使用,为 Table API、SQL 解析、查询优化和执行等提供了基础的构建块。
     

    flink-table-planner:负责 Flink Table 的查询规划,包括逻辑查询计划的生成和优化。它将 Table API 或 SQL 查询转换为逻辑查询计划,然后对逻辑查询计划进行优化,如应用各种查询规则、重写查询、选择最优的执行计划等,以提高查询的执行效率。
     

    flink-table-planner-blink:Blink Planner 是 Flink 的一种查询规划器,该包实现了 Blink Planner 的相关功能。Blink Planner 引入了一些新的查询优化技术,如动态分区裁剪、谓词下推等,能够生成更高效的执行计划,提高查询性能。
     

    flink-table-runtime-blink:Blink Runtime 是 Flink 的一种运行时环境,该包实现了 Blink Runtime 的相关功能。它负责执行由 Blink Planner 生成的执行计划,包括任务的调度、数据的传输、算子的执行等,提供了高效的运行时支持。

    假如我们使用Java开发,编写一条简单的SQL查询语句,就会进入到flink-table-api-java模块里,由其中的org.apache.flink.table.api.internal.TableEnvironmentImpl类的sqlQuery(String query)方法接收,并返回Table类型的对象,可用于进一步的SQL查询或变换。

    sqlQuery()

    sqlQuery()方法对应源码如下:

        /**
         * 执行单个 SQL 查询并返回一个 Table 对象。
         * 此方法仅支持单个 SQL 查询,且查询类型必须为 SELECT、UNION、INTERSECT、EXCEPT、VALUES 或 ORDER_BY。
         *
         * @param query 要执行的 SQL 查询语句。
         * @return 包含查询结果的 Table 对象。
         * @throws ValidationException 如果查询不是单个 SQL 查询,或者查询类型不被支持。
         */
        public Table sqlQuery(String query) {
            // 使用解析器,解析 SQL 查询语句,得到Operation列表
            List<Operation> operations = getParser().parse(query);
    
            // 如果解析出来的Operation多余1个,说明填写了多个SQL,不支持这样使用
            if (operations.size() != 1) {
                throw new ValidationException(
                        "Unsupported SQL query! sqlQuery() only accepts a single SQL query.");
            }
    
            // 获取解析到的Operation
            Operation operation = operations.get(0);
    
            // 仅支持查询语句,这里会检查Operation是否为 QueryOperation 类型,且不是 ModifyOperation 类型
            if (operation instanceof QueryOperation && !(operation instanceof ModifyOperation)) {
                // 如果满足条件,则将Operation转换为 QueryOperation 类型并创建 Table 对象
                return createTable((QueryOperation) operation);
            } else {
                // 如果不满足条件,则抛出异常
                throw new ValidationException(
                        "Unsupported SQL query! sqlQuery() only accepts a single SQL query of type "
                                + "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");
            }
        }
    

    parse()

    其中的parse()方法在1.13以前是如下实现的:

    protected final Parser parser;
    ... ...
    List<Operation> operations = parser.parse(query);

    1.13迭代主要基于以下几个方面的考虑

    • 设计模式:由直接访问变量转变为通过接口获取对象,遵循了依赖倒置接口隔离原则。

    • 灵活性:接口允许更容易地扩展、切换不同的解析器实现或策略。

    • 可测试性:隔离核心逻辑,让测试更易模拟和介入。

    • 解耦:减少内部依赖,明确职责边界,使系统更清晰、健壮。

    这样可以更方便集成更多SQL解析规则,并能支撑更多样化的输入来源,有助于推动Flink整体架构向更稳定、更高效的分布式数据处理框架发展。

    我们通过parse()方法可追溯到 flink-table-planner 模块里的org.apache.flink.table.planner.ParserImpl类的parse()方法,其源码注释如下:

    When parsing statement, it first uses {@link ExtendedParser} to parse statements. If {@link ExtendedParser} fails to parse statement, it uses the {@link CalciteParser} to parse statements.

    翻译:在解析语句时,它首先使用扩展解析器(ExtendedParser)来解析语句。如果扩展解析器(ExtendedParser)无法解析语句,它将使用Calcite解析器(CalciteParser)来解析语句。

    对应源码如下:

        /**
         * 解析输入的 SQL 语句,将其转换为 Operation 列表。
         * 首先尝试使用扩展解析器解析语句,如果失败则使用 Calcite 解析器。
         *
         * @param statement 输入的 SQL 语句。
         * @return 解析后的操作列表。
         */
        @Override
        public List<Operation> parse(String statement) {
            // 获取 Calcite 解析器实例
            CalciteParser parser = calciteParserSupplier.get();
            // 使用 FlinkPlannerImpl 作为 validator
            FlinkPlannerImpl planner = validatorSupplier.get();
            // 解析 SQL 查询
            // 对于一些特殊的写法,例如SET key=value,CalciteParser是不支持这种写法的
            // 为了避免在Calcite中引入过多的关键字,这里定义了一组扩展解析器,专门用于在CalciteParser 之前,解析这些特殊的语句
            // 尝试使用扩展解析器解析语句
            Optional<Operation> command = EXTENDED_PARSER.parse(statement);
            // 如果扩展解析器成功解析出操作,则直接返回该操作
            if (command.isPresent()) {
                return Collections.singletonList(command.get());
            }
    
            // 使用 Calcite 解析器解析 SQL 为语法树
            SqlNode parsed = parser.parse(statement);
    
            // 将解析后的 SqlNode 转换为 Operation
            Operation operation =
                    SqlToOperationConverter.convert(planner, catalogManager, parsed)
                            .orElseThrow(
                                    () ->
                                            // 如果转换失败,抛出不支持的 SQL 查询异常
                                            new TableException(
                                                    "Unsupported SQL query! parse() only accepts SQL queries of type "
                                                            + "SELECT, UNION, INTERSECT, EXCEPT, VALUES, ORDER_BY or INSERT;"
                                                            + "and SQL DDLs of type "
                                                            + "CREATE TABLE"));
            // 返回包含单个操作的列表
            return Collections.singletonList(operation);
        }
    

    其中ExtendedParser.parse(statement)是1.13中优化迭代新增的,在1.13之前是没有的。这个设计,是为了在不增加CalciteParser复杂性的前提下,可以让Flink SQL支持更多专用的语法。

    这里我们可以看一下,ExtendedParser包含的解析策略:

    /**
     * ExtendedParser 用于解析一些 CalciteParser 不支持的特殊命令,例如 SET key=value,其中键和值标识符包含特殊字符。
     * 将一些解析逻辑移到这里也有助于避免引入新的保留关键字。
     * 该类提供了单例模式的实例,可用于解析命令并生成相应的操作。
     */
    public class ExtendedParser {
    
        /**
         * 单例模式的 ExtendedParser 实例。
         * 使用单例模式确保在整个应用程序中只有一个 ExtendedParser 实例被创建和使用。
         */
        public static final ExtendedParser INSTANCE = new ExtendedParser();
    
        /**
         * 存储所有扩展解析策略的列表。
         * 这些策略用于匹配和转换特定的命令,每个策略对应一种特殊命令的解析方式。
         */
        private static final List<AbstractRegexParseStrategy> PARSE_STRATEGIES =
                Arrays.asList(
                        // 清空输出
                        ClearOperationParseStrategy.INSTANCE,
                        // 打印帮助信息
                        HelpOperationParseStrategy.INSTANCE,
                        // 退出执行环境
                        QuitOperationParseStrategy.INSTANCE,
                        // 重置一个变量的值
                        ResetOperationParseStrategy.INSTANCE,
                        // 设置一个变量的值
                        SetOperationParseStrategy.INSTANCE);
    

    而正常对于标准的SQL语句,则由org.apache.flink.table.parse.CalciteParser类的parse()方法负责解析,其对应源码如下:

        /**
         * 解析一个 SQL 语句为 {@link SqlNode}。这个 {@link SqlNode} 尚未经过验证。
         *
         * @param sql 要解析的 SQL 字符串
         * @return 解析后的 SQL 节点
         * @throws SqlParserException 如果在解析语句时抛出异常
         */
        public SqlNode parse(String sql) {
            try {
                // 创建一个 SQL 解析器实例,使用传入的 SQL 字符串和配置
                SqlParser parser = SqlParser.create(sql, config);
                // 解析 SQL 语句并返回解析后的 SqlNode
                return parser.parseStmt();
            } catch (SqlParseException e) {
                // 如果解析过程中出现异常,抛出 SqlParserException 并附带详细错误信息
                throw new SqlParserException("SQL parse failed. " + e.getMessage(), e);
            }
        }
    

    getSqlParserConfig()

    Flink的SQL方言与标准SQL相比有很大差别,那么Flink是如何借助Calcite实现Flink SQL专用的解析器呢?

    我们可以通过SqlParser.Config入手,可以从下面源码看到,在构造SqlParser类的配置类SqlParser.Config时,需要传入FlinkSqlParserImpl.FACTORY:

        /**
         * 获取SQL解析器的配置。
         *
         * 此方法首先尝试从表配置对应的Calcite配置中获取SQL解析器配置。
         * 如果未找到,则使用默认的构建器创建一个新的配置。
         * 默认配置使用Java词法,因为反引号比双引号在编程中更方便,并且保留大小写。
         *
         * @return SQL解析器的配置对象
         */
        public SqlParser.Config getSqlParserConfig() {
            // 尝试从Calcite配置中获取SQL解析器配置
            return JavaScalaConversionUtil.toJava(calciteConfig(tableConfig).sqlParserConfig())
                    .orElseGet(
                            () ->
                                    // 如果Calcite配置中没有SQL解析器配置,则使用默认配置
                                    // 我们使用Java词法,因为反引号比双引号在编程中更方便,并且保留大小写
                                    SqlParser.configBuilder()
                                            // 设置解析器工厂为Flink SQL解析器实现工厂
                                            .setParserFactory(FlinkSqlParserImpl.FACTORY)
                                            // 设置SQL兼容性
                                            .setConformance(getSqlConformance())
                                            // 设置词法为Java词法
                                            .setLex(Lex.JAVA)
                                            // 构建配置对象
                                            .build());
        }
    

    其中的类FlinkSqlParserImpl,是通过编译Flink SQL的语法描述文件(包含Calcite内置的Parser.jj与Flink定制好的Freemarker模板)生成,最终在generated-sources目录下生成了FlinkSqlParserImpl及其附属的类,Calcite会利用它们进行Flink SQL的解析。codegen目录下则是语法描述文件的本体。

    Flink借助了Calcite实现Flink SQL专用的解析器,具体实现过程如下:

    1. 引入Calcite
      Flink通过Maven依赖引入Calcite,并使用Maven插件将Calcite的解析器模板解压到Flink项目的构建目录下。
    2. 生成Parser.jj文件
      Flink使用FreeMarker模板引擎(FMPP)生成Parser.jj文件,该文件定义了Flink SQL的语法。
    3. 使用JavaCC生成解析器
      Flink利用JavaCC工具根据Parser.jj文件生成解析器代码,生成的解析器能够将SQL语句解析为SqlNode对象。
    4. 扩展Calcite的解析器
      Flink通过扩展Calcite的解析器,实现了对Flink SQL方言的支持。具体来说,Flink在Calcite的基础上进行了二次开发,增加了对Flink特定语法的支持,例如对SET key=value语句的支持。
    5. 实现FlinkSqlParserImpl
      最终,Flink生成了FlinkSqlParserImpl,这是Flink专用的解析器实现。它在解析过程中首先尝试使用ExtendedParser解析特殊命令,如果失败,则使用CalciteParser进行解析。

    convert()

    好了,下面我们回到主线剧情主流程,SQL解析完成后,前面的ParserImpl.parse()方法紧接着就会调用验证逻辑。SqlToOperationConverter.convert()方法负责校验SQL语句,并将它转换为Flink对应的Operation,同时还会将SqlNode转化成RelNode,从单纯的SQL语句转化为对数据的处理逻辑,即关系代数的具体操作,如Scan、Project、Filter、Join等。

    其对应源码如下:

        /**
         * 将 SQL 节点转换为对应的操作对象。
         *
         * 该方法接收一个 FlinkPlannerImpl 对象、一个 CatalogManager 对象和一个 SqlNode 对象,
         * 首先对 SQL 节点进行验证,然后根据验证后的 SQL 节点类型,调用相应的转换方法将其转换为操作对象。
         * 如果 SQL 节点类型不支持,则返回一个空的 Optional 对象。
         *
         * @param flinkPlanner Flink 规划器实现,用于将 SQL 节点转换为关系节点
         * @param catalogManager 目录管理器,用于管理目录和数据库
         * @param sqlNode 要执行的 SQL 节点
         * @return 一个包含操作对象的 Optional 对象,如果 SQL 节点类型不支持,则返回空的 Optional 对象
         */
        public static Optional<Operation> convert(
                FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {
    
            // 校验解析后的SQL语法树
            final SqlNode validated = flinkPlanner.validate(sqlNode);
    
            // 将SqlNode转化成Operation
            SqlToOperationConverter converter =
                    new SqlToOperationConverter(flinkPlanner, catalogManager);
    
            // 根据验证后的 SQL 节点类型进行转换
            if (validated instanceof SqlUseCatalog) {
                // 如果是使用目录的 SQL 节点,调用 convertUseCatalog 方法进行转换
                return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated));
            } else if (validated instanceof SqlShowCatalogs) {
                // 如果是显示目录的 SQL 节点,调用 convertShowCatalogs 方法进行转换
                return Optional.of(converter.convertShowCatalogs((SqlShowCatalogs) validated));
            } else if (validated instanceof SqlShowCurrentCatalog) {
                // 如果是显示当前目录的 SQL 节点,调用 convertShowCurrentCatalog 方法进行转换
                return Optional.of(
                        converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) validated));
            }
            if (validated instanceof SqlCreateDatabase) {
                // 如果是创建数据库的 SQL 节点,调用 convertCreateDatabase 方法进行转换
                return Optional.of(converter.convertCreateDatabase((SqlCreateDatabase) validated));
            } else if (validated instanceof SqlDropDatabase) {
                // 如果是删除数据库的 SQL 节点,调用 convertDropDatabase 方法进行转换
                return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated));
            } else if (validated instanceof SqlAlterDatabase) {
                // 如果是修改数据库的 SQL 节点,调用 convertAlterDatabase 方法进行转换
                return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated));
            } else if (validated instanceof SqlShowDatabases) {
                // 如果是显示数据库的 SQL 节点,调用 convertShowDatabases 方法进行转换
                return Optional.of(converter.convertShowDatabases((SqlShowDatabases) validated));
            } else if (validated instanceof SqlShowCurrentDatabase) {
                // 如果是显示当前数据库的 SQL 节点,调用 convertShowCurrentDatabase 方法进行转换
                return Optional.of(
                        converter.convertShowCurrentDatabase((SqlShowCurrentDatabase) validated));
            } else if (validated instanceof SqlUseDatabase) {
                // 如果是使用数据库的 SQL 节点,调用 convertUseDatabase 方法进行转换
                return Optional.of(converter.convertUseDatabase((SqlUseDatabase) validated));
            } else if (validated instanceof SqlCreateTable) {
                // 如果是创建表的 SQL 节点,调用 convertCreateTable 方法进行转换
                return Optional.of(converter.convertCreateTable((SqlCreateTable) validated));
            } else if (validated instanceof SqlDropTable) {
                // 如果是删除表的 SQL 节点,调用 convertDropTable 方法进行转换
                return Optional.of(converter.convertDropTable((SqlDropTable) validated));
            } else if (validated instanceof SqlAlterTable) {
                // 如果是修改表的 SQL 节点,调用 convertAlterTable 方法进行转换
                return Optional.of(converter.convertAlterTable((SqlAlterTable) validated));
            } else if (validated instanceof SqlShowTables) {
                // 如果是显示表的 SQL 节点,调用 convertShowTables 方法进行转换
                return Optional.of(converter.convertShowTables((SqlShowTables) validated));
            } else if (validated instanceof SqlCreateView) {
                // 如果是创建视图的 SQL 节点,调用 convertCreateView 方法进行转换
                return Optional.of(converter.convertCreateView((SqlCreateView) validated));
            } else if (validated instanceof SqlDropView) {
                // 如果是删除视图的 SQL 节点,调用 convertDropView 方法进行转换
                return Optional.of(converter.convertDropView((SqlDropView) validated));
            } else if (validated instanceof SqlShowViews) {
                // 如果是显示视图的 SQL 节点,调用 convertShowViews 方法进行转换
                return Optional.of(converter.convertShowViews((SqlShowViews) validated));
            } else if (validated instanceof SqlCreateFunction) {
                // 如果是创建函数的 SQL 节点,调用 convertCreateFunction 方法进行转换
                return Optional.of(converter.convertCreateFunction((SqlCreateFunction) validated));
            } else if (validated instanceof SqlAlterFunction) {
                // 如果是修改函数的 SQL 节点,调用 convertAlterFunction 方法进行转换
                return Optional.of(converter.convertAlterFunction((SqlAlterFunction) validated));
            } else if (validated instanceof SqlDropFunction) {
                // 如果是删除函数的 SQL 节点,调用 convertDropFunction 方法进行转换
                return Optional.of(converter.convertDropFunction((SqlDropFunction) validated));
            } else if (validated instanceof SqlShowFunctions) {
                // 如果是显示函数的 SQL 节点,调用 convertShowFunctions 方法进行转换
                return Optional.of(converter.convertShowFunctions((SqlShowFunctions) validated));
            } else if (validated instanceof SqlRichExplain) {
                // 如果是解释 SQL 语句的 SQL 节点,调用 convertRichExplain 方法进行转换
                return Optional.of(converter.convertRichExplain((SqlRichExplain) validated));
            } else if (validated instanceof SqlRichDescribeTable) {
                // 如果是描述表的 SQL 节点,调用 convertDescribeTable 方法进行转换
                return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated));
            } else if (validated instanceof RichSqlInsert) {
                // 如果是插入数据的 SQL 节点,检查是否为部分插入
                SqlNodeList targetColumnList = ((RichSqlInsert) validated).getTargetColumnList();
                if (targetColumnList != null && targetColumnList.size() != 0) {
                    // 如果是部分插入,抛出异常
                    throw new ValidationException("Partial inserts are not supported");
                }
                // 如果不是部分插入,调用 convertSqlInsert 方法进行转换
                return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
            } else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
                // 如果是查询语句的 SQL 节点,调用 convertSqlQuery 方法进行转换
                return Optional.of(converter.convertSqlQuery(validated));
            } else {
                // 如果 SQL 节点类型不支持,返回空的 Optional 对象
                return Optional.empty();
            }
        }
    

    其中的validate()方法中,会基于FlinkCalciteSqlValidator(继承了Calcite的默认验证器SqlValidatorImpl)采用访问者模式,递归访问每个SqlCall节点,并额外规定了对字面量和Join的验证逻辑。在这个过程中会同时连接Catalog,主要的功能就是匹配表的Scheme和基本函数信息,例如表的基本定义(列名、数据类型)和函数名等。最后会将语法树重写为标准形式,以便其余的验证逻辑可以更方便地执行。

    我们可以从源码看到,其中使用多个if-else判断验证之后的SqlNode属于何种类型,再分别调用不同的方法触发转换为RelNode的操作。

    其中对于SELECT语句,会调用 convertSqlQuery() 方法进行转换,其对应源码如下:

        /**
         * 将 SQL 查询节点转换为查询操作。
         *
         * @param node 待转换的 SQL 节点
         * @return 转换后的查询操作
         */
        private Operation convertSqlQuery(SqlNode node) {
            return toQueryOperation(flinkPlanner, node);
        }
    

    继续查看toQueryOperation()方法源码如下:

        /**
         * 将经过验证的 SQL 节点转换为查询操作。
         *
         * @param planner 用于将 SQL 转换为关系代数的 Flink 规划器实例。
         * @param validated 经过验证的 SQL 节点。
         * @return 表示查询操作的 PlannerQueryOperation 对象。
         */
        private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
            // transform to a relational tree
            // 将验证后的 SQL 节点转换为关系树
            RelRoot relational = planner.rel(validated);
            // 创建一个新的 PlannerQueryOperation 对象,传入关系树的根节点
            return new PlannerQueryOperation(relational.rel);
        }
    

    该方法最终生成一个PlannerQueryOperation,将Calcite转换成的RelNode包装进去。其中生成RelNodede的过程则是由Calcite的SqlToRelConverter完成,在这个过程中,会基于Flink定制的优化规则以及Calcite自身的一些规则进行优化。

    execute()

    最后通过org.apache.flink.table.api.internal.TableImpl类的execute()方法执行SQL查询,返回一个TableResult对象,其源码如下:

        /**
         * 执行当前表的查询操作,并返回执行结果。
         * 该方法调用 TableEnvironment 的 executeInternal 方法,传入当前表的查询操作对象。
         *
         * @return 表示查询执行结果的 TableResult 对象。
         */
        @Override
        public TableResult execute() {
            return tableEnvironment.executeInternal(getQueryOperation());
        }
    

    executeInternal()方法的整体逻辑是判断Operation的类型,不同的Operation类型执行不同的操作,比如创建表、删除表、修改表、查询表、创建数据库、删除数据库、创建视图、删除视图、创建函数、删除函数,分页操作等。

    其源码如下:

        /**
         * 执行单个操作并返回操作结果。
         *
         * @param operation 要执行的操作
         * @return 操作结果
         * @throws ValidationException 如果操作过程中出现验证错误
         * @throws TableException 如果操作过程中出现表相关的错误
         */
        @Override
        public TableResult executeInternal(Operation operation) {
        // 根据操作类型分发到不同的处理逻辑
        if (operation instanceof ModifyOperation) {
            // 如果传入的是ModifyOperation,则调用批量执行方法
            return executeInternal(Collections.singletonList((ModifyOperation) operation));
        } else if (operation instanceof CreateTableOperation) {
            // 处理CreateTableOperation,包括创建临时表或普通表
            CreateTableOperation createTableOperation = (CreateTableOperation) operation;
            if (createTableOperation.isTemporary()) {
                // 创建临时表
                catalogManager.createTemporaryTable(
                        createTableOperation.getCatalogTable(),
                        createTableOperation.getTableIdentifier(),
                        createTableOperation.isIgnoreIfExists());
            } else {
                // 创建普通表
                catalogManager.createTable(
                        createTableOperation.getCatalogTable(),
                        createTableOperation.getTableIdentifier(),
                        createTableOperation.isIgnoreIfExists());
            }
            return TableResultImpl.TABLE_RESULT_OK;
        } else if (operation instanceof DropTableOperation) {
            // 处理DropTableOperation,包括删除临时表或普通表
            DropTableOperation dropTableOperation = (DropTableOperation) operation;
            if (dropTableOperation.isTemporary()) {
                // 删除临时表
                catalogManager.dropTemporaryTable(
                        dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());
            } else {
                // 删除普通表
                catalogManager.dropTable(
                        dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());
            }
            return TableResultImpl.TABLE_RESULT_OK;
        } else if (operation instanceof AlterTableOperation) {
            // 处理各种表修改操作(AlterTableOperation),包括重命名表、修改表选项、修改表约束等
            AlterTableOperation alterTableOperation = (AlterTableOperation) operation;
            Catalog catalog = getCatalogOrThrowException(alterTableOperation.getTableIdentifier().getCatalogName());
            String exMsg = getDDLOpExecuteErrorMsg(alterTableOperation.asSummaryString());
            try {
                // 根据不同的AlterTableOperation类型执行相应的操作
                if (alterTableOperation instanceof AlterTableRenameOperation) {
                    AlterTableRenameOperation alterTableRenameOp = (AlterTableRenameOperation) operation;
                    catalog.renameTable(
                            alterTableRenameOp.getTableIdentifier().toObjectPath(),
                            alterTableRenameOp.getNewTableIdentifier().getObjectName(),
                            false);
                } else if (alterTableOperation instanceof AlterTableOptionsOperation) {
                    AlterTableOptionsOperation alterTablePropertiesOp = (AlterTableOptionsOperation) operation;
                    catalogManager.alterTable(
                            alterTablePropertiesOp.getCatalogTable(),
                            alterTablePropertiesOp.getTableIdentifier(),
                            false);
                } else if (alterTableOperation instanceof AlterTableAddConstraintOperation) {
                    AlterTableAddConstraintOperation addConstraintOP = (AlterTableAddConstraintOperation) operation;
                    // 更新表结构,添加主键约束
                    CatalogTable oriTable = (CatalogTable) catalogManager.getTable(addConstraintOP.getTableIdentifier()).get().getTable();
                    TableSchema.Builder builder = TableSchemaUtils.builderWithGivenSchema(oriTable.getSchema());
                    if (addConstraintOP.getConstraintName().isPresent()) {
                        builder.primaryKey(
                                addConstraintOP.getConstraintName().get(),
                                addConstraintOP.getColumnNames());
                    } else {
                        builder.primaryKey(addConstraintOP.getColumnNames());
                    }
                    CatalogTable newTable = new CatalogTableImpl(
                            builder.build(),
                            oriTable.getPartitionKeys(),
                            oriTable.getOptions(),
                            oriTable.getComment());
                    catalogManager.alterTable(
                            newTable, addConstraintOP.getTableIdentifier(), false);
                } else if (alterTableOperation instanceof AlterTableDropConstraintOperation) {
                    AlterTableDropConstraintOperation dropConstraintOperation = (AlterTableDropConstraintOperation) operation;
                    // 更新表结构,移除主键约束
                    CatalogTable oriTable = (CatalogTable) catalogManager.getTable(dropConstraintOperation.getTableIdentifier()).get().getTable();
                    CatalogTable newTable = new CatalogTableImpl(
                            TableSchemaUtils.dropConstraint(
                                    oriTable.getSchema(),
                                    dropConstraintOperation.getConstraintName()),
                            oriTable.getPartitionKeys(),
                            oriTable.getOptions(),
                            oriTable.getComment());
                    catalogManager.alterTable(
                            newTable, dropConstraintOperation.getTableIdentifier(), false);
                } else if (alterTableOperation instanceof AlterPartitionPropertiesOperation) {
                    AlterPartitionPropertiesOperation alterPartPropsOp = (AlterPartitionPropertiesOperation) operation;
                    // 修改分区属性
                    catalog.alterPartition(
                            alterPartPropsOp.getTableIdentifier().toObjectPath(),
                            alterPartPropsOp.getPartitionSpec(),
                            alterPartPropsOp.getCatalogPartition(),
                            false);
                } else if (alterTableOperation instanceof AlterTableSchemaOperation) {
                    AlterTableSchemaOperation alterTableSchemaOperation = (AlterTableSchemaOperation) alterTableOperation;
                    catalogManager.alterTable(
                            alterTableSchemaOperation.getCatalogTable(),
                            alterTableSchemaOperation.getTableIdentifier(),
                            false);
                } else if (alterTableOperation instanceof AddPartitionsOperation) {
                    AddPartitionsOperation addPartitionsOperation = (AddPartitionsOperation) alterTableOperation;
                    // 添加多个分区
                    List<CatalogPartitionSpec> specs = addPartitionsOperation.getPartitionSpecs();
                    List<CatalogPartition> partitions = addPartitionsOperation.getCatalogPartitions();
                    boolean ifNotExists = addPartitionsOperation.ifNotExists();
                    ObjectPath tablePath = addPartitionsOperation.getTableIdentifier().toObjectPath();
                    for (int i = 0; i < specs.size(); i++) {
                        catalog.createPartition(
                                tablePath, specs.get(i), partitions.get(i), ifNotExists);
                    }
                } else if (alterTableOperation instanceof DropPartitionsOperation) {
                    DropPartitionsOperation dropPartitionsOperation = (DropPartitionsOperation) alterTableOperation;
                    // 删除多个分区
                    ObjectPath tablePath = dropPartitionsOperation.getTableIdentifier().toObjectPath();
                    boolean ifExists = dropPartitionsOperation.ifExists();
                    for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) {
                        catalog.dropPartition(tablePath, spec, ifExists);
                    }
                }
                return TableResultImpl.TABLE_RESULT_OK;
            } catch (TableAlreadyExistException | TableNotExistException e) {
                throw new ValidationException(exMsg, e);
            } catch (Exception e) {
                throw new TableException(exMsg, e);
            }
        } else if (operation instanceof CreateViewOperation) {
            // 处理创建视图操作
            CreateViewOperation createViewOperation = (CreateViewOperation) operation;
            if (createViewOperation.isTemporary()) {
                catalogManager.createTemporaryTable(
                        createViewOperation.getCatalogView(),
                        createViewOperation.getViewIdentifier(),
                        createViewOperation.isIgnoreIfExists());
            } else {
                catalogManager.createTable(
                        createViewOperation.getCatalogView(),
                        createViewOperation.getViewIdentifier(),
                        createViewOperation.isIgnoreIfExists());
            }
            return TableResultImpl.TABLE_RESULT_OK;
        } else if (operation instanceof DropViewOperation) {
            // 处理删除视图操作
            DropViewOperation dropViewOperation = (DropViewOperation) operation;
            if (dropViewOperation.isTemporary()) {
                catalogManager.dropTemporaryView(
                        dropViewOperation.getViewIdentifier(), dropViewOperation.isIfExists());
            } else {
                catalogManager.dropView(
                        dropViewOperation.getViewIdentifier(), dropViewOperation.isIfExists());
            }
            return TableResultImpl.TABLE_RESULT_OK;
        } else if (operation instanceof AlterViewOperation) {
            // 处理视图修改操作
            AlterViewOperation alterViewOperation = (AlterViewOperation) operation;
            Catalog catalog = getCatalogOrThrowException(alterViewOperation.getViewIdentifier().getCatalogName());
            String exMsg = getDDLOpExecuteErrorMsg(alterViewOperation.asSummaryString());
            try {
                if (alterViewOperation instanceof AlterViewRenameOperation) {
                    AlterViewRenameOperation alterTableRenameOp = (AlterViewRenameOperation) operation;
                    catalog.renameTable(
                            alterTableRenameOp.getViewIdentifier().toObjectPath(),
                            alterTableRenameOp.getNewViewIdentifier().getObjectName(),
                            false);
                } else if (alterViewOperation instanceof AlterViewPropertiesOperation) {
                    AlterViewPropertiesOperation alterTablePropertiesOp = (AlterViewPropertiesOperation) operation;
                    catalogManager.alterTable(
                            alterTablePropertiesOp.getCatalogView(),
                            alterTablePropertiesOp.getViewIdentifier(),
                            false);
                } else if (alterViewOperation instanceof AlterViewAsOperation) {
                    AlterViewAsOperation alterViewAsOperation = (AlterViewAsOperation) alterViewOperation;
                    catalogManager.alterTable(
                            alterViewAsOperation.getNewView(),
                            alterViewAsOperation.getViewIdentifier(),
                            false);
                }
                return TableResultImpl.TABLE_RESULT_OK;
            } catch (TableAlreadyExistException | TableNotExistException e) {
                throw new ValidationException(exMsg, e);
            } catch (Exception e) {
                throw new TableException(exMsg, e);
            }
        } else if (operation instanceof CreateDatabaseOperation) {
            // 处理创建数据库操作
            CreateDatabaseOperation createDatabaseOperation = (CreateDatabaseOperation) operation;
            Catalog catalog = getCatalogOrThrowException(createDatabaseOperation.getCatalogName());
            String exMsg = getDDLOpExecuteErrorMsg(createDatabaseOperation.asSummaryString());
            try {
                catalog.createDatabase(
                        createDatabaseOperation.getDatabaseName(),
                        createDatabaseOperation.getCatalogDatabase(),
                        createDatabaseOperation.isIgnoreIfExists());
                return TableResultImpl.TABLE_RESULT_OK;
            } catch (DatabaseAlreadyExistException e) {
                throw new ValidationException(exMsg, e);
            } catch (Exception e) {
                throw new TableException(exMsg, e);
            }
        } else if (operation instanceof DropDatabaseOperation) {
            // 处理删除数据库操作
            DropDatabaseOperation dropDatabaseOperation = (DropDatabaseOperation) operation;
            Catalog catalog = getCatalogOrThrowException(dropDatabaseOperation.getCatalogName());
            String exMsg = getDDLOpExecuteErrorMsg(dropDatabaseOperation.asSummaryString());
            try {
                catalog.dropDatabase(
                        dropDatabaseOperation.getDatabaseName(),
                        dropDatabaseOperation.isIfExists(),
                        dropDatabaseOperation.isCascade());
                return TableResultImpl.TABLE_RESULT_OK;
            } catch (DatabaseNotExistException | DatabaseNotEmptyException e) {
                throw new ValidationException(exMsg, e);
            } catch (Exception e) {
                throw new TableException(exMsg, e);
            }
        } else if (operation instanceof AlterDatabaseOperation) {
            // 处理修改数据库操作
            AlterDatabaseOperation alterDatabaseOperation = (AlterDatabaseOperation) operation;
            Catalog catalog = getCatalogOrThrowException(alterDatabaseOperation.getCatalogName());
            String exMsg = getDDLOpExecuteErrorMsg(alterDatabaseOperation.asSummaryString());
            try {
                catalog.alterDatabase(
                        alterDatabaseOperation.getDatabaseName(),
                        alterDatabaseOperation.getCatalogDatabase(),
                        false);
                return TableResultImpl.TABLE_RESULT_OK;
            } catch (DatabaseNotExistException e) {
                throw new ValidationException(exMsg, e);
            } catch (Exception e) {
                throw new TableException(exMsg, e);
            }
        } else if (operation instanceof CreateCatalogFunctionOperation) {
            // 处理创建目录函数操作
            return createCatalogFunction((CreateCatalogFunctionOperation) operation);
        } else if (operation instanceof CreateTempSystemFunctionOperation) {
            // 处理创建临时系统函数操作
            return createSystemFunction((CreateTempSystemFunctionOperation) operation);
        } else if (operation instanceof DropCatalogFunctionOperation) {
            // 处理删除目录函数操作
            return dropCatalogFunction((DropCatalogFunctionOperation) operation);
        } else if (operation instanceof DropTempSystemFunctionOperation) {
            // 处理删除临时系统函数操作
            return dropSystemFunction((DropTempSystemFunctionOperation) operation);
        } else if (operation instanceof AlterCatalogFunctionOperation) {
            // 处理修改目录函数操作
            return alterCatalogFunction((AlterCatalogFunctionOperation) operation);
        } else if (operation instanceof CreateCatalogOperation) {
            // 处理创建目录操作
            return createCatalog((CreateCatalogOperation) operation);
        } else if (operation instanceof DropCatalogOperation) {
            // 处理删除目录操作
            DropCatalogOperation dropCatalogOperation = (DropCatalogOperation) operation;
            String exMsg = getDDLOpExecuteErrorMsg(dropCatalogOperation.asSummaryString());
            try {
                catalogManager.unregisterCatalog(
                        dropCatalogOperation.getCatalogName(), dropCatalogOperation.isIfExists());
                return TableResultImpl.TABLE_RESULT_OK;
            } catch (CatalogException e) {
                throw new ValidationException(exMsg, e);
            }
        } else if (operation instanceof LoadModuleOperation) {
            // 处理加载模块操作
            return loadModule((LoadModuleOperation) operation);
        } else if (operation instanceof UnloadModuleOperation) {
            // 处理卸载模块操作
            return unloadModule((UnloadModuleOperation) operation);
        } else if (operation instanceof UseModulesOperation) {
            // 处理使用模块操作
            return useModules((UseModulesOperation) operation);
        } else if (operation instanceof UseCatalogOperation) {
            // 处理切换当前目录操作
            UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation;
            catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName());
            return TableResultImpl.TABLE_RESULT_OK;
        } else if (operation instanceof UseDatabaseOperation) {
            // 处理切换当前数据库操作
            UseDatabaseOperation useDatabaseOperation = (UseDatabaseOperation) operation;
            catalogManager.setCurrentCatalog(useDatabaseOperation.getCatalogName());
            catalogManager.setCurrentDatabase(useDatabaseOperation.getDatabaseName());
            return TableResultImpl.TABLE_RESULT_OK;
        } else if (operation instanceof ShowCatalogsOperation) {
            // 处理显示所有目录操作
            return buildShowResult("catalog name", listCatalogs());
        } else if (operation instanceof ShowCurrentCatalogOperation) {
            // 处理显示当前目录操作
            return buildShowResult(
                    "current catalog name", new String[] {catalogManager.getCurrentCatalog()});
        } else if (operation instanceof ShowDatabasesOperation) {
            // 处理显示所有数据库操作
            return buildShowResult("database name", listDatabases());
        } else if (operation instanceof ShowCurrentDatabaseOperation) {
            // 处理显示当前数据库操作
            return buildShowResult(
                    "current database name", new String[] {catalogManager.getCurrentDatabase()});
        } else if (operation instanceof ShowModulesOperation) {
            // 处理显示模块操作,支持简略和完整格式
            ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation;
            if (showModulesOperation.requireFull()) {
                return buildShowFullModulesResult(listFullModules());
            } else {
                return buildShowResult("module name", listModules());
            }
        } else if (operation instanceof ShowTablesOperation) {
            // 处理显示所有表操作
            return buildShowResult("table name", listTables());
        } else if (operation instanceof ShowFunctionsOperation) {
            // 处理显示函数操作,支持显示用户定义函数和所有函数
            ShowFunctionsOperation showFunctionsOperation = (ShowFunctionsOperation) operation;
            String[] functionNames = null;
            switch (showFunctionsOperation.getFunctionScope()) {
                case USER:
                    functionNames = listUserDefinedFunctions();
                    break;
                case ALL:
                    functionNames = listFunctions();
                    break;
                default:
                    throw new UnsupportedOperationException(
                            String.format(
                                    "SHOW FUNCTIONS with %s scope is not supported.",
                                    showFunctionsOperation.getFunctionScope()));
            }
            return buildShowResult("function name", functionNames);
        } else if (operation instanceof ShowViewsOperation) {
            // 处理显示所有视图操作
            return buildShowResult("view name", listViews());
        } else if (operation instanceof ShowPartitionsOperation) {
            // 处理显示分区操作
            String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
            try {
                ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation;
                Catalog catalog = getCatalogOrThrowException(showPartitionsOperation.getTableIdentifier().getCatalogName());
                ObjectPath tablePath = showPartitionsOperation.getTableIdentifier().toObjectPath();
                CatalogPartitionSpec partitionSpec = showPartitionsOperation.getPartitionSpec();
                List<CatalogPartitionSpec> partitionSpecs =
                        partitionSpec == null
                                ? catalog.listPartitions(tablePath)
                                : catalog.listPartitions(tablePath, partitionSpec);
                List<String> partitionNames = new ArrayList<>(partitionSpecs.size());
                for (CatalogPartitionSpec spec : partitionSpecs) {
                    List<String> partitionKVs = new ArrayList<>(spec.getPartitionSpec().size());
                    for (Map.Entry<String, String> partitionKV : spec.getPartitionSpec().entrySet()) {
                        partitionKVs.add(partitionKV.getKey() + "=" + partitionKV.getValue());
                    }
                    partitionNames.add(String.join("/", partitionKVs));
                }
                return buildShowResult("partition name", partitionNames.toArray(new String[0]));
            } catch (TableNotExistException e) {
                throw new ValidationException(exMsg, e);
            } catch (Exception e) {
                throw new TableException(exMsg, e);
            }
        } else if (operation instanceof ExplainOperation) {
            // 处理EXPLAIN操作,返回SQL的执行计划
            String explanation =
                    explainInternal(
                            Collections.singletonList(((ExplainOperation) operation).getChild()));
            return TableResultImpl.builder()
                    .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
                    .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())))
                    .data(Collections.singletonList(Row.of(explanation)))
                    .setPrintStyle(TableResultImpl.PrintStyle.rawContent())
                    .setSessionTimeZone(getConfig().getLocalTimeZone())
                    .build();
        } else if (operation instanceof DescribeTableOperation) {
            // 处理DESCRIBE TABLE操作,返回表的详细信息
            DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
            Optional<CatalogManager.TableLookupResult> result =
                    catalogManager.getTable(describeTableOperation.getSqlIdentifier());
            if (result.isPresent()) {
                return buildDescribeResult(result.get().getResolvedSchema());
            } else {
                throw new ValidationException(
                        String.format(
                                "Tables or views with the identifier '%s' doesn't exist",
                                describeTableOperation.getSqlIdentifier().asSummaryString()));
            }
        } else if (operation instanceof QueryOperation) {
            // 处理查询操作
            return executeQueryOperation((QueryOperation) operation);
        } else if (operation instanceof CreateTableASOperation) {
            // 处理CREATE TABLE AS操作,包括创建表和插入数据
            executeInternal(((CreateTableASOperation) operation).getCreateTableOperation());
            return executeInternal(((CreateTableASOperation) operation).getInsertOperation());
        } else if (operation instanceof NopOperation) {
            // 处理空操作
            return TableResultImpl.TABLE_RESULT_OK;
        } else {
            // 如果操作类型不支持,则抛出异常
            throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
        }
    }

    其中会通过executeQueryOperation()来执行查询操作,其源码如下:

        /**
         * 执行查询操作并返回表结果。
         * 此方法将查询操作包装在一个收集修改操作中,并异步执行该操作。
         * 它创建一个未注册的收集接收器,将查询操作转换为转换列表,
         * 然后使用执行环境创建并执行管道。
         *
         * @param operation 要执行的查询操作
         * @return 包含查询结果的表结果
         * @throws TableException 如果执行SQL时发生错误
         */
        private TableResult executeQueryOperation(QueryOperation operation) {
            // 创建一个未解析的标识符,用于表示未注册的收集接收器
            final UnresolvedIdentifier unresolvedIdentifier =
                    UnresolvedIdentifier.of(
                            "Unregistered_Collect_Sink_" + CollectModifyOperation.getUniqueId());
            // 将未解析的标识符转换为合格的对象标识符
            final ObjectIdentifier objectIdentifier =
                    catalogManager.qualifyIdentifier(unresolvedIdentifier);
    
            // 创建一个本地收集ModifyOperation结果的Operation
            CollectModifyOperation sinkOperation =
                    new CollectModifyOperation(objectIdentifier, operation);
            // 将上一步的 sinkOperation 翻译为Flink的transformation
            List<Transformation<?>> transformations =
                    translate(Collections.singletonList(sinkOperation));
            // 设置作业名称
            String jobName = getJobName("collect");
            // 根据transformation,生成StreamGraph
            Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);
            try {
                // 代表作业异步执行过程
                JobClient jobClient = execEnv.executeAsync(pipeline);
                // 用于帮助jobClient获取执行结果
                CollectResultProvider resultProvider = sinkOperation.getSelectResultProvider();
                resultProvider.setJobClient(jobClient);
                // 构建TableResultImpl对象
                return TableResultImpl.builder()
                        .jobClient(jobClient)
                        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
                        .schema(operation.getResolvedSchema())
                        .data(resultProvider.getResultIterator())
                        .setPrintStyle(
                                TableResultImpl.PrintStyle.tableau(
                                        PrintUtils.MAX_COLUMN_WIDTH,
                                        PrintUtils.NULL_COLUMN,
                                        true,
                                        isStreamingMode))
                        .setSessionTimeZone(getConfig().getLocalTimeZone())
                        .build();
            } catch (Exception e) {
                // 如果执行过程中发生异常,抛出表异常
                throw new TableException("Failed to execute sql", e);
            }
        }
    

    总结

    本文介绍了Flink,并通过源码梳理了Flink SQL的执行原理。

    Flink SQL从提交查询到任务执行,可以分为以下过程:

    1. 语法解析
      利用Calcite将SQL语句转换成一棵抽象语法树,在Calcite中用SqlNode来表示。
    2. 语法校验
      根据元数据信息进行验证,例如查询的表、使用的函数是否存在等,校验之后仍然是由SqlNode构成的语法树。
    3. 查询计划优化
      首先将SqlNode语法树转换成由关系表达式RelNode构成的逻辑树,然后使用优化器基于规则进行等价变换。
    4. 物理执行
      逻辑查询计划翻译成物理执行计划,生成对应的可执行代码并提交运行。

    http://www.kler.cn/a/552521.html

    相关文章:

  • 安装海康威视相机SDK后,catkin_make其他项目时,出现“libusb_set_option”错误的解决方法
  • DeepSeek R1生成图片总结2(虽然本身是不能直接生成图片,但是可以想办法利用别的工具一起实现)
  • word$deepseep
  • 用deepseek学大模型03-数学基础 概率论 最大似然估计(MLE)最大后验估计(MAP)
  • DeepSeek告别服务器繁忙
  • 应急响应(linux 篇,以centos 7为例)
  • Linux、Docker、Redis常见面试题
  • MybatisMybatisPllus公共字段填充与配置逻辑删除
  • LLaMa-Factory部署及llamafactory-cli webui命令无法打开ui界面问题解决记录
  • 【触想智能】工业显示器和普通显示器的区别以及工业显示器的主要应用领域分析
  • Leetcode2080:区间内查询数字的频率
  • SpringCloud面试题----为什么会产生Eureka的自我保护, 如何关闭自我保护机制
  • docker 安装的open-webui链接ollama出现网络错误
  • 阿里云大文件ossutil工具进行上传下载,该工具支持断点续传
  • mysql开启gtid并配置主从
  • eNSP防火墙综合实验
  • AI与大数据:双剑合璧的智能革命
  • 什么是UV环形光源
  • Flutter 状态管理:详细分析与实战
  • python爬虫系列课程2:如何下载Xpath Helper