【PostgreSQL内核学习 —— (sort算子)】
sort算子
- 概述
- ExecInitSort 函数
- ExecSort 函数
- ExecEndSort 函数
- tuplesort_performsort 函数
- tuplestore_puttuple_common 函数
- tuplestore_gettuple 函数
声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 postgresql-15.0 的开源代码和《PostgresSQL数据库内核分析》一书
概述
在PostgreSQL
中,Sort
算子用于对输入数据进行排序以满足查询的ORDER BY
要求,其执行流程与执行器框架紧密关联。当执行器启动时,Sort
算子会初始化内存缓冲区(由work_mem
参数控制),通过下层算子(如SeqScan
)逐行拉取数据并存储为SortTuple
结构。若内存不足,数据会被分块排序后写入磁盘临时文件(称为“tape
”),采用外部归并排序(External Merge Sort
)完成最终排序。具体流程包括:
- 内存排序阶段: 优先使用快速排序(
quicksort
)对内存中的数据块进行排序,并支持增量排序(Incremental Sort
)以利用索引预排序优化多字段排序。
参考文章:PostgreSQL 17增量排序、PostgreSQL中的sort算子及所用到的外排实现原理探究(二)
- 外部排序阶段: 当数据量超出
work_mem
限制时,将数据分段(run
)写入磁盘,每段单独排序后通过多路归并(k-way merge
)生成全局有序结果。
参考文章:PostgreSQL中的sort算子所用到的外排实现原理探究(一)、PostgreSQL中的sort算子及所用到的外排实现原理探究(二)
- 结果输出: 最终有序数据通过执行器的
Pull
模型逐行返回给上层算子(如Limit
或Merge
Join
)。
参考文章:PostgreSQL技术内幕(十六)如何写一个执行器算子?、理解PG如何执行一个查询
Sort
算子作为执行器的算子,其执行框架遵循四阶段模型:
- 初始化(
ExecutorStart
):分配内存并初始化排序状态。 - 运行(
ExecutorRun
):触发下层算子的数据拉取,执行排序逻辑。 - 统计与清理(
ExecutorFinish
):收集执行统计信息并释放临时资源。 - 终止(
ExecutorEnd
):释放内存和文件句柄。
与其他算子(如HashJoin
、IndexScan
)的协作通过统一的执行器接口实现,所有算子均嵌入查询计划树中,由Portal
结构统一调度,通过递归调用ExecProcNode
驱动数据流动。这种设计使得Sort
算子既能独立处理排序任务,又能与其他算子高效协同,形成完整的查询执行流水线。
ExecInitSort 函数
ExecInitSort
函数是PostgreSQL
执行器中Sort
算子的初始化函数(ExecInitSort
),属于执行器四阶段模型中的初始化阶段(ExecutorStart
)。其主要功能包括:
- 状态初始化: 创建
SortState
结构体,关联查询计划节点(Sort *node
)和全局执行状态(EState *estate
),并设置核心处理函数ExecSort
(后续实际排序逻辑的入口)。 - 随机访问控制: 根据执行标志(
eflags
)决定是否支持结果集回退、标记/恢复等操作,若需要则启用randomAccess
并强制物化排序结果。 - 子节点初始化: 通过递归调用
ExecInitNode
初始化下层算子(如Scan
节点),并屏蔽其不需要处理的执行标志(如REWIND
)以简化逻辑。 - 元组处理配置:
- 创建扫描槽从子节点拉取数据,使用虚拟元组格式(
TTSOpsVirtual
)减少内存拷贝。 - 初始化结果槽为最小化元组格式(
TTSOpsMinimalTuple
),避免冗余数据存储。
- 创建扫描槽从子节点拉取数据,使用虚拟元组格式(
- 排序类型选择: 根据子节点输出的元组描述符(
outerTupDesc
)判断是否采用高效的Datum
排序(单列按值传递场景),否则使用通用元组排序。
此函数为Sort
算子的执行阶段(如内存/磁盘排序、结果输出)完成准备工作,确保执行器能够通过统一的接口(如ExecProcNode
)驱动数据流动,并与其他算子(如Limit、Merge Join
)协作形成完整的查询流水线。其设计体现了PostgreSQL
执行器框架的核心思想:通过分层初始化和状态封装,将复杂操作(如排序)抽象为独立的阻塞型算子,同时保持执行过程的高效与可控性。
// 函数定义:初始化Sort节点,返回SortState结构体指针
SortState *
ExecInitSort(Sort *node, EState *estate, int eflags)
{
SortState *sortstate;
TupleDesc outerTupDesc;
// 调试输出,标记初始化开始
SO1_printf("ExecInitSort: %s\n", "initializing sort node");
/*
* 创建Sort节点的运行时状态结构体SortState
*/
sortstate = makeNode(SortState); // 分配内存
sortstate->ss.ps.plan = (Plan *) node; // 关联查询计划中的Sort节点
sortstate->ss.ps.state = estate; // 关联执行器全局状态
sortstate->ss.ps.ExecProcNode = ExecSort; // 设置处理函数为ExecSort(实际排序逻辑的入口)
/*
* 根据执行标志(eflags)判断是否需要支持随机访问(如回退扫描、标记/恢复操作)
* 若需要,则启用`randomAccess`,并强制物化排序结果以支持多次回放
*/
sortstate->randomAccess = (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) != 0;
// 初始化状态变量:非边界排序、未完成排序、暂未初始化排序状态
sortstate->bounded = false; // 不限制排序结果数量(与LIMIT无关)
sortstate->sort_Done = false; // 标记排序是否完成
sortstate->tuplesortstate = NULL; // 排序状态机(后续由tuplesort模块初始化)
/*
* 跳过表达式上下文(ExprContext)初始化,因为Sort节点不涉及条件过滤(ExecQual)或投影(ExecProject)
*/
/*
* 初始化子节点(下层算子,如SeqScan或IndexScan)
* 通过屏蔽`eflags`中的REWIND、BACKWARD、MARK标志,避免子节点处理这些复杂逻辑
*/
eflags &= ~(EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK);
outerPlanState(sortstate) = ExecInitNode(outerPlan(node), estate, eflags); // 递归初始化子节点
/*
* 创建扫描槽(ScanSlot),用于从子节点拉取数据
* 使用虚拟元组表(TTSOpsVirtual)作为默认存储格式
*/
ExecCreateScanSlotFromOuterPlan(estate, &sortstate->ss, &TTSOpsVirtual);
/*
* 初始化结果槽(ResultSlot),用于向上层返回排序后的数据
* 使用最小化元组格式(TTSOpsMinimalTuple)以节省内存
* 不设置投影信息(ps_ProjInfo),因为Sort节点不修改数据内容
*/
ExecInitResultTupleSlotTL(&sortstate->ss.ps, &TTSOpsMinimalTuple);
sortstate->ss.ps.ps_ProjInfo = NULL;
// 获取子节点的输出元组描述符(用于判断排序类型)
outerTupDesc = ExecGetResultType(outerPlanState(sortstate));
/*
* 判断是否启用Datum排序(仅当子节点输出单个按值传递的列时适用)
* 否则使用元组排序(通用场景)
*/
if (outerTupDesc->natts == 1 && TupleDescAttr(outerTupDesc, 0)->attbyval)
sortstate->datumSort = true; // 单列按值排序(效率更高)
else
sortstate->datumSort = false; // 多列或按引用排序
// 调试输出,标记初始化完成
SO1_printf("ExecInitSort: %s\n", "sort node initialized");
return sortstate; // 返回初始化完成的SortState
}
ExecSort 函数
ExecSort
函数是PostgreSQL
执行器中Sort
算子的核心执行函数(ExecSort
),属于执行器四阶段模型中的运行阶段(ExecutorRun
),其核心作用是通过tuplesort
模块对输入数据进行排序,并按需向上层返回有序元组。具体流程如下:
- 首次执行逻辑(排序初始化):
- 子节点数据拉取: 通过
ExecProcNode
递归调用下层算子(如SeqScan
)逐行获取所有元组。 - 排序器初始化: 根据排序类型(单列
Datum
排序或多列元组排序)选择tuplesort_begin_datum
或tuplesort_begin_heap
,并配置排序选项(如内存限制、随机访问支持)。 - 数据填充: 将子节点数据存入排序器(
tuplesort_putdatum
或tuplesort_puttupleslot
)。 - 排序触发: 调用
tuplesort_performsort
触发排序逻辑(可能涉及内存快速排序或外部归并排序)。 - 状态更新: 标记排序完成,处理并行场景的统计信息收集。
- 子节点数据拉取: 通过
- 后续执行逻辑(结果返回):
- 元组提取: 根据当前扫描方向(由
ORDER BY
决定)从排序器中提取有序元组。- Datum排序: 手动构造虚拟元组槽,将
Datum
值和NULL
标记填充到槽中。 - 元组排序: 直接通过
tuplesort_gettupleslot
获取完整元组。
- Datum排序: 手动构造虚拟元组槽,将
- 元组提取: 根据当前扫描方向(由
- 关键特性:
- 阻塞型算子: 首次调用需拉取并排序所有数据,后续调用仅返回结果,符合执行器的
Pull
模型。 - 多场景支持:
- 支持正向/反向扫描(如
ORDER BY ... ASC/DESC
)。 - 支持有限排序(
LIMIT
优化)和并行排序统计。
- 支持正向/反向扫描(如
- 性能优化: 单列
Datum
排序避免元组开销,提升效率。
- 阻塞型算子: 首次调用需拉取并排序所有数据,后续调用仅返回结果,符合执行器的
- 与执行器框架的集成:
- 通过统一的
ExecProcNode
接口与上下层算子(如SeqScan、Limit
)交互。 - 依赖
tuplesort
模块实现跨内存与磁盘的高效排序,屏蔽底层复杂性。
- 通过统一的
此函数体现了PostgreSQL
执行器的核心设计原则:通过模块化(如tuplesort
)和状态机管理(SortState
),将复杂操作(如排序)封装为可重用的算子,同时保持执行过程的高效性与灵活性。
// 函数定义:执行排序操作,返回排序后的元组槽
static TupleTableSlot *
ExecSort(PlanState *pstate)
{
SortState *node = castNode(SortState, pstate); // 强制转换为SortState类型
EState *estate; // 全局执行状态
ScanDirection dir; // 当前扫描方向(正向/反向)
Tuplesortstate *tuplesortstate; // 排序状态机(由tuplesort模块管理)
TupleTableSlot *slot; // 元组槽(用于接收子节点数据或返回结果)
CHECK_FOR_INTERRUPTS(); // 检查查询是否被中断(如用户取消操作)
/*
* 从SortState节点中获取状态信息
*/
SO1_printf("ExecSort: %s\n", "entering routine"); // 调试输出,标记进入函数
estate = node->ss.ps.state; // 获取全局执行状态(EState)
dir = estate->es_direction; // 当前扫描方向(由查询的ORDER BY决定)
tuplesortstate = (Tuplesortstate *) node->tuplesortstate; // 排序状态机指针
/*
* 首次执行时,从子节点拉取所有元组并排序;后续调用直接返回已排序的元组
*/
if (!node->sort_Done) // 检查是否已完成排序初始化
{
Sort *plannode = (Sort *) node->ss.ps.plan; // 获取查询计划中的Sort节点
PlanState *outerNode; // 子节点(如SeqScan)
TupleDesc tupDesc; // 子节点输出的元组描述符
int tuplesortopts = TUPLESORT_NONE; // 排序选项(默认为无特殊选项)
SO1_printf("ExecSort: %s\n", "sorting subplan"); // 调试输出,标记开始排序
/*
* 设置子节点扫描方向为正向(确保数据按顺序拉取)
*/
estate->es_direction = ForwardScanDirection;
/*
* 初始化tuplesort模块(核心排序逻辑)
*/
SO1_printf("ExecSort: %s\n", "calling tuplesort_begin"); // 调试输出
outerNode = outerPlanState(node); // 获取子节点状态
tupDesc = ExecGetResultType(outerNode); // 获取子节点输出的元组描述符
// 根据SortState配置排序选项
if (node->randomAccess)
tuplesortopts |= TUPLESORT_RANDOMACCESS; // 启用随机访问(支持回退等操作)
if (node->bounded)
tuplesortopts |= TUPLESORT_ALLOWBOUNDED; // 启用有限排序(如LIMIT优化)
// 根据是否单列按值排序(datumSort),选择不同的排序初始化方法
if (node->datumSort)
tuplesortstate = tuplesort_begin_datum(
TupleDescAttr(tupDesc, 0)->atttypid, // 列的数据类型OID
plannode->sortOperators[0], // 排序操作符(如>或<)
plannode->collations[0], // 排序规则(如COLLATE)
plannode->nullsFirst[0], // NULL值排序位置(FIRST/LAST)
work_mem, // 排序内存限制(由work_mem参数控制)
NULL, // 无自定义比较函数
tuplesortopts // 排序选项
);
else
tuplesortstate = tuplesort_begin_heap(
tupDesc, // 元组描述符
plannode->numCols, // 排序列数量
plannode->sortColIdx, // 排序列索引数组
plannode->sortOperators, // 排序操作符数组
plannode->collations, // 排序规则数组
plannode->nullsFirst, // NULL值排序位置数组
work_mem, // 内存限制
NULL, // 无自定义比较函数
tuplesortopts // 排序选项
);
// 如果启用有限排序(如LIMIT),设置最大元组数
if (node->bounded)
tuplesort_set_bound(tuplesortstate, node->bound);
node->tuplesortstate = (void *) tuplesortstate; // 将排序状态机保存到SortState
/*
* 从子节点拉取所有元组,根据排序类型(datumSort或元组排序)存入tuplesort
*/
if (node->datumSort) // 单列按值排序(Datum)
{
for (;;) // 无限循环,直到子节点返回空槽
{
slot = ExecProcNode(outerNode); // 从子节点获取元组
if (TupIsNull(slot)) // 若子节点无数据,结束循环
break;
slot_getsomeattrs(slot, 1); // 确保第一列数据已加载到槽中
// 将Datum值和是否为NULL存入排序器
tuplesort_putdatum(tuplesortstate, slot->tts_values[0], slot->tts_isnull[0]);
}
}
else // 多列或引用类型排序(元组)
{
for (;;)
{
slot = ExecProcNode(outerNode);
if (TupIsNull(slot))
break;
// 将完整元组存入排序器
tuplesort_puttupleslot(tuplesortstate, slot);
}
}
/*
* 执行最终排序(触发内存/磁盘排序逻辑)
*/
tuplesort_performsort(tuplesortstate);
/*
* 恢复用户指定的扫描方向(可能为反向,如ORDER BY ... DESC)
*/
estate->es_direction = dir;
/*
* 标记排序已完成,更新状态
*/
node->sort_Done = true; // 排序完成标志
node->bounded_Done = node->bounded; // 保存有限排序状态
node->bound_Done = node->bound; // 保存LIMIT值
// 如果是并行工作线程,收集排序统计信息
if (node->shared_info && node->am_worker)
{
TuplesortInstrumentation *si;
Assert(IsParallelWorker());
Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
si = &node->shared_info->sinstrument[ParallelWorkerNumber];
tuplesort_get_stats(tuplesortstate, si); // 获取排序统计信息(如内存使用、I/O次数)
}
SO1_printf("ExecSort: %s\n", "sorting done"); // 调试输出,标记排序完成
}
/*
* 从排序器中获取已排序的元组
*/
SO1_printf("ExecSort: %s\n", "retrieving tuple from tuplesort"); // 调试输出
slot = node->ss.ps.ps_ResultTupleSlot; // 获取结果槽(用于返回数据)
/*
* 根据排序类型调用不同的获取函数
*/
if (node->datumSort) // 单列按值排序(Datum)
{
ExecClearTuple(slot); // 清空结果槽
// 从排序器获取Datum值并填充到虚拟元组槽中
if (tuplesort_getdatum(tuplesortstate, ScanDirectionIsForward(dir),
&(slot->tts_values[0]), &(slot->tts_isnull[0]), NULL))
ExecStoreVirtualTuple(slot); // 将Datum转换为虚拟元组
}
else // 元组排序
{
// 直接从排序器获取元组槽(tuplesort管理槽的生命周期)
(void) tuplesort_gettupleslot(tuplesortstate, ScanDirectionIsForward(dir),
false, slot, NULL);
}
return slot; // 返回排序后的元组槽(可能为空表示无更多数据)
}
ExecEndSort 函数
此函数是PostgreSQL
执行器中Sort
算子的资源清理函数,属于执行器四阶段模型中的终止阶段(ExecutorEnd
),核心作用包括:
- 元组槽清理:清空扫描槽(
ScanTupleSlot
)和结果槽(ResultTupleSlot
),释放其占用的内存。 - 排序资源释放:
- 若已初始化排序状态机(
tuplesortstate
),调用tuplesort_end()
释放其内存缓冲区及临时磁盘文件(如外部排序生成的tape
文件)。 - 将
tuplesortstate
指针置空,确保后续操作不会误用已释放资源。
- 若已初始化排序状态机(
- 递归关闭子节点:通过
ExecEndNode()
递归调用下层算子(如SeqScan
)的清理函数,释放整个查询计划树的资源。
/*
* ----------------------------------------------------------------
* ExecEndSort
* ----------------------------------------------------------------
*/
void
ExecEndSort(SortState *node)
{
// 调试输出:标记开始关闭Sort节点
SO1_printf("ExecEndSort: %s\n", "shutting down sort node");
/*
* 清理元组表(TupleTable)中的扫描槽和结果槽
*/
ExecClearTuple(node->ss.ss_ScanTupleSlot); // 清空扫描槽(用于从子节点拉取数据)
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); // 清空结果槽(用于向上层返回排序后的数据)
/*
* 释放tuplesort模块占用的资源
*/
if (node->tuplesortstate != NULL)
{
tuplesort_end((Tuplesortstate *) node->tuplesortstate); // 调用tuplesort的清理函数,释放内存和临时文件
node->tuplesortstate = NULL; // 将排序状态机指针置空,防止悬空引用
}
/*
* 递归关闭子节点(如下层SeqScan或IndexScan)
*/
ExecEndNode(outerPlanState(node)); // 调用子节点的ExecEnd函数链式释放资源
// 调试输出:标记Sort节点关闭完成
SO1_printf("ExecEndSort: %s\n", "sort node shutdown");
}
tuplesort_performsort 函数
此函数是PostgreSQL
排序模块的核心调度器,负责根据当前排序状态(state->status
)选择不同的处理策略。主要功能包括:
-
多模式支持
- 串行模式:直接使用快速排序处理内存数据
- 并行
Worker
模式:将数据写入磁带文件,避免资源竞争 - 并行
Leader
模式:整合Worker
的中间结果执行多路归并
-
状态机管理
- 处理三种核心状态转换:
-
TSS_INITIAL
→ 根据并行配置选择排序策略 -
TSS_BOUNDED
→ 转换有界堆为有序数组 -
TSS_BUILDRUNS
→ 完成磁带数据合并
-
- 处理三种核心状态转换:
-
资源管理
- 通过
MemoryContextSwitchTo
确保内存分配在专用排序上下文 - 使用
inittapes/dumptuples
管理磁盘临时文件 - 通过
mergeruns
实现高效的多路归并算法
- 通过
-
调试支持
- 集成
TRACE_SORT
日志模块,可输出详细排序过程指标(如CPU
时间、I/O
次数)
- 集成
关键设计特点
- 分层处理:根据数据量自动选择内存排序或外部排序,当数据超过
work_mem
时触发磁带写入 - 并行优化:
Worker
仅负责局部排序,Leader
集中合并,充分利用多核性能 - 状态一致性:每次状态转换后重置迭代器位置(
current
,markpos
等),保证后续读取的正确性 - 资源隔离:通过专用内存上下文防止内存泄漏,确保异常时能正确清理资源
/*
* 执行排序的主入口函数,处理不同排序阶段的逻辑
*/
void
tuplesort_performsort(Tuplesortstate *state)
{
// 切换到排序专用的内存上下文,确保内存分配在正确环境中进行
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
#ifdef TRACE_SORT // 调试跟踪模块
if (trace_sort)
elog(LOG, "performsort of worker %d starting: %s",
state->worker, pg_rusage_show(&state->ru_start)); // 记录排序开始时的资源使用情况
#endif
// 根据当前排序状态选择处理逻辑
switch (state->status)
{
case TSS_INITIAL: // 初始状态(未开始排序)
if (SERIAL(state)) { // 串行模式
/* 单线程快速排序 */
tuplesort_sort_memtuples(state); // 对内存中的元组数组执行快速排序
state->status = TSS_SORTEDINMEM; // 更新状态为"内存已排序"
} else if (WORKER(state)) { // 并行工作线程模式
/*
* 并行Worker将数据写入磁带:
* 1. 初始化磁带存储(inittapes)
* 2. 将内存数据转储到磁带(dumptuples)
* 3. 标记不需要合并运行(worker_nomergeruns)
*/
inittapes(state, false); // 初始化磁带文件
dumptuples(state, true); // 将内存元组写入磁带
worker_nomergeruns(state); // 设置Worker不参与合并
state->status = TSS_SORTEDONTAPE; // 更新状态为"磁带已排序"
} else { // 并行Leader模式
/*
* Leader接管Worker的磁带并执行多路归并:
* 1. 接管磁带资源(leader_takeover_tapes)
* 2. 执行多路归并排序(mergeruns)
*/
leader_takeover_tapes(state); // 接管Worker的磁带资源
mergeruns(state); // 执行多路归并排序
}
// 重置迭代器状态
state->current = 0; // 重置当前读取位置
state->eof_reached = false; // 清除EOF标记
state->markpos_block = 0L; // 重置标记位置块号
state->markpos_offset = 0; // 重置标记位置偏移量
state->markpos_eof = false; // 清除标记EOF状态
break;
case TSS_BOUNDED: // 有界堆排序状态
/*
* 转换有界堆为有序数组:
* 1. 调用sort_bounded_heap处理堆结构
* 2. 更新状态为"内存已排序"
*/
sort_bounded_heap(state); // 将有界堆转换为有序数组
state->current = 0;
state->eof_reached = false;
state->markpos_offset = 0;
state->markpos_eof = false;
state->status = TSS_SORTEDINMEM; // 更新状态
break;
case TSS_BUILDRUNS: // 磁带构建运行状态
/*
* 完成磁带排序:
* 1. 将内存剩余元组写入磁带(dumptuples)
* 2. 执行多路归并(mergeruns)
*/
dumptuples(state, true); // 最终转储内存数据
mergeruns(state); // 执行磁带合并
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;
state->markpos_eof = false;
break;
default: // 无效状态处理
elog(ERROR, "invalid tuplesort state"); // 抛出错误
break;
}
#ifdef TRACE_SORT // 调试跟踪模块
if (trace_sort) {
if (state->status == TSS_FINALMERGE) // 最终合并阶段日志
elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
state->worker, state->nInputTapes, pg_rusage_show(&state->ru_start));
else // 常规完成日志
elog(LOG, "performsort of worker %d done: %s",
state->worker, pg_rusage_show(&state->ru_start));
}
#endif
// 恢复原始内存上下文
MemoryContextSwitchTo(oldcontext);
}
tuplestore_puttuple_common 函数
该函数是PostgreSQL
中tuplestore
机制的核心数据写入方法,负责在三种存储状态(内存模式、文件写入模式、文件读取模式)下管理元组的存储。主要功能包括:
- 在内存模式下动态扩容存储空间,当内存不足时自动切换至文件存储模式;
- 维护多个读取指针的位置状态,确保在模式切换或数据追加时保持读取一致性;
- 处理不同存储介质(内存/磁盘)的写入操作,实现透明的存储层切换。
通过资源所有者(ResourceOwner
)管理临时文件生命周期,确保事务安全。函数包含完整的错误检测机制,在文件操作失败时抛出标准数据库错误。该实现支持正向和反向扫描,通过dumptuples
实现内存数据批量转储优化,是多阶段查询处理(如窗口函数、哈希连接等)中大数据集暂存的关键基础组件。
static void
tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
{
TSReadPointer *readptr; // 定义读取指针
int i; // 循环计数器
ResourceOwner oldowner; // 临时保存当前资源所有者
state->tuples++; // 增加总元组计数器
switch (state->status) // 根据当前存储状态处理
{
case TSS_INMEM: // 内存模式处理
/*
* 更新所有需要重置的读取指针:
* 当某个读指针已到达EOF且不是当前活动指针时,
* 将其重置到最新元组位置
*/
readptr = state->readptrs;
for (i = 0; i < state->readptrcount; readptr++, i++)
{
if (readptr->eof_reached && i != state->activeptr)
{
readptr->eof_reached = false;
readptr->current = state->memtupcount;
}
}
/*
* 内存扩展策略:
* 当内存槽即将用尽时尝试扩容(保留最后一个槽位)
* 扩容失败仍能存储当前元组,之后切换为文件模式
*/
if (state->memtupcount >= state->memtupsize - 1)
{
(void) grow_memtuples(state); // 执行内存扩容
Assert(state->memtupcount < state->memtupsize);
}
// 将元组存入内存数组
state->memtuples[state->memtupcount++] = tuple;
/*
* 如果仍有可用内存且未超过容量限制,
* 直接返回保持内存模式
*/
if (state->memtupcount < state->memtupsize && !LACKMEM(state))
return;
/*
* 内存不足处理:切换到文件模式
* 1. 确保临时文件创建在正确的表空间
* 2. 关联资源所有者管理文件生命周期
*/
PrepareTempTablespaces(); // 准备临时表空间
// 切换资源所有者上下文
oldowner = CurrentResourceOwner;
CurrentResourceOwner = state->resowner;
// 创建临时文件句柄(跨事务处理)
state->myfile = BufFileCreateTemp(state->interXact);
CurrentResourceOwner = oldowner; // 恢复原资源所有者
/*
* 确定存储格式:
* 根据执行方向标志决定是否存储元组长度信息
* 此设置一旦确定不可更改
*/
state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
state->status = TSS_WRITEFILE; // 切换为文件写入模式
dumptuples(state); // 将内存数据转储到文件
break;
case TSS_WRITEFILE: // 文件写入模式
/*
* 更新读取指针位置:
* 使用低成本的BufFileTell获取当前文件位置,
* 用于后续恢复读取点
*/
readptr = state->readptrs;
for (i = 0; i < state->readptrcount; readptr++, i++)
{
if (readptr->eof_reached && i != state->activeptr)
{
readptr->eof_reached = false;
BufFileTell(state->myfile,
&readptr->file,
&readptr->offset);
}
}
WRITETUP(state, tuple); // 执行实际的元组写入操作
break;
case TSS_READFILE: // 文件读取模式
/*
* 模式切换处理:
* 1. 保存当前活动指针的读取位置
* 2. 将文件指针定位到写入位置
*/
if (!state->readptrs[state->activeptr].eof_reached)
BufFileTell(state->myfile,
&state->readptrs[state->activeptr].file,
&state->readptrs[state->activeptr].offset);
// 定位到文件写入位置
if (BufFileSeek(state->myfile,
state->writepos_file, state->writepos_offset,
SEEK_SET) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek in tuplestore temporary file")));
state->status = TSS_WRITEFILE; // 切换为写入模式
// 更新所有非活动读指针的位置为当前写入点
readptr = state->readptrs;
for (i = 0; i < state->readptrcount; readptr++, i++)
{
if (readptr->eof_reached && i != state->activeptr)
{
readptr->eof_reached = false;
readptr->file = state->writepos_file;
readptr->offset = state->writepos_offset;
}
}
WRITETUP(state, tuple); // 写入元组
break;
default:
elog(ERROR, "invalid tuplestore state"); // 无效状态异常
break;
}
}
tuplestore_gettuple 函数
这个函数是 PostgreSQL
中 tuplestore
模块的核心函数之一,用于从 tuplestore
中获取下一个元组,支持正向和反向扫描。该函数的核心目标是从 tuplestore
中按顺序(正向或反向)读取元组,支持内存和文件两种存储模式,处理状态切换与多指针管理,确保数据访问的一致性和高效性。
static void *
tuplestore_gettuple(Tuplestorestate *state, bool forward, bool *should_free)
{
TSReadPointer *readptr = &state->readptrs[state->activeptr]; // 当前活动读取指针
unsigned int tuplen; // 元组长度
void *tup; // 元组数据
Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD)); // 反向扫描需预置标志
switch (state->status) // 根据存储状态处理
{
case TSS_INMEM: // 内存模式
*should_free = false; // 内存元组无需调用者释放
if (forward) // 正向扫描
{
if (readptr->eof_reached) // 已到末尾则返回NULL
return NULL;
if (readptr->current < state->memtupcount) // 存在未读元组
return state->memtuples[readptr->current++]; // 返回当前元组并后移指针
readptr->eof_reached = true; // 标记EOF
return NULL;
}
else // 反向扫描
{
if (readptr->eof_reached) // 若之前已到EOF
{
readptr->current = state->memtupcount; // 重置指针到末尾
readptr->eof_reached = false;
}
else // 正常反向遍历
{
if (readptr->current <= state->memtupdeleted) // 指针越界(已删除区域)
{
Assert(!state->truncated);
return NULL;
}
readptr->current--; // 回退指针到上一个元组
}
if (readptr->current <= state->memtupdeleted) // 再次检查越界
{
Assert(!state->truncated);
return NULL;
}
return state->memtuples[readptr->current - 1]; // 返回前一元组
}
break;
case TSS_WRITEFILE: // 文件写入模式
if (readptr->eof_reached && forward) // 正向且已到EOF则直接返回
return NULL;
// 切换到读取模式:保存写入位置,定位到读指针位置
BufFileTell(state->myfile, &state->writepos_file, &state->writepos_offset);
if (!readptr->eof_reached) // 若未到EOF,定位读指针
if (BufFileSeek(state->myfile, readptr->file, readptr->offset, SEEK_SET) != 0)
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not seek in tuplestore temporary file")));
state->status = TSS_READFILE; // 切换为读取模式
/* FALLTHROUGH */ // 继续执行下一case
case TSS_READFILE: // 文件读取模式
*should_free = true; // 文件元组需调用者释放内存
if (forward) // 正向读取
{
if ((tuplen = getlen(state, true)) != 0) // 读取元组长度
{
tup = READTUP(state, tuplen); // 从文件读取元组
return tup;
}
else // 长度为0表示EOF
{
readptr->eof_reached = true;
return NULL;
}
}
// 反向读取逻辑
if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int), SEEK_CUR) != 0)
{
// 定位失败(如文件头),清除EOF状态并返回NULL
readptr->eof_reached = false;
Assert(!state->truncated);
return NULL;
}
tuplen = getlen(state, false); // 读取前一元组长度
if (readptr->eof_reached) // 之前处于EOF状态
{
readptr->eof_reached = false; // 重置状态
// 返回最后一个元组
}
else
{
// 定位到前一元组的起始位置
if (BufFileSeek(state->myfile, 0, -(long)(tuplen + 2 * sizeof(unsigned int)), SEEK_CUR) != 0)
{
// 处理边界情况(如第一个元组)
if (BufFileSeek(state->myfile, 0, -(long)(tuplen + sizeof(unsigned int)), SEEK_CUR) != 0)
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not seek in tuplestore temporary file")));
Assert(!state->truncated);
return NULL;
}
tuplen = getlen(state, false); // 再次读取长度
}
// 定位到元组数据起始点并读取
if (BufFileSeek(state->myfile, 0, -(long)tuplen, SEEK_CUR) != 0)
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not seek in tuplestore temporary file")));
tup = READTUP(state, tuplen); // 读取元组
return tup;
default:
elog(ERROR, "invalid tuplestore state"); // 非法状态异常
return NULL;
}
}