PostgreSQL 插件的事务回调机制
在 PostgreSQL 插件开发中,我们经常需要对数据库事务的不同阶段进行监控或控制;例如,在事务开始、提交、回滚时执行特定逻辑,这时就可以借助 PostgreSQL 提供的 XactCallback 机制来实现事务回调事件的钩入(hook)。本文将介绍事务回调机制的基础原理,以及在实际开发中如何实现一个简单的事务回调事件 Hook 插件。
01 事务的基础概念
1.1 事务
事务(Transaction)是数据库操作的基本单元,它确保一组操作可以作为一个整体被执行,事务的四个关键特性 ACID :
- 原子性(Atomicity):事务中的操作要么全部执行成功,要么全部回滚,不会处于部分完成状态
- 一致性(Consistency):事务完成后,数据库必须从一个一致状态转到另一个一致状态
- 隔离性(Isolation):并发事务之间相互隔离,互不影响
- 持久性(Durability):事务一旦提交,数据的更改会永久保存到数据库中
在逻辑层面用户通过事务块的方式描述事务,并通过事务控制命令管理事务的生命周期,包括开始事务、执行事务中的操作、以及结束事务:
BEGIN
或START TRANSACTION
:开始一个新的事务块,在 PG 中,事务默认是自动开始的,但可以使用这个命令显式地开始一个事务COMMIT
或END
:提交当前事务,使所有更改永久生效,并释放事务中使用的锁ROLLBACK
:回滚当前事务,撤销所有更改,并释放事务中使用的锁SAVEPOINT
名称:(子事务)在当前事务中创建一个保存点,可以用于部分回滚事务RELEASE SAVEPOINT
名称:(子事务)释放一个之前创建的保存点ROLLBACK TO SAVEPOINT
名称:(子事务)回滚到指定的保存点,撤销该保存点之后的所有更改PREPARE TRANSACTION <gid>
:(2PC)准备当前事务,以便在分布式事务中进行两阶段提交COMMIT PREPARED <gid>
:(2PC)提交一个已经准备好的事务ROLLBACK PREPARED <gid>
:(2PC)回滚一个已经准备好的事务
1.2 子事务
子事务(SubTransaction)是事务的一种特殊形式,它允许在父事务的上下文中创建嵌套事务;PG 中通常通过保存点 SAVEPOINT
机制实现,并把保存点作为子事务定义的标记
子事务在源码实现上使用一个链栈实现:
tips:这种链栈层次结构处理方法可以应用到所有涉及事务和子事务处理逻辑中
- 栈顶元素拥有一个指向其父事务的指针
struct TransactionStateData *parent
;- 当启动一个新的子事务时将该描述该子事务的事务状态
TransactionState
结构变量 Push 到链栈中,并将当前事务状态CurrentTransactionState
切换到新创建的事务状态;- 当子事务执行结束时,将该子事务的事务状态 Pop 出链栈,并把当前事务状态
CurrentTransactionState
切换到父事务的事务状态,同时切换内存上下文 。
这种实现方式使得子事务可以在事务执行过程中进行提交或回滚,而不会影响父事务的整体状态
SAVEPOINT
:子事务的起点,可以在需要时回滚到该点。- 子事务与父事务共享同一个事务环境,但子事务的回滚不会影响父事务的整体执行状态。
1.3 并行事务
PostgreSQL 计划器 Planner 能设计出利用多 CPU 让查询更快的查询计划,这种特性被称为并行查询;对于那些可以从并行查询获益的查询来说,并行查询显著提升了查询速度,如下例子是一个并行查询
查询语句 SELECT COUNT(*) FROM people WHERE inpgconn2018 = 'Y';
在没有开并行时的查询计划如下:
Aggregate (cost=169324.73..169324.74 rows=1 width=8) (actual time=983.729..983.730 rows=1 loops=1)
-> Seq Scan on people (cost=0.00..169307.23 rows=7001 width=0) (actual time=981.723..983.051 rows=9999 loops=1)
Filter: (atpgconn2018 = 'Y'::bpchar)
Rows Removed by Filter: 9990001
Planning Time: 0.066 ms
Execution Time: 983.760 ms
开了并行之后,启动两个 worker 进程(即 Workers Launched: 2)并行执行,且执行时间(Execution Time)仅为不并行的 40%
Finalize Aggregate (cost=97389.77..97389.78 rows=1 width=8) (actual time=384.848..384.848 rows=1 loops=1)
-> Gather (cost=97389.55..97389.76 rows=2 width=8) (actual time=384.708..386.486 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=96389.55..96389.56 rows=1 width=8) (actual time=379.597..379.597 rows=1 loops=3)
-> Parallel Seq Scan on people (cost=0.00..96382.26 rows=2917 width=0)
(actual time=378.831..379.341 rows=3333 loops=3)
Filter: (atpgconn2018 = 'Y'::bpchar)
Rows Removed by Filter: 3330000
Planning Time: 0.063 ms
Execution Time: 386.532 ms
例子出处:https://developer.aliyun.com/article/684431
通过上面的例子,想引出是在并行查询中会创建多个工作进程(Workers)来并行处理查询的不同部分;这些工作进程会有一个特殊的事务块状态 TBLOCK_PARALLEL_INPROGRESS
来帮助在并行执行查询时管理事务的并发性和一致性;通过这种状态,PG 能够确保并行查询的各个部分在事务控制下正确执行,同时避免了在并行环境中可能引起的复杂事务问题
并行查询很复杂,我们回到事务回调机制上来,下面进一步探究并行查询的事务是怎么提交的,这也是事务回调机制关注点
并行查询的事务提交是一个涉及主进程和多个并行工作进程的复杂过程:在并行查询中,主进程负责协调整个事务的提交流程,而每个工作进程则负责执行查询的一部分,并在完成后将结果返回给主进程;查询完成后并行事务提交过程如下
- 准备提交:主进程首先会检查所有并行工作进程是否已经完成了它们的任务;一旦所有工作进程都完成了它们的任务,主进程会开始调用
CommitTransaction
函数执行事务提交的逻辑,包括触发任何相关的触发器,以及释放在事务过程中占用的资源- 标记事务为已提交:主进程调用
RecordTransactionCommit
函数,将事务的 XID 标记为已提交,确保事务更改持久化到数据库中;对于并行工作进程,则不需要在 pg_xact 中标记自己的 XID 为已提交,但需要确保主进程知道它们写入的 WAL 记录,这样在主进程提交之前,所有的更改都能够被记录下来- 执行回调函数:在事务提交之后,主进程会调用
CallXactCallbacks
执行回调函数;对于并行工作进程,这可能包括XACT_EVENT_PARALLEL_PRE_COMMIT
和XACT_EVENT_PARALLEL_COMMIT
事件,这些事件允许在事务提交前后执行额外的清理或验证工作/- 清理操作:提交完成后,调用
AtEOXact
相关函数执行一系列的清理操作,包括关闭打开的文件描述符、释放内存和其他资源;在所有清理操作完成后,主进程会将事务状态重置为TRANS_DEFAULT
,标志着事务的正式结束
1.4 两阶段提交事务(2PC)
两阶段提交(Two-Phase Commit,2PC)是一种在分布式系统中确保事务原子性的协议;在 PostgreSQL 中,两阶段提交允许多个分布式系统以事务方式协同工作,这两个阶段分别为:Prepare 和 Commit
- Prepare:
- 事务协调者(Coordinator)发送 prepare 请求给所有参与节点,并等待它们的响应
- 参与者节点收到 prepare 请求之后开启一个本地事务完成分布式事务任务,本地事务是否进行提交或者取消由参与节点自行决定,并将结果反馈给事务协调者;当参与节点决定提交本地事务时,便进入了预提交阶段
- Commit:
- 如果所有参与节点都反馈了准备就绪的响应,事务协调者发送 commit 请求给所有参与者节点,要求它们进行最终提交;参与节点持久化成功则返回成功,最后事务协调者将整个分布式事务执行成功的消息返回给客户端
- 如果在提交过程中,事务协调者没有收到消息,或者任何一个参与节点在 Prepare 阶段返回了未准备好的响应,或者提交阶段收到了“取消”消息,事务协调者会发送回滚请求给所有节点,进行全局性的取消动作
假设事务协调节点为 A,两个参与者节点分别为 B 和 C,那么一个成的两阶段提交事务过程如下图所示
- prepare 阶段 A 将 prepare request 发送给各个参与者, 等待他们反馈是否 ok
- B 和 C 全部反馈 ok 之后,进入 commit 阶段,A 发送 commit request 给参与者,B 和 C 持久化成功则返回成功 commit ok,A 再将成功的消息反馈给 client 完成最终提交
图片来源,如有侵权请告知:https://picx.zhimg.com/v2-a74b8c3691524066bb441d869b15ea63_1440w.jpg
02 事务回调机制
2.1 事务回调的触发
2.1.1 触发函数
PostgreSQL 提供事务回调(XactCallback)触发通过 CallXactCallbacks
内部函数实现,该函数根据事务回调事件在事务的不同阶段调用注册的回调函数
在 CallXactCallbacks
函数中遍历所有注册的回调函数,并为每个回调函数传递相同的 event
参数;由于在回调函数执行期间回调链表可能会发生变化(例如,某个回调函数可能会注销自身),所以需要先保存了下一个回调项的指针 next
,以确保即使当前回调项被移除,循环也能继续执行下一个回调项
// src/backend/access/transam/xact.c
static void
CallXactCallbacks(XactEvent event)
{
XactCallbackItem *item;
XactCallbackItem *next;
for (item = Xact_callbacks; item; item = next)
{
/* allow callbacks to unregister themselves when called */
next = item->next;
item->callback(event, item->arg);
}
}
对于子事务,PG 提供了单独的用于触发子事务的回调函数调用的内部函数 CallSubXactCallbacks
,同样根据子事务的特定事件触发,处理子事务生命周期中的回调事件;该函数的实现,除了使用的事务回调事件类型是子事务回调事件,实现和 CallXactCallbacks
几乎一致
// src/backend/access/transam/xact.c
static void
CallSubXactCallbacks(SubXactEvent event,
SubTransactionId mySubid,
SubTransactionId parentSubid)
{
SubXactCallbackItem *item;
SubXactCallbackItem *next;
for (item = SubXact_callbacks; item; item = next)
{
/* allow callbacks to unregister themselves when called */
next = item->next;
item->callback(event, mySubid, parentSubid, item->arg);
}
}
2.1.2 回调事件类型
结合第一节中介绍的事务基础概念,可以更好的理解本节中各项回调事件类型的含义
事务回调事件通过 CallXactCallbacks
函数给每个回调函数传递相应的 XactEvent
事件类型;在 PostgreSQL 中,XactEvent
是一个枚举类型,它定义了事务生命周期中的关键事件点,允许注册的回调函数在这些事件点被触发,其定义如下:
// src/include/access/xact.h
typedef enum
{
XACT_EVENT_COMMIT,
XACT_EVENT_PARALLEL_COMMIT,
XACT_EVENT_ABORT,
XACT_EVENT_PARALLEL_ABORT,
XACT_EVENT_PREPARE,
XACT_EVENT_PRE_COMMIT,
XACT_EVENT_PARALLEL_PRE_COMMIT,
XACT_EVENT_PRE_PREPARE,
} XactEvent;
XACT_EVENT_COMMIT
:事务提交事件,当一个事务成功提交时被触发XACT_EVENT_PARALLEL_COMMIT
:并行事务提交事件,在并行事务中,当并行工作进程提交事务时被触发XACT_EVENT_ABORT
:事务中止事件,当一个事务由于错误或用户请求被中止时被触发XACT_EVENT_PARALLEL_ABORT
:并行事务中止事件,在并行事务中,当并行工作进程中止事务时被触发XACT_EVENT_PREPARE
:在两阶段提交事务中,参与者节点进入准备阶段时触发,即在事务执行PREPARE TRANSACTION
命令时被触发XACT_EVENT_PRE_COMMIT
:事务预提交事件,事务已经准备好提交但在实际提交操作之前被触发,即在执行COMMIT
命令之前XACT_EVENT_PRE_PARALLEL_COMMIT
:并发事务预提交事件,在并行事务中,当并行工作进程的事务已经准备好提交但在实际提交操作之前被触发,即在执行COMMIT
命令之前XACT_EVENT_PRE_PREPARE
:在两阶段提交事务中,参与者节点在事务准备阶段之前被触发,是事务准备阶段的前奏,即在事务执行PREPARE TRANSACTION
命令之前被触发
SubXactEvent
用于标识子事务的回调事件,并通过 CallSubXactCallbacks
函数传递给每个回调函数传,以下是 SubXactEvent
枚举中包含的事件类型:
// src/include/access/xact.h
typedef enum
{
SUBXACT_EVENT_START_SUB,
SUBXACT_EVENT_COMMIT_SUB,
SUBXACT_EVENT_ABORT_SUB,
SUBXACT_EVENT_PRE_COMMIT_SUB,
} SubXactEvent;
SUBXACT_EVENT_START_SUB
:当一个新的子事务开始时触发的事件SUBXACT_EVENT_COMMIT_SUB
:当一个子事务提交时触发的事件SUBXACT_EVENT_ABORT_SUB
:当一个子事务中止时触发的事件SUBXACT_EVENT_PRE_COMMIT_SUB
:在子事务提交之前,但在实际提交操作之前触发的事件
2.2 事务回调的注册
同样的针对普通事务和子事务 PostgreSQL 分别提供了两类回调函数机制;通过这两个回调机制,开发者可以精确地在事务和子事务的各个阶段执行自定义逻辑,从而实现事务监控、资源管理等功能
1. RegisterXactCallback/UnregisterXactCallback
用于注册与注销事务的回调函数,在事务的不同阶段被调用,回调函数会在事务的不同阶段触发,常用于监控事务的生命周期
函数原型:
// src/include/access/xact.h
void RegisterXactCallback(XactCallback callback, void *arg);
void UnregisterXactCallback(XactCallback callback, void *arg);
callback
:用户自定义的回调函数arg
:传递给回调函数的上下文数据
2. RegisterSubXactCallback/RegisterSubXactCallback
用于注册与注销子事务的回调函数,在子事务回调事件发生时被调用,子事务是通过 SAVEPOINT
实现的,回调函数会在子事务开始、提交或回滚时触发
函数原型:
// src/include/access/xact.h
void RegisterSubXactCallback(SubXactCallback callback, void *arg);
void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
- callback:用户自定义的子事务回调函数
- arg:传递给回调函数的上下文数据
03 插件中使用事务回调
通过事务回调机制,我们可以轻松地 Hook 到事务的不同阶段,并实现自定义的逻辑,可以实现以下功能:
- 事务监控与审计:监控事务的开始、提交和回滚,记录日志或审计信息
- 安全策略控制:在事务阶段执行安全检查,阻止不合规操作
- 资源管理:在事务提交或回滚时释放或回收资源
- 缓存更新:在事务提交时同步更新缓存数据
下面我们理论结合实际,开发一个使用事务回调机制的 PostgreSQL 插件,快速理解如何在事务回调事件中使用回调函数,分以下几个步骤介绍实践过程:
- 定义模块的入口函数
_PG_init
和退出函数_PG_fini
,并注册和注销事务回调函数 - 实现回调函数来处理不同的事务回调事件
- 编译和安装插件,并验证插件功能
其实现方式与 Hook 使用相似,更加详细步骤可以参考笔者的这篇文章:https://randy.blog.csdn.net/article/details/144457221
3.1 事务回调注册与注销
创建一个名为 my_xact_callback 插件目录并新建 my_xact_callback.c
源码文件,定义模块的入口函数 _PG_init 和退出函数 _PG_fini,并注册和注销事务回调函数,代码如下:
// src/contrib/my_xact_callback/my_xact_callback.c
#include "postgres.h" // PostgreSQL 基本定义
#include "access/xact.h" // 包含事务相关的函数和事件定义
#include "miscadmin.h" // 包含 PostgreSQL 管理功能
#include "utils/guc.h" // 包含 GUC 变量接口
PG_MODULE_MAGIC; // PostgreSQL 插件必须包含的宏
/* 回调函数声明 */
static void my_xact_callback(XactEvent event, void *arg);
static void my_sub_xact_callback(SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg);
/* 插件加载和卸载时的入口函数 */
void _PG_init(void);
void _PG_fini(void);
/* 模块初始化 */
void
_PG_init(void)
{
/* 注册事务回调函数 */
RegisterXactCallback(my_xact_callback, NULL);
RegisterSubXactCallback(my_sub_xact_callback, NULL);
}
/* 模块卸载 */
void
_PG_fini(void)
{
/* 取消注册事务回调函数 */
UnregisterXactCallback(my_xact_callback, NULL);
UnregisterSubXactCallback(my_sub_xact_callback, NULL);
}
3.2 事务回调函数实现
接着,实现前面声明的回调函数 my_xact_callback
和 my_sub_xact_callback
,这两个函数都只是简单的根据部分回调事件类型打印相应的日志信息
// src/contrib/my_xact_callback/my_xact_callback.c
#include "postgres.h" // PostgreSQL 基本定义
#include "access/xact.h" // 包含事务相关的函数和事件定义
#include "miscadmin.h" // 包含 PostgreSQL 管理功能
#include "utils/guc.h" // 包含 GUC 变量接口
PG_MODULE_MAGIC; // PostgreSQL 插件必须包含的宏
/* 回调函数声明 */
static void my_xact_callback(XactEvent event, void *arg);
static void my_sub_xact_callback(SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg);
/* 插件加载和卸载时的入口函数 */
void _PG_init(void);
void _PG_fini(void);
/* 模块初始化 */
void
_PG_init(void)
{
/* 注册事务回调函数 */
RegisterXactCallback(my_xact_callback, NULL);
RegisterSubXactCallback(my_sub_xact_callback, NULL);
}
/* 模块卸载 */
void
_PG_fini(void)
{
/* 取消注册事务回调函数 */
UnregisterXactCallback(my_xact_callback, NULL);
UnregisterSubXactCallback(my_sub_xact_callback, NULL);
}
/* 事务回调函数实现 */
static void
my_xact_callback(XactEvent event, void *arg)
{
switch (event)
{
case XACT_EVENT_COMMIT:
elog(INFO, "[my_xact_callback] Transaction committed.");
break;
case XACT_EVENT_ABORT:
elog(INFO, "[my_xact_callback] Transaction aborted.");
break;
case XACT_EVENT_PRE_COMMIT:
elog(INFO, "[my_xact_callback] Transaction pre commit.");
break;
default:
elog(INFO, "[my_xact_callback] Unknown transaction event.");
break;
}
}
static void
my_sub_xact_callback(SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg)
{
switch (event)
{
case SUBXACT_EVENT_START_SUB:
elog(INFO, "[my_xact_callback] SubTransaction started.");
break;
case SUBXACT_EVENT_COMMIT_SUB:
elog(INFO, "[my_xact_callback] SubTransaction committed.");
break;
case SUBXACT_EVENT_ABORT_SUB:
elog(INFO, "[my_xact_callback] SubTransaction aborted.");
break;
case SUBXACT_EVENT_PRE_COMMIT_SUB:
elog(INFO, "[my_xact_callback] SubTransaction pre commit.");
break;
default:
elog(INFO, "[my_xact_callback] Unknown subtransaction event.");
break;
}
}
3.3 插件安装与验证
要完成插件的安装和验证,还需要为 my_xact_callback 插件项目增加编译文件 Makefile
和控制文件 my_xact_callback.control
,此处不再赘述
详细插件实践过程同样可参考 https://randy.blog.csdn.net/article/details/144457221
完成编译安装后,生成的插件库文件 my_xact_callback.so
会被安装到 PostgreSQL 的 shared 目录中
和 hook 使用一样,由于需要在初始化时加载,所以需要编辑 postgresql.conf
配置文件,添加插件到 shared_preload_libraries
选项中,并重启 PostgreSQL 生效
shared_preload_libraries = 'my_xact_callback'
加载插件后,执行以下 SQL 命令以测试插件的功能
postgres=# BEGIN;
BEGIN
postgres=*# SAVEPOINT s1;
INFO: [my_xact_callback] SubTransaction started.
SAVEPOINT
postgres=*# ROLLBACK TO SAVEPOINT s1;
INFO: [my_xact_callback] SubTransaction aborted.
INFO: [my_xact_callback] SubTransaction started.
ROLLBACK
postgres=*# SAVEPOINT s2;
INFO: [my_xact_callback] SubTransaction started.
SAVEPOINT
postgres=*# COMMIT;
INFO: [my_xact_callback] SubTransaction pre commit.
INFO: [my_xact_callback] SubTransaction committed.
INFO: [my_xact_callback] SubTransaction pre commit.
INFO: [my_xact_callback] SubTransaction committed.
INFO: [my_xact_callback] Transaction pre commit.
INFO: [my_xact_callback] Transaction committed.
COMMIT
postgres=# BEGIN;
BEGIN
postgres=*# ROLLBACK;
INFO: [my_xact_callback] Transaction aborted.
ROLLBACK
参考资料
https://zhuanlan.zhihu.com/p/147605189?d=1601190486925
PostgreSQL 并行查询概述-阿里云开发者社区
https://zhuanlan.zhihu.com/p/614592806