当前位置: 首页 > article >正文

PostgreSQL 插件的事务回调机制

背景图片

在 PostgreSQL 插件开发中,我们经常需要对数据库事务的不同阶段进行监控或控制;例如,在事务开始、提交、回滚时执行特定逻辑,这时就可以借助 PostgreSQL 提供的 XactCallback 机制来实现事务回调事件的钩入(hook)。本文将介绍事务回调机制的基础原理,以及在实际开发中如何实现一个简单的事务回调事件 Hook 插件。

01 事务的基础概念


1.1 事务

事务(Transaction)是数据库操作的基本单元,它确保一组操作可以作为一个整体被执行,事务的四个关键特性 ACID :

  • 原子性(Atomicity):事务中的操作要么全部执行成功,要么全部回滚,不会处于部分完成状态
  • 一致性(Consistency):事务完成后,数据库必须从一个一致状态转到另一个一致状态
  • 隔离性(Isolation):并发事务之间相互隔离,互不影响
  • 持久性(Durability):事务一旦提交,数据的更改会永久保存到数据库中

在逻辑层面用户通过事务块的方式描述事务,并通过事务控制命令管理事务的生命周期,包括开始事务、执行事务中的操作、以及结束事务:

  • BEGINSTART TRANSACTION:开始一个新的事务块,在 PG 中,事务默认是自动开始的,但可以使用这个命令显式地开始一个事务
  • COMMITEND:提交当前事务,使所有更改永久生效,并释放事务中使用的锁
  • ROLLBACK:回滚当前事务,撤销所有更改,并释放事务中使用的锁
  • SAVEPOINT 名称:(子事务)在当前事务中创建一个保存点,可以用于部分回滚事务
  • RELEASE SAVEPOINT 名称:(子事务)释放一个之前创建的保存点
  • ROLLBACK TO SAVEPOINT 名称:(子事务)回滚到指定的保存点,撤销该保存点之后的所有更改
  • PREPARE TRANSACTION <gid>:(2PC)准备当前事务,以便在分布式事务中进行两阶段提交
  • COMMIT PREPARED <gid>:(2PC)提交一个已经准备好的事务
  • ROLLBACK PREPARED <gid>:(2PC)回滚一个已经准备好的事务

1.2 子事务

子事务(SubTransaction)是事务的一种特殊形式,它允许在父事务的上下文中创建嵌套事务;PG 中通常通过保存点 SAVEPOINT 机制实现,并把保存点作为子事务定义的标记

子事务在源码实现上使用一个链栈实现:

tips这种链栈层次结构处理方法可以应用到所有涉及事务和子事务处理逻辑中

  • 栈顶元素拥有一个指向其父事务的指针 struct TransactionStateData *parent
  • 当启动一个新的子事务时将该描述该子事务的事务状态 TransactionState 结构变量 Push 到链栈中,并将当前事务状态 CurrentTransactionState 切换到新创建的事务状态;
  • 当子事务执行结束时,将该子事务的事务状态 Pop 出链栈,并把当前事务状态 CurrentTransactionState 切换到父事务的事务状态,同时切换内存上下文 。

这种实现方式使得子事务可以在事务执行过程中进行提交或回滚,而不会影响父事务的整体状态

  • SAVEPOINT:子事务的起点,可以在需要时回滚到该点。
  • 子事务与父事务共享同一个事务环境,但子事务的回滚不会影响父事务的整体执行状态。

1.3 并行事务

PostgreSQL 计划器 Planner 能设计出利用多 CPU 让查询更快的查询计划,这种特性被称为并行查询;对于那些可以从并行查询获益的查询来说,并行查询显著提升了查询速度,如下例子是一个并行查询

查询语句 SELECT COUNT(*) FROM people WHERE inpgconn2018 = 'Y'; 在没有开并行时的查询计划如下:

Aggregate  (cost=169324.73..169324.74 rows=1 width=8) (actual time=983.729..983.730 rows=1 loops=1)
   ->  Seq Scan on people  (cost=0.00..169307.23 rows=7001 width=0) (actual time=981.723..983.051 rows=9999 loops=1)
         Filter: (atpgconn2018 = 'Y'::bpchar)
         Rows Removed by Filter: 9990001
 Planning Time: 0.066 ms
 Execution Time: 983.760 ms

开了并行之后,启动两个 worker 进程(即 Workers Launched: 2)并行执行,且执行时间(Execution Time)仅为不并行的 40%

Finalize Aggregate  (cost=97389.77..97389.78 rows=1 width=8) (actual time=384.848..384.848 rows=1 loops=1)
   ->  Gather  (cost=97389.55..97389.76 rows=2 width=8) (actual time=384.708..386.486 rows=3 loops=1)
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial Aggregate  (cost=96389.55..96389.56 rows=1 width=8) (actual time=379.597..379.597 rows=1 loops=3)
               ->  Parallel Seq Scan on people  (cost=0.00..96382.26 rows=2917 width=0)
                 (actual time=378.831..379.341 rows=3333 loops=3)
                     Filter: (atpgconn2018 = 'Y'::bpchar)
                     Rows Removed by Filter: 3330000
 Planning Time: 0.063 ms
 Execution Time: 386.532 ms

例子出处:https://developer.aliyun.com/article/684431

通过上面的例子,想引出是在并行查询中会创建多个工作进程(Workers)来并行处理查询的不同部分;这些工作进程会有一个特殊的事务块状态 TBLOCK_PARALLEL_INPROGRESS 来帮助在并行执行查询时管理事务的并发性和一致性;通过这种状态,PG 能够确保并行查询的各个部分在事务控制下正确执行,同时避免了在并行环境中可能引起的复杂事务问题

并行查询很复杂,我们回到事务回调机制上来,下面进一步探究并行查询的事务是怎么提交的,这也是事务回调机制关注点

并行查询的事务提交是一个涉及主进程和多个并行工作进程的复杂过程:在并行查询中,主进程负责协调整个事务的提交流程,而每个工作进程则负责执行查询的一部分,并在完成后将结果返回给主进程;查询完成后并行事务提交过程如下

  1. 准备提交:主进程首先会检查所有并行工作进程是否已经完成了它们的任务;一旦所有工作进程都完成了它们的任务,主进程会开始调用 CommitTransaction 函数执行事务提交的逻辑,包括触发任何相关的触发器,以及释放在事务过程中占用的资源
  2. 标记事务为已提交:主进程调用 RecordTransactionCommit 函数,将事务的 XID 标记为已提交,确保事务更改持久化到数据库中;对于并行工作进程,则不需要在 pg_xact 中标记自己的 XID 为已提交,但需要确保主进程知道它们写入的 WAL 记录,这样在主进程提交之前,所有的更改都能够被记录下来
  3. 执行回调函数:在事务提交之后,主进程会调用 CallXactCallbacks 执行回调函数;对于并行工作进程,这可能包括 XACT_EVENT_PARALLEL_PRE_COMMITXACT_EVENT_PARALLEL_COMMIT 事件,这些事件允许在事务提交前后执行额外的清理或验证工作/
  4. 清理操作:提交完成后,调用 AtEOXact 相关函数执行一系列的清理操作,包括关闭打开的文件描述符、释放内存和其他资源;在所有清理操作完成后,主进程会将事务状态重置为 TRANS_DEFAULT,标志着事务的正式结束

1.4 两阶段提交事务(2PC)

两阶段提交(Two-Phase Commit,2PC)是一种在分布式系统中确保事务原子性的协议;在 PostgreSQL 中,两阶段提交允许多个分布式系统以事务方式协同工作,这两个阶段分别为:PrepareCommit

  1. Prepare
  • 事务协调者(Coordinator)发送 prepare 请求给所有参与节点,并等待它们的响应
  • 参与者节点收到 prepare 请求之后开启一个本地事务完成分布式事务任务,本地事务是否进行提交或者取消由参与节点自行决定,并将结果反馈给事务协调者;当参与节点决定提交本地事务时,便进入了预提交阶段
  1. Commit
  • 如果所有参与节点都反馈了准备就绪的响应,事务协调者发送 commit 请求给所有参与者节点,要求它们进行最终提交;参与节点持久化成功则返回成功,最后事务协调者将整个分布式事务执行成功的消息返回给客户端
  • 如果在提交过程中,事务协调者没有收到消息,或者任何一个参与节点在 Prepare 阶段返回了未准备好的响应,或者提交阶段收到了“取消”消息,事务协调者会发送回滚请求给所有节点,进行全局性的取消动作

假设事务协调节点为 A,两个参与者节点分别为 B 和 C,那么一个成的两阶段提交事务过程如下图所示

  • prepare 阶段 A 将 prepare request 发送给各个参与者, 等待他们反馈是否 ok
  • B 和 C 全部反馈 ok 之后,进入 commit 阶段,A 发送 commit request 给参与者,B 和 C 持久化成功则返回成功 commit ok,A 再将成功的消息反馈给 client 完成最终提交

在这里插入图片描述

图片来源,如有侵权请告知:https://picx.zhimg.com/v2-a74b8c3691524066bb441d869b15ea63_1440w.jpg

02 事务回调机制


2.1 事务回调的触发

2.1.1 触发函数

PostgreSQL 提供事务回调(XactCallback)触发通过 CallXactCallbacks 内部函数实现,该函数根据事务回调事件在事务的不同阶段调用注册的回调函数

CallXactCallbacks 函数中遍历所有注册的回调函数,并为每个回调函数传递相同的 event 参数;由于在回调函数执行期间回调链表可能会发生变化(例如,某个回调函数可能会注销自身),所以需要先保存了下一个回调项的指针 next,以确保即使当前回调项被移除,循环也能继续执行下一个回调项

// src/backend/access/transam/xact.c
static void
CallXactCallbacks(XactEvent event)
{
    XactCallbackItem *item;
    XactCallbackItem *next;

    for (item = Xact_callbacks; item; item = next)
    {
        /* allow callbacks to unregister themselves when called */
        next = item->next;
        item->callback(event, item->arg);
    }
}

对于子事务,PG 提供了单独的用于触发子事务的回调函数调用的内部函数 CallSubXactCallbacks,同样根据子事务的特定事件触发,处理子事务生命周期中的回调事件;该函数的实现,除了使用的事务回调事件类型是子事务回调事件,实现和 CallXactCallbacks 几乎一致

// src/backend/access/transam/xact.c
static void
CallSubXactCallbacks(SubXactEvent event,
                     SubTransactionId mySubid,
                     SubTransactionId parentSubid)
{
    SubXactCallbackItem *item;
    SubXactCallbackItem *next;

    for (item = SubXact_callbacks; item; item = next)
    {
        /* allow callbacks to unregister themselves when called */
        next = item->next;
        item->callback(event, mySubid, parentSubid, item->arg);
    }
}

2.1.2 回调事件类型

结合第一节中介绍的事务基础概念,可以更好的理解本节中各项回调事件类型的含义

事务回调事件通过 CallXactCallbacks 函数给每个回调函数传递相应的 XactEvent 事件类型;在 PostgreSQL 中,XactEvent 是一个枚举类型,它定义了事务生命周期中的关键事件点,允许注册的回调函数在这些事件点被触发,其定义如下:

// src/include/access/xact.h
typedef enum
{
    XACT_EVENT_COMMIT,
    XACT_EVENT_PARALLEL_COMMIT,
    XACT_EVENT_ABORT,
    XACT_EVENT_PARALLEL_ABORT,
    XACT_EVENT_PREPARE,
    XACT_EVENT_PRE_COMMIT,
    XACT_EVENT_PARALLEL_PRE_COMMIT,
    XACT_EVENT_PRE_PREPARE,
} XactEvent; 
  • XACT_EVENT_COMMIT:事务提交事件,当一个事务成功提交时被触发
  • XACT_EVENT_PARALLEL_COMMIT:并行事务提交事件,在并行事务中,当并行工作进程提交事务时被触发
  • XACT_EVENT_ABORT:事务中止事件,当一个事务由于错误或用户请求被中止时被触发
  • XACT_EVENT_PARALLEL_ABORT:并行事务中止事件,在并行事务中,当并行工作进程中止事务时被触发
  • XACT_EVENT_PREPARE:在两阶段提交事务中,参与者节点进入准备阶段时触发,即在事务执行 PREPARE TRANSACTION 命令时被触发
  • XACT_EVENT_PRE_COMMIT:事务预提交事件,事务已经准备好提交但在实际提交操作之前被触发,即在执行 COMMIT 命令之前
  • XACT_EVENT_PRE_PARALLEL_COMMIT:并发事务预提交事件,在并行事务中,当并行工作进程的事务已经准备好提交但在实际提交操作之前被触发,即在执行 COMMIT 命令之前
  • XACT_EVENT_PRE_PREPARE:在两阶段提交事务中,参与者节点在事务准备阶段之前被触发,是事务准备阶段的前奏,即在事务执行 PREPARE TRANSACTION 命令之前被触发

SubXactEvent 用于标识子事务的回调事件,并通过 CallSubXactCallbacks 函数传递给每个回调函数传,以下是 SubXactEvent 枚举中包含的事件类型:

// src/include/access/xact.h
typedef enum
{
    SUBXACT_EVENT_START_SUB,
    SUBXACT_EVENT_COMMIT_SUB,
    SUBXACT_EVENT_ABORT_SUB,
    SUBXACT_EVENT_PRE_COMMIT_SUB,
} SubXactEvent;
  • SUBXACT_EVENT_START_SUB:当一个新的子事务开始时触发的事件
  • SUBXACT_EVENT_COMMIT_SUB:当一个子事务提交时触发的事件
  • SUBXACT_EVENT_ABORT_SUB:当一个子事务中止时触发的事件
  • SUBXACT_EVENT_PRE_COMMIT_SUB:在子事务提交之前,但在实际提交操作之前触发的事件

2.2 事务回调的注册

同样的针对普通事务和子事务 PostgreSQL 分别提供了两类回调函数机制;通过这两个回调机制,开发者可以精确地在事务和子事务的各个阶段执行自定义逻辑,从而实现事务监控、资源管理等功能

1. RegisterXactCallback/UnregisterXactCallback

用于注册与注销事务的回调函数,在事务的不同阶段被调用,回调函数会在事务的不同阶段触发,常用于监控事务的生命周期

函数原型

// src/include/access/xact.h
void RegisterXactCallback(XactCallback callback, void *arg);
void UnregisterXactCallback(XactCallback callback, void *arg);
  • callback:用户自定义的回调函数
  • arg:传递给回调函数的上下文数据

2. RegisterSubXactCallback/RegisterSubXactCallback

用于注册与注销子事务的回调函数,在子事务回调事件发生时被调用,子事务是通过 SAVEPOINT 实现的,回调函数会在子事务开始、提交或回滚时触发

函数原型

// src/include/access/xact.h
void RegisterSubXactCallback(SubXactCallback callback, void *arg);
void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
  • callback:用户自定义的子事务回调函数
  • arg:传递给回调函数的上下文数据

03 插件中使用事务回调


通过事务回调机制,我们可以轻松地 Hook 到事务的不同阶段,并实现自定义的逻辑,可以实现以下功能:

  • 事务监控与审计:监控事务的开始、提交和回滚,记录日志或审计信息
  • 安全策略控制:在事务阶段执行安全检查,阻止不合规操作
  • 资源管理:在事务提交或回滚时释放或回收资源
  • 缓存更新:在事务提交时同步更新缓存数据

下面我们理论结合实际,开发一个使用事务回调机制的 PostgreSQL 插件,快速理解如何在事务回调事件中使用回调函数,分以下几个步骤介绍实践过程:

  1. 定义模块的入口函数 _PG_init 和退出函数 _PG_fini,并注册和注销事务回调函数
  2. 实现回调函数来处理不同的事务回调事件
  3. 编译和安装插件,并验证插件功能

其实现方式与 Hook 使用相似,更加详细步骤可以参考笔者的这篇文章:https://randy.blog.csdn.net/article/details/144457221

3.1 事务回调注册与注销

创建一个名为 my_xact_callback 插件目录并新建 my_xact_callback.c 源码文件,定义模块的入口函数 _PG_init 和退出函数 _PG_fini,并注册和注销事务回调函数,代码如下:

// src/contrib/my_xact_callback/my_xact_callback.c
#include "postgres.h"           // PostgreSQL 基本定义
#include "access/xact.h"        // 包含事务相关的函数和事件定义
#include "miscadmin.h"          // 包含 PostgreSQL 管理功能
#include "utils/guc.h"          // 包含 GUC 变量接口

PG_MODULE_MAGIC;                // PostgreSQL 插件必须包含的宏

/* 回调函数声明 */
static void my_xact_callback(XactEvent event, void *arg);
static void my_sub_xact_callback(SubXactEvent event, SubTransactionId mySubid,
                                 SubTransactionId parentSubid, void *arg);

/* 插件加载和卸载时的入口函数 */
void _PG_init(void);
void _PG_fini(void);

/* 模块初始化 */
void
_PG_init(void)
{
    /* 注册事务回调函数 */
    RegisterXactCallback(my_xact_callback, NULL);
    RegisterSubXactCallback(my_sub_xact_callback, NULL);
}

/* 模块卸载 */
void
_PG_fini(void)
{
    /* 取消注册事务回调函数 */
    UnregisterXactCallback(my_xact_callback, NULL);
    UnregisterSubXactCallback(my_sub_xact_callback, NULL);
}

3.2 事务回调函数实现

接着,实现前面声明的回调函数 my_xact_callbackmy_sub_xact_callback,这两个函数都只是简单的根据部分回调事件类型打印相应的日志信息

// src/contrib/my_xact_callback/my_xact_callback.c
#include "postgres.h"           // PostgreSQL 基本定义
#include "access/xact.h"        // 包含事务相关的函数和事件定义
#include "miscadmin.h"          // 包含 PostgreSQL 管理功能
#include "utils/guc.h"          // 包含 GUC 变量接口

PG_MODULE_MAGIC;                // PostgreSQL 插件必须包含的宏

/* 回调函数声明 */
static void my_xact_callback(XactEvent event, void *arg);
static void my_sub_xact_callback(SubXactEvent event, SubTransactionId mySubid,
                                 SubTransactionId parentSubid, void *arg);

/* 插件加载和卸载时的入口函数 */
void _PG_init(void);
void _PG_fini(void);

/* 模块初始化 */
void
_PG_init(void)
{
    /* 注册事务回调函数 */
    RegisterXactCallback(my_xact_callback, NULL);
    RegisterSubXactCallback(my_sub_xact_callback, NULL);
}

/* 模块卸载 */
void
_PG_fini(void)
{
    /* 取消注册事务回调函数 */
    UnregisterXactCallback(my_xact_callback, NULL);
    UnregisterSubXactCallback(my_sub_xact_callback, NULL);
}

/* 事务回调函数实现 */
static void
my_xact_callback(XactEvent event, void *arg)
{
    switch (event)
    {
        case XACT_EVENT_COMMIT:
            elog(INFO, "[my_xact_callback] Transaction committed.");
            break;
        case XACT_EVENT_ABORT:
            elog(INFO, "[my_xact_callback] Transaction aborted.");
            break;
        case XACT_EVENT_PRE_COMMIT:
            elog(INFO, "[my_xact_callback] Transaction pre commit.");
            break;
        default:
            elog(INFO, "[my_xact_callback] Unknown transaction event.");
            break;
    }
}

static void
my_sub_xact_callback(SubXactEvent event, SubTransactionId mySubid,
                     SubTransactionId parentSubid, void *arg)
{
    switch (event)
    {
        case SUBXACT_EVENT_START_SUB:
            elog(INFO, "[my_xact_callback] SubTransaction started.");
            break;
        case SUBXACT_EVENT_COMMIT_SUB:
            elog(INFO, "[my_xact_callback] SubTransaction committed.");
            break;
        case SUBXACT_EVENT_ABORT_SUB:
            elog(INFO, "[my_xact_callback] SubTransaction aborted.");
            break;
        case SUBXACT_EVENT_PRE_COMMIT_SUB:
            elog(INFO, "[my_xact_callback] SubTransaction pre commit.");
            break;
        default:
            elog(INFO, "[my_xact_callback] Unknown subtransaction event.");
            break;
    }
}

3.3 插件安装与验证

要完成插件的安装和验证,还需要为 my_xact_callback 插件项目增加编译文件 Makefile 和控制文件 my_xact_callback.control,此处不再赘述

详细插件实践过程同样可参考 https://randy.blog.csdn.net/article/details/144457221

完成编译安装后,生成的插件库文件 my_xact_callback.so 会被安装到 PostgreSQL 的 shared 目录中

和 hook 使用一样,由于需要在初始化时加载,所以需要编辑 postgresql.conf 配置文件,添加插件到 shared_preload_libraries 选项中,并重启 PostgreSQL 生效

shared_preload_libraries = 'my_xact_callback'

加载插件后,执行以下 SQL 命令以测试插件的功能

postgres=# BEGIN;
BEGIN
postgres=*# SAVEPOINT s1;
INFO:  [my_xact_callback] SubTransaction started.
SAVEPOINT
postgres=*# ROLLBACK TO SAVEPOINT s1;
INFO:  [my_xact_callback] SubTransaction aborted.
INFO:  [my_xact_callback] SubTransaction started.
ROLLBACK
postgres=*# SAVEPOINT s2;
INFO:  [my_xact_callback] SubTransaction started.
SAVEPOINT
postgres=*# COMMIT;
INFO:  [my_xact_callback] SubTransaction pre commit.
INFO:  [my_xact_callback] SubTransaction committed.
INFO:  [my_xact_callback] SubTransaction pre commit.
INFO:  [my_xact_callback] SubTransaction committed.
INFO:  [my_xact_callback] Transaction pre commit.
INFO:  [my_xact_callback] Transaction committed.
COMMIT
postgres=# BEGIN;
BEGIN
postgres=*# ROLLBACK;
INFO:  [my_xact_callback] Transaction aborted.
ROLLBACK

如果文章对你有帮助,欢迎 点赞收藏 👍 ⭐️ 💬,如果还能够 点击关注,那真的是对我最大的鼓励 🔥 🔥 🔥

参考资料

https://zhuanlan.zhihu.com/p/147605189?d=1601190486925

PostgreSQL 并行查询概述-阿里云开发者社区

https://zhuanlan.zhihu.com/p/614592806


http://www.kler.cn/a/472702.html

相关文章:

  • SpringBoot环境和Maven配置
  • 【CSS】设置滚动条样式
  • 【网络安全 | 漏洞挖掘】通过监控调试模式实现价值$15k的RCE
  • 自动驾驶控制与规划——Project 6: A* Route Planning
  • 力扣刷题:数组OJ篇(下)
  • flink cdc oceanbase(binlog模式)
  • 怎么把word试题转成excel?
  • 在windows系统上安装docker并自定义安装和数据存储位置
  • No Homebrew ruby 2.6.3_2 available for arm64 processors!
  • 微软Office存在的意义是什么?
  • 【深度学习量化交易12】基于miniQMT的量化交易框架总体构建思路——回测、模拟、实盘通吃的系统架构
  • 显示器太薄怎么用屏幕挂灯?2025最新屏幕挂灯什么牌子好
  • 如何在 LobeChat 中使用 Ollama
  • 《网络安全里的Linux基础:构建安全网络的关键基石》
  • Python爬虫基础——数据清洗
  • python虚拟环境的使用
  • 【2024华为OD-E卷-200分-会议接待】(题目+思路+JavaC++Python解析)
  • Pytorch学习12_最大池化的使用
  • Elastic-Job相关
  • 案例解读 | 香港某多元化综合金融企业基础监控+网管平台建设实践
  • 微信小程序动态更改富文本的css样式
  • Jenkins-持续集成、交付、构建、部署、测试
  • 腾讯云AI代码助手编程挑战赛——贪吃蛇小游戏
  • QT学习二十一天 Quick 应用程序主窗口
  • 了解SQL
  • MongoDB的部署和操作