当前位置: 首页 > article >正文

Spark 之 Aggregate

Aggregate

参考链接:

  • https://github.com/PZXWHU/SparkSQL-Kernel-Profiling

完整的聚合查询的关键字包括 group by、 cube、 grouping sets 和 rollup 4 种 。 分组语句 group by 后面可以是一个或多个分组表达式( groupingExpressions )。

聚合查询还支持 OLAP 场景下的多维分析,包括 rollup、 cube 和 grouping sets 3 种操作 。

逻辑节点 Aggregate

在这里插入图片描述

逻辑算子树节点通过分组表达式列表( groupingExpressions )、聚合表达式列表( aggregateExpressions )和子节点( child )构造而成,
其中分组表达式类型都是 Expression ,而聚合表达式类型都是 NamedExpression ,意味着聚合表达式一般都需要设置名字。
aggregateExpressions 对应聚合函数,而 resultExpressions 则包含了 Select 语句中选择的所有列信息 。

示例之 partial Aggregate 对应 logical plan

在这里插入图片描述
里面的mode 直接也是 Complete

示例之 final Aggregate 对应 logical plan

在这里插入图片描述

NamedExpression (这里对应的是Alias) 里 的child 是 AggregateFunction,里面的mode 直接就是 Complete

case class Alias(child: Expression, name: String)

case class Alias(child: Expression, name: String)(
    val exprId: ExprId = NamedExpression.newExprId,
    val qualifier: Seq[String] = Seq.empty,
    val explicitMetadata: Option[Metadata] = None,
    val nonInheritableMetadataKeys: Seq[String] = Seq.empty)
  extends UnaryExpression with NamedExpression {
物理 Aggregate

对于聚合查询,逻辑算子树转换为物理算子树,必不可少的是 Aggregation 转换策略 。 实际上, Aggregation 策略是基于 PhysicalAggregation 的 。 与 PhysicalOperation 类似,PhysicalAggregation 也是一种逻辑算子树的模式,用来匹配逻辑算子树中的 Aggregate 节点并提取该节点中的相关信息 。 PhysicalAggregation 在提取信息时会进行以下转换 。

在这里插入图片描述

select id, count(name) from student group by id

在这里插入图片描述

聚合模式

在 SparkSQL 中,聚合过程有 4 种模式,分别是 Partial 模式、 ParitialMerge 模式、 Final 模式 和 Complete 模式 。

在这里插入图片描述

上述聚合过程
中在 map 阶段的 sum 函数处于 Partial 模式,在 reduce 阶段的 sum 函数处于 Final 模式。

在这里插入图片描述

Complete 模式和Partial/Final 组合方式不一样,不进行局部聚合计算 。

在这里插入图片描述

ParitialMerge 主要应用在 distinct 语句中,如图 、所示 。聚合语句针对同一张表进行 sum 和 count (distinct)查询,最终的执行过程包含了 4 步聚合操作 。

  • 第 1 步按照( A,C)分组,对 sum 函数进行 Partial 模式聚合计算;
  • 第 2 步是 PartialMerge 模式,对上一步计算之后的聚合缓冲区进行合井,但此时仍然不是最终的结果;
  • 第 3 步分组的列发生变化,再一次进行 Partial 模式的 count 计算;
  • 第 4 步完成 Final 模式的最终计算 。
HashAggregate

常见的聚合查询语句通常采用 HashAggregate 方式,当存在以下几种情况时,会用 SortAggregate 方式来执行 。

  • 查询中存在不支持 Partial 方式的聚合函数:此时会调用 AggUtils 中的 planAggregateWithoutPartial 方法,直接生成 SortAggregateExec 聚合算子节点 。
  • 聚合函数结果不支持 Buffer 方式:如果结果类型不属于(NullType, BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DateType, TimestampType,DecimalType]集合中的任意一种,则需要执行 SortAggregateExec 方式,例如 collect_set和 collect_list 函数。
  • 内存不足:如果在 HashAggregate 执行过程中,内存空间己捕,那么聚合执行会切换到 SortAggregateExec 方式。

注意:
spark 2.2 之后去掉了planAggregateWithoutPartial
参见:
https://issues.apache.org/jira/browse/SPARK-19060
https://github.com/apache/spark/pull/16461

Expand

逻辑计划阶段:
GroupingSets 节点转换为 Aggregate+Expand+Pr付出t3 个节点的组合 。 顾名思义, Expand 表示“扩展”,多维分析在本质上相当于执行多种组合的 group by 操作,因此 Expand 所起的作用就是将一条数据扩展为特定形式的多条数据。

在这里插入图片描述

需要注意的是, Expand 方式执行多维分析虽然能够达到只读一次数据表的效果,但是在某些场景下容易造成中间数据的膨胀。 例如,数据的维度太高, Expand 会产生指数级别的数据量 。 针对这种情况,可以进行相应的优化。

AggregateMode

org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode

sealed trait AggregateMode

/**
 * An [[AggregateFunction]] with [[Partial]] mode is used for partial aggregation.
 * This function updates the given aggregation buffer with the original input of this
 * function. When it has processed all input rows, the aggregation buffer is returned.
 */
case object Partial extends AggregateMode

/**
 * An [[AggregateFunction]] with [[PartialMerge]] mode is used to merge aggregation buffers
 * containing intermediate results for this function.
 * This function updates the given aggregation buffer by merging multiple aggregation buffers.
 * When it has processed all input rows, the aggregation buffer is returned.
 */
case object PartialMerge extends AggregateMode

/**
 * An [[AggregateFunction]] with [[Final]] mode is used to merge aggregation buffers
 * containing intermediate results for this function and then generate final result.
 * This function updates the given aggregation buffer by merging multiple aggregation buffers.
 * When it has processed all input rows, the final result of this function is returned.
 */
case object Final extends AggregateMode

/**
 * An [[AggregateFunction]] with [[Complete]] mode is used to evaluate this function directly
 * from original input rows without any partial aggregation.
 * This function updates the given aggregation buffer with the original input of this
 * function. When it has processed all input rows, the final result of this function is returned.
 */
case object Complete extends AggregateMode
Aggregate 之 inputBufferOffset

org.apache.spark.sql.execution.aggregate.HashAggregateExec

case class HashAggregateExec(
    requiredChildDistributionExpressions: Option[Seq[Expression]],
    isStreaming: Boolean,
    numShufflePartitions: Option[Int],
    groupingExpressions: Seq[NamedExpression],
    aggregateExpressions: Seq[AggregateExpression],
    aggregateAttributes: Seq[Attribute],
    initialInputBufferOffset: Int,
    resultExpressions: Seq[NamedExpression],
    child: SparkPlan)
  extends AggregateCodegenSupport {
        val aggregationIterator =
          new TungstenAggregationIterator(
            partIndex,
            groupingExpressions,
            aggregateExpressions,
            aggregateAttributes,
            initialInputBufferOffset,
            resultExpressions,
            (expressions, inputSchema) =>
              MutableProjection.create(expressions, inputSchema),
            inputAttributes,
            iter,
            testFallbackStartsAt,
            numOutputRows,
            peakMemory,
            spillSize,
            avgHashProbe,
            numTasksFallBacked)

org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator

  extends AggregationIterator(
    partIndex,
    groupingExpressions,
    originalInputAttributes,
    aggregateExpressions,
    aggregateAttributes,
    initialInputBufferOffset,
    resultExpressions,
    newMutableProjection) with Logging {

org.apache.spark.sql.execution.aggregate.AggregationIterator

  protected val aggregateFunctions: Array[AggregateFunction] =
    initializeAggregateFunctions(aggregateExpressions, initialInputBufferOffset)
    for (expression <- expressions) {
      val func = expression.aggregateFunction
      val funcWithBoundReferences: AggregateFunction = expression.mode match {
        case Partial | Complete if func.isInstanceOf[ImperativeAggregate] =>
          // We need to create BoundReferences if the function is not an
          // expression-based aggregate function (it does not support code-gen) and the mode of
          // this function is Partial or Complete because we will call eval of this
          // function's children in the update method of this aggregate function.
          // Those eval calls require BoundReferences to work.
          BindReferences.bindReference(func, inputAttributeSeq)
        case _ =>
          // We only need to set inputBufferOffset for aggregate functions with mode
          // PartialMerge and Final.
          val updatedFunc = func match {
            case function: ImperativeAggregate =>
              function.withNewInputAggBufferOffset(inputBufferOffset)
            case function => function
          }
          inputBufferOffset += func.aggBufferSchema.length
          updatedFunc
      }

可见 inputBufferOffset 对 Partial | Complete 无效

ObjectHashAggregateExec

参考链接:

  • https://dataninjago.com/2022/01/09/spark-sql-query-engine-deep-dive-10-hashaggregateexec-objecthashaggregateexec/
  • https://blog.csdn.net/monkeyboy_tech/article/details/123759074

While the HashAggregateExec, backed by the Tungsten execution engine(基于Tungsten执行引擎), performs well for aggregation operations, it can only support the mutable primitive data type with a fixed size. For the user-defined aggregation functions (UDAFs) and some collect functions (e.g. collect_list and collect_set), they are not supported by the HashAggregateExec. Prior Spark 2.2.0, they have to fall back to the less performant SortAggregateExec. Since Spark 2.2.0, the ObjectHashAggregateExec is released to fill this gap, which enables the performant hash-based aggregations on the data types that are not supported by HashAggregateExec.

在这里插入图片描述


http://www.kler.cn/a/406715.html

相关文章:

  • wend看源码-APISJON
  • Appium常用的使用方法(一)
  • mac安装Pytest、Allure、brew
  • Flume日志采集系统的部署,实现flume负载均衡,flume故障恢复
  • 【算法】计算程序执行时间(C/C++)
  • 菜鸟驿站二维码/一维码 取件识别功能
  • 深入探索Apache JMeter:HashTree结构解析与应用
  • AWTK 最新动态:支持鸿蒙系统(HarmonyOS Next)
  • 游戏盾 :在线游戏的终极防护屏障
  • 返回流类型接口的错误信息处理
  • java基础概念37:正则表达式2-爬虫
  • Xilinx 7 系列 FPGA的各引脚外围电路接法
  • SMO算法-核方法支持向量机
  • HTML常用表格与标签
  • 经典 AEC 论文解读
  • 基础自动化系统的任务
  • HTMLCSS:3D立方体loading
  • Vue3-小兔鲜项目出现问题及其解决方法(未写完)
  • 解决 Git 默认分支不一致问题:最佳实践与解决方案20241120
  • Zmap+python脚本+burp实现自动化Fuzzing测试
  • 【MySQL】避免执行SQl文件后自动转化表名为小写字母
  • 查手机号归属地免费API接口教程
  • R语言绘图过程中遇到图例的图块中出现字符“a“的解决方法
  • 基于单片机的煤气泄漏控制器设计
  • C++ —— string类(上)
  • MATLAB 2024a安装包下载及安装教程