【PostgreSQL内核学习 —— (WindowAgg(二))】
WindowAgg
- WindowAggState 结构体
- 窗口聚合行为
- ExecInitWindowAgg 函数
- ExecWindowAgg 函数
- 代码逻辑解释:计算窗口偏移量
- 代码逻辑详细解释:
- 代码逻辑解释:窗口聚合分区初始化与行推进逻辑
- 代码逻辑详细解释:
- 代码逻辑解释:从Tuplestore读取当前行并处理分组和排除逻辑
- 代码逻辑详细解释:
- 代码逻辑解释:窗口聚合状态管理与元组过滤
- 代码逻辑详细解释:
- ExecEndWindowAgg 函数
声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 postgresql-15.0 的开源代码和《PostgresSQL数据库内核分析》一书
在【PostgreSQL内核学习 —— (WindowAgg(一))】中,我们介绍了窗口函数以及窗口聚合的核心计算过程。本文我们继续学习WindowAgg算子的具体实现逻辑。
WindowAggState 结构体
WindowAggState
结构体用于表示窗口聚合操作的执行状态。它存储了窗口聚合的相关数据和信息,包括窗口函数、聚合函数、分区信息、排序信息等。该结构体主要用于窗口聚合节点在查询执行过程中维护当前的执行状态和环境,确保窗口聚合操作按照指定的框架(例如,RANGE
、GROUPS
等)正确地执行。
typedef struct WindowAggState
{
ScanState ss; /* 结构体的第一个字段是NodeTag,表示此结构体属于ScanState类型 */
/* 以下字段由ExecInitExpr填充,用于窗口聚合操作的初始化 */
List *funcs; /* 存储所有窗口函数节点的列表,这些函数将应用于目标列表 */
int numfuncs; /* 窗口函数的总数量 */
int numaggs; /* 纯聚合函数的数量 */
WindowStatePerFunc perfunc; /* 存储每个窗口函数的状态信息 */
WindowStatePerAgg peragg; /* 存储每个纯聚合函数的状态信息 */
ExprState *partEqfunction; /* 分区列的相等性函数,用于分区的比较 */
ExprState *ordEqfunction; /* 排序列的相等性函数,用于排序列的比较 */
Tuplestorestate *buffer; /* 存储当前分区的元组行数据 */
int current_ptr; /* 当前分区中正在读取的行的指针位置 */
int framehead_ptr; /* 当前窗口框架头的指针位置(若使用了框架) */
int frametail_ptr; /* 当前窗口框架尾的指针位置(若使用了框架) */
int grouptail_ptr; /* 当前组尾的指针位置(若使用了分组模式) */
int64 spooled_rows; /* 当前分区中存储的行数 */
int64 currentpos; /* 当前元组在分区中的位置 */
int64 frameheadpos; /* 当前框架头的位置 */
int64 frametailpos; /* 当前框架尾的位置(框架的结束位置+1) */
/* 用于聚合获取窗口对象(聚合操作的处理) */
struct WindowObjectData *agg_winobj; /* 聚合窗口对象,供聚合操作使用 */
int64 aggregatedbase; /* 当前聚合的起始行 */
int64 aggregatedupto; /* 聚合处理到的行的上限 */
WindowAggStatus status; /* 窗口聚合状态,表示当前窗口的执行状态 */
int frameOptions; /* 窗口框架的选项,控制窗口的计算方式 */
ExprState *startOffset; /* 表示窗口起始偏移量的表达式 */
ExprState *endOffset; /* 表示窗口结束偏移量的表达式 */
Datum startOffsetValue; /* startOffset计算后的结果值 */
Datum endOffsetValue; /* endOffset计算后的结果值 */
/* 以下字段用于RANGE偏移的PRECEDING/FOLLOWING模式 */
FmgrInfo startInRangeFunc; /* 用于startOffset的in_range函数 */
FmgrInfo endInRangeFunc; /* 用于endOffset的in_range函数 */
Oid inRangeColl; /* 用于in_range测试的排序规则 */
bool inRangeAsc; /* 在in_range测试中,是否使用升序排序 */
bool inRangeNullsFirst; /* 在in_range测试中,null值是否排在前面 */
/* 以下字段用于GROUPS模式 */
int64 currentgroup; /* 当前元组所属的组号 */
int64 frameheadgroup; /* 当前窗口框架头所在的组号 */
int64 frametailgroup; /* 当前窗口框架尾所在的组号 */
int64 groupheadpos; /* 当前元组所在组的头部位置 */
int64 grouptailpos; /* 当前元组所在组的尾部位置(组的结束位置+1) */
/* 内存上下文,用于分区生命周期内的数据存储 */
MemoryContext partcontext; /* 存储分区数据的内存上下文 */
MemoryContext aggcontext; /* 聚合数据的共享内存上下文 */
MemoryContext curaggcontext; /* 当前聚合操作的内存上下文 */
ExprContext *tmpcontext; /* 短期的表达式计算上下文 */
ExprState *runcondition; /* 窗口聚合执行的条件,若不满足该条件,则停止执行 */
bool use_pass_through; /* 如果为false,表示当runcondition不满足时,停止执行窗口聚合;否则继续评估窗口函数 */
bool top_window; /* 如果为true,表示这是最顶层的窗口聚合,或是唯一的窗口聚合 */
bool all_first; /* 如果为true,表示这是第一次扫描分区,尚未处理任何行 */
bool partition_spooled; /* 如果为true,表示当前分区的所有元组已经被存储到tuplestore中 */
bool more_partitions; /* 如果为true,表示还有更多的分区需要处理 */
bool framehead_valid; /* 如果为true,表示当前行的框架头位置已经被更新 */
bool frametail_valid; /* 如果为true,表示当前行的框架尾位置已经被更新 */
bool grouptail_valid; /* 如果为true,表示当前行的组尾位置已经被更新 */
/* 当前分区的第一行元组,用于开始或处理下一个分区 */
TupleTableSlot *first_part_slot; /* 当前或下一个分区的第一个元组 */
TupleTableSlot *framehead_slot; /* 当前框架的头部元组 */
TupleTableSlot *frametail_slot; /* 当前框架的尾部元组 */
/* 临时槽,用于从tuplestore中恢复元组 */
TupleTableSlot *agg_row_slot;
TupleTableSlot *temp_slot_1;
TupleTableSlot *temp_slot_2;
} WindowAggState;
窗口聚合行为
以下代码段定义了许多与窗口函数相关的框架选项(frameOptions
),这些选项通过位运算来设置窗口聚合行为。具体来说,它们用于描述窗口框架的开始、结束、排除类型、以及是否使用默认行为等。
/*
* frameOptions 是这些位的按位 OR。NONDEFAULT 和 BETWEEN 位用于让 ruleutils.c
* 可以识别哪些属性是用户指定的,哪些是默认值;无论如何,必须设置正确的行为位。
* START_foo 和 END_foo 选项必须成对出现,以便 gram.y 方便处理,
* 尽管其中一些选项是无效或无用的。
*/
#define FRAMEOPTION_NONDEFAULT 0x00001 /* 是否有任何选项被指定? */
#define FRAMEOPTION_RANGE 0x00002 /* 使用 RANGE 行为:表示窗口是基于值范围来定义的 */
#define FRAMEOPTION_ROWS 0x00004 /* 使用 ROWS 行为:表示窗口是基于行数来定义的 */
#define FRAMEOPTION_GROUPS 0x00008 /* 使用 GROUPS 行为:表示窗口是基于分组来定义的 */
#define FRAMEOPTION_BETWEEN 0x00010 /* 是否给定了 BETWEEN 关键字? */
#define FRAMEOPTION_START_UNBOUNDED_PRECEDING 0x00020 /* 起始位置为 "UNBOUNDED PRECEDING" */
#define FRAMEOPTION_END_UNBOUNDED_PRECEDING 0x00040 /* 结束位置为 "UNBOUNDED PRECEDING"(不允许) */
#define FRAMEOPTION_START_UNBOUNDED_FOLLOWING 0x00080 /* 起始位置为 "UNBOUNDED FOLLOWING"(不允许) */
#define FRAMEOPTION_END_UNBOUNDED_FOLLOWING 0x00100 /* 结束位置为 "UNBOUNDED FOLLOWING" */
#define FRAMEOPTION_START_CURRENT_ROW 0x00200 /* 起始位置为 "CURRENT ROW" */
#define FRAMEOPTION_END_CURRENT_ROW 0x00400 /* 结束位置为 "CURRENT ROW" */
#define FRAMEOPTION_START_OFFSET_PRECEDING 0x00800 /* 起始位置为偏移量 "PRECEDING" */
#define FRAMEOPTION_END_OFFSET_PRECEDING 0x01000 /* 结束位置为偏移量 "PRECEDING" */
#define FRAMEOPTION_START_OFFSET_FOLLOWING 0x02000 /* 起始位置为偏移量 "FOLLOWING" */
#define FRAMEOPTION_END_OFFSET_FOLLOWING 0x04000 /* 结束位置为偏移量 "FOLLOWING" */
#define FRAMEOPTION_EXCLUDE_CURRENT_ROW 0x08000 /* 排除当前行(CURRENT ROW) */
#define FRAMEOPTION_EXCLUDE_GROUP 0x10000 /* 排除当前行及其同组行(peer group) */
#define FRAMEOPTION_EXCLUDE_TIES 0x20000 /* 排除当前行及其相同的同行(ties) */
/* 对于偏移量选项,起始和结束偏移量必须一起使用 */
#define FRAMEOPTION_START_OFFSET \
(FRAMEOPTION_START_OFFSET_PRECEDING | FRAMEOPTION_START_OFFSET_FOLLOWING)
#define FRAMEOPTION_END_OFFSET \
(FRAMEOPTION_END_OFFSET_PRECEDING | FRAMEOPTION_END_OFFSET_FOLLOWING)
/* 排除选项,表示排除当前行、同组行或相同值的同行 */
#define FRAMEOPTION_EXCLUSION \
(FRAMEOPTION_EXCLUDE_CURRENT_ROW | FRAMEOPTION_EXCLUDE_GROUP | \
FRAMEOPTION_EXCLUDE_TIES)
/* 默认行为选项:指定默认的窗口框架行为 */
#define FRAMEOPTION_DEFAULTS \
(FRAMEOPTION_RANGE | FRAMEOPTION_START_UNBOUNDED_PRECEDING | \
FRAMEOPTION_END_CURRENT_ROW)
ExecInitWindowAgg 函数
ExecInitWindowAgg 函数用于初始化窗口聚合节点的运行时信息。它负责创建窗口聚合状态结构体,设置与窗口相关的各类内存上下文、表达式上下文,以及管理窗口函数和聚合函数的状态。该函数还初始化了处理查询计划所需的各种资源,包括元组描述符、扫描槽、结果槽等。通过为每个窗口函数分配状态,设置权限检查,以及为聚合函数创建 WindowObject,它为窗口聚合操作做好了充分的准备。最终,窗口聚合节点的状态被设置为运行状态,准备执行窗口聚合计算。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c
)
WindowAggState *
ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
{
WindowAggState *winstate; // 声明一个 WindowAggState 指针,表示窗口聚合操作的状态
Plan *outerPlan; // 声明一个指向外部计划节点的指针
ExprContext *econtext; // 表达式上下文,用于存储执行中的中间结果
ExprContext *tmpcontext; // 临时的表达式上下文
WindowStatePerFunc perfunc; // 每个窗口函数的状态信息
WindowStatePerAgg peragg; // 每个聚合操作的状态信息
int frameOptions = node->frameOptions; // 获取窗口的帧选项
int numfuncs, // 窗口函数的数量
wfuncno, // 当前窗口函数的编号
numaggs, // 聚合操作的数量
aggno; // 当前聚合操作的编号
TupleDesc scanDesc; // 扫描描述符,表示扫描操作的元数据
ListCell *l; // 列表单元,用于遍历窗口函数
/* 检查不支持的标志 */
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); // 断言不支持回溯和标记的标志
/*
* 创建状态结构
*/
winstate = makeNode(WindowAggState); // 创建并分配一个新的 WindowAggState 结构体
winstate->ss.ps.plan = (Plan *) node; // 将当前计划节点指针赋值给状态结构体中的计划字段
winstate->ss.ps.state = estate; // 将执行状态赋值给状态结构体中的状态字段
winstate->ss.ps.ExecProcNode = ExecWindowAgg; // 设置执行节点处理函数为 ExecWindowAgg
/* 将帧选项复制到状态节点以便于访问 */
winstate->frameOptions = frameOptions; // 将窗口帧选项复制到窗口聚合状态
/*
* 创建表达式上下文。我们需要两个,每个输入元组一个
* 处理和一个用于每个输出元组处理。我们有点作弊
* 通过使用ExecAssignExprContext()来构建两者。
*/
ExecAssignExprContext(estate, &winstate->ss.ps); // 为状态分配表达式上下文,用于输入元组处理
tmpcontext = winstate->ss.ps.ps_ExprContext; // 临时上下文用于存储中间结果
winstate->tmpcontext = tmpcontext; // 将临时上下文赋值给窗口状态
ExecAssignExprContext(estate, &winstate->ss.ps); // 为输出元组分配新的表达式上下文
/* 创建用于存储分区本地内存等的长期上下文 */
winstate->partcontext =
AllocSetContextCreate(CurrentMemoryContext,
"WindowAgg Partition",
ALLOCSET_DEFAULT_SIZES); // 创建一个长期存储上下文,用于保存分区相关的内存
/*
* 为聚合跨值等创建中期上下文。
*
* 请注意,移动聚合每个都使用自己的私有上下文,而不是这个上下文。
*/
winstate->aggcontext =
AllocSetContextCreate(CurrentMemoryContext,
"WindowAgg Aggregates",
ALLOCSET_DEFAULT_SIZES); // 创建一个中期存储上下文,用于保存聚合操作的临时值
/* 只有顶级WindowAgg可以具有qual */
Assert(node->plan.qual == NIL || node->topWindow); // 确保只有最顶层的窗口聚合节点才可能有 QUAL 子句
/* 初始化质量 */
winstate->ss.ps.qual = ExecInitQual(node->plan.qual,
(PlanState *) winstate); // 初始化查询条件(QUAL)
/*
* 如果我们从查询计划器收到运行条件,请设置运行条件。
* 设置后,这可能允许我们进入直通模式,以便我们
* 不必在中对WindowFuncs执行任何进一步的评估
* 当前分区或可能完全停止返回元组
* 元组位于同一分区中。
*/
winstate->runcondition = ExecInitQual(node->runCondition,
(PlanState *) winstate); // 设置窗口聚合的执行条件,决定是否进入通行模式
/*
* 当我们不是顶级WindowAgg节点,或者我们是但有一个
* PARTITION BY子句我们必须进入WINDOWAGG_PASSTHROUNG之一
* runCondition变为false时的模式。
*/
winstate->use_pass_through = !node->topWindow || node->partNumCols > 0; // 设置是否使用通行模式
/* 记住我们是顶窗还是顶窗下方 */
winstate->top_window = node->topWindow; // 记住当前是否为顶层窗口聚合
/*
* 初始化子节点
*/
outerPlan = outerPlan(node); // 获取窗口聚合节点的外部计划节点
outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags); // 初始化外部计划节点的状态
/*
* 初始化源元组类型(这也是我们将要初始化的元组类型
* 存储在元组存储中,并在我们所有的工作槽中使用)。
*/
ExecCreateScanSlotFromOuterPlan(estate, &winstate->ss, &TTSOpsMinimalTuple); // 创建扫描槽并初始化
scanDesc = winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; // 获取扫描槽的元数据描述符
/* 外部元组不是子元组,但始终是最小元组 */
winstate->ss.ps.outeropsset = true; // 设置外部操作槽已设置
winstate->ss.ps.outerops = &TTSOpsMinimalTuple; // 设置外部操作
winstate->ss.ps.outeropsfixed = true; // 设置外部操作已固定
/*
* 元组表初始化
*/
winstate->first_part_slot = ExecInitExtraTupleSlot(estate, scanDesc,
&TTSOpsMinimalTuple); // 初始化额外的元组槽,用于第一部分
winstate->agg_row_slot = ExecInitExtraTupleSlot(estate, scanDesc,
&TTSOpsMinimalTuple); // 初始化额外的元组槽,用于聚合行
winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate, scanDesc,
&TTSOpsMinimalTuple); // 初始化临时槽 1
winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc,
&TTSOpsMinimalTuple); // 初始化临时槽 2
/*
* 仅在需要时创建帧头和帧尾插槽(必须在中创建插槽
* 与update_frameheadpos和update_frameailpos完全相同的情况需要他们)
*/
winstate->framehead_slot = winstate->frametail_slot = NULL; // 初始化帧头和帧尾槽为 NULL
if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) // 如果帧选项为 RANGE 或 GROUPS
{
if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
node->ordNumCols != 0) ||
(frameOptions & FRAMEOPTION_START_OFFSET)) // 判断是否需要创建帧头槽
winstate->framehead_slot = ExecInitExtraTupleSlot(estate, scanDesc,
&TTSOpsMinimalTuple); // 初始化帧头槽
if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
node->ordNumCols != 0) ||
(frameOptions & FRAMEOPTION_END_OFFSET)) // 判断是否需要创建帧尾槽
winstate->frametail_slot = ExecInitExtraTupleSlot(estate, scanDesc,
&TTSOpsMinimalTuple); // 初始化帧尾槽
}
/*
* 初始化结果槽、类型和投影。
*/
ExecInitResultTupleSlotTL(&winstate->ss.ps, &TTSOpsVirtual); // 初始化结果槽
ExecAssignProjectionInfo(&winstate->ss.ps, NULL); // 设置投影信息
/* 设置用于比较元组的数据 */
if (node->partNumCols > 0) // 如果有分区列
winstate->partEqfunction =
execTuplesMatchPrepare(scanDesc,
node->partNumCols,
node->partColIdx,
node->partOperators,
node->partCollations,
&winstate->ss.ps); // 初始化分区列的匹配函数
if (node->ordNumCols > 0) // 如果有排序列
winstate->ordEqfunction =
execTuplesMatchPrepare(scanDesc,
node->ordNumCols,
node->ordColIdx,
node->ordOperators,
node->ordCollations,
&winstate->ss.ps); // 初始化排序列的匹配函数
/*
* 初始化将在执行过程中使用的函数。我们将为每个窗口函数创建独立的函数,
* 并使用 WindowAggState 作为每个函数的上下文。
*/
numfuncs = list_length(node->windowFuncs); // 获取窗口函数的数量
winstate->numWindowFuncs = numfuncs; // 将数量赋值给状态
winstate->windowFuncs = palloc0(sizeof(WindowStatePerFunc) * numfuncs); // 为窗口函数分配内存
wfuncno = 0;
foreach(l, node->windowFuncs) // 遍历窗口函数列表
{
WindowFunc *wf = (WindowFunc *) lfirst(l); // 获取当前窗口函数
/* 对于每个窗口函数,初始化它 */
perfunc = &winstate->windowFuncs[wfuncno]; // 获取当前窗口函数的状态
perfunc->winfunc = wf; // 将窗口函数赋值给状态结构
perfunc->wfuncno = wfuncno; // 记录当前窗口函数的编号
perfunc->framehead_slot = winstate->framehead_slot; // 为该函数设置帧头槽
perfunc->frametail_slot = winstate->frametail_slot; // 为该函数设置帧尾槽
/* 初始化窗口函数的每个函数状态 */
winstate->windowFuncs[wfuncno].state =
ExecInitWindowFunc(wf, estate, frameOptions); // 初始化每个窗口函数的状态
++wfuncno; // 递增窗口函数编号
}
/*
* 初始化聚合
*/
numaggs = list_length(node->aggFuncs); // 获取聚合操作的数量
winstate->numAggFuncs = numaggs; // 将聚合操作数量赋值给窗口状态
winstate->aggFuncs = palloc0(sizeof(WindowStatePerAgg) * numaggs); // 为聚合操作分配内存
aggno = 0;
foreach(l, node->aggFuncs) // 遍历聚合函数列表
{
Aggregate *agg = (Aggregate *) lfirst(l); // 获取当前聚合函数
/* 对于每个聚合,对其进行初始化 */
peragg = &winstate->aggFuncs[aggno]; // 获取当前聚合函数的状态
peragg->aggfunc = agg; // 将聚合函数赋值给状态结构
/* 初始化每个聚合的状态 */
winstate->aggFuncs[aggno].state =
ExecInitAggregate(agg, estate); // 初始化每个聚合操作的状态
++aggno; // 递增聚合操作编号
}
/* Done */
return winstate; // 返回初始化完成的窗口聚合状态
}
让我们通过一个具体的例子来分析 ExecInitWindowAgg 函数的每一步操作,构造一个简单的查询场景来演示其功能。假设我们有一个包含员工工资数据的表 employee
,查询计算每个部门的员工工资的滑动平均数(即窗口函数)。这个查询涉及窗口函数的初始化和运行。
假设的查询:
SELECT department_id, employee_id, salary,
AVG(salary) OVER (PARTITION BY department_id ORDER BY employee_id) AS avg_salary
FROM employee;
查询解析步骤:
- Plan Tree Generation:当执行查询时,查询计划生成器生成了一个执行计划,其中包含
WindowAgg
节点。WindowAgg
节点表示需要计算窗口聚合函数(如AVG
)的部分。 - 调用 ExecInitWindowAgg 函数:在执行查询时,
ExecInitWindowAgg
被调用来初始化WindowAgg
节点的执行环境。此函数的主要任务是为窗口函数设置运行时信息,包括内存上下文的分配、表达式上下文的初始化,以及窗口函数的相关状态。
具体的执行过程:
- 创建 WindowAggState 结构:
- 在查询执行期间,
ExecInitWindowAgg
创建一个WindowAggState
结构体,用于存储窗口聚合的运行时状态。- 该结构体包含对窗口函数所需数据的引用和内存上下文。
- 创建并分配内存上下文:
ExecInitWindowAgg
函数为每个窗口分配内存上下文。在该场景中,窗口函数会基于department_id
划分数据,因此需要为每个部门分配一个本地内存上下文。- 此外,创建了一个
aggcontext
,用于存储聚合操作的中间值。
- 初始化表达式上下文:
- 窗口聚合涉及的所有表达式(例如
AVG(salary)
)会有各自的上下文,ExecInitWindowAgg
为每个窗口函数分配一个表达式上下文。- 表达式上下文会存储评估窗口函数时所需的所有临时数据,如窗口的分区和排序列等。
- 初始化 WindowFunc:
- 这个查询包含了一个窗口函数
AVG(salary) OVER (PARTITION BY department_id ORDER BY employee_id)
,它将在窗口内计算salary
的平均值。ExecInitWindowAgg
为该窗口函数分配了一个WindowStatePerFunc
结构体,该结构体存储了该函数的执行状态、输入数据的类型、排序信息等。
- 初始化分区和排序列:
ExecInitWindowAgg
为department_id
和employee_id
列设置分区和排序操作。- 如果存在
PARTITION BY
或ORDER BY
子句,ExecInitWindowAgg
会设置必要的比较函数,以便在执行时能够正确地分区和排序数据。
- 初始化查询的其他部分:
- 在初始化窗口聚合时,
ExecInitWindowAgg
还会初始化额外的执行计划,如质检条件(qual
),分区边界的表达式等。- 它会为每个部门创建一个新的
WindowObject
,用于处理该部门的窗口聚合数据。
执行流程总结:
ExecInitWindowAgg
会通过初始化WindowAggState
,为每个窗口函数(如AVG(salary)
)分配执行状态和内存上下文。- 它会设置数据的分区和排序方式,以确保每个窗口函数能在正确的分区内工作,并按正确的顺序进行计算。
- 此外,它会为窗口聚合的每个阶段(如计算中间结果和最终结果)创建适当的上下文和资源。
- 最终,通过该函数初始化的状态结构,查询的执行计划能够在实际执行过程中评估每个窗口函数,并进行相应的聚合计算。
举例说明:
假设有以下表数据:
department_id | employee_id | salary |
---|---|---|
1 | 1 | 5000 |
1 | 2 | 6000 |
1 | 3 | 7000 |
2 | 1 | 8000 |
2 | 2 | 8500 |
窗口聚合查询通过 AVG(salary)
计算每个部门的滑动平均薪资。
对于 department_id = 1,AVG(salary) 会计算 5000, 6000, 7000 的平均值。
对于 department_id = 2,AVG(salary) 会计算 8000, 8500 的平均值。
在 ExecInitWindowAgg
的执行过程中,为每个部门创建了一个独立的内存上下文并设置了分区(department_id
)。然后在每个窗口函数中,根据分区的边界(如 department_id
),分别计算各自的平均薪资。
ExecWindowAgg 函数
ExecWindowAgg 函数,主要用于执行窗口聚合操作。它从子查询获取元组(行),并将它们存储到 tuplestore
(一个临时存储区),然后处理窗口函数。窗口聚合的计算方式允许在查询的每一行上对一组行进行操作,而不是只处理单独的行。以下是对代码每一行的详细中文注释以及这段代码的功能描述。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c
)
/* -----------------
* ExecWindowAgg
*
* ExecWindowAgg函数接收来自外部子查询的元组,
* 并将它们存储到tuplestore中,然后处理窗口函数。
* 该节点不会减少或筛选任何行,因此返回的行数与其外部子查询的结果完全相同。
* -----------------
*/
static TupleTableSlot *
ExecWindowAgg(PlanState *pstate)
{
// 将pstate转换为WindowAggState类型
WindowAggState *winstate = castNode(WindowAggState, pstate);
TupleTableSlot *slot;
ExprContext *econtext;
int i;
int numfuncs;
// 检查中断信号
CHECK_FOR_INTERRUPTS();
// 如果状态为WINDOWAGG_DONE,说明窗口聚合已经完成,返回NULL
if (winstate->status == WINDOWAGG_DONE)
return NULL;
/*
* 如果是第一次调用或执行了重新扫描,计算窗口框架的偏移值。
* 这些值在扫描过程中会保持不变。
* 如果偏移值为volatile表达式,我们只使用它的初始值。
*/
if (winstate->all_first)
{
int frameOptions = winstate->frameOptions;
ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
Datum value;
bool isnull;
int16 len;
bool byval;
// 计算开始偏移值
if (frameOptions & FRAMEOPTION_START_OFFSET)
{
Assert(winstate->startOffset != NULL);
value = ExecEvalExprSwitchContext(winstate->startOffset,
econtext,
&isnull);
if (isnull)
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("frame starting offset must not be null")));
// 将值复制到查询生命周期上下文中
get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
&len, &byval);
winstate->startOffsetValue = datumCopy(value, byval, len);
if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
{
// 偏移值应该是int8类型
int64 offset = DatumGetInt64(value);
if (offset < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
errmsg("frame starting offset must not be negative")));
}
}
// 计算结束偏移值
if (frameOptions & FRAMEOPTION_END_OFFSET)
{
Assert(winstate->endOffset != NULL);
value = ExecEvalExprSwitchContext(winstate->endOffset,
econtext,
&isnull);
if (isnull)
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("frame ending offset must not be null")));
// 将值复制到查询生命周期上下文中
get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
&len, &byval);
winstate->endOffsetValue = datumCopy(value, byval, len);
if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
{
// 偏移值应该是int8类型
int64 offset = DatumGetInt64(value);
if (offset < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
errmsg("frame ending offset must not be negative")));
}
}
// 标记为已计算偏移值
winstate->all_first = false;
}
// 处理每一行的数据
for (;;)
{
// 如果没有缓冲区,初始化新的分区,并将当前行位置设为0
if (winstate->buffer == NULL)
{
begin_partition(winstate);
}
else
{
// 否则,继续推进当前分区的行位置
winstate->currentpos++;
// 如果当前帧已被标记无效,需要重新计算
winstate->framehead_valid = false;
winstate->frametail_valid = false;
}
// 如果还没有将所有的元组读取到缓冲区中,继续读取
spool_tuples(winstate, winstate->currentpos);
// 如果当前分区的所有行已经读取完成,释放当前分区并继续处理下一个分区
if (winstate->partition_spooled &&
winstate->currentpos >= winstate->spooled_rows)
{
release_partition(winstate);
if (winstate->more_partitions)
{
begin_partition(winstate);
Assert(winstate->spooled_rows > 0);
// 切换到下一个分区时退出直通模式
winstate->status = WINDOWAGG_RUN;
}
else
{
// 如果没有更多的分区,说明处理完成
winstate->status = WINDOWAGG_DONE;
return NULL;
}
}
// 设置当前行的执行上下文
econtext = winstate->ss.ps.ps_ExprContext;
// 清除当前行的表达式上下文
ResetExprContext(econtext);
// 从tuplestore中读取当前行并保存到ScanTupleSlot中
tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
if ((winstate->frameOptions & (FRAMEOPTION_GROUPS |
FRAMEOPTION_EXCLUDE_GROUP |
FRAMEOPTION_EXCLUDE_TIES)) &&
winstate->currentpos > 0)
{
// 如果是GROUPS模式或需要处理排除组的情况
ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot);
if (!tuplestore_gettupleslot(winstate->buffer, true, true,
winstate->ss.ss_ScanTupleSlot))
elog(ERROR, "unexpected end of tuplestore");
if (!are_peers(winstate, winstate->temp_slot_2,
winstate->ss.ss_ScanTupleSlot))
{
winstate->currentgroup++;
winstate->groupheadpos = winstate->currentpos;
winstate->grouptail_valid = false;
}
ExecClearTuple(winstate->temp_slot_2);
}
else
{
if (!tuplestore_gettupleslot(winstate->buffer, true, true,
winstate->ss.ss_ScanTupleSlot))
elog(ERROR, "unexpected end of tuplestore");
}
// 在窗口函数执行时跳过直通模式
if (winstate->status == WINDOWAGG_RUN)
{
// 评估窗口函数
numfuncs = winstate->numfuncs;
for (i = 0; i < numfuncs; i++)
{
WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
// 跳过纯聚合函数
if (perfuncstate->plain_agg)
continue;
eval_windowfunction(winstate, perfuncstate,
&(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
&(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
}
// 评估聚合函数
if (winstate->numaggs > 0)
eval_windowaggregates(winstate);
}
// 如果创建了辅助读指针用于框架或组边界,确保它们保持最新
if (winstate->framehead_ptr >= 0)
update_frameheadpos(winstate);
if (winstate->frametail_ptr >= 0)
update_frametailpos(winstate);
if (winstate->grouptail_ptr >= 0)
update_grouptailpos(winstate);
// 删除不再需要的行
tuplestore_trim(winstate->buffer);
// 生成最终输出元组
econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
slot = ExecProject(winstate->ss.ps.ps_ProjInfo);
// 如果窗口聚合仍在运行,检查是否需要进入直通模式
if (winstate->status == WINDOWAGG_RUN)
{
econtext->ecxt_scantuple = slot; // 设置扫描的元组为当前的slot
/*
* 现在评估运行条件,看看是否需要进入传递模式(pass-through),或者可能完全停止。
*/
if (!ExecQual(winstate->runcondition, econtext)) // 如果运行条件不满足
{
/*
* 判断进入哪种模式。如果没有 PARTITION BY 子句且我们是顶层 WindowAgg,那么我们已经完成了。
* 这个元组以及未来的元组将不可能匹配运行条件。如果有 PARTITION BY 子句或者我们不是顶层窗口,
* 那么不能立即停止,因为我们需要处理其他分区,或者确保上层的 WindowAgg 节点接收到
* 它们所需的所有元组,以处理它们的窗口函数。
*/
if (winstate->use_pass_through) // 如果使用传递模式
{
/*
* 在有 PARTITION BY 子句时,顶层窗口需要严格的传递模式(STRICT pass-through)。
* 否则,我们必须确保存储那些不匹配运行条件的元组,以便它们可供上层的 WindowAgg 使用。
*/
if (winstate->top_window) // 如果是顶层窗口
{
winstate->status = WINDOWAGG_PASSTHROUGH_STRICT; // 设置为严格的传递模式
continue; // 继续循环,跳过后续处理
}
else
{
winstate->status = WINDOWAGG_PASSTHROUGH; // 设置为普通传递模式
/*
* 如果我们不是顶层窗口,我们最好将聚合结果设为 NULL。
* 在传递模式下我们不再更新这些结果,这样可以避免旧的过时结果残留。
* 其中一些结果可能是引用类型(byref),所以不能指向已经释放的内存。
* 规划器保证运行条件中的条件是严格的,因此顶层 WindowAgg 会在过滤子句中筛除这些 NULL。
*/
numfuncs = winstate->numfuncs;
for (i = 0; i < numfuncs; i++)
{
econtext->ecxt_aggvalues[i] = (Datum) 0; // 将聚合值设为 NULL
econtext->ecxt_aggnulls[i] = true; // 将聚合 NULL 标志设为 true
}
}
}
else
{
/*
* 不需要传递模式,我们可以直接返回 NULL。
* 因为没有其他元组会匹配运行条件。
*/
winstate->status = WINDOWAGG_DONE; // 设置状态为完成
return NULL; // 返回 NULL,表示窗口聚合结束
}
}
/*
* 过滤掉不需要的元组,只保留顶层 WindowAgg 需要的元组。
*/
if (!ExecQual(winstate->ss.ps.qual, econtext)) // 如果当前元组不符合过滤条件
{
InstrCountFiltered1(winstate, 1); // 统计被过滤的元组数
continue; // 继续下一轮循环,跳过当前元组
}
break; // 通过过滤,当前元组满足条件,跳出循环,开始处理该元组
}
/*
* 如果不处于 WINDOWAGG_RUN 模式,只要不是顶层窗口,我们必须返回当前元组。
*/
else if (!winstate->top_window) // 如果不是顶层窗口
break; // 跳出循环,返回当前元组
// 如果结果集合为空,返回NULL
if (TupIsNull(slot))
return NULL;
// 返回窗口结果
return slot;
}
}
代码逻辑解释:计算窗口偏移量
/*
* 如果是第一次调用或执行了重新扫描,计算窗口框架的偏移值。
* 这些值在扫描过程中会保持不变。
* 如果偏移值为volatile表达式,我们只使用它的初始值。
*/
if (winstate->all_first)
{
int frameOptions = winstate->frameOptions;
ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
Datum value;
bool isnull;
int16 len;
bool byval;
// 计算开始偏移值
if (frameOptions & FRAMEOPTION_START_OFFSET)
{
Assert(winstate->startOffset != NULL);
value = ExecEvalExprSwitchContext(winstate->startOffset,
econtext,
&isnull);
if (isnull)
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("frame starting offset must not be null")));
// 将值复制到查询生命周期上下文中
get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
&len, &byval);
winstate->startOffsetValue = datumCopy(value, byval, len);
if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
{
// 偏移值应该是int8类型
int64 offset = DatumGetInt64(value);
if (offset < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
errmsg("frame starting offset must not be negative")));
}
}
// 计算结束偏移值
if (frameOptions & FRAMEOPTION_END_OFFSET)
{
Assert(winstate->endOffset != NULL);
value = ExecEvalExprSwitchContext(winstate->endOffset,
econtext,
&isnull);
if (isnull)
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("frame ending offset must not be null")));
// 将值复制到查询生命周期上下文中
get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
&len, &byval);
winstate->endOffsetValue = datumCopy(value, byval, len);
if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
{
// 偏移值应该是int8类型
int64 offset = DatumGetInt64(value);
if (offset < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
errmsg("frame ending offset must not be negative")));
}
}
// 标记为已计算偏移值
winstate->all_first = false;
}
这段代码的目的是在 第一次调用 或者 重新扫描 时,计算并确定窗口框架的偏移量(startOffset
和 endOffset
)。这两个偏移量在整个扫描过程中保持不变。如果表达式是可变的(例如,用户提供了动态的表达式),代码只会使用 初始值,确保偏移量不会在扫描过程中变化。
代码逻辑详细解释:
-
初始化:
winstate->all_first
是一个标记,用来指示这是 第一次执行 窗口聚合操作(或者在重新扫描时)。frameOptions
是一个标志,表示启用了哪些窗口框架选项,比如开始和结束偏移量。ExprContext *econtext
获取当前的执行上下文,便于在评估表达式时使用。Datum value
存储计算得到的偏移量。isnull
是一个标志,表示评估的结果是否为 NULL。len
,byval
是用于复制数据的类型信息。
-
计算
startOffset
(开始偏移量):- 如果启用了
FRAMEOPTION_START_OFFSET
,即窗口框架指定了一个开始偏移量,则通过ExecEvalExprSwitchContext
计算该偏移量的值。 - 如果计算结果为
NULL
,则抛出错误(偏移量不能为NULL
)。 - 偏移量值被复制到查询生命周期内的内存上下文中,确保其在整个查询过程中有效。
- 如果框架行为是
ROWS
或GROUPS
,则假设偏移量是一个整数(int64
),并检查其是否为负数。如果是负数,抛出错误(偏移量不能为负数)。
- 如果启用了
-
计算
endOffset
(结束偏移量):- 如果启用了
FRAMEOPTION_END_OFFSET
,即窗口框架指定了一个结束偏移量,则与startOffset
的处理类似,计算并存储结束偏移量的值。 - 如果结果为
NULL
,同样抛出错误。 - 如果偏移量为负数,同样抛出错误。
- 如果启用了
-
更新
winstate->all_first
:- 最后,将
winstate->all_first
设置为false
,表示不再是第一次扫描。
- 最后,将
代码逻辑解释:窗口聚合分区初始化与行推进逻辑
// 如果没有缓冲区,初始化新的分区,并将当前行位置设为0
if (winstate->buffer == NULL)
{
begin_partition(winstate);
}
else
{
// 否则,继续推进当前分区的行位置
winstate->currentpos++;
// 如果当前帧已被标记无效,需要重新计算
winstate->framehead_valid = false;
winstate->frametail_valid = false;
}
这段代码主要处理了窗口聚合操作中的数据分区和当前行的移动。它根据窗口的状态(是否已经初始化过缓冲区)来决定是初始化分区还是在已有分区中推进当前行。接下来,我将逐步解释这段代码的逻辑:
代码逻辑详细解释:
-
初始化分区:
- 这里的
winstate->buffer == NULL
表示当前窗口聚合状态还没有缓冲区,说明还没有处理任何分区。 begin_partition(winstate);
会调用一个函数来初始化当前的分区。通常,这个函数会为窗口聚合操作准备数据缓冲区,并将当前行的指针 (currentpos
) 设置为0
,即当前行指向分区中的第一个行。- 如果当前分区没有输入行,程序会在后续逻辑中检测到这一情况并退出。
- 这里的
-
推进当前行:
- 如果
winstate->buffer
已经存在,表示窗口聚合操作已经在处理某个分区。此时,程序推进当前行:winstate->currentpos++;
:将当前行指针加一,表示处理到了分区中的下一行数据。
- 推进当前行后,可能会导致窗口帧的位置发生变化,因此需要将
framehead_valid
和frametail_valid
标记为false
,表示这些位置的值不再有效,需要在后续重新计算。winstate->framehead_valid = false;
和winstate->frametail_valid = false;
将窗口帧头和帧尾的有效性标记为无效。
- 代码中有一个注释说明
grouptail
的有效性无需在这里无效化,具体逻辑将在后续的代码中处理。
- 如果
总结:
- 这段代码根据窗口聚合的状态决定是否初始化新的分区,或者推进当前行。
- 如果是新的分区,初始化分区并将当前行设置为
0
。- 如果是已经处理过的分区,程序会推进当前行,同时标记窗口帧的头部和尾部为无效,表示需要在后续重新计算帧的位置。
代码逻辑解释:从Tuplestore读取当前行并处理分组和排除逻辑
// 从tuplestore中读取当前行并保存到ScanTupleSlot中
tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
if ((winstate->frameOptions & (FRAMEOPTION_GROUPS |
FRAMEOPTION_EXCLUDE_GROUP |
FRAMEOPTION_EXCLUDE_TIES)) &&
winstate->currentpos > 0)
{
// 如果是GROUPS模式或需要处理排除组的情况
ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot);
if (!tuplestore_gettupleslot(winstate->buffer, true, true,
winstate->ss.ss_ScanTupleSlot))
elog(ERROR, "unexpected end of tuplestore");
if (!are_peers(winstate, winstate->temp_slot_2,
winstate->ss.ss_ScanTupleSlot))
{
winstate->currentgroup++;
winstate->groupheadpos = winstate->currentpos;
winstate->grouptail_valid = false;
}
ExecClearTuple(winstate->temp_slot_2);
}
else
{
if (!tuplestore_gettupleslot(winstate->buffer, true, true,
winstate->ss.ss_ScanTupleSlot))
elog(ERROR, "unexpected end of tuplestore");
}
这段代码的核心功能是从tuplestore
中读取当前行,并根据窗口聚合的不同选项(如GROUPS
模式、排除组和排除相等值)进行相应的处理,确保处理过程中正确更新当前行的分组信息。
代码逻辑详细解释:
-
从tuplestore读取当前行:
- 使用
tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
设置读取指针,指向当前的读取位置。 - 然后调用
tuplestore_gettupleslot()
来从tuplestore
中获取当前元组,并将其保存到winstate->ss.ss_ScanTupleSlot
中。
- 使用
-
处理GROUPS模式及排除组/排除相等的情况:
- 如果
winstate->frameOptions
启用了FRAMEOPTION_GROUPS、FRAMEOPTION_EXCLUDE_GROUP
或FRAMEOPTION_EXCLUDE_TIES
,并且winstate->currentpos
大于0
(意味着当前已经不是分区中的第一个元组),则执行以下步骤:- 使用
ExecCopySlot()
将当前元组拷贝到临时槽winstate->temp_slot_2
。 - 再次从
tuplestore
中获取下一个元组,确保元组被成功读取。 - 调用
are_peers()
函数检查当前元组与临时槽中存储的上一个元组是否属于同一个分组。如果不是同一个分组(即are_peers()
返回false
),则:- 增加
winstate->currentgroup
计数器,表示当前分组已结束并开始新的分组。 - 更新
winstate->groupheadpos
为当前元组的位置,标记当前分组的起始位置。 - 设置
winstate->grouptail_valid = false
,表明当前分组尾部信息无效,需要重新计算。
- 增加
- 使用
ExecClearTuple()
清除临时槽中的数据,释放空间。
- 使用
- 如果
-
默认处理:
- 如果不需要处理
GROUPS
模式或排除逻辑(即上述条件不满足),直接从tuplestore
中读取当前元组并存储到winstate->ss.ss_ScanTupleSlot
中。 - 如果无法成功获取元组,报错
unexpected end of tuplestore
。
- 如果不需要处理
代码逻辑解释:窗口聚合状态管理与元组过滤
if (winstate->status == WINDOWAGG_RUN)
{
econtext->ecxt_scantuple = slot; // 设置扫描的元组为当前的slot
/*
* 现在评估运行条件,看看是否需要进入传递模式(pass-through),或者可能完全停止。
*/
if (!ExecQual(winstate->runcondition, econtext)) // 如果运行条件不满足
{
/*
* 判断进入哪种模式。如果没有 PARTITION BY 子句且我们是顶层 WindowAgg,那么我们已经完成了。
* 这个元组以及未来的元组将不可能匹配运行条件。如果有 PARTITION BY 子句或者我们不是顶层窗口,
* 那么不能立即停止,因为我们需要处理其他分区,或者确保上层的 WindowAgg 节点接收到
* 它们所需的所有元组,以处理它们的窗口函数。
*/
if (winstate->use_pass_through) // 如果使用传递模式
{
/*
* 在有 PARTITION BY 子句时,顶层窗口需要严格的传递模式(STRICT pass-through)。
* 否则,我们必须确保存储那些不匹配运行条件的元组,以便它们可供上层的 WindowAgg 使用。
*/
if (winstate->top_window) // 如果是顶层窗口
{
winstate->status = WINDOWAGG_PASSTHROUGH_STRICT; // 设置为严格的传递模式
continue; // 继续循环,跳过后续处理
}
else
{
winstate->status = WINDOWAGG_PASSTHROUGH; // 设置为普通传递模式
/*
* 如果我们不是顶层窗口,我们最好将聚合结果设为 NULL。
* 在传递模式下我们不再更新这些结果,这样可以避免旧的过时结果残留。
* 其中一些结果可能是引用类型(byref),所以不能指向已经释放的内存。
* 规划器保证运行条件中的条件是严格的,因此顶层 WindowAgg 会在过滤子句中筛除这些 NULL。
*/
numfuncs = winstate->numfuncs;
for (i = 0; i < numfuncs; i++)
{
econtext->ecxt_aggvalues[i] = (Datum) 0; // 将聚合值设为 NULL
econtext->ecxt_aggnulls[i] = true; // 将聚合 NULL 标志设为 true
}
}
}
else
{
/*
* 不需要传递模式,我们可以直接返回 NULL。
* 因为没有其他元组会匹配运行条件。
*/
winstate->status = WINDOWAGG_DONE; // 设置状态为完成
return NULL; // 返回 NULL,表示窗口聚合结束
}
}
/*
* 过滤掉不需要的元组,只保留顶层 WindowAgg 需要的元组。
*/
if (!ExecQual(winstate->ss.ps.qual, econtext)) // 如果当前元组不符合过滤条件
{
InstrCountFiltered1(winstate, 1); // 统计被过滤的元组数
continue; // 继续下一轮循环,跳过当前元组
}
break; // 通过过滤,当前元组满足条件,跳出循环,开始处理该元组
}
/*
* 如果不处于 WINDOWAGG_RUN 模式,只要不是顶层窗口,我们必须返回当前元组。
*/
else if (!winstate->top_window) // 如果不是顶层窗口
break; // 跳出循环,返回当前元组
这段代码逻辑主要是用来处理窗口聚合操作中的运行条件(runcondition
)和控制窗口的传递模式。在窗口聚合过程中,可能会切换到pass-through
模式(即跳过窗口函数计算的状态),并进行一系列条件判断。
代码逻辑详细解释:
-
检查当前状态:
- 代码首先检查
winstate->status
是否等于WINDOWAGG_RUN
,只有在窗口聚合处于运行模式时才会执行接下来的操作。
- 代码首先检查
-
运行条件判断:
ExecQual(winstate->runcondition, econtext)
:通过执行runcondition
来判断当前行是否满足窗口函数的运行条件。如果不满足条件,则需要切换到pass-through
模式或者结束处理。winstate->use_pass_through
控制是否使用pass-through
模式:- 如果是
top_window
且有PARTITION BY
子句,则进入严格的pass-through
模式:winstate->status = WINDOWAGG_PASSTHROUGH_STRICT
。 - 如果是非顶层窗口且不满足
runcondition
,则进入普通的pass-through
模式,窗口聚合的结果会被清空。 - 如果不是顶层窗口,且不进入
pass-through
模式,则会将窗口聚合的结果设置为NULL
,避免残留旧结果。
- 如果是
-
结束处理:
- 如果没有
pass-through
模式且没有满足条件的行,直接将winstate->status
设置为WINDOWAGG_DONE
,结束窗口聚合操作,返回NULL
。
- 如果没有
-
过滤不需要的元组:
- 通过
ExecQual(winstate->ss.ps.qual, econtext)
来判断当前行是否需要。在顶层窗口中,如果当前行不满足条件,则跳过该行。 InstrCountFiltered1(winstate, 1)
统计跳过的元组数。
- 通过
-
跳过条件不满足的元组:
- 如果运行条件不满足,则直接跳过当前元组,进入下一轮循环。
-
返回符合条件的元组:
- 在当前元组符合所有条件的情况下,跳出循环并继续处理。
ExecEndWindowAgg 函数
ExecEndWindowAgg 函数用于结束窗口聚合(WindowAgg
)节点的执行,释放与该节点相关的资源。该函数的作用是清理和释放内存,以确保在节点处理完成后不再占用任何多余的资源。(路径:src\backend\executor\nodeWindowAgg.c
)
/* -----------------
* ExecEndWindowAgg
* -----------------
*/
void
ExecEndWindowAgg(WindowAggState *node)
{
PlanState *outerPlan; // 用于保存外部计划节点的状态
int i;
// 释放分区相关资源
release_partition(node);
// 清理扫描元组槽中的数据
ExecClearTuple(node->ss.ss_ScanTupleSlot);
// 清理第一个分区槽中的数据
ExecClearTuple(node->first_part_slot);
// 清理聚合行槽中的数据
ExecClearTuple(node->agg_row_slot);
// 清理临时槽1中的数据
ExecClearTuple(node->temp_slot_1);
// 清理临时槽2中的数据
ExecClearTuple(node->temp_slot_2);
// 如果存在 framehead_slot,清理该槽中的数据
if (node->framehead_slot)
ExecClearTuple(node->framehead_slot);
// 如果存在 frametail_slot,清理该槽中的数据
if (node->frametail_slot)
ExecClearTuple(node->frametail_slot);
/*
* 释放两个表达式上下文(ExprContext)。
*/
// 释放当前节点的表达式上下文
ExecFreeExprContext(&node->ss.ps);
// 将临时上下文赋值给当前表达式上下文
node->ss.ps.ps_ExprContext = node->tmpcontext;
// 再次释放表达式上下文
ExecFreeExprContext(&node->ss.ps);
// 遍历每个聚合函数,释放其对应的上下文
for (i = 0; i < node->numaggs; i++)
{
// 如果当前聚合函数的上下文不是默认的聚合上下文,则释放该上下文
if (node->peragg[i].aggcontext != node->aggcontext)
MemoryContextDelete(node->peragg[i].aggcontext);
}
// 释放分区上下文
MemoryContextDelete(node->partcontext);
// 释放聚合上下文
MemoryContextDelete(node->aggcontext);
// 释放聚合函数和聚合状态数组的内存
pfree(node->perfunc);
pfree(node->peragg);
// 获取外部计划节点的状态
outerPlan = outerPlanState(node);
// 结束外部计划节点的执行
ExecEndNode(outerPlan);
}