当前位置: 首页 > article >正文

flink cdc 应用

SQLServer

1. The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.

遇到了一下问题,多次尝试,最终发现是数据库大小写要一致。

Caused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.
	at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]
	at io.debezium.schema.HistorizedDatabaseSchema.recover(HistorizedDatabaseSchema.java:38) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]
	at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext.validateAndLoadDatabaseHistory(SqlServerSourceFetchTaskContext.java:187) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]
	at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext.configure(SqlServerSourceFetchTaskContext.java:130) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]
	at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:84) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]
	at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.submitStreamSplit(IncrementalSourceSplitReader.java:261) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]
	at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:153) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]
	at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-files-1.20.0.jar:1.20.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
  CREATE TABLE Member_Extend (ID INT, MemberID INT,  PRIMARY KEY (ID) NOT ENFORCED
        ) WITH (
            'connector' = 'sqlserver-cdc',
            'hostname' = '192.168.1.3',
            'port' = '1433',
            'username' = 'test',
            'password' = 'test',
            'database-name' = 'CrmExtend',
            'table-name' = 'dbo.Member_Extend'
        );

作业安全启停


show jobs
Flink SQL> show jobs;
+----------------------------------+------------------------------------------------------------------+----------+-------------------------+
|                           job id |                                                         job name |   status |              start time |
+----------------------------------+------------------------------------------------------------------+----------+-------------------------+
| ce5a0e938563cf52317c5b9055ad102f| testjob |  RUNNING | 2024-11-15T03:38:47.919 |

+----------------------------------+------------------------------------------------------------------+----------+-------------------------+
4 rows in set



SET state.checkpoints.dir='s3://flink/cdc-1.20/savepoints';
stop job 'ce5a0e938563cf52317c5b9055ad102f' with savepoint;

Flink SQL> stop job 'ce5a0e938563cf52317c5b9055ad102f' with savepoint;
+--------------------------------------------------------------+
|                                               savepoint path |
+--------------------------------------------------------------+
| s3://flink/cdc-1.20/savepoints/savepoint-ce5a0e-2935055bb307 |
+--------------------------------------------------------------+
1 row in set

SET execution.savepoint.path='s3://flink/cdc-1.20/savepoints/savepoiznt-ce5a0e-2935055bb307';  
set 'execution.savepoint.ignore-unclaimed-state' = 'true'; 


重新执行原有sql

insert into flink_user select * from user ;


http://www.kler.cn/a/396058.html

相关文章:

  • 信奥学习规划(CSP-J/S)
  • 《C语言程序设计现代方法》note-4 基本类型 强制类型转换 类型定义
  • 鸿蒙next ui安全区域适配(刘海屏、摄像头挖空等)
  • FreeRTOS的列表与列表项
  • Vulnhub靶场案例渗透[8]- HackableII
  • 城市轨道交通数据可视化的应用与优势
  • 深度解析 Feign
  • Pytorch无法使用GPU的问题的原因
  • 推荐一款全能网络视频下载工具:闪豆视频下载器
  • 【蓝桥等考C++真题】蓝桥杯等级考试C++组第13级L13真题原题(含答案)-最大的数
  • 我手搓了个“自动生成标书”的开源大模型工具
  • 物联网低功耗广域网LoRa开发(三):Lora人机界面
  • 使用阿里云快速搭建 DataLight 平台
  • C++如何调用Python脚本
  • 24/11/14 算法笔记<强化学习> 马尔可夫
  • PaddlePaddle 开源产业级文档印章识别PaddleX-Pipeline “seal_recognition”模型 开箱即用篇(一)
  • <项目代码>YOLOv8 瞳孔识别<目标检测>
  • 机器学习 ---线性回归
  • 【大语言模型】ACL2024论文-11 动态主题模型评估
  • 生产环境中AI调用的优化:AI网关高价值应用实践
  • 二、JS书写位置、输入输出语句、变量常量、数据类型、运算符
  • 去中心化治理的实现:Web3的区块链新玩法
  • 笔记--(网络服务2)、DHCP原理与配置
  • Django SQL 查询优化方案:性能与可读性分析
  • 记录日志中logback和log4j2不能共存的问题
  • 计算机图形学在游戏开发中的应用