10-pg内核之锁管理器(五)行锁
概念
数据库采用MVCC方式进行并发控制,读写并不会互相阻塞,但是写之间仍然存在冲突。如果还是采用常规锁那样加锁,则会耗费大量共享内存,进而影响性能。所以行锁通过元组级常规锁和xmax结合的方式实现。一般先通过xmax进行可见性判断,如果出现冲突,则等待的一方申请元组级常规锁,这样就能避免申请过多的锁。
行锁的加锁方式有两种:
- 对元组进行DELETE/UPDATE操作
- 显示指定加行锁
行锁加锁流程主要分以下两种:
- 如果没有其他事务在更新或对该元组加锁,则当前事务占有该元组,并将xmax置为当前的事务ID;如果已经有事务正在更新该元组,即xmax不为0,且对应事务还未结束,则当前事务等待xmax中的事务结束
- 如果已经有事务正在更新该元组,即xmax不为0,且对应事务还未结束,则当前事务需要等待xmax中的事务结束,并申请元组级常规锁;如果已经有事务持有该锁,那么还需要等待该锁释放,获取到元组级锁后再等待xmax中的事务释放。
显式加行锁
- FOR UPDATE: 针对查询到的元组添加排它锁,此时如果有其他事务修改元组,就需要等待行锁释放才行。
- FOR NO KEY UPDATE: 对元组加FOR KEY SHARE锁,对应LockTupleNoKey Exclusive
- FOR ShARE:对元组加共享锁,共享锁之间互不冲突,对应LockTupleShare
- FOR KEY SHARE: 它允许对非索引键值的列进行更新,但阻止对索引键值的更新操作,对应LockTuplekeyShare
行锁的相容性矩阵
类型 | FOR KEY SHARE | FOR SHARE | FOR NO KEY UPDATE | FOR UPDATE |
---|---|---|---|---|
FOR KEY SHARE | ✔ | |||
FOR SHARE | ✔ | ✔ | ||
FOR NO KEY UPDATE | ✔ | ✔ | ✔ | |
FOR UPDATE | ✔ | ✔ | ✔ | ✔ |
DELETE/UPDATE 加锁
状态类型 | 子操作 | 说明 |
---|---|---|
DELETE | 加行级排他锁,对应LockTupleExclusive | |
UPDATE | 不修改键值 | 加行级排他锁,对应LockTupleNoKeyExclusive |
UPDATE | 修改键值 | 加行级排他锁,对应LockTup了Exclusive |
结构体
LockTupleMode
行锁的模式,共4个
typedef enum LockTupleMode
{
/* SELECT FOR KEY SHARE */
LockTupleKeyShare,
/* SELECT FOR SHARE */
LockTupleShare,
/* SELECT FOR NO KEY UPDATE, and UPDATEs that don't modify key columns */
LockTupleNoKeyExclusive,
/* SELECT FOR UPDATE, UPDATEs that modify key columns, and DELETE */
LockTupleExclusive
} LockTupleMode;
tupleLockExtraInfo
由于行级锁是由常规锁和xmax相结合实现的,对于常规锁来讲,它就需要建立LockTupleMode和常规锁之间的映射关系,映射关系由结构体tupleLockExtraInfo描述。
- hwlock: 行级锁对应的常规锁模式,因为行级锁模式只有4种,所以其对应的常规锁中的四种:AccessShareLock,RowShareLock,ExclusiveLock,AccessExclusiveLock
- lockstaus: 显式加锁时的锁模式,有4种,分别为:MultiXactStatusForKeyShare,MultiXactStatusForShare,MultiXactStatusForNoKeyUpdate,MultiXactStatusForUpdate
- updstatus: 通过DELETE/UPDATE加锁时的锁模式,主要有两种:MultiXactStatusNoKeyUpdate,MultiXactStatusUpdate
上述映射的行锁模式由枚举类型MultiXactStatus描述,类型内前4个对应的是lockstatus的值,后两个对应的是updstatus的值
typedef enum
{
MultiXactStatusForKeyShare = 0x00,
MultiXactStatusForShare = 0x01,
MultiXactStatusForNoKeyUpdate = 0x02,
MultiXactStatusForUpdate = 0x03,
/* an update that doesn't touch "key" columns */
MultiXactStatusNoKeyUpdate = 0x04,
/* other updates, and delete */
MultiXactStatusUpdate = 0x05
} MultiXactStatus;
tupleLockExtraInfo描述成员有4个,其结构及成员如下:
static const struct
{
LOCKMODE hwlock; //常规锁
int lockstatus; //通过显式加锁指定的锁模式
int updstatus; //通过DELETE/UPDATE加锁时的锁模式
}
tupleLockExtraInfo[MaxLockTupleMode + 1] =
{
{ /* LockTupleKeyShare对应的锁 */
AccessShareLock,
MultiXactStatusForKeyShare,
-1 /* KeyShare does not allow updating tuples */
},
{ /* LockTupleShare对应的锁 */
RowShareLock,
MultiXactStatusForShare,
-1 /* Share does not allow updating tuples */
},
{ /* LockTupleNoKeyExclusive对应的锁 */
ExclusiveLock,
MultiXactStatusForNoKeyUpdate,
MultiXactStatusNoKeyUpdate
},
{ /* LockTupleExclusive对应的锁 */
AccessExclusiveLock,
MultiXactStatusForUpdate,
MultiXactStatusUpdate
}
};
MultiXactID
tupleLockExtraInfo结构的行锁的锁模式都是由MultiXactStatus来表示,那我们首先要了解一下MultiXactID。
正常情况下,如果只有一个事务对某个元组加锁,这时是用不到上述的行锁的,只用将事务ID保存到xmax,并设置一下t_infomask的值即可。但是如果有多个事务同时对元组加锁,xmax就无法准确的记录加锁的事务了,此时可以将多个事务ID的映射关系保存都MultiXact日志中,并生成一个MultiXactID保存到xmax中,这个就能通过MultiXactID从MultiXact日志中找到对应的事务ID的信息。MultiXact日志的信息可以参考 MultiXact日志管理器
需要注意的是,使用MultiXactID时,元组的infomask要添加HEAP_XMAX_IS_MultiXact标记。
只有一个事务对元组加锁时,只需要xmax和t_infomask标记就足以表示。
t_infomask中行锁相关的标记位如下:
- HEAP_XMAX_EXCL_LOCK:表示加的有排他锁,FOR UPDATE和FOR NO KEY UPDATE共用该标记位。
- HEAP_XMAX_KEYSHR_LOCK:是一个KEY共享锁,FOR KEY SHARE子句对应该标记。
- HEAP_XMAX_LOCK_ONLY:只有显式加锁时对应该标记
- HEAP_XMAX_SHR_LOCK: 共享锁,FOR SHARE子句对于该标记
- HEAP_XMAX_IS_MULTI:表示多个事务对元组加锁
- HEAP_KEYS_UPDATED: 元组键值要被更新时添加此标记,DELETE/UPDATE操作加锁时对应该标记
每种行锁模式对应的标记位组合为:
- FOR UPDATE:HEAP_XMAX_EXCL_LOCK | HEAP_XMAX_LOCK_ONLY |HEAP_KEYS_UPDATED
- FOR NO KEY UPDATE:HEAP_XMAX_EXCL_LOCK | HEAP_XMAX_LOCK_ONLY
- FOR SHARE:HEAP_XMAX_SHR_LOCK | HEAP_XMAX_LOCK_ONLY
- FOR KEY SHARE:HEAP_XMAX_SHR_LOCK | HEAP_XMAX_LOCK_ONLY
- UPDATE:HEAP_XMAX_EXCL_LOCK | HEAP_KEYS_UPDATED
- DELETE:HEAP_XMAX_EXCL_LOCK | HEAP_KEYS_UPDATED
主要函数
行锁的使用流程,可通过元组的更新/删除操作时的可见性判断来说明,以更新流程为例,在元组更新前,需要通过MVCC可见性判断函数HeapTupleSatisfiesUpdate来判断元组是否满足可以更新的条件,其中就会判断元组的加锁情况,并给出对应的结果。MVCC机制可以参考[MVCC机制](pg内核之事务管理器(五) MVCC)
HeapTupleSatisfiesUpdate
该函数是利用MVCC机制判断元组是否满足更新的条件,判断的结果有下面几类:
- TM_OK: 元组可以更新
- TM_Invisible: 元组不可见,不能被更新
- TM_SelfModified: 元组已经被当前事务更新过
- TM_Updated:元组已经被其他事务更新过,且已提交
- TM_Deleted:元组已经被其他事务删除,且已提交,不能更新
- TM_BeingModified: 元组正在被其他事务更新
- TM_WouldBlock:支持行锁的Skip Lock功能
HeapTupleSatisfiesUpdate函数流程
TM_Result
HeapTupleSatisfiesUpdate(HeapTuple htup, CommandId curcid,
Buffer buffer)
{
HeapTupleHeader tuple = htup->t_data; //获取到元组头信息
Assert(ItemPointerIsValid(&htup->t_self));
Assert(htup->t_tableOid != InvalidOid);
if (!HeapTupleHeaderXminCommitted(tuple))//判断xmin是否提交
{
if (HeapTupleHeaderXminInvalid(tuple))//xmin无效,元组无效
return TM_Invisible;
else if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmin(tuple)))//如果xmin是为当前事务
{
if (HeapTupleHeaderGetCmin(tuple) >= curcid) //插入在快照之后,不可见
return TM_Invisible; /* inserted after scan started */
if (tuple->t_infomask & HEAP_XMAX_INVALID) /* 无其他事务更新或对该元组加锁 */
return TM_Ok;
if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))//该元组被加锁
{
TransactionId xmax;
xmax = HeapTupleHeaderGetRawXmax(tuple); //获取到xmax的值
if (tuple->t_infomask & HEAP_XMAX_IS_MULTI) //有多个事务对元组加锁
{
if (MultiXactIdIsRunning(xmax, true)) //加锁事务正在运行中
return TM_BeingModified;
else //加锁事务已结束
return TM_Ok;
}
if (!TransactionIdIsInProgress(xmax))//xmax已结束,可更新
return TM_Ok;
return TM_BeingModified; //正在被更新
}
if (tuple->t_infomask & HEAP_XMAX_IS_MULTI)//xmax是MultiXact
{
TransactionId xmax;
xmax = HeapTupleGetUpdateXid(tuple);//获取更新元组的事务ID
if (!TransactionIdIsCurrentTransactionId(xmax))//更新的事务不是当前事务
{
if (MultiXactIdIsRunning(HeapTupleHeaderGetRawXmax(tuple),
false))//更新事务在运行中
return TM_BeingModified;
return TM_Ok;
}
else
{
if (HeapTupleHeaderGetCmax(tuple) >= curcid) //更新在快照后
return TM_SelfModified; /* updated after scan started */
else//更新在快照前
return TM_Invisible; /* updated before scan started */
}
}
if (!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmax(tuple)))//xmax不是当前事务
{
SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
InvalidTransactionId);
return TM_Ok;
}
if (HeapTupleHeaderGetCmax(tuple) >= curcid)//更新在快照后
return TM_SelfModified; /* updated after scan started */
else//更新在快照前
return TM_Invisible; /* updated before scan started */
}
else if (TransactionIdIsInProgress(HeapTupleHeaderGetRawXmin(tuple)))//xmin不是当前事务且仍在运行中
return TM_Invisible;
else if (TransactionIdDidCommit(HeapTupleHeaderGetRawXmin(tuple))) //xmin不是当前事务且已提交
SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
HeapTupleHeaderGetRawXmin(tuple));
else//其他情况
{
/* it must have aborted or crashed */
SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
InvalidTransactionId);
return TM_Invisible;
}
}
//到这里说明插入元组的事务已经提交,下面是判断xmax的状态
if (tuple->t_infomask & HEAP_XMAX_INVALID) /* 元组还未被更新或加锁 */
return TM_Ok;
if (tuple->t_infomask & HEAP_XMAX_COMMITTED)//xmax已提交
{
if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))//xmax只是加锁
return TM_Ok;
if (!ItemPointerEquals(&htup->t_self, &tuple->t_ctid)) //被其他事务更新
return TM_Updated; /* updated by other */
else //被其他事务删除
return TM_Deleted; /* deleted by other */
}
if (tuple->t_infomask & HEAP_XMAX_IS_MULTI) //xmax是MultiXactID
{
TransactionId xmax;
if (HEAP_LOCKED_UPGRADED(tuple->t_infomask)) //是否是FOR SHARE锁
return TM_Ok;
if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask)) //xmax仅是加锁
{
if (MultiXactIdIsRunning(HeapTupleHeaderGetRawXmax(tuple), true))//xmax正在运行中
return TM_BeingModified;
SetHintBits(tuple, buffer, HEAP_XMAX_INVALID, InvalidTransactionId);
return TM_Ok;
}
xmax = HeapTupleGetUpdateXid(tuple);//获取更新的事务ID
if (!TransactionIdIsValid(xmax)) //xmax无效
{
if (MultiXactIdIsRunning(HeapTupleHeaderGetRawXmax(tuple), false))//xmax正在运行中
return TM_BeingModified;
}
/* not LOCKED_ONLY, so it has to have an xmax */
Assert(TransactionIdIsValid(xmax));
if (TransactionIdIsCurrentTransactionId(xmax))//xmax是当前事务
{
if (HeapTupleHeaderGetCmax(tuple) >= curcid)//更新在快照后
return TM_SelfModified; /* updated after scan started */
else//更新在快照前
return TM_Invisible; /* updated before scan started */
}
if (MultiXactIdIsRunning(HeapTupleHeaderGetRawXmax(tuple), false))//xmax正在运行中
return TM_BeingModified;
if (TransactionIdDidCommit(xmax))//xmax已提交
{
if (!ItemPointerEquals(&htup->t_self, &tuple->t_ctid)) //是UPDATE
return TM_Updated;
else //是DELETE
return TM_Deleted;
}
/* 到这里的话表名xmax已经异常终止了 */
if (!MultiXactIdIsRunning(HeapTupleHeaderGetRawXmax(tuple), false))//xmax没有在运行
{
SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
InvalidTransactionId);
return TM_Ok;
}
else//xmax在运行
{
return TM_BeingModified;
}
}
if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmax(tuple)))//xmax是当前事务
{
if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))//只有加锁
return TM_BeingModified;
if (HeapTupleHeaderGetCmax(tuple) >= curcid) //更新在快照后
return TM_SelfModified; /* updated after scan started */
else//更新在快照前
return TM_Invisible; /* updated before scan started */
}
if (TransactionIdIsInProgress(HeapTupleHeaderGetRawXmax(tuple)))//xmax正在运行中
return TM_BeingModified;
if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmax(tuple)))//xmax未提交
{
SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
InvalidTransactionId);
return TM_Ok;
}
/* xmax transaction committed */
//到这里xmax也已经提交
if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))
{
SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
InvalidTransactionId);
return TM_Ok;
}
SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED,
HeapTupleHeaderGetRawXmax(tuple));
if (!ItemPointerEquals(&htup->t_self, &tuple->t_ctid))//是更新操作
return TM_Updated; /* updated by other */
else//是删除操作
return TM_Deleted; /* deleted by other */
}
heap_update
以heap_update为例,因为是执行的UPDATE操作,所以行锁的锁模式只有两种,流程如下:
- 根据是否有主键更新,选择不同的锁模式
//如果没有主键的更新的话,可以申请弱一点的行锁,否则申请级别更高的行锁
if (!bms_overlap(modified_attrs, key_attrs))
{
*lockmode = LockTupleNoKeyExclusive;
mxact_status = MultiXactStatusNoKeyUpdate;
key_intact = true;
MultiXactIdSetOldestMember();
}
else
{
*lockmode = LockTupleExclusive;
mxact_status = MultiXactStatusUpdate;
key_intact = false;
}
- 调用HeapTupleSatisfiesUpdate函数判断当前元组的状态是否可更新
result = HeapTupleSatisfiesUpdate(&oldtup, cid, buffer); //判断元组可见性,MVCC
- 如果元组状态为TM_BeingModified,说明元组正在被更新,需要执行下面操作进行加锁。
else if (result == TM_BeingModified && wait) //元组正在被修改,切wait为TRUE时,需要等待
- 如果是MultiXact,判断MultiXact是否与当前要等到的模式冲突,如果冲突,申请一把行锁,然后等待MultiXact结束(一直自旋),MultiXact结束后,如果元组状态发生了变化,则需要回到第二步继续判断。
if (infomask & HEAP_XMAX_IS_MULTI) //是multixact
{
TransactionId update_xact;
int remain;
bool current_is_member = false;
if (DoesMultiXactIdConflict((MultiXactId) xwait, infomask,
*lockmode, ¤t_is_member)) //multi是否与当前要等待的模式冲突
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);//睡眠浅需要先释放buffer的锁
if (!current_is_member) //如果我们还没有持有锁,则申请一把行锁
heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,
LockWaitBlock, &have_tuple_lock);
/* wait for multixact,等待multi结束,自旋查询,直到结束 */
MultiXactIdWait((MultiXactId) xwait, mxact_status, infomask,
relation, &oldtup.t_self, XLTW_Update,
&remain);
checked_lockers = true;
locker_remains = remain != 0;
//此时等待的xmax已经结束
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);//重新申请buffer的锁
/*
如果其他人又修改了这个元组,也就是说xmax活t_infomask发生了变化,那么需要重新进行检查
*/
if (xmax_infomask_changed(oldtup.t_data->t_infomask,
infomask) ||
!TransactionIdEquals(HeapTupleHeaderGetRawXmax(oldtup.t_data),
xwait))
goto l2;
}
if (!HEAP_XMAX_IS_LOCKED_ONLY(oldtup.t_data->t_infomask))
update_xact = HeapTupleGetUpdateXid(oldtup.t_data); //如果还是有元组在更新该元组,获取其xmax
else
update_xact = InvalidTransactionId; //没有
/*
此时已经没有update操作再multi中了,或者它已经终止了,可以继续
*/
if (!TransactionIdIsValid(update_xact) ||
TransactionIdDidAbort(update_xact))
can_continue = true;
}
- 如果正在更新该元组的事务就是当前事务,那么就不用重新申请锁了,可以继续操作
else if (TransactionIdIsCurrentTransactionId(xwait)) //如果正在更新该元组的事务就是当前事务,那么就不用重新申请锁了,可以继续往下操作了
{
checked_lockers = true;
locker_remains = true;
can_continue = true;
}
- 如果该元组当前存在的是key share锁,并且我们也不会修改主键行,说明不冲突,可以继续执行
else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask) && key_intact) //如果只是存在key-share锁,并且我们也不会修改主键行,那么这里就也不需要等待了
{
checked_lockers = true;
locker_remains = true;
can_continue = true;
}
- 如果上面的情况都不是,说明当前只有一个事务在更新,我们只需要等待xmax事务结束就可以了,等待前需要先申请元组级常规锁,等待结束后,如果元组状态发生了变化,则需要重新回到第二步判断元组状态。
else
{
/*
这里就是常规等待xmax结束了,不过等待之前会先申请一把行锁
*/
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); //睡眠之前先释放buffer上的锁
heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,
LockWaitBlock, &have_tuple_lock); //申请一把行锁
XactLockTableWait(xwait, relation, &oldtup.t_self,
XLTW_Update);//等待xmax结束
checked_lockers = true;
//xmax已经结束,重新申请buffer的锁
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
/*
如果元组的xmax和infomask已经发生改变,重新检查
*/
if (xmax_infomask_changed(oldtup.t_data->t_infomask, infomask) ||
!TransactionIdEquals(xwait,
HeapTupleHeaderGetRawXmax(oldtup.t_data)))
goto l2;
/* 如果存在其他情况,检查是否已经提交了*/
UpdateXmaxHintBits(oldtup.t_data, buffer, xwait);
if (oldtup.t_data->t_infomask & HEAP_XMAX_INVALID)
can_continue = true;
}
- 满足更新元组的条件,进行元组更新操作,更新后释放元组锁
if (have_tuple_lock)
UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
XactLockTableWait
等待指定的事务结束,会以死循环的方式等待,等待间隔1000us,直到要等待的事务结束为止。
for (;;)
{
SET_LOCKTAG_TRANSACTION(tag, xid);
(void) LockAcquire(&tag, ShareLock, false, false);//以共享模式申请一把事务锁
LockRelease(&tag, ShareLock, false);//释放事务锁
if (!TransactionIdIsInProgress(xid))//如果事务结束,则退出循环
break;
if (!first)
pg_usleep(1000L);
first = false;
xid = SubTransGetTopmostTransaction(xid);//如果是子事务,就获取其顶层的父事务
}
ConditionalXactLockTableWait
等待指定的事务结束,不会阻塞
bool
ConditionalXactLockTableWait(TransactionId xid)
{
LOCKTAG tag;
bool first = true;
for (;;)
{
SET_LOCKTAG_TRANSACTION(tag, xid);
if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)//尝试获取事务锁,成功与否都直接返回结果
return false;
LockRelease(&tag, ShareLock, false);//获取到锁,表示事务已经结束,释放锁。
if (!TransactionIdIsInProgress(xid))//判断是否已经结束
break;
if (!first)
pg_usleep(1000L);
first = false;
xid = SubTransGetTopmostTransaction(xid);//获取子事务顶层父事务
}
return true;
}
LockTupleTuplock/ConditionalLockTuple
根据行锁的模式获取对应的常规锁模式,并调用对应函数锁表
#define LockTupleTuplock(rel, tup, mode) \
LockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
#define UnlockTupleTuplock(rel, tup, mode) \
UnlockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
#define ConditionalLockTupleTuplock(rel, tup, mode) \
ConditionalLockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
LockTuple/ConditionalLockTuple
申请元组级常规锁
void
LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
{
LOCKTAG tag;
SET_LOCKTAG_TUPLE(tag,
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId,
ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(tid));//设置锁的tag
(void) LockAcquire(&tag, lockmode, false, false);//申请锁
}
bool
ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
{
LOCKTAG tag;
SET_LOCKTAG_TUPLE(tag,
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId,
ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(tid));//设置锁模式
return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);//非阻塞模式申请锁
}
DoesMultiXactIdConflict
判断MultiXact是否与当前的事务冲突。
static bool
DoesMultiXactIdConflict(MultiXactId multi, uint16 infomask,
LockTupleMode lockmode, bool *current_is_member)
{
int nmembers;
MultiXactMember *members;
bool result = false;
LOCKMODE wanted = tupleLockExtraInfo[lockmode].hwlock;
if (HEAP_LOCKED_UPGRADED(infomask))
return false;
nmembers = GetMultiXactIdMembers(multi, &members, false,
HEAP_XMAX_IS_LOCKED_ONLY(infomask));//获取MultiXact中的事务信息
if (nmembers >= 0)
{
int i;
for (i = 0; i < nmembers; i++)//遍历每个事务
{
TransactionId memxid;
LOCKMODE memlockmode;
if (result && (current_is_member == NULL || *current_is_member))
break;
memlockmode = LOCKMODE_from_mxstatus(members[i].status);//获取member的事务状态
memxid = members[i].xid;
if (TransactionIdIsCurrentTransactionId(memxid))//跳过当前事务
{
if (current_is_member != NULL)
*current_is_member = true;
continue;
}
else if (result)
continue;
if (!DoLockModesConflict(memlockmode, wanted))//与当前事务模式不冲突,跳过
continue;
if (ISUPDATE_from_mxstatus(members[i].status))//是更新操作
{
if (TransactionIdDidAbort(memxid))//member事务已经abort
continue;
}
else
{
if (!TransactionIdIsInProgress(memxid))//member事务进程已经结束
continue;
}
result = true;
}
pfree(members);
}
return result;
}
Do_MultiXactIdWait
等待MultiXact包含的所有事务结束
tatic bool
Do_MultiXactIdWait(MultiXactId multi, MultiXactStatus status,
uint16 infomask, bool nowait,
Relation rel, ItemPointer ctid, XLTW_Oper oper,
int *remaining)
{
bool result = true;
MultiXactMember *members;
int nmembers;
int remain = 0;
nmembers = HEAP_LOCKED_UPGRADED(infomask) ? -1 :
GetMultiXactIdMembers(multi, &members, false,
HEAP_XMAX_IS_LOCKED_ONLY(infomask));//获取MultiXact的members信息
if (nmembers >= 0)
{
int i;
for (i = 0; i < nmembers; i++)//遍历每个member事务
{
TransactionId memxid = members[i].xid;
MultiXactStatus memstatus = members[i].status;
if (TransactionIdIsCurrentTransactionId(memxid))//跳过当前事务
{
remain++;
continue;
}
if (!DoLockModesConflict(LOCKMODE_from_mxstatus(memstatus),
LOCKMODE_from_mxstatus(status)))//是否与当前事务冲突,不冲突跳过
{
if (remaining && TransactionIdIsInProgress(memxid))
remain++;
continue;
}
if (nowait)//如果冲突且需要不需要等待
{
result = ConditionalXactLockTableWait(memxid);//判断事务是否结束,立即返回结果
if (!result)
break;
}
else
XactLockTableWait(memxid, rel, ctid, oper);//等待事务结束
}
pfree(members);
}
if (remaining)
*remaining = remain;
return result;
}
heap_lock_tuple
以共享模式或排他模式锁住一个元组
- 调用HeapTupleSatisfiesUpdate是否满足更新条件,只有结果为TM_BeingModified、TM_UPdated、TM_Deleted时才需要处理。
result = HeapTupleSatisfiesUpdate(tuple, cid, *buffer);//判断是否满足更新条件
if (result == TM_Invisible)
{
result = TM_Invisible;
goto out_locked;
}
else if (result == TM_BeingModified ||
result == TM_Updated ||
result == TM_Deleted)
{
-
因为可能会循环从上一步开始检查,所以有些需要第一次才需要执行的操作,操作流程如下:
- 如果是MultiXact,遍历每个member事务,如果存在比当前要申请的锁模式更高的锁,则不比再申请了,直接调到结束即可;如果没有就设置skip_tuple_lock为true,后面就不需要再申请更高级的锁了,因为可能会与其他已经申请了强锁但是在等待我们事务结束的事务产生死锁冲突,这里只用等待MultiXact事务结束即可
if (infomask & HEAP_XMAX_IS_MULTI)//是MultiXact { int i; int nmembers; MultiXactMember *members; nmembers = GetMultiXactIdMembers(xwait, &members, false, HEAP_XMAX_IS_LOCKED_ONLY(infomask));//获取MultiXact的成员 for (i = 0; i < nmembers; i++) { if (!TransactionIdIsCurrentTransactionId(members[i].xid))//跳过当前的事务ID continue; if (TUPLOCK_from_mxstatus(members[i].status) >= mode)//如果成员的权限大于当前要申请的,直接结束 { pfree(members); result = TM_Ok; goto out_unlocked; } else { skip_tuple_lock = true; } } if (members) pfree(members); }
- 如果xmax就是当前的事务,这里判断一下锁模式与对应的t_infomask标记是否正确,正确的话直接调到结束位置即可
else if (TransactionIdIsCurrentTransactionId(xwait))//如果xmax是当前事务,根据锁模式判断t_infomask标记 { switch (mode) { case LockTupleKeyShare: Assert(HEAP_XMAX_IS_KEYSHR_LOCKED(infomask) || HEAP_XMAX_IS_SHR_LOCKED(infomask) || HEAP_XMAX_IS_EXCL_LOCKED(infomask)); result = TM_Ok; goto out_unlocked; case LockTupleShare: if (HEAP_XMAX_IS_SHR_LOCKED(infomask) || HEAP_XMAX_IS_EXCL_LOCKED(infomask)) { result = TM_Ok; goto out_unlocked; } break; case LockTupleNoKeyExclusive: if (HEAP_XMAX_IS_EXCL_LOCKED(infomask)) { result = TM_Ok; goto out_unlocked; } break; case LockTupleExclusive: if (HEAP_XMAX_IS_EXCL_LOCKED(infomask) && infomask2 & HEAP_KEYS_UPDATED) { result = TM_Ok; goto out_unlocked; } break; } }
-
我们不得不等待持锁事务结束,下面根据锁模式不同,进行不同的处理
- 申请的是LockTupleKeyShare锁
- 如果没有键值被更新,则与申请的锁不冲突,但是如果还是有更新操作,且入参fllow_updates为true(表示要锁定更新的版本链),还需要调用heap_lock_updated_tuple函数锁住更新后的元组。
if (!(infomask2 & HEAP_KEYS_UPDATED))//没有键值被更新,不冲突 { bool updated; updated = !HEAP_XMAX_IS_LOCKED_ONLY(infomask);//还有更新操作 if (follow_updates && updated)//要跟踪更新链并锁住后面的元组 { TM_Result res; res = heap_lock_updated_tuple(relation, tuple, &t_ctid, GetCurrentTransactionId(), mode);//锁住更新链后面的元组 if (res != TM_Ok) { result = res; LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); goto failed; } }
- 再次检查,如果元组事务中还有更新操作,且有键值更新,则回到第一步重新遍历
if (!HeapTupleHeaderIsOnlyLocked(tuple->t_data) && ((tuple->t_data->t_infomask2 & HEAP_KEYS_UPDATED) || !updated))//元组不仅仅被锁住,且有键值更新,则重新遍历 goto l3;
- 没有冲突,可以不用睡眠等待其他事务结束,require_sleep设置为false
require_sleep = false;
- 申请的是LockTupleShare锁
如果当前元组只是被锁住而没有排他锁,则也不需要睡眠等待
else if (mode == LockTupleShare)//申请的是SHARE 锁 { if (HEAP_XMAX_IS_LOCKED_ONLY(infomask) && !HEAP_XMAX_IS_EXCL_LOCKED(infomask))//只是被锁住且没有排他锁 { LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); if (!HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_data->t_infomask) || HEAP_XMAX_IS_EXCL_LOCKED(tuple->t_data->t_infomask))//还有更新操作或有排他锁,重新检查 goto l3; require_sleep = false; }
- 申请的是LockTupleNoKeyExclusive锁
- 如果是MultiXact,则需要判断MultiXact所有成员事务是否与当前事务申请的锁冲突,如果不冲突的话,也不需要睡眠
- 如果是单一的事务且时Key SHARE锁,也不需要睡眠
if (infomask & HEAP_XMAX_IS_MULTI)//是MultiXact { if (!DoesMultiXactIdConflict((MultiXactId) xwait, infomask, mode, NULL))//判断MultiXact是否与当前事务申请的锁冲突,不冲突的话 { LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) || !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data), xwait))//如果xmax更新过或者跟当前的不一致,则重新检查 goto l3; require_sleep = false; } } else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))//如果添加的有keyshare锁 { LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) || !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data), xwait))//如果xmax有变更过,重新检查 goto l3; require_sleep = false; }
- 申请的是LockTupleKeyShare锁
-
如果当前事务是该元组的唯一持锁拥有者,我们也可以避免睡眠
if (require_sleep && !(infomask & HEAP_XMAX_IS_MULTI) &&
TransactionIdIsCurrentTransactionId(xwait))
{
/* ... but if the xmax changed in the meantime, start over */
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
xwait))
goto l3;
Assert(HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_data->t_infomask));
require_sleep = false;
}
- 如果元组已经被其他事务删除或更新且事务已经提交,而require_sleep又是true的话,直接调到出错处理地方
//如果元组已经被其他事务删除或更新且已经提交,这里还要求睡眠的话就直接失败
if (require_sleep && (result == TM_Updated || result == TM_Deleted))
{
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
goto failed;
}
- 如果需要睡眠等待,执行下面操作
- 调用heap_acquire_tuplock函数尝试申请元组级常规锁,如果申请失败,跳到申请失败位置
if (!skip_tuple_lock && !heap_acquire_tuplock(relation, tid, mode, wait_policy, &have_tuple_lock))//尝试申请元组级锁 { result = TM_WouldBlock; LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); goto failed; }
- 如果是MultiXact,根据等待策略,执行对应的等待函数,等待MultiXact的事务结束
- 如果是LockWaitBlock,则调用MultiXactIdWait函数阻塞等待事务结束
- 如果是LockWaitSkip,则调用ConditionalMultiXactIdWait返回查询结果,不会等待
- 如果是LockWaitError,则调用ConditionalMultiXactIdWait返回查询结果,如果无法获取到锁,则报错退出。
if (infomask & HEAP_XMAX_IS_MULTI) { MultiXactStatus status = get_mxact_status_for_lock(mode, false); if (status >= MultiXactStatusNoKeyUpdate) elog(ERROR, "invalid lock mode in heap_lock_tuple"); //根据更待策略,去等待锁 switch (wait_policy) { case LockWaitBlock: MultiXactIdWait((MultiXactId) xwait, status, infomask, relation, &tuple->t_self, XLTW_Lock, NULL);//等待MultiXact包含的事务结束,阻塞等待 break; case LockWaitSkip: if (!ConditionalMultiXactIdWait((MultiXactId) xwait, status, infomask, relation, NULL))//等待MultiXact包含的事务结束,不阻塞等待,立即返回结果 { result = TM_WouldBlock; LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); goto failed; } break; case LockWaitError: if (!ConditionalMultiXactIdWait((MultiXactId) xwait, status, infomask, relation, NULL))//等待MultiXact包含的事务结束,不阻塞等待,立即返回结果,失败的直接报错退出 ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("could not obtain lock on row in relation \"%s\"", RelationGetRelationName(relation)))); break; }
- 单一事务的情况,就是等待锁释放,根据等待策略,执行对应的等待函数
- 如果是LockWaitBlock,则调用XactLockTableWait函数阻塞等待事务结束
- 如果是LockWaitSkip,则调用ConditionalXactLockTableWait返回查询结果,不会等待
- 如果是LockWaitError,则调用ConditionalXactLockTableWait返回查询结果,如果无法获取到锁,则报错退出。
else//等待锁释放 { /* wait for regular transaction to end, or die trying */ switch (wait_policy) { case LockWaitBlock: XactLockTableWait(xwait, relation, &tuple->t_self, XLTW_Lock);//阻塞等待xmax对应的事务结束 break; case LockWaitSkip: if (!ConditionalXactLockTableWait(xwait))//不阻塞等待xmax对应的事务结束 { result = TM_WouldBlock; /* recovery code expects to have buffer lock held */ LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); goto failed; } break; case LockWaitError: if (!ConditionalXactLockTableWait(xwait))//不阻塞等待xmax对应的事务结束,失败就报错退出 ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("could not obtain lock on row in relation \"%s\"", RelationGetRelationName(relation)))); break; } }
- 如果事务有更新操作,且follow_updates为true,则需要调用heap_lock_updated_tuple函数,锁住更新后的元组
//如果 有更新,且需要跟踪并锁定更新的版本链 if (follow_updates && !HEAP_XMAX_IS_LOCKED_ONLY(infomask)) { TM_Result res; res = heap_lock_updated_tuple(relation, tuple, &t_ctid, GetCurrentTransactionId(), mode);//锁住更新的版本链 if (res != TM_Ok) { result = res; /* recovery code expects to have buffer lock held */ LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); goto failed; } }
- 再次判断xmax是否有变更,如果有,需要返回第一步重新检查
- xmax事务已经结束,更新元组的标记位
if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) || !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data), xwait))//xmax有更新且不是自己 回去继续循环 goto l3; if (!(infomask & HEAP_XMAX_IS_MULTI))//更新xmax为当前的事务,更新标记位 { UpdateXmaxHintBits(tuple->t_data, *buffer, xwait); }
- 到这里,我们就已经确定拿到了排他锁了,然后根据元组的状态返回结果
- 如果xmax是0,或者元组只是被上锁,返回TM_OK
- 如果当前元组的ctid与t_self不一致,表名元组已经被更新过,返回TM_Updated
- 如果元组已经被删除,返回TM_deleted
if (!require_sleep ||
(tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_data->t_infomask) ||
HeapTupleHeaderIsOnlyLocked(tuple->t_data))
result = TM_Ok;
else if (!ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid))
result = TM_Updated;
else
result = TM_Deleted;
- failed的异常处理
- 更新元组信息
- 重新固定vm页
- 重新计算xmax值或者MultiXactID
- 更新元组头信息
- 缓冲区标记为脏
- 写WAL日志
if (result != TM_Ok)
{
tmfd->ctid = tuple->t_data->t_ctid;
tmfd->xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
if (result == TM_SelfModified)
tmfd->cmax = HeapTupleHeaderGetCmax(tuple->t_data);
else
tmfd->cmax = InvalidCommandId;
goto out_locked;
}
if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
{
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
visibilitymap_pin(relation, block, &vmbuffer);
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
goto l3;
}
xmax = HeapTupleHeaderGetRawXmax(tuple->t_data);
old_infomask = tuple->t_data->t_infomask;
MultiXactIdSetOldestMember();
compute_new_xmax_infomask(xmax, old_infomask, tuple->t_data->t_infomask2,
GetCurrentTransactionId(), mode, false,
&xid, &new_infomask, &new_infomask2);//重新计算xmax或MultiXact
START_CRIT_SECTION();
//更新元组头信息
tuple->t_data->t_infomask &= ~HEAP_XMAX_BITS;
tuple->t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
tuple->t_data->t_infomask |= new_infomask;
tuple->t_data->t_infomask2 |= new_infomask2;
if (HEAP_XMAX_IS_LOCKED_ONLY(new_infomask))
HeapTupleHeaderClearHotUpdated(tuple->t_data);
HeapTupleHeaderSetXmax(tuple->t_data, xid);
if (HEAP_XMAX_IS_LOCKED_ONLY(new_infomask))
tuple->t_data->t_ctid = *tid;
if (PageIsAllVisible(page) &&
visibilitymap_clear(relation, block, vmbuffer,
VISIBILITYMAP_ALL_FROZEN))
cleared_all_frozen = true;
MarkBufferDirty(*buffer);//缓冲区标记为脏
if (RelationNeedsWAL(relation))//写XLOG日志
{
xl_heap_lock xlrec;
XLogRecPtr recptr;
XLogBeginInsert();
XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD);
xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
xlrec.locking_xid = xid;
xlrec.infobits_set = compute_infobits(new_infomask,
tuple->t_data->t_infomask2);
xlrec.flags = cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0;
XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
result = TM_Ok;
- 解锁缓冲区,如果有tuple lock的话,解锁
out_unlocked:
if (BufferIsValid(vmbuffer))
ReleaseBuffer(vmbuffer);
/*
* Don't update the visibility map here. Locking a tuple doesn't change
* visibility info.
*/
/*
* Now that we have successfully marked the tuple as locked, we can
* release the lmgr tuple lock, if we had it.
*/
if (have_tuple_lock)
UnlockTupleTuplock(relation, tid, mode);
heap_lock_updated_tuple
在更新后的元组上申请锁,如果元组还没有被更新或者挪到其他分区中,就停止,否则就调用heap_lock_updated_tuple_rec申请锁
heap_lock_updated_tuple_rec
遍历找出最新的元组版本然后上锁
static TM_Result
heap_lock_updated_tuple_rec(Relation rel, ItemPointer tid, TransactionId xid,
LockTupleMode mode)
{
TM_Result result;
ItemPointerData tupid;
HeapTupleData mytup;
Buffer buf;
uint16 new_infomask,
new_infomask2,
old_infomask,
old_infomask2;
TransactionId xmax,
new_xmax;
TransactionId priorXmax = InvalidTransactionId;
bool cleared_all_frozen = false;
bool pinned_desired_page;
Buffer vmbuffer = InvalidBuffer;
BlockNumber block;
ItemPointerCopy(tid, &tupid);
for (;;)
{
new_infomask = 0;
new_xmax = InvalidTransactionId;
block = ItemPointerGetBlockNumber(&tupid);
ItemPointerCopy(&tupid, &(mytup.t_self));
if (!heap_fetch(rel, SnapshotAny, &mytup, &buf))//通过ctid找到对应的元组
{
result = TM_Ok;
goto out_unlocked;
}
l4:
CHECK_FOR_INTERRUPTS();
if (PageIsAllVisible(BufferGetPage(buf)))
{
visibilitymap_pin(rel, block, &vmbuffer);//钉住vm
pinned_desired_page = true;
}
else
pinned_desired_page = false;
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);//buffer上锁
if (!pinned_desired_page && PageIsAllVisible(BufferGetPage(buf)))
{
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
visibilitymap_pin(rel, block, &vmbuffer);//重新盯一下
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
}
if (TransactionIdIsValid(priorXmax) &&
!TransactionIdEquals(HeapTupleHeaderGetXmin(mytup.t_data),
priorXmax))//判断xmax是否为0,为0表示到达最新的元组位置,判断当前元组xmin是否等于上一版本的xmax
{
result = TM_Ok;//到达最新位置,返回
goto out_locked;
}
if (TransactionIdDidAbort(HeapTupleHeaderGetXmin(mytup.t_data)))//该元组无效,返回
{
result = TM_Ok;
goto out_locked;
}
old_infomask = mytup.t_data->t_infomask;
old_infomask2 = mytup.t_data->t_infomask2;
xmax = HeapTupleHeaderGetRawXmax(mytup.t_data);
/*
如果当前元组已经被更新或者被一些正在运行的事务锁住,我们需要判断那些事务是否与我们要持有的锁冲突
*/
if (!(old_infomask & HEAP_XMAX_INVALID))//已经被更新
{
TransactionId rawxmax;
bool needwait;
rawxmax = HeapTupleHeaderGetRawXmax(mytup.t_data);//获取xmax
if (old_infomask & HEAP_XMAX_IS_MULTI)//如果是MultiXact
{
int nmembers;
int i;
MultiXactMember *members;
Assert(!HEAP_LOCKED_UPGRADED(mytup.t_data->t_infomask));
nmembers = GetMultiXactIdMembers(rawxmax, &members, false,
HEAP_XMAX_IS_LOCKED_ONLY(old_infomask));
for (i = 0; i < nmembers; i++)//遍历每个member事务,检测是否与当前要持有的锁冲突
{
result = test_lockmode_for_conflict(members[i].status,
members[i].xid,
mode,
&mytup,
&needwait);//检查锁模式是否与member事务冲突
if (result == TM_SelfModified)//已经被当前事务修改过了,下一个
{
pfree(members);
goto next;
}
if (needwait)//需要等待持锁事务结束
{
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
XactLockTableWait(members[i].xid, rel,
&mytup.t_self,
XLTW_LockUpdated);
pfree(members);
goto l4;
}
if (result != TM_Ok)//不冲突
{
pfree(members);
goto out_locked;
}
}
if (members)
pfree(members);
}
else//单一事务情况
{
MultiXactStatus status;
if (HEAP_XMAX_IS_LOCKED_ONLY(old_infomask))//根据标记位判断锁模式
{
if (HEAP_XMAX_IS_KEYSHR_LOCKED(old_infomask))
status = MultiXactStatusForKeyShare;
else if (HEAP_XMAX_IS_SHR_LOCKED(old_infomask))
status = MultiXactStatusForShare;
else if (HEAP_XMAX_IS_EXCL_LOCKED(old_infomask))
{
if (old_infomask2 & HEAP_KEYS_UPDATED)
status = MultiXactStatusForUpdate;
else
status = MultiXactStatusForNoKeyUpdate;
}
else
{
elog(ERROR, "invalid lock status in tuple");
}
}
else
{
/* it's an update, but which kind? */
if (old_infomask2 & HEAP_KEYS_UPDATED)
status = MultiXactStatusUpdate;
else
status = MultiXactStatusNoKeyUpdate;
}
result = test_lockmode_for_conflict(status, rawxmax, mode,
&mytup, &needwait);//检测是否冲突
if (result == TM_SelfModified)//被当前事务修改过
goto next;
if (needwait)//需要等待事务结束
{
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
XactLockTableWait(rawxmax, rel, &mytup.t_self,
XLTW_LockUpdated);
goto l4;
}
if (result != TM_Ok)//Ok
{
goto out_locked;
}
}
}
/* compute the new Xmax and infomask values for the tuple ... */
compute_new_xmax_infomask(xmax, old_infomask, mytup.t_data->t_infomask2,
xid, mode, false,
&new_xmax, &new_infomask, &new_infomask2);//重新计算xmax
if (PageIsAllVisible(BufferGetPage(buf)) &&
visibilitymap_clear(rel, block, vmbuffer,
VISIBILITYMAP_ALL_FROZEN))
cleared_all_frozen = true;
START_CRIT_SECTION();
/* ... and set them 更新标记位*/
HeapTupleHeaderSetXmax(mytup.t_data, new_xmax);
mytup.t_data->t_infomask &= ~HEAP_XMAX_BITS;
mytup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
mytup.t_data->t_infomask |= new_infomask;
mytup.t_data->t_infomask2 |= new_infomask2;
MarkBufferDirty(buf);//buffer标记为脏
/* XLOG stuff */
if (RelationNeedsWAL(rel))//写WAL日志
{
xl_heap_lock_updated xlrec;
XLogRecPtr recptr;
Page page = BufferGetPage(buf);
XLogBeginInsert();
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
xlrec.offnum = ItemPointerGetOffsetNumber(&mytup.t_self);
xlrec.xmax = new_xmax;
xlrec.infobits_set = compute_infobits(new_infomask, new_infomask2);
xlrec.flags =
cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0;
XLogRegisterData((char *) &xlrec, SizeOfHeapLockUpdated);
recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_LOCK_UPDATED);
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
next:
/* if we find the end of update chain, we're done. */
if (mytup.t_data->t_infomask & HEAP_XMAX_INVALID ||
HeapTupleHeaderIndicatesMovedPartitions(mytup.t_data) ||
ItemPointerEquals(&mytup.t_self, &mytup.t_data->t_ctid) ||
HeapTupleHeaderIsOnlyLocked(mytup.t_data))
{
result = TM_Ok;
goto out_locked;
}
/* tail recursion */
priorXmax = HeapTupleHeaderGetUpdateXid(mytup.t_data);
ItemPointerCopy(&(mytup.t_data->t_ctid), &tupid);//切换到下一个版本元组(即ctid指向的下一个元组)
UnlockReleaseBuffer(buf);//缓冲区解压
}
result = TM_Ok;
out_locked:
UnlockReleaseBuffer(buf);
out_unlocked:
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);//释放锁
return result;
}
test_lockmode_for_conflict
检测给定的事务与给定的锁模式是否冲突
static TM_Result
test_lockmode_for_conflict(MultiXactStatus status, TransactionId xid,
LockTupleMode mode, HeapTuple tup,
bool *needwait)
{
MultiXactStatus wantedstatus;
*needwait = false;
wantedstatus = get_mxact_status_for_lock(mode, false);//获取锁模式对应的行锁模式
if (TransactionIdIsCurrentTransactionId(xid))//如果就是当前事务,直接返回被自己修改过即可
{
return TM_SelfModified;
}
else if (TransactionIdIsInProgress(xid))//如果事务正在运行中
{
if (DoLockModesConflict(LOCKMODE_from_mxstatus(status),
LOCKMODE_from_mxstatus(wantedstatus)))//检查是否与我们依赖的锁模式冲突
{
*needwait = true;
}
return TM_Ok;
}
else if (TransactionIdDidAbort(xid))//是否如果已经异常终止,不冲突
return TM_Ok;
else if (TransactionIdDidCommit(xid))//如果事务已经提交
{
if (!ISUPDATE_from_mxstatus(status))//不是元组更新的事务,不冲突
return TM_Ok;
if (DoLockModesConflict(LOCKMODE_from_mxstatus(status),
LOCKMODE_from_mxstatus(wantedstatus)))//判断是否与锁模式冲突
{
if (!ItemPointerEquals(&tup->t_self, &tup->t_data->t_ctid))//ctid不一致,已经被更新
return TM_Updated;
else//已经被删除
return TM_Deleted;
}
return TM_Ok;
}
return TM_Ok;
}
【参考】
- 《PostgreSQL数据库内核分析》
- 《Postgresql技术内幕-事务处理深度探索》
- 《PostgreSQL指南:内幕探索》
- pg14源码