【PostgreSQL内核学习 —— (WindowAgg(一))】
WindowAgg
- 窗口函数介绍
- WindowAgg
- 理论层面
- 源码层面
- WindowObjectData 结构体
- WindowStatePerFuncData 结构体
- WindowStatePerAggData 结构体
- eval_windowaggregates 函数
- update_frameheadpos 函数
声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 postgresql-15.0 的开源代码和《PostgresSQL数据库内核分析》一书
窗口函数介绍
首先,我将提供一个简单的 SQL 用例,并逐步解读窗口函数的使用过程。假设我们有一个名为 sales 的销售数据表,表结构如下:
CREATE TABLE sales (
id SERIAL PRIMARY KEY,
salesperson_id INT,
sale_date DATE,
sale_amount NUMERIC
);
假设 sales 表包含以下数据:
id | salesperson_id | sale_date | sale_amount |
---|---|---|---|
1 | 1 | 2024-01-01 | 1000 |
2 | 1 | 2024-01-02 | 1200 |
3 | 2 | 2024-01-01 | 800 |
4 | 2 | 2024-01-02 | 1100 |
5 | 3 | 2024-01-01 | 1500 |
6 | 3 | 2024-01-02 | 1300 |
SQL 用例:使用窗口函数计算每个销售人员的累计销售金额
我们希望计算每个销售人员在每个销售记录的日期上的累计销售金额。为了实现这一目标,我们可以使用 SUM() 函数,它会对每个销售人员的数据进行累计。
SQL 查询如下:
SELECT
id,
salesperson_id,
sale_date,
sale_amount,
SUM(sale_amount) OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_sales
FROM
sales
ORDER BY
salesperson_id, sale_date;
详细解读:
SUM(sale_amount)
这是一个聚合函数,通常用于对某个列的值进行汇总。在这个查询中,SUM(sale_amount) 用于计算销售额的累计值。
OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
这是一个窗口函数的关键部分,指定了如何对结果进行分区、排序和聚合。具体来说:
PARTITION BY salesperson_id
:这是窗口函数的分区操作,将数据按 salesperson_id(销售人员 ID)分区。也就是说,每个销售人员的数据将分别计算,不同销售人员的累计销售是独立的。ORDER BY sale_date
:对每个分区内的数据按销售日期 (sale_date) 进行排序,确保累计计算是按时间顺序进行的。ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
:这是一个窗口帧的定义,意味着每个分区的累计值从该分区的第一行开始计算,一直到当前行。UNBOUNDED PRECEDING
表示从分区的第一行开始,CURRENT ROW
表示包括当前行。
- 结果分析:
查询结果将会返回每个销售人员的每笔销售记录,并在 cumulative_sales 列显示该销售人员的累计销售金额。例如:
id | salesperson_id | sale_date | sale_amount | cumulative_sales |
---|---|---|---|---|
1 | 1 | 2024-01-01 | 1000 | 1000 |
2 | 1 | 2024-01-02 | 1200 | 2200 |
3 | 2 | 2024-01-01 | 800 | 800 |
4 | 2 | 2024-01-02 | 1100 | 1900 |
5 | 3 | 2024-01-01 | 1500 | 1500 |
6 | 3 | 2024-01-02 | 1300 | 2800 |
- 对于销售人员 1,第一个销售记录的累计销售金额为 1000,第二个销售记录的累计销售金额为
1000 + 1200 = 2200
。- 对于销售人员 2,第一个销售记录的累计销售金额为 800,第二个销售记录的累计销售金额为
800 + 1100 = 1900
。- 对于销售人员 3,第一个销售记录的累计销售金额为 1500,第二个销售记录的累计销售金额为
1500 + 1300 = 2800
。
窗口函数的工作机制:
- 分区:窗口函数首先会根据
PARTITION BY
子句将数据分成不同的分区。这里,数据按salesperson_id
分区,每个销售人员的记录组成一个分区。 - 排序:在每个分区内,数据会根据
ORDER BY
子句进行排序。在这个例子中,按sale_date
对每个销售人员的销售记录按时间顺序进行排序。 - 累计:
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
确保了每个销售人员从分区的第一行开始,直到当前行的所有销售记录都会被累加,形成一个累积的结果。
更多详细的窗口函数使用教程可以参阅:GaussDB(DWS) SQL进阶之SQL操作之窗口函数
WindowAgg
理论层面
下面我们来了解一下 WindowAgg 算子,先看看书中的描述:
书中详细描述了 WindowAgg 节点在 PostgreSQL 中处理窗口函数时的执行过程,包括如何管理分区、排序、聚合等。通过 WindowAggState 和相关的数据结构,窗口聚合可以高效地计算多个窗口函数,同时保持对数据的完整性。性能优化方面,窗口函数的排序和缓存机制也起到了关键作用,帮助提升计算效率。
源码层面
WindowObjectData 结构体
WindowObjectData 结构体用于在窗口函数调用过程中保存与窗口聚合操作相关的状态信息。在 PostgreSQL 中,窗口函数用于基于窗口进行计算,而每个窗口函数可能需要不同的上下文状态来处理其数据。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c
)
/*
* 所有窗口函数的API都通过这个对象进行调用,该对象会作为fcinfo->context传递给窗口函数。
*/
typedef struct WindowObjectData
{
NodeTag type; /* 类型标识符,用于区分不同的节点类型 */
WindowAggState *winstate; /* 指向父级窗口聚合状态的指针,用于获取窗口聚合的上下文状态 */
List *argstates; /* 窗口函数参数的表达式状态树 */
void *localmem; /* 当前窗口函数在执行过程中使用的局部内存,由WinGetPartitionLocalMemory分配 */
int markptr; /* 用于标记当前窗口函数状态的tuplestore标记指针 */
int readptr; /* 读取指针,指向当前正在处理的行位置 */
int64 markpos; /* 标记指针所指向的行号 */
int64 seekpos; /* 读取指针所指向的行号 */
} WindowObjectData;
WindowStatePerFuncData 结构体
WindowStatePerFuncData 结构体用于存储与窗口函数和窗口聚合操作相关的工作状态和数据。它包含了窗口函数执行时需要的各种信息,如参数数量、排序规则、结果类型、是否为聚合函数等。这些信息对于在窗口函数计算过程中正确管理和执行窗口函数非常重要。在 PostgreSQL 中,窗口函数的执行涉及多次状态保存和计算,而这个结构体便用于管理这些窗口函数的具体执行细节。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c
)
/*
* 为每个由该节点处理的窗口函数和窗口聚合创建一个 WindowStatePerFunc 结构体。
*/
typedef struct WindowStatePerFuncData
{
/* 链接到与此工作状态相关的 WindowFunc 表达式和状态节点 */
WindowFuncExprState *wfuncstate; /* 当前窗口函数的表达式状态 */
WindowFunc *wfunc; /* 当前窗口函数的定义(结构体) */
int numArguments; /* 窗口函数的参数数量 */
FmgrInfo flinfo; /* 用于窗口函数的 fmgr 查找数据,存储有关函数的信息 */
Oid winCollation; /* 窗口函数的排序规则,由当前函数派生 */
/*
* 我们需要窗口函数结果的长度和 byval 信息,以便知道如何复制/删除值。
*/
int16 resulttypeLen; /* 窗口函数返回值类型的长度 */
bool resulttypeByVal; /* 窗口函数返回值类型是否为按值传递 */
bool plain_agg; /* 是否仅为普通的聚合函数? */
int aggno; /* 如果是,指明其对应的 WindowStatePerAggData 的索引 */
WindowObject winobj; /* 用于窗口函数 API 的对象 */
} WindowStatePerFuncData;
WindowStatePerAggData 结构体
WindowStatePerAggData 结构体主要用于保存窗口聚合过程中普通聚合函数的工作状态。它包含了有关过渡函数、最终函数、初始值、当前帧的聚合结果、过渡值等详细信息。通过这些信息,系统可以正确地计算窗口聚合函数的结果,处理每个聚合操作的中间状态,确保聚合计算按预期执行。此外,该结构体还考虑了内存管理和函数调用的效率,使得聚合操作在处理大数据量时能够高效执行。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c
)
/*
* 对于普通的聚合窗口函数,我们也有一个这样的结构体。
*/
typedef struct WindowStatePerAggData
{
/* 聚合函数的过渡函数 OID */
Oid transfn_oid; /* 聚合函数的过渡函数的 OID */
Oid invtransfn_oid; /* 反向过渡函数的 OID,可能是 InvalidOid */
Oid finalfn_oid; /* 最终函数的 OID,可能是 InvalidOid */
/*
* 聚合过渡函数的 fmgr 查找数据 --- 只有当对应的 OID 不为 InvalidOid 时才有效。
* 特别注意,函数的 fn_strict 标志在这里保存。
*/
FmgrInfo transfn; /* 聚合函数的过渡函数的 fmgr 查找数据 */
FmgrInfo invtransfn; /* 反向过渡函数的 fmgr 查找数据 */
FmgrInfo finalfn; /* 最终函数的 fmgr 查找数据 */
int numFinalArgs; /* 传递给最终函数的参数个数 */
/*
* 来自 pg_aggregate 入口的初始值
*/
Datum initValue; /* 初始值 */
bool initValueIsNull; /* 初始值是否为 NULL */
/*
* 当前帧边界的缓存值
*/
Datum resultValue; /* 当前计算帧的结果值 */
bool resultValueIsNull; /* 结果值是否为 NULL */
/*
* 需要输入、结果和过渡数据类型的长度和 byval 信息,
* 以便知道如何复制/删除值。
*/
int16 inputtypeLen, /* 输入类型的长度 */
resulttypeLen, /* 结果类型的长度 */
transtypeLen; /* 过渡数据类型的长度 */
bool inputtypeByVal, /* 输入类型是否按值传递 */
resulttypeByVal, /* 结果类型是否按值传递 */
transtypeByVal; /* 过渡数据类型是否按值传递 */
int wfuncno; /* 关联的 WindowStatePerFuncData 的索引 */
/* 持有过渡值和可能的其他附加数据的上下文 */
MemoryContext aggcontext; /* 聚合上下文,可能是私有的,或 winstate->aggcontext */
/* 当前的过渡值 */
Datum transValue; /* 当前过渡值 */
bool transValueIsNull; /* 过渡值是否为 NULL */
int64 transValueCount; /* 当前聚合的行数 */
/* eval_windowaggregates() 函数中使用的数据 */
bool restart; /* 是否需要在本轮聚合中重新启动此聚合? */
} WindowStatePerAggData;
eval_windowaggregates 函数
eval_windowaggregates 函数主要用于窗口聚合的计算,特别是普通聚合函数(如 SUM()
、COUNT()
等)。它在处理窗口时,根据窗口帧的位置和聚合的需求,优化了聚合操作。在帧起始位置为 UNBOUNDED_PRECEDING
时,采用增量计算策略,在窗口帧发生变化时,使用反向过渡函数或重新聚合数据。同时,它通过复用已计算的结果来提高性能,在需要时重启聚合并重置相应的状态。
此外,它还管理了不同聚合函数的上下文,确保在窗口帧的不同部分对每个聚合函数都进行正确的计算,并在计算结束后保存结果。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c
)
/*
* eval_windowaggregates
* 评估作为窗口函数的普通聚合函数
*
* 这与 nodeAgg.c 不同的地方在于:首先,如果窗口的帧开始位置发生变化,我们使用反向过渡函数(如果存在)从过渡值中删除行。其次,我们希望在将更多数据聚合到同一过渡值后,可以多次调用聚合最终函数。这是 nodeAgg.c 中不要求的行为。
*/
static void
eval_windowaggregates(WindowAggState *winstate)
{
WindowStatePerAgg peraggstate; /* 用于存储每个聚合函数的状态 */
int wfuncno, /* 窗口函数的索引 */
numaggs, /* 聚合函数的数量 */
numaggs_restart, /* 需要重启的聚合函数数量 */
i; /* 循环变量 */
int64 aggregatedupto_nonrestarted; /* 尚未聚合的行数 */
MemoryContext oldContext; /* 内存上下文的备份 */
ExprContext *econtext; /* 当前表达式上下文 */
WindowObject agg_winobj; /* 窗口函数对象 */
TupleTableSlot *agg_row_slot; /* 用于存储聚合数据的行槽 */
TupleTableSlot *temp_slot; /* 临时槽,用于存储中间结果 */
numaggs = winstate->numaggs; /* 获取窗口聚合函数的数量 */
if (numaggs == 0)
return; /* 如果没有聚合函数,直接返回 */
/* 获取执行上下文 */
econtext = winstate->ss.ps.ps_ExprContext;
agg_winobj = winstate->agg_winobj;
agg_row_slot = winstate->agg_row_slot;
temp_slot = winstate->temp_slot_1;
/*
* 如果窗口的帧起始位置为 UNBOUNDED_PRECEDING 且没有排除子句,
* 那么窗口帧由从分区开始处向前延伸的一组连续的行组成,随着当前行向前推进,行只进入帧内,而不会退出帧。
* 这样就可以使用增量策略来计算聚合值:我们为每个加入帧的行运行过渡函数,并在需要时运行最终函数来获取当前聚合值。
* 这种方法比每次处理当前行时都重新运行整个聚合计算更高效。前提是假设最终函数不会破坏正在运行的过渡值,这一点在 nodeAgg.c 中也有类似的假设。
*
* 如果帧起始位置有时会移动,我们仍然可以优化相邻的行,尽可能使用增量聚合策略,但如果帧头超出了上一个头,我们将尝试使用反向过渡函数删除这些行。
* 反向过渡函数会恢复聚合的当前状态,仿佛被移除的行从未被聚合过。如果反向过渡函数无法删除该行,或者根本没有反向过渡函数,我们需要重新计算所有位于新帧边界内的元组的聚合结果。
*
* 如果存在排除子句,我们可能需要在一个不连续的行集上聚合,因此需要重新计算每行的聚合。
*/
/*
* 更新帧头位置
*
* 窗口的帧头位置不应该向后移动,如果发生这种情况,代码将无法处理,因此在安全起见,我们会检查并报告错误。
*/
update_frameheadpos(winstate);
if (winstate->
frameheadpos < winstate->aggregatedbase)
elog(ERROR, "window frame head moved backward");
/*
* 如果帧没有变化,我们可以重用之前保存的结果值。
* 如果帧结束模式是 UNBOUNDED FOLLOWING 或 CURRENT ROW 且没有排除子句,并且当前行位于前一行的帧内,那么当前帧和前一帧的结束位置必须重合。
* 这意味着我们可以复用结果值。
*/
if (winstate->aggregatedbase == winstate->frameheadpos &&
(winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
FRAMEOPTION_END_CURRENT_ROW)) &&
!(winstate->frameOptions & FRAMEOPTION_EXCLUSION) &&
winstate->aggregatedbase <= winstate->currentpos &&
winstate->aggregatedupto > winstate->currentpos)
{
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
wfuncno = peraggstate->wfuncno;
econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
}
return;
}
/* 初始化重启标志 */
numaggs_restart = 0;
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
/* 判断是否需要重启聚合函数 */
if (winstate->currentpos == 0 ||
(winstate->aggregatedbase != winstate->frameheadpos &&
!OidIsValid(peraggstate->invtransfn_oid)) ||
(winstate->frameOptions & FRAMEOPTION_EXCLUSION) ||
winstate->aggregatedupto <= winstate->frameheadpos)
{
peraggstate->restart = true;
numaggs_restart++;
}
else
peraggstate->restart = false;
}
/*
* 如果有任何可能需要移动的聚合函数,尝试通过删除从帧顶部掉落的输入行来将 aggregatedbase 向前推进。
* 如果失败(即 advance_windowaggregate_base 返回 false),则需要重启聚合。
*/
while (numaggs_restart < numaggs &&
winstate->aggregatedbase < winstate->frameheadpos)
{
/*
* 获取要删除的元组。这应该永远不会失败,因为我们应该已经处理过这些行。
*/
if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
temp_slot))
elog(ERROR, "could not re-fetch previously fetched frame row");
/* 设置元组上下文,用于计算聚合函数的参数 */
winstate->tmpcontext->ecxt_outertuple = temp_slot;
/*
* 为每个聚合函数执行反向过渡,除非该聚合已经标记为需要重启。
*/
for (i = 0; i < numaggs; i++)
{
bool ok;
peraggstate = &winstate->peragg[i];
if (peraggstate->restart)
continue;
wfuncno = peraggstate->wfuncno;
ok = advance_windowaggregate_base(winstate,
&winstate->perfunc[wfuncno],
peraggstate);
if (!ok)
{
/* 如果反向过渡函数失败,则需要重启聚合 */
peraggstate->restart = true;
numaggs_restart++;
}
}
/* 重置每个输入元组的上下文 */
ResetExprContext(winstate->tmpcontext);
/* 进展到下一个聚合行 */
winstate->aggregatedbase++;
ExecClearTuple(temp_slot);
}
/*
* 如果我们成功推进了所有聚合的基准行,aggregatedbase 现在应该等于 frameheadpos;
* 如果失败了,我们必须强制更新 aggregatedbase。
*/
winstate->aggregatedbase = winstate->frameheadpos;
/*
* 如果为聚合函数创建了标记指针,则将其推进到帧头,以便 tuplestore 可以丢弃不必要的行。
*/
if (agg_winobj->markptr >= 0)
WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
/*
* 现在重启需要重启的聚合函数。
*
* 如果任何聚合函数需要重启,我们假设使用共享上下文的聚合函数也需要重启,
* 并且在这种情况下我们会清理共享的 aggcontext。
*/
if (numaggs_restart > 0)
MemoryContextResetAndDeleteChildren(winstate->aggcontext);
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
/* 如果共享上下文的聚合函数需要重启,则重启所有需要重启的聚合 */
Assert(peraggstate->aggcontext != winstate->aggcontext ||
numaggs_restart == 0 ||
peraggstate->restart);
if (peraggstate->restart)
{
wfuncno = peraggstate->wfuncno;
initialize_windowaggregate(winstate,
&winstate->perfunc[wfuncno],
peraggstate);
}
else if (!peraggstate->resultValueIsNull)
{
if (!peraggstate->resulttypeByVal)
pfree(DatumGetPointer(peraggstate->resultValue));
peraggstate->resultValue = (Datum) 0;
peraggstate->resultValueIsNull = true;
}
}
/*
* 非重启的聚合现在包含 aggregatedbase 和 aggregatedupto 之间的行,
* 而重启的聚合不包含任何行。如果有重启的聚合,我们必须从 frameheadpos 开始重新聚合,
* 否则可以从 aggregatedupto 开始继续聚合。
*/
aggregatedupto_nonrestarted = winstate->aggregatedupto;
if (numaggs_restart > 0 &&
winstate->aggregatedupto != winstate->frameheadpos)
{
winstate->aggregatedupto = winstate->frameheadpos;
ExecClearTuple(agg_row_slot);
}
/*
* 继续聚合直到遇到帧外的行(或分区结束)。
*/
for (;;)
{
int ret;
/* 如果没有获取行,获取下一行 */
if (TupIsNull(agg_row_slot))
{
if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
agg_row_slot))
break; /* 到达分区结束 */
}
/*
* 如果当前行不在帧内,跳过聚合。
*/
ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
if (ret < 0)
break;
if (ret == 0)
goto next_tuple;
/* 设置元组上下文 */
winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
/* 将行累加到聚合中 */
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
/* 跳过未重启的聚合 */
if (!peraggstate->restart &&
winstate->aggregatedupto < aggregatedupto_nonrestarted)
continue;
wfuncno = peraggstate->wfuncno;
advance_windowaggregate(winstate,
&winstate->perfunc[wfuncno],
peraggstate);
}
next_tuple:
/* 重置每个输入元组的上下文 */
ResetExprContext(winstate->tmpcontext);
/* 进展到下一个聚合行 */
winstate->aggregatedupto++;
ExecClearTuple(agg_row_slot);
}
/* 确保帧的结束位置不会向后移动 */
Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
/*
* 最终化聚合并填充结果和空值字段
*/
for (i = 0; i < numaggs; i++)
{
Datum *result;
bool *isnull;
peraggstate = &winstate->peragg[i];
wfuncno = peraggstate->wfuncno;
result = &econtext->ecxt_aggvalues[wfuncno];
isnull = &econtext->ecxt_aggnulls[wfuncno];
finalize_windowaggregate(winstate,
&winstate->perfunc[wfuncno],
peraggstate,
result, isnull);
/*
* 如果下一个行共享同一帧,保存结果值
*/
if (!peraggstate->resulttypeByVal && !*isnull)
{
oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
peraggstate->resultValue =
datumCopy(*result,
peraggstate->resulttypeByVal,
peraggstate->resulttypeLen);
MemoryContextSwitchTo(oldContext);
}
else
{
peraggstate->resultValue = *result;
}
peraggstate->resultValueIsNull = *isnull;
}
}
让我们通过一个具体的例子来分析 eval_windowaggregates 函数的每一步操作。假设我们有一个销售数据表 sales,包含以下数据:
salesperson_id | sale_date | sale_amount |
---|---|---|
1 | 2024-01-01 | 100 |
1 | 2024-01-02 | 200 |
1 | 2024-01-03 | 300 |
2 | 2024-01-01 | 150 |
2 | 2024-01-02 | 250 |
2 | 2024-01-03 | 350 |
假设我们希望计算每个销售人员的累计销售额,并且使用的是窗口聚合函数,按日期顺序(ORDER BY sale_date
)来计算累计销售额。我们的窗口框架将从 UNBOUNDED PRECEDING
开始,直到当前行结束。
1. 初始化和设置
在开始时,窗口函数会为每个聚合函数(在这个例子中是
SUM(sale_amount)
)创建一个WindowStatePerAggData
结构体来保存当前的聚合状态。假设我们有两个销售人员的销售数据。对于每个销售人员,eval_windowaggregates
将会处理每个销售记录,维护其当前的聚合状态。
初始化:numaggs = 1
,因为只有一个聚合函数SUM(sale_amount)
。aggregatedbase
和aggregatedupto
变量分别用于跟踪当前已聚合和尚未聚合的行。
2. 更新帧头位置
在窗口聚合中,
frameheadpos
表示窗口帧的起始位置。update_frameheadpos
会根据窗口的当前状态更新这一位置。例如,假设当前处理的销售人员是销售员 1,并且当前销售记录是 2024-01-03。
帧头位置更新:frameheadpos
会根据查询的PARTITION BY
和ORDER BY
规则进行调整。这里,frameheadpos
会指向销售员 1 在 2024-01-03 的行。
3. 优化增量计算
如果当前的窗口帧没有发生变化,我们就可以复用之前保存的聚合结果,而不必重新计算。例如,在销售员 1 的数据中,假设前两天(2024-01-01 和 2024-01-02)已经聚合完成。
复用结果:假设当前帧的结束位置是 2024-01-03,且没有排除子句(EXCLUSION
),那么程序会检查窗口帧是否变化。如果没有变化(即当前行仍然在上一帧内),则复用先前的聚合结果。
4. 处理帧的变化
如果窗口帧的头位置发生变化,我们需要做以下几步:
- 检查是否需要重启聚合:如果帧的头移动,或者窗口的范围发生变化(例如,加入了
EXCLUSION
子句),我们就需要重新聚合数据。eval_windowaggregates
会为每个聚合函数设置重启标志。- 更新聚合函数的状态:在此过程中,
advance_windowaggregate_base
函数会根据新的帧头位置和数据,调整聚合的基准状态(aggregatedbase
)。例如,如果帧的起始位置从 2024-01-01 移动到 2024-01-02,
eval_windowaggregates
将使用反向过渡函数(invtransfn
)删除帧头之前的行。
5. 重新聚合数据
如果
advance_windowaggregate_base
无法成功移动聚合的基准行(即删除掉帧头之前的行),或者没有反向过渡函数,系统就会重新开始聚合。例如,在 2024-01-02 之后的帧头位置,可能需要从新的帧开始重新计算聚合结果。
- 重启聚合:如果需要重启聚合(例如因为反向过渡失败),
restart
标志会被设置为true
,然后聚合函数的状态会被重新初始化。
6. 计算新行的聚合结果
如果当前的聚合状态已经准备好,且没有出现需要重启的情况,
eval_windowaggregates
会开始将新的一行数据添加到聚合中。
- 逐行聚合:每次计算新的聚合值时,
advance_windowaggregate
函数会根据当前行的数据更新聚合结果。例如,在2024-01-03
,销售员 1 的累计销售额将是100 + 200 + 300 = 600
。
7. 最终化聚合结果
当所有的行都被处理完后,
finalize_windowaggregate
会被调用来计算窗口聚合的最终结果。例如,计算销售员 1 和销售员 2 的最终累计销售额。
- 保存和返回结果:最终,
eval_windowaggregates
会保存每个聚合函数的结果,并更新相应的输出字段。如果存在共享上下文(即多个聚合函数使用同一个上下文),它会进行清理,以确保没有内存泄漏。
8. 返回结果
函数会返回每个窗口聚合函数的
在这里插入代码片
最终结果,在每一行的输出中返回正确的累计销售额。
示例执行:
假设我们在销售员 1 上执行上述操作:
初始时,销售员 1 在 2024-01-01 的销售额为 100,聚合值为 100。
接着,销售员 1 在 2024-01-02 的销售额为 200,聚合值为 100 + 200 = 300。
最后,在 2024-01-03,销售员 1 的销售额为 300,最终累计值为 100 + 200 + 300 = 600。
update_frameheadpos 函数
update_frameheadpos 函数的主要功能是更新窗口聚合的帧头位置 frameheadpos
,确保其对于当前行有效。帧头的位置是窗口聚合计算的关键,因为它决定了每个窗口函数计算时所依据的数据范围。下面是详细的逐行注释和对每个步骤的解释。(路径:src\backend\executor\nodeWindowAgg.c
)
/*
* update_frameheadpos
* 使 frameheadpos 对当前行有效
*
* 注意,frameheadpos 计算时不考虑任何窗口排除子句;当前行和/或其同组行即使在后续需要被排除时,也会被视为帧的一部分。
*
* 可能会覆盖 winstate->temp_slot_2。
*/
static void
update_frameheadpos(WindowAggState *winstate)
{
WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; /* 获取窗口聚合节点 */
int frameOptions = winstate->frameOptions; /* 获取当前的帧选项 */
MemoryContext oldcontext; /* 保存当前的内存上下文 */
/* 如果帧头已经有效,则不需要更新,直接返回 */
if (winstate->framehead_valid)
return;
/* 可能会在短生命周期的上下文中被调用,因此切换到合适的内存上下文 */
oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
/* 根据帧的起始选项来计算帧头 */
if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
{
/* 在 UNBOUNDED PRECEDING 模式下,帧头始终是分区的第一行 */
winstate->frameheadpos = 0;
winstate->framehead_valid = true;
}
else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
{
/* 如果是 CURRENT ROW 模式,根据排序模式计算帧头 */
if (frameOptions & FRAMEOPTION_ROWS)
{
/* 在 ROWS 模式下,帧头与当前行相同 */
winstate->frameheadpos = winstate->currentpos;
winstate->framehead_valid = true;
}
else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
{
/* 如果没有 ORDER BY,所有行是同行的 */
if (node->ordNumCols == 0)
{
winstate->frameheadpos = 0;
winstate->framehead_valid = true;
MemoryContextSwitchTo(oldcontext);
return;
}
/*
* 在 RANGE 或 GROUPS START_CURRENT_ROW 模式下,帧头是当前行的同组中的第一行。
* 我们保持帧头的最后已知位置,并根据需要前进。
*/
tuplestore_select_read_pointer(winstate->buffer, winstate->framehead_ptr);
if (winstate->frameheadpos == 0 && TupIsNull(winstate->framehead_slot))
{
/* 如果尚未获取第一行,则将其获取到 framehead_slot */
if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))
elog(ERROR, "unexpected end of tuplestore");
}
/* 检查当前行是否是正确的帧头 */
while (!TupIsNull(winstate->framehead_slot))
{
if (are_peers(winstate, winstate->framehead_slot, winstate->ss.ss_ScanTupleSlot))
break; /* 该行是正确的帧头 */
/* 即使获取失败,仍然推进帧头位置 */
winstate->frameheadpos++;
spool_tuples(winstate, winstate->frameheadpos);
if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))
break; /* 到达分区末尾 */
}
winstate->framehead_valid = true;
}
else
Assert(false); /* 如果既不是 RANGE 也不是 GROUPS,应该抛出异常 */
}
else if (frameOptions & FRAMEOPTION_START_OFFSET)
{
/* 在 OFFSET 模式下,帧头相对于当前行的位置是通过偏移量来决定的 */
if (frameOptions & FRAMEOPTION_ROWS)
{
int64 offset = DatumGetInt64(winstate->startOffsetValue);
if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
offset = -offset; /* 如果是 PRECEDING,则是负偏移量 */
winstate->frameheadpos = winstate->currentpos + offset;
/* 帧头不能小于第一行 */
if (winstate->frameheadpos < 0)
winstate->frameheadpos = 0;
/* 确保帧头不超出分区末尾 */
else if (winstate->frameheadpos > winstate->currentpos + 1)
{
spool_tuples(winstate, winstate->frameheadpos - 1);
if (winstate->frameheadpos > winstate->spooled_rows)
winstate->frameheadpos = winstate->spooled_rows;
}
winstate->framehead_valid = true;
}
else if (frameOptions & FRAMEOPTION_RANGE)
{
/*
* 在 RANGE START_OFFSET 模式下,帧头是满足范围约束的第一行。
* 我们保持帧头的最后已知位置,并根据需要推进。
*/
int sortCol = node->ordColIdx[0];
bool sub, less;
/* 确保有排序列 */
Assert(node->ordNumCols == 1);
/* 计算用于范围检查的标志 */
if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
sub = true;
else
sub = false;
less = false; /* 通常,帧头应满足 >= sum */
if (!winstate->inRangeAsc)
{
sub = !sub;
less = true;
}
tuplestore_select_read_pointer(winstate->buffer, winstate->framehead_ptr);
if (winstate->frameheadpos == 0 && TupIsNull(winstate->framehead_slot))
{
/* 如果尚未获取第一行,则将其获取到 framehead_slot */
if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))
elog(ERROR, "unexpected end of tuplestore");
}
/* 逐行检查,直到找到满足范围条件的帧头行 */
while (!TupIsNull(winstate->framehead_slot))
{
Datum headval, currval;
bool headisnull, currisnull;
headval = slot_getattr(winstate->framehead_slot, sortCol, &headisnull);
currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol, &currisnull);
if (headisnull || currisnull)
{
/* 如果其中一行的值为 NULL,按照 nulls_first 设置推进帧头 */
if (winstate->inRangeNullsFirst)
{
if (!headisnull || currisnull)
break;
}
else
{
if (headisnull || !currisnull)
break;
}
}
else
{
if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc,
winstate->inRangeColl,
headval,
currval,
winstate->startOffsetValue,
BoolGetDatum(sub),
BoolGetDatum(less))))
break; /* 该行是正确的帧头 */
}
/* 即使获取失败,仍然推进帧头位置 */
winstate->frameheadpos++;
spool_tuples(winstate, winstate->frameheadpos);
if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))
break; /* 到达分区末尾 */
}
winstate->framehead_valid = true;
}
else if (frameOptions & FRAMEOPTION_GROUPS)
{
/*
* 在 GROUPS START_OFFSET 模式下,帧头是满足偏移量约束的第一组的第一行。
*/
int64 offset = DatumGetInt64(winstate->startOffsetValue);
int64 minheadgroup;
if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
minheadgroup = winstate->currentgroup - offset;
else
minheadgroup = winstate->currentgroup + offset;
tuplestore_select_read_pointer(winstate->buffer, winstate->framehead_ptr);
if (winstate->frameheadpos == 0 && TupIsNull(winstate->framehead_slot))
{
/* 如果尚未获取第一行,则将其获取到 framehead_slot */
if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))
elog(ERROR, "unexpected end of tuplestore");
}
/* 逐组推进帧头 */
while (!TupIsNull(winstate->framehead_slot))
{
if (winstate->frameheadgroup >= minheadgroup)
break; /* 找到满足条件的帧头行 */
ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot);
winstate->frameheadpos++;
spool_tuples(winstate, winstate->frameheadpos);
if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))
break; /* 到达分区末尾 */
if (!are_peers(winstate, winstate->temp_slot_2, winstate->framehead_slot))
winstate->frameheadgroup++;
}
ExecClearTuple(winstate->temp_slot_2);
winstate->framehead_valid = true;
}
else
Assert(false);
}
else
Assert(false);
/* 恢复原内存上下文 */
MemoryContextSwitchTo(oldcontext);
}
依旧通过一个具体的例子来分析该函数的具体执行过程,案例参考函数eval_windowaggregates。
案例背景:
我们希望计算每个销售员的累计销售额。使用窗口函数
SUM(sale_amount) OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
,即每个销售员的累计销售额是从该销售员的第一个销售日期开始,到当前行的销售额的累积。
SQL 查询:
SELECT salesperson_id, sale_date, sale_amount,
SUM(sale_amount) OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_sales
FROM sales;
这个查询会根据 sale_date
排序每个销售员的数据,并为每一行计算累计销售额。为了计算窗口函数,update_frameheadpos
会在内部被调用来更新每个窗口的帧头位置。
详细步骤和代码说明:
假设我们正在处理销售员 1 的数据,查询的当前行是 2024-01-02。
第一步:更新帧头位置
当函数 update_frameheadpos
被调用时,它的作用是更新 frameheadpos
,即计算当前帧的起始位置。帧头位置决定了窗口函数计算时应包括哪些行。
1. 检查是否已经计算了帧头位置:
if (winstate->framehead_valid)
return; /* 如果帧头已经有效,直接返回 */
如果帧头已经计算过了,就跳过计算,避免重复计算。
2. 切换到合适的内存上下文:
oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
这里,我们切换到合适的内存上下文,以确保计算不会泄漏内存。
3. 计算帧头位置: 接下来根据帧的选项 (frameOptions
),我们来决定帧头的位置。
- 如果是
UNBOUNDED PRECEDING
:
if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
{
winstate->frameheadpos = 0;
winstate->framehead_valid = true;
}
这里,UNBOUNDED PRECEDING
表示帧从分区的第一行开始。因此,帧头位置就是 0,即第一行。
- 如果是
CURRENT ROW
:
else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
{
if (frameOptions & FRAMEOPTION_ROWS)
{
winstate->frameheadpos = winstate->currentpos;
winstate->framehead_valid = true;
}
}
如果是 CURRENT ROW
,那么帧头就是当前行的位置。在我们的例子中,假设当前行是 2024-01-02,frameheadpos
就是当前行的位置。
第二步:处理 RANGE 或 GROUPS 模式
如果窗口定义了 RANGE
或 GROUPS
,我们需要根据排序规则找到当前行所在的组,并确定该组的第一行作为帧头。
4. 如果没有排序列(ORDER BY):
if (node->ordNumCols == 0)
{
winstate->frameheadpos = 0;
winstate->framehead_valid = true;
MemoryContextSwitchTo(oldcontext);
return;
}
如果没有定义排序列,那么所有行被认为是同一组,帧头位置就是 0,即分区的第一行。
5. 如果有排序列
如果有排序列,我们会根据当前行的值和分区内其他行的值,找到与当前行同组的第一行作为帧头。例如,如果是 2024-01-02 的数据,程序会查找销售员 1 中销售额最早的那一行(即 2024-01-01)。
5. 查找同组的第一行
while (!TupIsNull(winstate->framehead_slot))
{
if (are_peers(winstate, winstate->framehead_slot, winstate->ss.ss_ScanTupleSlot))
break; /* 找到当前行同组的第一行作为帧头 */
winstate->frameheadpos++;
spool_tuples(winstate, winstate->frameheadpos);
}
这里,我们通过检查每一行是否与当前行同组(are_peers
函数),找到属于同组的第一行,作为帧头。
第三步:更新帧头位置和返回
7. 设置帧头有效:
winstate->framehead_valid = true;
一旦计算出帧头位置,就将 framehead_valid
设置为 true
,表示帧头计算完成。
8. 恢复内存上下文:
MemoryContextSwitchTo(oldcontext);
最后,恢复之前的内存上下文,确保内存管理的正确性。
具体例子:
假设当前行是 2024-01-02,销售员 1。
查询的窗口帧使用的是 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW。
第一步:frameheadpos 将被设置为 0,即从 2024-01-01 开始。
第二步:在 RANGE 模式下,程序检查是否有排序列,并找到销售员 1 在 2024-01-01 的销售额作为帧头。
第三步:最终,帧头位置 frameheadpos 被设置为 0,并且标记为有效。
因此,当前行的累计销售额将从 2024-01-01 到 2024-01-02,依此类推。
窗口模式通过不同的帧定义方式,影响了窗口函数的计算范围,从而决定了聚合计算的结果。
- UNBOUNDED PRECEDING:帧从分区的第一行开始,适用于计算从分区开始到当前行的累计值。
- CURRENT ROW:帧仅包含当前行,适用于每行单独计算(如排名)。
- RANGE:帧的起始位置是当前行所在同组的第一行,适用于基于排序的聚合(如销售排名)。
- OFFSET:帧的起始位置是当前行位置的偏移,适用于计算行之间的偏移聚合。
- GROUPS:帧的起始位置是当前行所在组的第一行,适用于按组聚合。