当前位置: 首页 > article >正文

【PostgreSQL内核学习 —— (sort算子)】

sort算子

  • 概述
  • ExecInitSort 函数
  • ExecSort 函数
  • ExecEndSort 函数
  • tuplesort_performsort 函数
  • tuplestore_puttuple_common 函数
  • tuplestore_gettuple 函数

声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 postgresql-15.0 的开源代码和《PostgresSQL数据库内核分析》一书

概述

  在PostgreSQL中,Sort 算子用于对输入数据进行排序以满足查询的ORDER BY要求,其执行流程与执行器框架紧密关联。当执行器启动时,Sort算子会初始化内存缓冲区(由work_mem参数控制),通过下层算子(如SeqScan)逐行拉取数据并存储为SortTuple结构。若内存不足,数据会被分块排序后写入磁盘临时文件(称为“tape”),采用外部归并排序External Merge Sort)完成最终排序。具体流程包括:

  1. 内存排序阶段: 优先使用快速排序(quicksort)对内存中的数据块进行排序,并支持增量排序(Incremental Sort)以利用索引预排序优化多字段排序。

参考文章:PostgreSQL 17增量排序、PostgreSQL中的sort算子及所用到的外排实现原理探究(二)

  1. 外部排序阶段: 当数据量超出work_mem限制时,将数据分段(run)写入磁盘,每段单独排序后通过多路归并(k-way merge)生成全局有序结果。

参考文章:PostgreSQL中的sort算子所用到的外排实现原理探究(一)、PostgreSQL中的sort算子及所用到的外排实现原理探究(二)

  1. 结果输出: 最终有序数据通过执行器的Pull模型逐行返回给上层算子(如LimitMerge Join)。

参考文章:PostgreSQL技术内幕(十六)如何写一个执行器算子?、理解PG如何执行一个查询

Sort算子作为执行器的算子,其执行框架遵循四阶段模型:

  • ​初始化​(ExecutorStart):分配内存并初始化排序状态。
  • ​运行​(ExecutorRun):触发下层算子的数据拉取,执行排序逻辑。
  • ​统计与清理​(ExecutorFinish):收集执行统计信息并释放临时资源。
  • ​ 终止​(ExecutorEnd):释放内存和文件句柄。
      与其他算子(如HashJoinIndexScan)的协作通过统一的执行器接口实现,所有算子均嵌入查询计划树中,由Portal结构统一调度,通过递归调用ExecProcNode驱动数据流动。这种设计使得Sort算子既能独立处理排序任务,又能与其他算子高效协同,形成完整的查询执行流水线。

ExecInitSort 函数

  ExecInitSort函数是PostgreSQL执行器中Sort算子的初始化函数(ExecInitSort),属于执行器四阶段模型中的初始化阶段(ExecutorStart)​。其主要功能包括:

  1. 状态初始化: 创建SortState结构体,关联查询计划节点Sort *node)和全局执行状态EState *estate),并设置核心处理函数ExecSort(后续实际排序逻辑的入口)。
  2. 随机访问控制: 根据执行标志eflags)决定是否支持结果集回退标记/恢复等操作,若需要则启用randomAccess并强制物化排序结果。
  3. ​子节点初始化: 通过递归调用ExecInitNode初始化下层算子(如Scan节点),并屏蔽其不需要处理的执行标志(如REWIND)以简化逻辑。
  4. ​元组处理配置:
    • 创建扫描槽从子节点拉取数据,使用虚拟元组格式TTSOpsVirtual)减少内存拷贝。
    • 初始化结果槽为最小化元组格式(TTSOpsMinimalTuple),避免冗余数据存储。
  5. ​排序类型选择: 根据子节点输出的元组描述符(outerTupDesc)判断是否采用高效的Datum排序(单列按值传递场景),否则使用通用元组排序。

  此函数为Sort算子的执行阶段(如内存/磁盘排序、结果输出)完成准备工作,确保执行器能够通过统一的接口(如ExecProcNode)驱动数据流动,并与其他算子(如Limit、Merge Join)协作形成完整的查询流水线。其设计体现了PostgreSQL执行器框架的核心思想:通过分层初始化和状态封装,将复杂操作(如排序)抽象为独立的阻塞型算子,同时保持执行过程的高效与可控性。

// 函数定义:初始化Sort节点,返回SortState结构体指针
SortState *
ExecInitSort(Sort *node, EState *estate, int eflags)
{
    SortState  *sortstate;
    TupleDesc   outerTupDesc;

    // 调试输出,标记初始化开始
    SO1_printf("ExecInitSort: %s\n", "initializing sort node");

    /* 
     * 创建Sort节点的运行时状态结构体SortState
     */
    sortstate = makeNode(SortState);  // 分配内存
    sortstate->ss.ps.plan = (Plan *) node;  // 关联查询计划中的Sort节点
    sortstate->ss.ps.state = estate;        // 关联执行器全局状态
    sortstate->ss.ps.ExecProcNode = ExecSort; // 设置处理函数为ExecSort(实际排序逻辑的入口)

    /*
     * 根据执行标志(eflags)判断是否需要支持随机访问(如回退扫描、标记/恢复操作)
     * 若需要,则启用`randomAccess`,并强制物化排序结果以支持多次回放
     */
    sortstate->randomAccess = (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) != 0;

    // 初始化状态变量:非边界排序、未完成排序、暂未初始化排序状态
    sortstate->bounded = false;         // 不限制排序结果数量(与LIMIT无关)
    sortstate->sort_Done = false;       // 标记排序是否完成
    sortstate->tuplesortstate = NULL;   // 排序状态机(后续由tuplesort模块初始化)

    /*
     * 跳过表达式上下文(ExprContext)初始化,因为Sort节点不涉及条件过滤(ExecQual)或投影(ExecProject)
     */

    /*
     * 初始化子节点(下层算子,如SeqScan或IndexScan)
     * 通过屏蔽`eflags`中的REWIND、BACKWARD、MARK标志,避免子节点处理这些复杂逻辑
     */
    eflags &= ~(EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK);
    outerPlanState(sortstate) = ExecInitNode(outerPlan(node), estate, eflags);  // 递归初始化子节点

    /*
     * 创建扫描槽(ScanSlot),用于从子节点拉取数据
     * 使用虚拟元组表(TTSOpsVirtual)作为默认存储格式
     */
    ExecCreateScanSlotFromOuterPlan(estate, &sortstate->ss, &TTSOpsVirtual);

    /*
     * 初始化结果槽(ResultSlot),用于向上层返回排序后的数据
     * 使用最小化元组格式(TTSOpsMinimalTuple)以节省内存
     * 不设置投影信息(ps_ProjInfo),因为Sort节点不修改数据内容
     */
    ExecInitResultTupleSlotTL(&sortstate->ss.ps, &TTSOpsMinimalTuple);
    sortstate->ss.ps.ps_ProjInfo = NULL;

    // 获取子节点的输出元组描述符(用于判断排序类型)
    outerTupDesc = ExecGetResultType(outerPlanState(sortstate));

    /*
     * 判断是否启用Datum排序(仅当子节点输出单个按值传递的列时适用)
     * 否则使用元组排序(通用场景)
     */
    if (outerTupDesc->natts == 1 && TupleDescAttr(outerTupDesc, 0)->attbyval)
        sortstate->datumSort = true;   // 单列按值排序(效率更高)
    else
        sortstate->datumSort = false;  // 多列或按引用排序

    // 调试输出,标记初始化完成
    SO1_printf("ExecInitSort: %s\n", "sort node initialized");

    return sortstate;  // 返回初始化完成的SortState
}

ExecSort 函数

  ExecSort 函数是PostgreSQL执行器中Sort算子的核心执行函数(ExecSort),属于执行器四阶段模型中的运行阶段ExecutorRun)​,其核心作用是通过tuplesort模块对输入数据进行排序,并按需向上层返回有序元组。具体流程如下:

  1. 首次执行逻辑排序初始化):
    • 子节点数据拉取: 通过ExecProcNode递归调用下层算子(如SeqScan)逐行获取所有元组。
    • ​​排序器初始化: 根据排序类型(单列Datum排序或多列元组排序)选择tuplesort_begin_datumtuplesort_begin_heap,并配置排序选项(如内存限制随机访问支持)。
    • ​​数据填充: 将子节点数据存入排序器tuplesort_putdatumtuplesort_puttupleslot)。
    • ​​排序触发: 调用tuplesort_performsort触发排序逻辑(可能涉及内存快速排序或外部归并排序)。
    • ​​状态更新: 标记排序完成,处理并行场景的统计信息收集。
  2. ​后续执行逻辑(结果返回)​
    • ​​​元组提取: 根据当前扫描方向(由ORDER BY决定)从排序器中提取有序元组
      • ​​​Datum排序: 手动构造虚拟元组槽,将Datum值和NULL标记填充到槽中。
      • ​元组排序: 直接通过tuplesort_gettupleslot获取完整元组。
  3. 关键特性:
    • ​阻塞型算子: 首次调用需拉取并排序所有数据,后续调用仅返回结果,符合执行器的Pull模型。
    • 多场景支持:
      • ​​支持正向/反向扫描(如ORDER BY ... ASC/DESC)。
      • ​​支持有限排序LIMIT优化)和并行排序统计
    • 性能优化: 单列Datum排序避免元组开销,提升效率。
  4. 与执行器框架的集成:
    • 通过统一的ExecProcNode接口与上下层算子(如SeqScan、Limit)交互。
    • 依赖tuplesort模块实现跨内存与磁盘的高效排序,屏蔽底层复杂性。

  此函数体现了PostgreSQL执行器的核心设计原则:通过模块化(如tuplesort)和状态机管理(SortState),将复杂操作(如排序)封装为可重用的算子,同时保持执行过程的高效性与灵活性。

// 函数定义:执行排序操作,返回排序后的元组槽
static TupleTableSlot *
ExecSort(PlanState *pstate)
{
    SortState  *node = castNode(SortState, pstate);  // 强制转换为SortState类型
    EState     *estate;                              // 全局执行状态
    ScanDirection dir;                               // 当前扫描方向(正向/反向)
    Tuplesortstate *tuplesortstate;                  // 排序状态机(由tuplesort模块管理)
    TupleTableSlot *slot;                            // 元组槽(用于接收子节点数据或返回结果)

    CHECK_FOR_INTERRUPTS();  // 检查查询是否被中断(如用户取消操作)

    /* 
     * 从SortState节点中获取状态信息 
     */
    SO1_printf("ExecSort: %s\n", "entering routine");  // 调试输出,标记进入函数

    estate = node->ss.ps.state;          // 获取全局执行状态(EState)
    dir = estate->es_direction;          // 当前扫描方向(由查询的ORDER BY决定)
    tuplesortstate = (Tuplesortstate *) node->tuplesortstate;  // 排序状态机指针

    /*
     * 首次执行时,从子节点拉取所有元组并排序;后续调用直接返回已排序的元组
     */
    if (!node->sort_Done)  // 检查是否已完成排序初始化
    {
        Sort       *plannode = (Sort *) node->ss.ps.plan;  // 获取查询计划中的Sort节点
        PlanState  *outerNode;          // 子节点(如SeqScan)
        TupleDesc   tupDesc;            // 子节点输出的元组描述符
        int         tuplesortopts = TUPLESORT_NONE;  // 排序选项(默认为无特殊选项)

        SO1_printf("ExecSort: %s\n", "sorting subplan");  // 调试输出,标记开始排序

        /* 
         * 设置子节点扫描方向为正向(确保数据按顺序拉取) 
         */
        estate->es_direction = ForwardScanDirection;

        /* 
         * 初始化tuplesort模块(核心排序逻辑) 
         */
        SO1_printf("ExecSort: %s\n", "calling tuplesort_begin");  // 调试输出

        outerNode = outerPlanState(node);  // 获取子节点状态
        tupDesc = ExecGetResultType(outerNode);  // 获取子节点输出的元组描述符

        // 根据SortState配置排序选项
        if (node->randomAccess)
            tuplesortopts |= TUPLESORT_RANDOMACCESS;  // 启用随机访问(支持回退等操作)
        if (node->bounded)
            tuplesortopts |= TUPLESORT_ALLOWBOUNDED;   // 启用有限排序(如LIMIT优化)

        // 根据是否单列按值排序(datumSort),选择不同的排序初始化方法
        if (node->datumSort)
            tuplesortstate = tuplesort_begin_datum(
                TupleDescAttr(tupDesc, 0)->atttypid,   // 列的数据类型OID
                plannode->sortOperators[0],            // 排序操作符(如>或<)
                plannode->collations[0],              // 排序规则(如COLLATE)
                plannode->nullsFirst[0],               // NULL值排序位置(FIRST/LAST)
                work_mem,                             // 排序内存限制(由work_mem参数控制)
                NULL,                                 // 无自定义比较函数
                tuplesortopts                         // 排序选项
            );
        else
            tuplesortstate = tuplesort_begin_heap(
                tupDesc,                              // 元组描述符
                plannode->numCols,                    // 排序列数量
                plannode->sortColIdx,                 // 排序列索引数组
                plannode->sortOperators,              // 排序操作符数组
                plannode->collations,                 // 排序规则数组
                plannode->nullsFirst,                 // NULL值排序位置数组
                work_mem,                            // 内存限制
                NULL,                                // 无自定义比较函数
                tuplesortopts                        // 排序选项
            );

        // 如果启用有限排序(如LIMIT),设置最大元组数
        if (node->bounded)
            tuplesort_set_bound(tuplesortstate, node->bound);

        node->tuplesortstate = (void *) tuplesortstate;  // 将排序状态机保存到SortState

        /*
         * 从子节点拉取所有元组,根据排序类型(datumSort或元组排序)存入tuplesort
         */
        if (node->datumSort)  // 单列按值排序(Datum)
        {
            for (;;)  // 无限循环,直到子节点返回空槽
            {
                slot = ExecProcNode(outerNode);  // 从子节点获取元组

                if (TupIsNull(slot))  // 若子节点无数据,结束循环
                    break;

                slot_getsomeattrs(slot, 1);  // 确保第一列数据已加载到槽中
                // 将Datum值和是否为NULL存入排序器
                tuplesort_putdatum(tuplesortstate, slot->tts_values[0], slot->tts_isnull[0]);
            }
        }
        else  // 多列或引用类型排序(元组)
        {
            for (;;)
            {
                slot = ExecProcNode(outerNode);

                if (TupIsNull(slot))
                    break;

                // 将完整元组存入排序器
                tuplesort_puttupleslot(tuplesortstate, slot);
            }
        }

        /*
         * 执行最终排序(触发内存/磁盘排序逻辑)
         */
        tuplesort_performsort(tuplesortstate);

        /*
         * 恢复用户指定的扫描方向(可能为反向,如ORDER BY ... DESC)
         */
        estate->es_direction = dir;

        /*
         * 标记排序已完成,更新状态
         */
        node->sort_Done = true;      // 排序完成标志
        node->bounded_Done = node->bounded;  // 保存有限排序状态
        node->bound_Done = node->bound;      // 保存LIMIT值

        // 如果是并行工作线程,收集排序统计信息
        if (node->shared_info && node->am_worker)
        {
            TuplesortInstrumentation *si;

            Assert(IsParallelWorker());
            Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
            si = &node->shared_info->sinstrument[ParallelWorkerNumber];
            tuplesort_get_stats(tuplesortstate, si);  // 获取排序统计信息(如内存使用、I/O次数)
        }

        SO1_printf("ExecSort: %s\n", "sorting done");  // 调试输出,标记排序完成
    }

    /*
     * 从排序器中获取已排序的元组
     */
    SO1_printf("ExecSort: %s\n", "retrieving tuple from tuplesort");  // 调试输出

    slot = node->ss.ps.ps_ResultTupleSlot;  // 获取结果槽(用于返回数据)

    /*
     * 根据排序类型调用不同的获取函数
     */
    if (node->datumSort)  // 单列按值排序(Datum)
    {
        ExecClearTuple(slot);  // 清空结果槽
        // 从排序器获取Datum值并填充到虚拟元组槽中
        if (tuplesort_getdatum(tuplesortstate, ScanDirectionIsForward(dir),
                               &(slot->tts_values[0]), &(slot->tts_isnull[0]), NULL))
            ExecStoreVirtualTuple(slot);  // 将Datum转换为虚拟元组
    }
    else  // 元组排序
    {
        // 直接从排序器获取元组槽(tuplesort管理槽的生命周期)
        (void) tuplesort_gettupleslot(tuplesortstate, ScanDirectionIsForward(dir),
                                      false, slot, NULL);
    }

    return slot;  // 返回排序后的元组槽(可能为空表示无更多数据)
}

ExecEndSort 函数

  此函数是PostgreSQL执行器中Sort算子的资源清理函数,属于执行器四阶段模型中的终止阶段ExecutorEnd)​,核心作用包括:

  1. 元组槽清理清空扫描槽ScanTupleSlot)和结果槽ResultTupleSlot),释放其占用的内存。
  2. 排序资源释放
    • 若已初始化排序状态机(tuplesortstate),调用tuplesort_end()释放其内存缓冲区及临时磁盘文件(如外部排序生成的tape文件)。
    • tuplesortstate指针置空,确保后续操作不会误用已释放资源。
  3. ​递归关闭子节点:通过ExecEndNode()递归调用下层算子(如SeqScan)的清理函数,释放整个查询计划树的资源。
/*
 * ----------------------------------------------------------------
 *		ExecEndSort
 * ----------------------------------------------------------------
 */
void
ExecEndSort(SortState *node)
{
    // 调试输出:标记开始关闭Sort节点
    SO1_printf("ExecEndSort: %s\n", "shutting down sort node");

    /*
     * 清理元组表(TupleTable)中的扫描槽和结果槽
     */
    ExecClearTuple(node->ss.ss_ScanTupleSlot);      // 清空扫描槽(用于从子节点拉取数据)
    ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); // 清空结果槽(用于向上层返回排序后的数据)

    /*
     * 释放tuplesort模块占用的资源
     */
    if (node->tuplesortstate != NULL)
    {
        tuplesort_end((Tuplesortstate *) node->tuplesortstate);  // 调用tuplesort的清理函数,释放内存和临时文件
        node->tuplesortstate = NULL;  // 将排序状态机指针置空,防止悬空引用
    }

    /*
     * 递归关闭子节点(如下层SeqScan或IndexScan)
     */
    ExecEndNode(outerPlanState(node));  // 调用子节点的ExecEnd函数链式释放资源

    // 调试输出:标记Sort节点关闭完成
    SO1_printf("ExecEndSort: %s\n", "sort node shutdown");
}

tuplesort_performsort 函数

  此函数是PostgreSQL排序模块的核心调度器,负责根据当前排序状态(state->status)选择不同的处理策略。主要功能包括:

  1. 多模式支持

    • 串行模式:直接使用快速排序处理内存数据
    • ​​并行Worker模式:将数据写入磁带文件,避免资源竞争
    • ​​并行Leader模式:整合Worker的中间结果执行多路归并
  2. 状态机管理

    • ​​处理三种核心状态转换:
      • TSS_INITIAL → 根据并行配置选择排序策略
      • TSS_BOUNDED → 转换有界堆为有序数组
      • TSS_BUILDRUNS → 完成磁带数据合并
  3. 资源管理

    • ​​通过MemoryContextSwitchTo确保内存分配在专用排序上下文
    • ​​使用inittapes/dumptuples管理磁盘临时文件
    • ​​通过mergeruns实现高效的多路归并算法
  4. 调试支持

    • ​​集成TRACE_SORT日志模块,可输出详细排序过程指标(如CPU时间、I/O次数)

关键设计特点

  • ​​分层处理:根据数据量自动选择内存排序或外部排序,当数据超过work_mem时触发磁带写入
  • ​​并行优化Worker仅负责局部排序,Leader集中合并,充分利用多核性能
  • ​​​状态一致性:每次状态转换后重置迭代器位置(current, markpos等),保证后续读取的正确性
  • ​​​资源隔离:通过专用内存上下文防止内存泄漏,确保异常时能正确清理资源
/*
 * 执行排序的主入口函数,处理不同排序阶段的逻辑
 */
void
tuplesort_performsort(Tuplesortstate *state)
{
    // 切换到排序专用的内存上下文,确保内存分配在正确环境中进行
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);

#ifdef TRACE_SORT  // 调试跟踪模块
    if (trace_sort)
        elog(LOG, "performsort of worker %d starting: %s",
             state->worker, pg_rusage_show(&state->ru_start));  // 记录排序开始时的资源使用情况
#endif

    // 根据当前排序状态选择处理逻辑
    switch (state->status)
    {
        case TSS_INITIAL:  // 初始状态(未开始排序)
            if (SERIAL(state)) {  // 串行模式
                /* 单线程快速排序 */
                tuplesort_sort_memtuples(state);  // 对内存中的元组数组执行快速排序
                state->status = TSS_SORTEDINMEM;  // 更新状态为"内存已排序"
            } else if (WORKER(state)) {  // 并行工作线程模式
                /* 
                 * 并行Worker将数据写入磁带:
                 * 1. 初始化磁带存储(inittapes)
                 * 2. 将内存数据转储到磁带(dumptuples)
                 * 3. 标记不需要合并运行(worker_nomergeruns)
                 */
                inittapes(state, false);          // 初始化磁带文件
                dumptuples(state, true);          // 将内存元组写入磁带
                worker_nomergeruns(state);        // 设置Worker不参与合并
                state->status = TSS_SORTEDONTAPE;  // 更新状态为"磁带已排序"
            } else {  // 并行Leader模式
                /* 
                 * Leader接管Worker的磁带并执行多路归并:
                 * 1. 接管磁带资源(leader_takeover_tapes)
                 * 2. 执行多路归并排序(mergeruns)
                 */
                leader_takeover_tapes(state);     // 接管Worker的磁带资源
                mergeruns(state);                 // 执行多路归并排序
            }
            // 重置迭代器状态
            state->current = 0;                   // 重置当前读取位置
            state->eof_reached = false;           // 清除EOF标记
            state->markpos_block = 0L;            // 重置标记位置块号
            state->markpos_offset = 0;            // 重置标记位置偏移量
            state->markpos_eof = false;           // 清除标记EOF状态
            break;

        case TSS_BOUNDED:  // 有界堆排序状态
            /* 
             * 转换有界堆为有序数组:
             * 1. 调用sort_bounded_heap处理堆结构
             * 2. 更新状态为"内存已排序"
             */
            sort_bounded_heap(state);             // 将有界堆转换为有序数组
            state->current = 0;
            state->eof_reached = false;
            state->markpos_offset = 0;
            state->markpos_eof = false;
            state->status = TSS_SORTEDINMEM;      // 更新状态
            break;

        case TSS_BUILDRUNS:  // 磁带构建运行状态
            /*
             * 完成磁带排序:
             * 1. 将内存剩余元组写入磁带(dumptuples)
             * 2. 执行多路归并(mergeruns)
             */
            dumptuples(state, true);              // 最终转储内存数据
            mergeruns(state);                     // 执行磁带合并
            state->eof_reached = false;
            state->markpos_block = 0L;
            state->markpos_offset = 0;
            state->markpos_eof = false;
            break;

        default:  // 无效状态处理
            elog(ERROR, "invalid tuplesort state");  // 抛出错误
            break;
    }

#ifdef TRACE_SORT  // 调试跟踪模块
    if (trace_sort) {
        if (state->status == TSS_FINALMERGE)  // 最终合并阶段日志
            elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
                 state->worker, state->nInputTapes, pg_rusage_show(&state->ru_start));
        else  // 常规完成日志
            elog(LOG, "performsort of worker %d done: %s",
                 state->worker, pg_rusage_show(&state->ru_start));
    }
#endif

    // 恢复原始内存上下文
    MemoryContextSwitchTo(oldcontext);
}

tuplestore_puttuple_common 函数

  该函数是PostgreSQLtuplestore机制的核心数据写入方法负责在三种存储状态(内存模式、文件写入模式、文件读取模式)下管理元组的存储。主要功能包括:

  1. 在内存模式下动态扩容存储空间,当内存不足时自动切换至文件存储模式
  2. 维护多个读取指针的位置状态,确保在模式切换或数据追加时保持读取一致性
  3. 处理不同存储介质(内存/磁盘)的写入操作,实现透明的存储层切换。

  通过资源所有者(ResourceOwner)管理临时文件生命周期,确保事务安全。函数包含完整的错误检测机制,在文件操作失败时抛出标准数据库错误。该实现支持正向和反向扫描,通过dumptuples实现内存数据批量转储优化,是多阶段查询处理(如窗口函数、哈希连接等)中大数据集暂存的关键基础组件。

static void
tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
{
	TSReadPointer *readptr;  // 定义读取指针
	int			i;            // 循环计数器
	ResourceOwner oldowner;  // 临时保存当前资源所有者

	state->tuples++;  // 增加总元组计数器

	switch (state->status)  // 根据当前存储状态处理
	{
		case TSS_INMEM:  // 内存模式处理
			/* 
			 * 更新所有需要重置的读取指针:
			 * 当某个读指针已到达EOF且不是当前活动指针时,
			 * 将其重置到最新元组位置
			 */
			readptr = state->readptrs;
			for (i = 0; i < state->readptrcount; readptr++, i++)
			{
				if (readptr->eof_reached && i != state->activeptr)
				{
					readptr->eof_reached = false;
					readptr->current = state->memtupcount;
				}
			}

			/* 
			 * 内存扩展策略:
			 * 当内存槽即将用尽时尝试扩容(保留最后一个槽位)
			 * 扩容失败仍能存储当前元组,之后切换为文件模式
			 */
			if (state->memtupcount >= state->memtupsize - 1)
			{
				(void) grow_memtuples(state);  // 执行内存扩容
				Assert(state->memtupcount < state->memtupsize);
			}

			// 将元组存入内存数组
			state->memtuples[state->memtupcount++] = tuple;

			/* 
			 * 如果仍有可用内存且未超过容量限制,
			 * 直接返回保持内存模式
			 */
			if (state->memtupcount < state->memtupsize && !LACKMEM(state))
				return;

			/* 
			 * 内存不足处理:切换到文件模式
			 * 1. 确保临时文件创建在正确的表空间
			 * 2. 关联资源所有者管理文件生命周期
			 */
			PrepareTempTablespaces();  // 准备临时表空间

			// 切换资源所有者上下文
			oldowner = CurrentResourceOwner;
			CurrentResourceOwner = state->resowner;

			// 创建临时文件句柄(跨事务处理)
			state->myfile = BufFileCreateTemp(state->interXact);

			CurrentResourceOwner = oldowner;  // 恢复原资源所有者

			/* 
			 * 确定存储格式:
			 * 根据执行方向标志决定是否存储元组长度信息
			 * 此设置一旦确定不可更改
			 */
			state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
			state->status = TSS_WRITEFILE;  // 切换为文件写入模式
			dumptuples(state);  // 将内存数据转储到文件
			break;

		case TSS_WRITEFILE:  // 文件写入模式
			/* 
			 * 更新读取指针位置:
			 * 使用低成本的BufFileTell获取当前文件位置,
			 * 用于后续恢复读取点
			 */
			readptr = state->readptrs;
			for (i = 0; i < state->readptrcount; readptr++, i++)
			{
				if (readptr->eof_reached && i != state->activeptr)
				{
					readptr->eof_reached = false;
					BufFileTell(state->myfile,
								&readptr->file,
								&readptr->offset);
				}
			}

			WRITETUP(state, tuple);  // 执行实际的元组写入操作
			break;

		case TSS_READFILE:  // 文件读取模式
			/* 
			 * 模式切换处理:
			 * 1. 保存当前活动指针的读取位置
			 * 2. 将文件指针定位到写入位置
			 */
			if (!state->readptrs[state->activeptr].eof_reached)
				BufFileTell(state->myfile,
							&state->readptrs[state->activeptr].file,
							&state->readptrs[state->activeptr].offset);
			// 定位到文件写入位置
			if (BufFileSeek(state->myfile,
							state->writepos_file, state->writepos_offset,
							SEEK_SET) != 0)
				ereport(ERROR,
						(errcode_for_file_access(),
						 errmsg("could not seek in tuplestore temporary file")));
			state->status = TSS_WRITEFILE;  // 切换为写入模式

			// 更新所有非活动读指针的位置为当前写入点
			readptr = state->readptrs;
			for (i = 0; i < state->readptrcount; readptr++, i++)
			{
				if (readptr->eof_reached && i != state->activeptr)
				{
					readptr->eof_reached = false;
					readptr->file = state->writepos_file;
					readptr->offset = state->writepos_offset;
				}
			}

			WRITETUP(state, tuple);  // 写入元组
			break;

		default:
			elog(ERROR, "invalid tuplestore state");  // 无效状态异常
			break;
	}
}

tuplestore_gettuple 函数

  这个函数是 PostgreSQLtuplestore 模块的核心函数之一,用于tuplestore 中获取下一个元组,支持正向和反向扫描。该函数的核心目标是从 tuplestore 中按顺序(正向或反向)读取元组,支持内存和文件两种存储模式,处理状态切换与多指针管理,确保数据访问的一致性和高效性。

static void *
tuplestore_gettuple(Tuplestorestate *state, bool forward, bool *should_free)
{
    TSReadPointer *readptr = &state->readptrs[state->activeptr]; // 当前活动读取指针
    unsigned int tuplen;    // 元组长度
    void       *tup;        // 元组数据

    Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD)); // 反向扫描需预置标志

    switch (state->status)  // 根据存储状态处理
    {
        case TSS_INMEM:     // 内存模式
            *should_free = false;  // 内存元组无需调用者释放
            if (forward)    // 正向扫描
            {
                if (readptr->eof_reached) // 已到末尾则返回NULL
                    return NULL;
                if (readptr->current < state->memtupcount) // 存在未读元组
                    return state->memtuples[readptr->current++]; // 返回当前元组并后移指针
                readptr->eof_reached = true; // 标记EOF
                return NULL;
            }
            else    // 反向扫描
            {
                if (readptr->eof_reached) // 若之前已到EOF
                {
                    readptr->current = state->memtupcount;  // 重置指针到末尾
                    readptr->eof_reached = false;
                }
                else    // 正常反向遍历
                {
                    if (readptr->current <= state->memtupdeleted) // 指针越界(已删除区域)
                    {
                        Assert(!state->truncated);
                        return NULL;
                    }
                    readptr->current--; // 回退指针到上一个元组
                }
                if (readptr->current <= state->memtupdeleted) // 再次检查越界
                {
                    Assert(!state->truncated);
                    return NULL;
                }
                return state->memtuples[readptr->current - 1]; // 返回前一元组
            }
            break;

        case TSS_WRITEFILE: // 文件写入模式
            if (readptr->eof_reached && forward) // 正向且已到EOF则直接返回
                return NULL;

            // 切换到读取模式:保存写入位置,定位到读指针位置
            BufFileTell(state->myfile, &state->writepos_file, &state->writepos_offset);
            if (!readptr->eof_reached) // 若未到EOF,定位读指针
                if (BufFileSeek(state->myfile, readptr->file, readptr->offset, SEEK_SET) != 0)
                    ereport(ERROR, (errcode_for_file_access(),
                                    errmsg("could not seek in tuplestore temporary file")));
            state->status = TSS_READFILE; // 切换为读取模式
            /* FALLTHROUGH */           // 继续执行下一case

        case TSS_READFILE:  // 文件读取模式
            *should_free = true; // 文件元组需调用者释放内存
            if (forward)    // 正向读取
            {
                if ((tuplen = getlen(state, true)) != 0) // 读取元组长度
                {
                    tup = READTUP(state, tuplen); // 从文件读取元组
                    return tup;
                }
                else        // 长度为0表示EOF
                {
                    readptr->eof_reached = true;
                    return NULL;
                }
            }

            // 反向读取逻辑
            if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int), SEEK_CUR) != 0)
            {
                // 定位失败(如文件头),清除EOF状态并返回NULL
                readptr->eof_reached = false;
                Assert(!state->truncated);
                return NULL;
            }
            tuplen = getlen(state, false); // 读取前一元组长度

            if (readptr->eof_reached) // 之前处于EOF状态
            {
                readptr->eof_reached = false; // 重置状态
                // 返回最后一个元组
            }
            else
            {
                // 定位到前一元组的起始位置
                if (BufFileSeek(state->myfile, 0, -(long)(tuplen + 2 * sizeof(unsigned int)), SEEK_CUR) != 0)
                {
                    // 处理边界情况(如第一个元组)
                    if (BufFileSeek(state->myfile, 0, -(long)(tuplen + sizeof(unsigned int)), SEEK_CUR) != 0)
                        ereport(ERROR, (errcode_for_file_access(),
                                        errmsg("could not seek in tuplestore temporary file")));
                    Assert(!state->truncated);
                    return NULL;
                }
                tuplen = getlen(state, false); // 再次读取长度
            }

            // 定位到元组数据起始点并读取
            if (BufFileSeek(state->myfile, 0, -(long)tuplen, SEEK_CUR) != 0)
                ereport(ERROR, (errcode_for_file_access(),
                                errmsg("could not seek in tuplestore temporary file")));
            tup = READTUP(state, tuplen); // 读取元组
            return tup;

        default:
            elog(ERROR, "invalid tuplestore state"); // 非法状态异常
            return NULL;
    }
}

http://www.kler.cn/a/613091.html

相关文章:

  • 数据库同步中间件PanguSync:如何跳过初始数据直接进行增量同步
  • HCIP VRRP MSTP 交换综合实验
  • 5.Matplotlib:高级绘图
  • SvelteKit 最新中文文档教程(13)—— Hooks
  • RHCA核心课程技术解析4:红帽服务管理与自动化深度实践
  • Java EE 进阶:MyBatis案例练习
  • 有价值的面试问题
  • 端到端自动驾驶VLM模型:LMDrive: Closed-Loop End-to-End Driving with Large Language Models
  • 通过Bioconductor/BiocManager安装生物r包详解(问题汇总)
  • 01 相机标定与相机模型介绍
  • 25大唐杯赛道一本科B组大纲总结(上)
  • 【OCR】技术
  • 【新手初学】读取数据库数据
  • VMware 安装 Ubuntu 实战分享
  • 2025美国网络专线国内服务商推荐
  • Qt SQL-2
  • 陪伴就诊 APP 功能架构:如何通过特定模块筛选优秀陪诊师
  • vscode在使用 alt + tab 切换程序窗口时,输入法总是自动变为中文模式
  • 电脑切换不同WiFi时,ip地址会自动切换吗?‌
  • Spring Boot添加全局异常处理器捕捉异常 跳转登录页面