18-pg内核之日志管理器(六)checkpoint
概念
数据库中除了实际存储的数据之外,还存在许多事务相关的日志,如WAL日志,CLOG日志。MultiXact日志等,每次包含DML操作的事务都会产生这些日志,随着时间的推移,如果不进行清理,日志会一直增大下去,最终可能会占满磁盘,而且会导致数据库性能的下降。为了避免这种情况的发生,这就需要我们定期的清理这些日志并保证数据已经完全落盘,数据库提供了checkpoint功能来实现。
Checkpoint就是在数据库运行过程中,选择一个时间点,将时间点之前的数据全部刷入磁盘,然后将这个时间点之前的相关的日志全部删除。 当进行故障恢复时,就从该时间点开始进行恢复。
Checkpoint触发条件
- 数据库启停
- 故障恢复完成
- 手动执行CheckPoint命令
- 超时时间达到(checkpoint_timeout)
- 日志数量达到阈值
结构
Flag
因为checkpoint的触发条件有多种,不同条件下触发的checkpoint,执行的操作也不一样,所以可以通过checkpoint时传入的flag标志位来区分,以便于在处理时能够针对不同的场景进行处理。
#define CHECKPOINT_IS_SHUTDOWN 0x0001 /* 关机时的Checkpoint */
#define CHECKPOINT_END_OF_RECOVERY 0x0002 /* 恢复时的checkpoint */
#define CHECKPOINT_IMMEDIATE 0x0004 /* 立刻执行,不能有延迟 */
#define CHECKPOINT_FORCE 0x0008 /* 即使不活跃也立马执行 */
#define CHECKPOINT_FLUSH_ALL 0x0010 /* 将所有的页都刷盘 */
/* These are important to RequestCheckpoint */
#define CHECKPOINT_WAIT 0x0020 /* 进程需要等待checkpoint完成 */
#define CHECKPOINT_REQUESTED 0x0040 /* 已经生成的有checkpoint requested */
#define CHECKPOINT_CAUSE_XLOG 0x0080 /* XLOG日志引起的checkpoint */
#define CHECKPOINT_CAUSE_TIME 0x0100 /* 超时引起的checkpoint */
CheckpointerShmem
用于检查点进程(checkpointer)与后端进程(backends)间通信的共享内存区域
ckpt计数器允许后端进程监视它们发出的检查点请求的完成情况。其工作原理如下:
- 在检查点开始时,检查点进程在持有ckpt_lck的情况下读取(并清除)请求标志,并递增ckpt_started。
- 在检查点完成后,检查点进程将ckpt_done设置为等于ckpt_started。
- 在检查点失败时,检查点进程递增ckpt_failed并将ckpt_done设置为等于ckpt_started。
后端进程遵循以下算法:
1. 在持有ckpt_lck的情况下,记录ckpt_failed和ckpt_started的当前值,并设置请求标志。
2. 发送信号以请求检查点。
3. 等待直到ckpt_started发生变化。现在你知道自从开始这个算法以来,一个检查点已经开始(尽管不能确定是你的信号直接引发的),并且它正在使用你设置的标志。
4. 记录ckpt_started的新值。
5. 等待直到ckpt_done大于等于之前保存的ckpt_started值(这里使用模运算处理计数器可能的回绕情况)。现在你知道一个检查点已经启动并完成,但不确定其是否成功。
6. 如果ckpt_failed与最初保存的值不同,假设请求失败;否则,请求肯定成功。
ckpt_flags保存自上一次检查点启动以来所有请求后端发送的检查点请求标志的逻辑或。这些标志被设计为可以通过逻辑或操作正确合并多个请求。
num_backend_writes用于统计用户后端进程执行的缓冲区写入次数。这个计数器应足够宽,以防止在一个处理周期内溢出。num_backend_fsync则计数那些由于检查点进程未能吸收其请求而必须自己执行fsync操作的写入子集。
requests数组保存由后端进程发送且尚未被检查点进程吸收的fsync请求。
与检查点相关的字段不同,num_backend_writes、num_backend_fsync以及requests字段受CheckpointerCommLock保护。
typedef struct
{
pid_t checkpointer_pid; /* checkpointer的进程PID */
slock_t ckpt_lck; /* 自旋锁,保护ckpt_* 变量 */
int ckpt_started; /* checkpoint 启动时更新该值 */
int ckpt_done; /* checkpoint 完成时更新该值 */
int ckpt_failed; /* checkpoint 失败时更新该值 */
int ckpt_flags; /* checkpoint 的标志位 */
ConditionVariable start_cv; /* checkpointer启动的信号量 */
ConditionVariable done_cv; /* checkpointer完成的信号量 */
uint32 num_backend_writes; /* 用户进程Buffer写的计数 */
uint32 num_backend_fsync; /* 用户进程fsync的计数 */
int num_requests; /* current # of requests */
int max_requests; /* allocated array size */
CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
} CheckpointerShmemStruct;
static CheckpointerShmemStruct *CheckpointerShmem;
CheckPoint
Checkpoint的XLOG日志记录结构,pg_control始终保存最新的副本。checkpoint也要写入XLOG日志记录的,checkpoint结构体保存要写入XLOG的数据,其结构如下:
typedef struct CheckPoint
{
XLogRecPtr redo; /* redo的位置 */
TimeLineID ThisTimeLineID; /* 当前的时间线 */
TimeLineID PrevTimeLineID; /* 上一个时间线 */
bool fullPageWrites; /* 是否进行FPW */
FullTransactionId nextXid; /* 写一个可用的事务ID */
Oid nextOid; /* 写一个可用的OID */
MultiXactId nextMulti; /* 写一个可用的MultiXactID*/
MultiXactOffset nextMultiOffset; /* 写一个可用的MultiOffset */
TransactionId oldestXid; /* 数据库层次的最小冻结ID datfrozenxid */
Oid oldestXidDB; /* 最小冻结ID存在的那个数据库的OID*/
MultiXactId oldestMulti; /* 集群层次的最小的MultiXactID datminmxid */
Oid oldestMultiDB; /* 最小的MultiXactID 所在的数据库的OID */
pg_time_t time; /* time stamp of checkpoint */
TransactionId oldestCommitTsXid; /* 最老的提交事务的事务ID */
TransactionId newestCommitTsXid; /* 最新提交的事务ID*/
TransactionId oldestActiveXid;//流复制时使用
} CheckPoint;
CheckpointStatsData
Checkpoint的统计数据,记录了Checkpoint相关统计数据,在log_checkpoint开关打开后,日志中会记录Checkpoint相关的信息,其统计数据就是从该结构体获取。
typedef struct CheckpointStatsData
{
TimestampTz ckpt_start_t; /* checkpoint 启动时间 */
TimestampTz ckpt_write_t; /* 刷Buffer的时间*/
TimestampTz ckpt_sync_t; /* fsync的时间 */
TimestampTz ckpt_sync_end_t; /* fsync的结束时间 */
TimestampTz ckpt_end_t; /* checkpointer的结束时间 */
int ckpt_bufs_written; /* # 已经写入的Buffer数量*/
int ckpt_segs_added; /* # 新增加XLOG的segment数量 */
int ckpt_segs_removed; /* # 删除的XLOG的segment数量 */
int ckpt_segs_recycled; /* # 被循环使用的XLOG的segment数量 */
int ckpt_sync_rels; /* # 同步的表的数量 */
uint64 ckpt_longest_sync; /* 一张表的最长sync的时间 */
uint64 ckpt_agg_sync_time; /* 所有单独的sync的时间的和*/
} CheckpointStatsData;
主要流程函数
checkpoint流程
RequestCheckpoint
请求一个后台进程进行checkpoint。这个进程一般是指单独的后台进程,例如某个会话执行checkpoint命令时,最总就会调用该函数请求checkpoint。
- 判断是否是独立的后台进程,是的话,调用CreateCheckPoint函数创建checkpoint点,标志为添加的是CHECKPOINT_IMMEDIATE标志,会立即执行checkpoint操作,完成后返回。
if (!IsPostmasterEnvironment)//独立的后台进程可以直接自己搞
{
CreateCheckPoint(flags | CHECKPOINT_IMMEDIATE);//创建checkpoint点
smgrcloseall();//关闭所有的smgr对象
return;
}
- 如果不是独立的进程,就需要先获取CheckpointerShmem->ckpt_failed和CheckpointerShmem->ckpt_started的值,然后CheckpointerShmem->ckpt_flags上设置CHECKPOINT_REQUESTED标志,CheckpointerShmem全局变量是其他进程与checkpointer进程通信的桥梁,CheckpointerShmem->ckpt_flags设置了标志位后,checkpointer进程会检测到标志位被设置,然后就会触发checkpoint操作。
SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
old_failed = CheckpointerShmem->ckpt_failed;
old_started = CheckpointerShmem->ckpt_started;
CheckpointerShmem->ckpt_flags |= (flags | CHECKPOINT_REQUESTED);//更新标记,checkpointer进程会检测到,然后就会执行checkpoint
SpinLockRelease(&CheckpointerShmem->ckpt_lck);
- 如果checkpointer进程还没有启动,会等待60秒,期间会检测CheckpointerShmem->checkpointer_pid来判断进程是否已经启动,或者给checkpointer发信号通知
- 如果请求的flags还包含CHECKPOINT_WAIT标志,那么就会等待checkpoint启动和完成
- 等待信号量CheckpointerShmem->start_cv被唤醒,唤醒后再次获取CheckpointerShmem->ckpt_started的值并与上面获取的值进行比较,如果不相等,则说明新的checkpoint已经启动,否则就继续睡眠等待被唤醒。
/* Wait for a new checkpoint to start. */ ConditionVariablePrepareToSleep(&CheckpointerShmem->start_cv); //等待新的checkpointer启动 for (;;) { SpinLockAcquire(&CheckpointerShmem->ckpt_lck); new_started = CheckpointerShmem->ckpt_started; SpinLockRelease(&CheckpointerShmem->ckpt_lck); if (new_started != old_started) break; ConditionVariableSleep(&CheckpointerShmem->start_cv, WAIT_EVENT_CHECKPOINT_START); } ConditionVariableCancelSleep();
- 等待信号量CheckpointerShmem->done_cv被唤醒,唤醒后再次获取CheckpointerShmem->done_cv的值并与上面获取的值进行比较,如果不相等,则说明新的checkpoint已经完成,否则就继续睡眠等待被唤醒。
/* * We are waiting for ckpt_done >= new_started, in a modulo sense. */ ConditionVariablePrepareToSleep(&CheckpointerShmem->done_cv);//等待checkpointer结束 for (;;) { int new_done; SpinLockAcquire(&CheckpointerShmem->ckpt_lck); new_done = CheckpointerShmem->ckpt_done; new_failed = CheckpointerShmem->ckpt_failed; SpinLockRelease(&CheckpointerShmem->ckpt_lck); if (new_done - new_started >= 0) break; ConditionVariableSleep(&CheckpointerShmem->done_cv, WAIT_EVENT_CHECKPOINT_DONE); } ConditionVariableCancelSleep();
CreateCheckPoint
执行检查点操作——无论是数据库关闭期间还是在线运行时
flags
是以下位或组合:
CHECKPOINT_IS_SHUTDOWN
:表示此检查点是为了数据库关闭而执行的。CHECKPOINT_END_OF_RECOVERY
:表示此检查点是为了WAL恢复结束而执行的。CHECKPOINT_IMMEDIATE
:尽快完成检查点操作,忽略checkpoint_completion_target
参数的设置。CHECKPOINT_FORCE
:即使自上次检查点以来没有发生XLOG活动,也强制执行检查点
(CHECKPOINT_IS_SHUTDOWN
或CHECKPOINT_END_OF_RECOVERY
隐含此标志)。CHECKPOINT_FLUSH_ALL
:同时刷新未记录表的缓冲区。- 注意:
flags
中还包含其他位,主要用于日志记录目的。特别是要注意,此例程是同步执行的,并不关注CHECKPOINT_WAIT
。 - 如果非关闭(
!shutdown
)状态,则我们正在写入一个在线检查点。这是一种特殊操作,涉及到一段持续时间但逻辑上仅在一个LSN
(Log Sequence Number,日志序列号)发生的WAL(Write-Ahead Log)记录。WAL记录的逻辑位置(重做指针)
与物理位置相同或更早。当我们重放WAL时,通过检查点的物理位置定位到它,然后读取重做指针并从更早的逻辑位置开始重放。
请注意,在逻辑位置我们并未向WAL写入任何内容,所以那个位置可能属于任何其他类型的WAL记录。所有这些机制允许我们在执行检查点的同时继续工作。因此,此处动作的时机控制至关重要,需要注意此函数在繁忙系统上执行可能需要几分钟时间。
主要执行流程
- checkpoint操作需要写WAL日志,准备checkpoint结构并更新相关成员数据
- checkpoint最关键的就是要计算出redo的位置,会先阻塞住所有的WAL日志写入,然后读取当前的Insert->CurrBytePos,转换后为本次checkpoint的redo位置。同时会更新XLogCtl->RedoRecPtr,RedoRecPtr,XLogCtl->Insert.RedoRecPtr 为该值。
- 调用CheckPointGuts函数将共享内存中的所有脏数据刷入磁盘中。
- checkpoint数据写入到XLOG日志中,如果是关机或者故障恢复,则这条XLOG数据需要时最后一条XLOG日志,否则就出错
- 更新pg_control中相关变量。
- 执行清理工作,比如删除redo点前的WAL段文件,防止磁盘满载。
void
CreateCheckPoint(int flags)
{
if (flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY))
shutdown = true;
else
shutdown = false;
InitXLogInsert();//初始化XLOG日志写入,checkpointer操作需要写WAL日志
MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));//初始化checkpointer统计数据
CheckpointStats.ckpt_start_t = GetCurrentTimestamp();//更新时间戳
SyncPreCheckpoint();//执行检查点的准备工作
START_CRIT_SECTION();//启动临界区
if (shutdown)//如果是关机触发的checkpoint,更新pg_contrl中state
{
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
ControlFile->state = DB_SHUTDOWNING;//更新状态
ControlFile->time = (pg_time_t) time(NULL);
UpdateControlFile();
LWLockRelease(ControlFileLock);
}
//1. 初始化checkpoint
MemSet(&checkPoint, 0, sizeof(checkPoint));
checkPoint.time = (pg_time_t) time(NULL);//更新checkpoint的时间
if (!shutdown && XLogStandbyInfoActive())
checkPoint.oldestActiveXid = GetOldestActiveTransactionId();
else
checkPoint.oldestActiveXid = InvalidTransactionId;
last_important_lsn = GetLastImportantRecPtr();//获取最近的写入位置
//2. 确定redo位置
/*
这里会阻塞写入进程,为了确定redo的位置
*/
WALInsertLockAcquireExclusive();//先持有WalInsertLocks锁,排他模式
curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
checkPoint.ThisTimeLineID = ThisTimeLineID;//更新时间线
if (flags & CHECKPOINT_END_OF_RECOVERY)
checkPoint.PrevTimeLineID = XLogCtl->PrevTimeLineID;
else
checkPoint.PrevTimeLineID = ThisTimeLineID;
checkPoint.fullPageWrites = Insert->fullPageWrites;//是否存在全页写
freespace = INSERT_FREESPACE(curInsert);//计算当前页的剩余空间
if (freespace == 0)
{
if (XLogSegmentOffset(curInsert, wal_segment_size) == 0)//添加页头
curInsert += SizeOfXLogLongPHD;
else
curInsert += SizeOfXLogShortPHD;
}
checkPoint.redo = curInsert;//更新redo点
RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
/*
已经确定了redo位置,可以方式insert的锁,让其他进程继续插入了
*/
WALInsertLockRelease();
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->RedoRecPtr = checkPoint.redo;//更新redo位置
SpinLockRelease(&XLogCtl->info_lck);
LWLockAcquire(XidGenLock, LW_SHARED);
checkPoint.nextXid = ShmemVariableCache->nextXid;//获取事务相关信息并写入到checkpoint中
checkPoint.oldestXid = ShmemVariableCache->oldestXid;
checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
LWLockRelease(XidGenLock);
LWLockAcquire(CommitTsLock, LW_SHARED);
checkPoint.oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;//获取事务提交时间戳信息并写入到checkpoint中
checkPoint.newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
LWLockRelease(CommitTsLock);
LWLockAcquire(OidGenLock, LW_SHARED);
checkPoint.nextOid = ShmemVariableCache->nextOid;//获取OID信息并写入到checkpoint中
if (!shutdown)
checkPoint.nextOid += ShmemVariableCache->oidCount;
LWLockRelease(OidGenLock);
MultiXactGetCheckptMulti(shutdown,
&checkPoint.nextMulti,
&checkPoint.nextMultiOffset,
&checkPoint.oldestMulti,
&checkPoint.oldestMultiDB);//获取MultiXact相关信息并写入到checkpoint中
END_CRIT_SECTION();
//3. 将共享内存中的脏数据刷入磁盘
CheckPointGuts(checkPoint.redo, flags);//将共享内存中的所有数据刷入到磁盘中
START_CRIT_SECTION();
/*
XLOG写入
*/
XLogBeginInsert();//开始插入
XLogRegisterData((char *) (&checkPoint), sizeof(checkPoint));//注册数据
recptr = XLogInsert(RM_XLOG_ID,
shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
XLOG_CHECKPOINT_ONLINE);//插入数据
XLogFlush(recptr);//刷入磁盘
PriorRedoPtr = ControlFile->checkPointCopy.redo;//记住上一个检查点的指针
//更新pg_control
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (shutdown)
ControlFile->state = DB_SHUTDOWNED;
ControlFile->checkPoint = ProcLastRecPtr;
ControlFile->checkPointCopy = checkPoint;
ControlFile->time = (pg_time_t) time(NULL);
ControlFile->minRecoveryPoint = InvalidXLogRecPtr;
ControlFile->minRecoveryPointTLI = 0;
SpinLockAcquire(&XLogCtl->ulsn_lck);
ControlFile->unloggedLSN = XLogCtl->unloggedLSN;
SpinLockRelease(&XLogCtl->ulsn_lck);
UpdateControlFile();//更新pg_control文件
LWLockRelease(ControlFileLock);
/* Update shared-memory copy of checkpoint XID/epoch */
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->ckptFullXid = checkPoint.nextXid;//更新checkpoint时的位置
SpinLockRelease(&XLogCtl->info_lck);
END_CRIT_SECTION();
SyncPostCheckpoint();//通知smgr进行checkpoint后的清理
if (PriorRedoPtr != InvalidXLogRecPtr)
UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);//更新两个检查点之间的距离,段文件回收时会用到
//4. 清理XLOG段文件
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
KeepLogSeg(recptr, &_logSegNo);
if (InvalidateObsoleteReplicationSlots(_logSegNo))
{
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
KeepLogSeg(recptr, &_logSegNo);
}
_logSegNo--;
RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);//删除旧的XLOG文件
if (!shutdown)
PreallocXlogFiles(recptr);//如果需要,创建更多的日志段
if (!RecoveryInProgress())
TruncateSUBTRANS(GetOldestTransactionIdConsideredRunning());//清除所有的SUBTRANS数据
LogCheckpointEnd(false);//checkpoint结束,记录到日志中
}
CheckPointGuts
将所有共享内存中的数据刷入到磁盘中
static void
CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
{
CheckPointRelationMap();//将表关系映射数据刷入磁盘
CheckPointReplicationSlots();//将复制槽相关的数据刷入磁盘
CheckPointSnapBuild();//删除串行化事务级别下创建的快照
CheckPointLogicalRewriteHeap();//逻辑重写清理
CheckPointReplicationOrigin();//执行针对每个复制源在其重放的远程LSN方面的进度检查点
CheckPointCLOG();//将CLOG数据刷入磁盘
CheckPointCommitTs();//将CommitTS数据刷入磁盘
CheckPointSUBTRANS();//将SUBTRANS数据刷入磁盘
CheckPointMultiXact();//将MultiXact数据刷入磁盘
CheckPointPredicate();//将所有脏的SLRU缓冲区数据刷入磁盘
CheckPointBuffers(flags);//将缓冲池中的所有脏块刷入磁盘
ProcessSyncRequests();//处理fsync
CheckPointTwoPhase(checkPointRedo);//将两阶段提交相关数据刷入磁盘
}
RemoveOldXlogFiles
要删除旧的XLOG日志,首先要计算出要保留的最小的段ID,小于该段ID的文件都可以删除。由于文件的删除和创建比较耗时,我们还采用了重用机制,从要删除的旧的XLOG日志文件中,选择部分重命名成将来要用的段文件,以供后面使用,这样文件就不用新创建了,能提高WAL日志的性能。
其主要实现流程如下:
-
计算要保留的最小段号
根据redo的LSN计算要保留的最小段号,基于wal_keep_size或其它复制槽的需要,还会与复制槽中要保留的LSN对比,取一个最小值,就是要保留的最小LSN。 -
计算要回收的段号
调用XLOGfileslop函数计算出要回收的段号,它会根据在这两个阈值(最小和最大WAL尺寸)之间,我们的目标是回收适量的日志段,以确保能够维持运行至下一个检查点的预计结束位置。
为了估算下一个检查点大概会在何时完成,我们做出一个假设:系统在两次检查点之间会以恒定的速度消耗CheckPointDistanceEstimate字节数的WAL(预写日志)。基于这一假设,我们可以预测从当前点到预期的下一个检查点完成时,系统将产生的WAL数据量,并据此来决定应该保留多少日志段,以确保在不超出最大WAL尺寸的同时,也能为下一次检查点的到来预留足够的日志信息,保障数据的完整性和系统的恢复能力。 -
删除或重用旧的段文件
只会处理XLOG日志文件,其他文件直接跳过,通过判断文件名格式过滤。
因为比较先后顺序是通过比较文件名大小实现的,而不同时间线的文件暂不处理,判断前需要先将文件名中的时间线过滤掉。
删除或回收文件之前,如果打开了归档模式,还需要先保证日志文件已归档,最后再删除文件。
XLByteToSeg(endptr, endlogSegNo, wal_segment_size);
recycleSegNo = XLOGfileslop(lastredoptr); //待回收的段号
XLogFileName(lastoff, 0, segno, wal_segment_size); //待删除的段文件
xldir = AllocateDir(XLOGDIR);//获取WAL日志所在目录
while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)//遍历目录下的每个文件
{
if (!IsXLogFileName(xlde->d_name) &&
!IsPartialXLogFileName(xlde->d_name))//不是XLOG文件的直接略过,即文件名不符合[timeline][logicalID][segNo]
continue;
if (strcmp(xlde->d_name + 8, lastoff + 8) <= 0)//过滤掉文件名的前八位,即时间线
{
if (XLogArchiveCheckDone(xlde->d_name))//归档模式下,需要先保证归档后才能删
{
UpdateLastRemovedPtr(xlde->d_name);//先在共享内存中更新最小段号
RemoveXlogFile(xlde->d_name, recycleSegNo, &endlogSegNo);//删除段文件
}
}
}
FreeDir(xldir);
XLOGfileslop
计算待回收段号
minSegNo = lastredoptr / wal_segment_size +
ConvertToXSegs(min_wal_size_mb, wal_segment_size) - 1;//计算最小保留WAL日志的终点段号
maxSegNo = lastredoptr / wal_segment_size +
ConvertToXSegs(max_wal_size_mb, wal_segment_size) - 1;//计算最大保留WAL日志的终点段号
distance = (1.0 + CheckPointCompletionTarget) * CheckPointDistanceEstimate;//计算距离
/* add 10% for good measure. */
distance *= 1.10;
recycleSegNo = (XLogSegNo) ceil(((double) lastredoptr + distance) /
wal_segment_size);//计算出回收的段号
if (recycleSegNo < minSegNo)//如果比最小保留的还小,更新为最小的
recycleSegNo = minSegNo;
if (recycleSegNo > maxSegNo)//如果比最大保留的还大,更新为最大的
recycleSegNo = maxSegNo;
UpdateCheckPointDistanceEstimate
计算两次checkpoint之间的估算值,用于计算回收段文件。
为了估算两次检查点之间消耗的日志段数量,我们维护一个移动平均值,该平均值基于前几次检查点周期内生成的WAL(预写日志)量。然而,如果负载呈现突发性,即存在平静期和繁忙期,我们就需要考虑到峰值负载的情况。因此,我们不采用简单的移动平均法,而是让平均值在前一周期使用的WAL量少于估计值时缓慢下降,而一旦实际使用量超过估计值,则立即上调平均值。
当检查点由max_wal_size触发时,这一估算值理论上应趋近于CheckpointSegments乘以wal_segment_size的值。
注意:这个估算过程并不区分检查点触发的原因。无论是通过CHECKPOINT命令手动触发的检查点,还是比如启动基础备份等操作引起的检查点,都被同等对待。如果这类非定期检查点不频繁发生,缓慢下降的调整机制基本可以将其影响淡化。如果这类检查点发生得很频繁,那么将它们纳入计算也是合理的;比如,如果你每五分钟手动触发一次检查点,且从不让定时检查点自动发生,那么基于这五分钟的间隔来预先分配日志空间,比起依据checkpoint_timeout设置的值来得更有意义。
PrevCheckPointDistance = nbytes;//本次checkpoint相对于上次的checkpoint的增量
if (CheckPointDistanceEstimate < nbytes)//如果增量大于上次的估算值,则更新估算值为本次的值
CheckPointDistanceEstimate = nbytes;
else//如果估算值大于增量,则更新估算值为估算值*0.9+0.1 * 增量
CheckPointDistanceEstimate =
(0.90 * CheckPointDistanceEstimate + 0.10 * (double) nbytes);
XLogCheckpointNeeded
计算XLOG日志数量是否超过阈值
static bool
XLogCheckpointNeeded(XLogSegNo new_segno)
{
XLogSegNo old_segno;
XLByteToSeg(RedoRecPtr, old_segno, wal_segment_size);//获取上一次的redo位置
/* CheckPointSegments= ConvertToXSegs(max_wal_size_mb, wal_segment_size) /
(1.0 + CheckPointCompletionTarget) = 1024/(16M/1M)/(1+0.9)=64/1.9=33.68
*/
if (new_segno >= old_segno + (uint64) (CheckPointSegments - 1))//计算是否超过阈值
return true;
return false;
}
checkpointer流程
pg的checkpoint操作有多种触发条件,如开关机,故障恢复等,这些在数据库运行过程中触发的频率很低,而共享缓冲区需要定期刷咋和XLOG日志又需要定期清理,所有有一个进程checkpointer专门维护,其他进程需要进行checkpoint或者超时达到时,就给checkpointer进程发送请求,checkpointer进程收到请求后进行checkpoint操作,将共享缓冲区中的脏数据库刷入磁盘,并清理旧的XLOG文件。
进程的loop中,判断是否需要进程checkpoint的条件是:
- CheckpointerShmem)->ckpt_flags不为空,设置有checkpoint的请求标志
- 距离上次checkpoint的时间间隔超过了超时时间
会先处理sync请求
主要流程
- 注册信号处理函数
pqsignal(SIGHUP, SignalHandlerForConfigReload);//重新加载配置参数
pqsignal(SIGINT, ReqCheckpointHandler); /* 收到SIGINT信号后调用回调函数ReqCheckpointHandler唤醒进程 */
pqsignal(SIGTERM, SIG_IGN); /* 忽略SIGTERM信号 */
pqsignal(SIGALRM, SIG_IGN);//忽略SIGALALRM信号
pqsignal(SIGPIPE, SIG_IGN);//忽略SIGPIPE信号
pqsignal(SIGUSR1, procsignal_sigusr1_handler);//处理SIGUSR1信号
pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);//postmaster发信号给checkpointer关闭
pqsignal(SIGCHLD, SIG_DFL);//重置一些由postmaster进程接受但当前进程不处理的信号。
- 如果遇到异常,需要进程异常处理
- 更新共享内存相关的GUC参数:UpdateSharedMemoryConfig()
- 进程循环
- 处理收到的请求和中断:
AbsorbSyncRequests();//处理收到的同步请求 HandleCheckpointerInterrupts();//处理中断
- 如果检测到CheckpointerShmem)->ckpt_flags上有checkpoint请求,设置需要标志位do_checkpoint = true;
- 如果距离上次checkpoint的时间间隔大于CheckpointTimeout,则设置标志位do_checkpoint = true;
- 如果do_checkpoint = true,执行checkpoint操作
- 获取请求的checkpoint类型,以决定执行什么类型的checkpoint
- 更新CheckpointerShmem->ckpt_started
- 唤醒等待CheckpointerShmem->start_cv信号量的进程
- 执行CreateCheckPoint函数创建检查点
- 执行完checkpoint,更新CheckpointerShmem->ckpt_done
- 唤醒等待CheckpointerShmem->done_cv信号量的进程
- 如果期间 CheckpointerShmem)->ckpt_flags又被设置,重新再进行一次checkpoint操作
- 如果耗费时间超过了CheckPointTimeout,重新再进行一次checkpoint
- 本次checkpoint结束,进程睡眠,等待再次被唤醒。
for (;;)
{
bool do_checkpoint = false;
int flags = 0;
pg_time_t now;
int elapsed_secs;
int cur_timeout;
ResetLatch(MyLatch);//清理已经添加的wakeup进程
AbsorbSyncRequests();//处理收到的同步请求
HandleCheckpointerInterrupts();//处理中断
/*
检测到信号量上有checkpoint请求
*/
if (((volatile CheckpointerShmemStruct *) CheckpointerShmem)->ckpt_flags)
{
do_checkpoint = true;
BgWriterStats.m_requested_checkpoints++;
}
now = (pg_time_t) time(NULL);
elapsed_secs = now - last_checkpoint_time;//计算距离上次checkpoint的时间间隔
if (elapsed_secs >= CheckPointTimeout)//如果超过了超时时间,就需要进行checkpoint
{
if (!do_checkpoint)
BgWriterStats.m_timed_checkpoints++;
do_checkpoint = true;
flags |= CHECKPOINT_CAUSE_TIME;
}
if (do_checkpoint)//执行checkpoint
{
bool ckpt_performed = false;
bool do_restartpoint;
do_restartpoint = RecoveryInProgress();//是否需要执行检查点重启操作
SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
flags |= CheckpointerShmem->ckpt_flags;//获取请求的checkpoint类型,以决定需要执行什么类型的checkpoint
CheckpointerShmem->ckpt_flags = 0;
CheckpointerShmem->ckpt_started++;
SpinLockRelease(&CheckpointerShmem->ckpt_lck);
ConditionVariableBroadcast(&CheckpointerShmem->start_cv);//唤醒CheckpointerShmem->start_cv信号量
if (flags & CHECKPOINT_END_OF_RECOVERY)//不再需要重启
do_restartpoint = false;
if (!do_restartpoint &&
(flags & CHECKPOINT_CAUSE_XLOG) &&
elapsed_secs < CheckPointWarning)//防止checkpoint过于频繁,这里将发出警告,时间间隔小于超时时间时
xxxx.
ckpt_active = true;
if (do_restartpoint)
ckpt_start_recptr = GetXLogReplayRecPtr(NULL);
else
ckpt_start_recptr = GetInsertRecPtr();
ckpt_start_time = now;
ckpt_cached_elapsed = 0;
if (!do_restartpoint)
{
CreateCheckPoint(flags);//创建检查点
ckpt_performed = true;
}
else
ckpt_performed = CreateRestartPoint(flags);//创建restart检查点
smgrcloseall();
SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
CheckpointerShmem->ckpt_done = CheckpointerShmem->ckpt_started;//checkpoint结束,更新计数
SpinLockRelease(&CheckpointerShmem->ckpt_lck);
ConditionVariableBroadcast(&CheckpointerShmem->done_cv);//唤醒完成信号量
if (ckpt_performed)
{
last_checkpoint_time = now;//更新上一次checkpoint时间
}
else
{
last_checkpoint_time = now - CheckPointTimeout + 15;//执行失败的话,15秒后重试
}
ckpt_active = false;
HandleCheckpointerInterrupts();
}
/* Check for archive_timeout and switch xlog files if necessary. */
CheckArchiveTimeout();
pgstat_send_bgwriter();
/* Send WAL statistics to the stats collector. */
pgstat_send_wal(true);
if (((volatile CheckpointerShmemStruct *) CheckpointerShmem)->ckpt_flags)
continue;
now = (pg_time_t) time(NULL);
elapsed_secs = now - last_checkpoint_time;
if (elapsed_secs >= CheckPointTimeout)
continue; /* no sleep for us ... */
cur_timeout = CheckPointTimeout - elapsed_secs;
if (XLogArchiveTimeout > 0 && !RecoveryInProgress())
{
elapsed_secs = now - last_xlog_switch_time;
if (elapsed_secs >= XLogArchiveTimeout)
continue; /* no sleep for us ... */
cur_timeout = Min(cur_timeout, XLogArchiveTimeout - elapsed_secs);
}
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
cur_timeout * 1000L /* convert to ms */ ,
WAIT_EVENT_CHECKPOINTER_MAIN);
}