当前位置: 首页 > article >正文

sql server cdc重启监控新加表字段

表新加字段,cdc要重启才能监控新加字段。但如果你有一个定时任务扫描cdc的_CT表,在以下两个场景会导致数据丢失。

  1. CDC重启时捕获不到changelog
  2. 重启前未消费完的CDC数据会被清空

通过以下脚本让cdc监控新字段可以避免数据丢失

下面脚本的DECLARE @TableName NVARCHAR(200) = 'your_table_name'这一行,将your_table_name替换成对应的表名,然后执行即可。

核心原理是通过开启一个新的CDC,使用替换的方法,不需要重启,数据也不丢失。

/*
表新加字段可以通过下面5个步骤实现
1、新开启一个临时捕捉实例
2、CT表增加字段
3、把新加字段写到cdc.captured_columns
4、修改cdc的存储过程,用临时捕捉实例的替换
cdc.sp_batchinsert_<objectid>
cdc.sp_insdel_<objectid>
cdc.sp_upd_<objectid>
5、删掉临时捕捉实例
*/
SET NOCOUNT ON;
DECLARE @Tibble             NVARCHAR(30) = '$tmp$'; /*临时CT表*/
DECLARE @ObjectID           INT;
DECLARE @MaxColumnID        INT;
DECLARE @ErrorMessage       NVARCHAR(MAX);
DECLARE @SchemaName         NVARCHAR(128);
DECLARE @ObjectName         NVARCHAR(128);
DECLARE @CaptureInstance    NVARCHAR(128);
DECLARE @TmpCaptureInstance NVARCHAR(128);
DECLARE @ColumnName         NVARCHAR(128);
DECLARE @ColumnID           INT;
DECLARE @ColumnDataType     NVARCHAR(128);
DECLARE @ColumnTypeName     NVARCHAR(128);
DECLARE @SQL                NVARCHAR(MAX);
DECLARE @ColumnOrdinal      INT;
DECLARE @IsComputed         BIT;
DECLARE @OriginalCDCObjectID INT;
DECLARE @NewCDCObjectID     INT;

DECLARE @TableName NVARCHAR(200) = 'your_table_name'	-- 指定表
 
/* 当前DB是否有CT表 */
IF EXISTS(SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID('cdc.change_tables'))
BEGIN
    /* 游标删除临时CT表,也许是上次执行留下的 */
    DECLARE TmpCDCObjects CURSOR LOCAL FAST_FORWARD FOR
    SELECT OBJECT_SCHEMA_NAME(source_object_id),OBJECT_NAME(source_object_id),capture_instance
    FROM cdc.change_tables
    WHERE capture_instance LIKE @Tibble + '%'
	AND OBJECT_NAME(source_object_id) = @TableName
	;
 
    OPEN TmpCDCObjects;
    FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;
    WHILE (@@FETCH_STATUS = 0)
    BEGIN
        /* 删除临时CT表,即禁用 CDC capture instance */
        EXEC sys.sp_cdc_disable_table @source_schema = @SchemaName, @source_name = @ObjectName, @capture_instance = @CaptureInstance;
        FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;
    END
    CLOSE TmpCDCObjects;
     
    /* 如果一张表启用 CDC capture instance 超过1个,则中断操作!*/
    IF EXISTS(SELECT source_object_id, COUNT(*) FROM cdc.change_tables GROUP BY source_object_id HAVING COUNT(*) > 1)
    BEGIN
        DEALLOCATE TmpCDCObjects;
        SELECT @ErrorMessage = 'Unable to update CDC as there are objects with more than one capture instance in use. You must update CDC manually';
        THROW 50000, @ErrorMEssage, 1;
    END;
 
    /* Loop over all the CDC tables that do not have the latest columns */
    /* 遍历所有新字段cdc表做相关处理 */
    DECLARE CDCObjects CURSOR LOCAL FAST_FORWARD FOR
        SELECT ct.object_id,ct.source_object_id,MAX(cc.column_id) AS MaxCapturedColumnID
        FROM cdc.captured_columns AS cc INNER JOIN cdc.change_tables AS ct ON cc.object_id = ct.object_id
		WHERE OBJECT_NAME(source_object_id) = @TableName
        GROUP BY ct.source_object_id,ct.object_id
        HAVING MAX(cc.column_id) < (SELECT MAX(column_id) FROM sys.columns AS c WHERE c.object_id = ct.source_object_id);
 
    OPEN CDCObjects;
    FETCH NEXT FROM CDCObjects INTO @OriginalCDCObjectID, @ObjectID, @MaxColumnID;
    WHILE (@@FETCH_STATUS = 0)
    BEGIN
        SELECT
        @SchemaName = OBJECT_SCHEMA_NAME(@ObjectID)
        ,@ObjectName = OBJECT_NAME(@ObjectID)
        ,@CaptureInstance = (SELECT TOP 1 capture_instance FROM cdc.change_tables WHERE source_object_id = @ObjectID ORDER BY create_date);
        SELECT @TmpCaptureInstance = @Tibble + '_' + @CaptureInstance
 
        /* 新增临时 CDC capture instance , 使用一个临时名字*/
        EXEC sys.sp_cdc_enable_table @source_schema = @SchemaName, @source_name = @ObjectName, @capture_instance = @TmpCaptureInstance, @role_name = 'cdc_Admin';
 
        /* 新增字段到原 CT 表 */
        DECLARE cColumns CURSOR LOCAL FAST_FORWARD FOR
            WITH LastColumn AS (
                /* 获取所有CT表没有的最新字段 */
                SELECT ct.source_object_id,MAX(column_id) AS MaxColumnID
                FROM cdc.captured_columns AS cc INNER JOIN cdc.change_tables AS ct ON cc.object_id = ct.object_id
                WHERE cc.object_id = @OriginalCDCObjectID
                GROUP BY ct.source_object_id
            ) SELECT c.name,c.column_id ,CASE
                WHEN t.name IN ('datetime2', 'varchar', 'char', 'binary', 'varbinary', 'float') THEN t.name + '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length AS NVARCHAR(20)) END + ')'
                WHEN t.name IN ('nchar', 'nvarchar') THEN t.name + '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length / 2 AS NVARCHAR(20)) END + ')'
                WHEN t.name IN ('decimal', 'numeric') THEN t.name+ '(' + CAST(c.precision AS NVARCHAR(20)) + ', ' + CAST(c.scale AS NVARCHAR(20)) + ')'
                ELSE t.name END AS DataType
            ,t.name AS TypeName,c.is_computed
            FROM sys.columns AS c
            INNER JOIN LastColumn AS lc ON c.object_id = lc.source_object_id
            INNER JOIN sys.types AS t ON c.user_type_id = t.user_type_id
            WHERE c.column_id > lc.MaxColumnID;
         
        OPEN cColumns;
        FETCH NEXT FROM cColumns INTO @ColumnName, @ColumnID, @ColumnDataType, @ColumnTypeName, @IsComputed;
 
        /* 查找字段最大序号 */
        SELECT @ColumnOrdinal = MAX(column_ordinal)
        FROM cdc.captured_columns
        WHERE [object_id] = @OriginalCDCObjectID;
 
        WHILE (@@FETCH_STATUS = 0)
        BEGIN;
            SELECT @ColumnOrdinal += 1; /* Increment the column ordinal */
            --SELECT @CaptureInstance, @ColumnName, @ColumnDataType, @ColumnOrdinal
            SELECT @SQL = 'ALTER TABLE ' + QUOTENAME(OBJECT_SCHEMA_NAME(@OriginalCDCObjectID)) + '.' + QUOTENAME(OBJECT_NAME(@OriginalCDCObjectID)) + ' ADD [' + @ColumnName + '] ' + @ColumnDataType + ' NULL';
            PRINT @SQL;
            EXEC(@SQL);
            /* 更新字段信息到元数据表 cdc.captured_colums */
            INSERT INTO cdc.captured_columns([object_id], column_name, column_id, column_type, column_ordinal, is_computed)
            VALUES(@OriginalCDCObjectID, @ColumnName, @ColumnID, @ColumnTypeName, @ColumnOrdinal, @IsComputed);
            FETCH NEXT FROM cColumns INTO @ColumnName, @ColumnID, @ColumnDataType, @ColumnTypeName, @IsComputed
        END;
 
        /* 找到刚更新的对象ID */
        SELECT @NewCDCObjectID = [object_id]
        FROM cdc.change_tables
        WHERE source_object_id = @ObjectID AND object_id != @OriginalCDCObjectID
 
        /* batch insert proc */
        SELECT @SQL = STUFF(OBJECT_DEFINITION(OBJECT_ID('cdc.sp_batchinsert_' + CAST(@NewCDCObjectID AS NVARCHAR(20)))), 1, 6, 'alter');
        SELECT @SQL = REPLACE(@SQL, CAST(@NewCDCObjectID AS NVARCHAR(20)), CAST(@OriginalCDCObjectID AS NVARCHAR(20)));
        SELECT @SQL = REPLACE(@SQL, @TmpCaptureInstance, @CaptureInstance);
        EXEC(@SQL);
        /* insdel insert proc */
        SELECT @SQL = STUFF(OBJECT_DEFINITION(OBJECT_ID('cdc.sp_insdel_' + CAST(@NewCDCObjectID AS NVARCHAR(20)))), 1, 6, 'alter');
        SELECT @SQL = REPLACE(@SQL, CAST(@NewCDCObjectID AS NVARCHAR(20)), CAST(@OriginalCDCObjectID AS NVARCHAR(20)));
        SELECT @SQL = REPLACE(@SQL, @TmpCaptureInstance, @CaptureInstance);
        EXEC(@SQL);
        /* upd insert proc */
        SELECT @SQL = STUFF(OBJECT_DEFINITION(OBJECT_ID('cdc.sp_upd_' + CAST(@NewCDCObjectID AS NVARCHAR(20)))), 1, 6, 'alter');
        SELECT @SQL = REPLACE(@SQL, CAST(@NewCDCObjectID AS NVARCHAR(20)), CAST(@OriginalCDCObjectID AS NVARCHAR(20)));
        SELECT @SQL = REPLACE(@SQL, @TmpCaptureInstance, @CaptureInstance);
        EXEC(@SQL);
        /* 下一张表对象 */
        FETCH NEXT FROM CDCObjects INTO @OriginalCDCObjectID, @ObjectID, @MaxColumnID;
    END;
    CLOSE CDCObjects;
    DEALLOCATE CDCObjects;
 
    /* 删除临时CT表 */
    OPEN TmpCDCObjects;
    FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;
    WHILE (@@FETCH_STATUS = 0)
    BEGIN;
        /* 移除 CDC capture instance */
        EXEC sys.sp_cdc_disable_table @source_schema = @SchemaName, @source_name = @ObjectName, @capture_instance = @TmpCaptureInstance;
        FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;
    END;
    CLOSE TmpCDCObjects;
    DEALLOCATE TmpCDCObjects;
END;
SET NOCOUNT OFF;


http://www.kler.cn/a/470106.html

相关文章:

  • 企业国外传输大文件到国内该怎么做?
  • vue 什么时候使用v-if 什么时候使用v-show
  • gesp(C++四级)(6)洛谷:B3870:[GESP202309 四级] 变长编码
  • Bash语言的函数实现
  • Timer、Ticker使用及其注意事项
  • 感知器的那些事
  • asp.net core mvc的 ViewBag , ViewData , Module ,TempData
  • JS数组转字符串(3种方法)
  • 字母异位分组力扣--49
  • 福建省乡镇界面数据arcgis格式shp乡镇名称和编码无偏移坐标内容测评
  • 什么是Spring Boot?深度解析其核心概念与优势
  • 从MySQL迁移到PostgreSQL的完整指南
  • Golang设计模式目录
  • CSS语言的软件开发工具
  • Easysearch Java SDK 2.0.x 使用指南(三)
  • 【微服务】5、服务保护 Sentinel
  • AWS Auto Scaling基础知识
  • 【AI日记】25.01.06
  • Ruby语言的语法
  • SVN简单使用教程
  • linux 查找redis 的配置文件 (`redis.conf`)
  • Kubernetes Gateway API-4-TCPRoute和GRPCRoute
  • 上网行为审计是什么?有什么功能?企业为什么需要上网行为审计?
  • 印象笔记08——便签功能
  • CSS学习记录24
  • Java中使用JFreeChart生成甘特图