sql server cdc重启监控新加表字段
表新加字段,cdc要重启才能监控新加字段。但如果你有一个定时任务扫描cdc的_CT表,在以下两个场景会导致数据丢失。
- CDC重启时捕获不到changelog
- 重启前未消费完的CDC数据会被清空
通过以下脚本让cdc监控新字段可以避免数据丢失
下面脚本的DECLARE @TableName NVARCHAR(200) = 'your_table_name'这一行,将your_table_name替换成对应的表名,然后执行即可。
核心原理是通过开启一个新的CDC,使用替换的方法,不需要重启,数据也不丢失。
/*
表新加字段可以通过下面5个步骤实现
1、新开启一个临时捕捉实例
2、CT表增加字段
3、把新加字段写到cdc.captured_columns
4、修改cdc的存储过程,用临时捕捉实例的替换
cdc.sp_batchinsert_<objectid>
cdc.sp_insdel_<objectid>
cdc.sp_upd_<objectid>
5、删掉临时捕捉实例
*/
SET NOCOUNT ON;
DECLARE @Tibble NVARCHAR(30) = '$tmp$'; /*临时CT表*/
DECLARE @ObjectID INT;
DECLARE @MaxColumnID INT;
DECLARE @ErrorMessage NVARCHAR(MAX);
DECLARE @SchemaName NVARCHAR(128);
DECLARE @ObjectName NVARCHAR(128);
DECLARE @CaptureInstance NVARCHAR(128);
DECLARE @TmpCaptureInstance NVARCHAR(128);
DECLARE @ColumnName NVARCHAR(128);
DECLARE @ColumnID INT;
DECLARE @ColumnDataType NVARCHAR(128);
DECLARE @ColumnTypeName NVARCHAR(128);
DECLARE @SQL NVARCHAR(MAX);
DECLARE @ColumnOrdinal INT;
DECLARE @IsComputed BIT;
DECLARE @OriginalCDCObjectID INT;
DECLARE @NewCDCObjectID INT;
DECLARE @TableName NVARCHAR(200) = 'your_table_name' -- 指定表
/* 当前DB是否有CT表 */
IF EXISTS(SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID('cdc.change_tables'))
BEGIN
/* 游标删除临时CT表,也许是上次执行留下的 */
DECLARE TmpCDCObjects CURSOR LOCAL FAST_FORWARD FOR
SELECT OBJECT_SCHEMA_NAME(source_object_id),OBJECT_NAME(source_object_id),capture_instance
FROM cdc.change_tables
WHERE capture_instance LIKE @Tibble + '%'
AND OBJECT_NAME(source_object_id) = @TableName
;
OPEN TmpCDCObjects;
FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;
WHILE (@@FETCH_STATUS = 0)
BEGIN
/* 删除临时CT表,即禁用 CDC capture instance */
EXEC sys.sp_cdc_disable_table @source_schema = @SchemaName, @source_name = @ObjectName, @capture_instance = @CaptureInstance;
FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;
END
CLOSE TmpCDCObjects;
/* 如果一张表启用 CDC capture instance 超过1个,则中断操作!*/
IF EXISTS(SELECT source_object_id, COUNT(*) FROM cdc.change_tables GROUP BY source_object_id HAVING COUNT(*) > 1)
BEGIN
DEALLOCATE TmpCDCObjects;
SELECT @ErrorMessage = 'Unable to update CDC as there are objects with more than one capture instance in use. You must update CDC manually';
THROW 50000, @ErrorMEssage, 1;
END;
/* Loop over all the CDC tables that do not have the latest columns */
/* 遍历所有新字段cdc表做相关处理 */
DECLARE CDCObjects CURSOR LOCAL FAST_FORWARD FOR
SELECT ct.object_id,ct.source_object_id,MAX(cc.column_id) AS MaxCapturedColumnID
FROM cdc.captured_columns AS cc INNER JOIN cdc.change_tables AS ct ON cc.object_id = ct.object_id
WHERE OBJECT_NAME(source_object_id) = @TableName
GROUP BY ct.source_object_id,ct.object_id
HAVING MAX(cc.column_id) < (SELECT MAX(column_id) FROM sys.columns AS c WHERE c.object_id = ct.source_object_id);
OPEN CDCObjects;
FETCH NEXT FROM CDCObjects INTO @OriginalCDCObjectID, @ObjectID, @MaxColumnID;
WHILE (@@FETCH_STATUS = 0)
BEGIN
SELECT
@SchemaName = OBJECT_SCHEMA_NAME(@ObjectID)
,@ObjectName = OBJECT_NAME(@ObjectID)
,@CaptureInstance = (SELECT TOP 1 capture_instance FROM cdc.change_tables WHERE source_object_id = @ObjectID ORDER BY create_date);
SELECT @TmpCaptureInstance = @Tibble + '_' + @CaptureInstance
/* 新增临时 CDC capture instance , 使用一个临时名字*/
EXEC sys.sp_cdc_enable_table @source_schema = @SchemaName, @source_name = @ObjectName, @capture_instance = @TmpCaptureInstance, @role_name = 'cdc_Admin';
/* 新增字段到原 CT 表 */
DECLARE cColumns CURSOR LOCAL FAST_FORWARD FOR
WITH LastColumn AS (
/* 获取所有CT表没有的最新字段 */
SELECT ct.source_object_id,MAX(column_id) AS MaxColumnID
FROM cdc.captured_columns AS cc INNER JOIN cdc.change_tables AS ct ON cc.object_id = ct.object_id
WHERE cc.object_id = @OriginalCDCObjectID
GROUP BY ct.source_object_id
) SELECT c.name,c.column_id ,CASE
WHEN t.name IN ('datetime2', 'varchar', 'char', 'binary', 'varbinary', 'float') THEN t.name + '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length AS NVARCHAR(20)) END + ')'
WHEN t.name IN ('nchar', 'nvarchar') THEN t.name + '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length / 2 AS NVARCHAR(20)) END + ')'
WHEN t.name IN ('decimal', 'numeric') THEN t.name+ '(' + CAST(c.precision AS NVARCHAR(20)) + ', ' + CAST(c.scale AS NVARCHAR(20)) + ')'
ELSE t.name END AS DataType
,t.name AS TypeName,c.is_computed
FROM sys.columns AS c
INNER JOIN LastColumn AS lc ON c.object_id = lc.source_object_id
INNER JOIN sys.types AS t ON c.user_type_id = t.user_type_id
WHERE c.column_id > lc.MaxColumnID;
OPEN cColumns;
FETCH NEXT FROM cColumns INTO @ColumnName, @ColumnID, @ColumnDataType, @ColumnTypeName, @IsComputed;
/* 查找字段最大序号 */
SELECT @ColumnOrdinal = MAX(column_ordinal)
FROM cdc.captured_columns
WHERE [object_id] = @OriginalCDCObjectID;
WHILE (@@FETCH_STATUS = 0)
BEGIN;
SELECT @ColumnOrdinal += 1; /* Increment the column ordinal */
--SELECT @CaptureInstance, @ColumnName, @ColumnDataType, @ColumnOrdinal
SELECT @SQL = 'ALTER TABLE ' + QUOTENAME(OBJECT_SCHEMA_NAME(@OriginalCDCObjectID)) + '.' + QUOTENAME(OBJECT_NAME(@OriginalCDCObjectID)) + ' ADD [' + @ColumnName + '] ' + @ColumnDataType + ' NULL';
PRINT @SQL;
EXEC(@SQL);
/* 更新字段信息到元数据表 cdc.captured_colums */
INSERT INTO cdc.captured_columns([object_id], column_name, column_id, column_type, column_ordinal, is_computed)
VALUES(@OriginalCDCObjectID, @ColumnName, @ColumnID, @ColumnTypeName, @ColumnOrdinal, @IsComputed);
FETCH NEXT FROM cColumns INTO @ColumnName, @ColumnID, @ColumnDataType, @ColumnTypeName, @IsComputed
END;
/* 找到刚更新的对象ID */
SELECT @NewCDCObjectID = [object_id]
FROM cdc.change_tables
WHERE source_object_id = @ObjectID AND object_id != @OriginalCDCObjectID
/* batch insert proc */
SELECT @SQL = STUFF(OBJECT_DEFINITION(OBJECT_ID('cdc.sp_batchinsert_' + CAST(@NewCDCObjectID AS NVARCHAR(20)))), 1, 6, 'alter');
SELECT @SQL = REPLACE(@SQL, CAST(@NewCDCObjectID AS NVARCHAR(20)), CAST(@OriginalCDCObjectID AS NVARCHAR(20)));
SELECT @SQL = REPLACE(@SQL, @TmpCaptureInstance, @CaptureInstance);
EXEC(@SQL);
/* insdel insert proc */
SELECT @SQL = STUFF(OBJECT_DEFINITION(OBJECT_ID('cdc.sp_insdel_' + CAST(@NewCDCObjectID AS NVARCHAR(20)))), 1, 6, 'alter');
SELECT @SQL = REPLACE(@SQL, CAST(@NewCDCObjectID AS NVARCHAR(20)), CAST(@OriginalCDCObjectID AS NVARCHAR(20)));
SELECT @SQL = REPLACE(@SQL, @TmpCaptureInstance, @CaptureInstance);
EXEC(@SQL);
/* upd insert proc */
SELECT @SQL = STUFF(OBJECT_DEFINITION(OBJECT_ID('cdc.sp_upd_' + CAST(@NewCDCObjectID AS NVARCHAR(20)))), 1, 6, 'alter');
SELECT @SQL = REPLACE(@SQL, CAST(@NewCDCObjectID AS NVARCHAR(20)), CAST(@OriginalCDCObjectID AS NVARCHAR(20)));
SELECT @SQL = REPLACE(@SQL, @TmpCaptureInstance, @CaptureInstance);
EXEC(@SQL);
/* 下一张表对象 */
FETCH NEXT FROM CDCObjects INTO @OriginalCDCObjectID, @ObjectID, @MaxColumnID;
END;
CLOSE CDCObjects;
DEALLOCATE CDCObjects;
/* 删除临时CT表 */
OPEN TmpCDCObjects;
FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;
WHILE (@@FETCH_STATUS = 0)
BEGIN;
/* 移除 CDC capture instance */
EXEC sys.sp_cdc_disable_table @source_schema = @SchemaName, @source_name = @ObjectName, @capture_instance = @TmpCaptureInstance;
FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;
END;
CLOSE TmpCDCObjects;
DEALLOCATE TmpCDCObjects;
END;
SET NOCOUNT OFF;