Flink cdc debug调试动态变更表结构
文章目录
- 前言
- 调试流程
- 1. 拉取代码本地打包
- 2. 配置启动参数
- 3. 日志配置
- 4. 启动验证
- 5. 断点验证
- 问题
- 1. Cannot find factory with identifier "mysql" in the classpath.
- 2.JsonFactory异常
- 3. NoSuchMethodError异常
- 其他
- 结尾
前言
接着上一篇Flink cdc3.0动态变更表结构——源码解析,cdc debug部分官方没有特别说明,尝试踩了一些坑, 这里记录下。
调试流程
1. 拉取代码本地打包
通过 github 拉取3.0.0
以上版本,本地maven打包 mvn clean package -DskipTests
2. 配置启动参数
搜索启动类CliFrontend
,修改运行配置。
需要配置环境变量FLINK_HOME
, pipeline connector依赖包
,flink-dist 包
,以及指定配置文件,--use-mini-cluster true
代表使用 local 集群。
mysql-to-doris.yaml
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 1
3. 日志配置
如果想debug时控制台有日志输出,需要修改主pom.xml中org.apache.logging.log4j
相关的scope
。然后再flink-cdc-cli
模块添加对应的log4j2.xml
日志配置。我这里为了测试效果,将日志级别调成info
。
4. 启动验证
在测试前需要doris创建好对应的数据库,启动验证库表能正常同步,这次我们拿app_db. products
表进行测试。目前表结构和数据能正常同步,接下来我们准备一条添加字段的sql在mysql执行
5. 断点验证
我们在SchemaOperator
类的processElement
方法中提前加上断点,在mysql中执行alter table products add COLUMN name VARCHAR(64);
,可以看到断点处捕获到了对应的SchemaChangeEvent
事件。
从这里可以看到SchemaChangeEvent
的生成及之后的处理,和上一篇分析的流程也是相同的。完整执行日志:
doris上也能查到添加的字段,不过是VARCHAR(256)
,这里又是一个bug。
问题
1. Cannot find factory with identifier “mysql” in the classpath.
没有添加flink-cdc-pipeline-connector-mysql-3.0-SNAPSHOT.jar
引起的异常
2.JsonFactory异常
下面这个异常网上很多解决方案是添加com.fasterxml.jackson.core
相关包,而这里实际可能是classpath没有添加flink-dist-1.18.0.jar
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/shaded/jackson2/com/fasterxml/jackson/core/JsonFactory
at com.ververica.cdc.cli.utils.FlinkEnvironmentUtils.loadFlinkConfiguration(FlinkEnvironmentUtils.java:33)
at com.ververica.cdc.cli.CliFrontend.createExecutor(CliFrontend.java:89)
at com.ververica.cdc.cli.CliFrontend.main(CliFrontend.java:62)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
3. NoSuchMethodError异常
如果classpath引入的pipeline版本不匹配,可能会找出NoSuchMethodError异常, 比如我最开始使用的官网的链接:
https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar
https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar
Caused by: java.lang.NoSuchMethodError: com.ververica.cdc.common.event.AddColumnEvent$ColumnWithPosition.<init>(Lcom/ververica/cdc/common/schema/Column;Lcom/ververica/cdc/common/event/AddColumnEvent$ColumnPosition;Lcom/ververica/cdc/common/schema/Column;)V
at com.ververica.cdc.connectors.mysql.source.parser.CustomAlterTableParserListener.lambda$exitAlterByAddColumn$0(CustomAlterTableParserListener.java:120)
at io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser.runIfNotNull(MySqlAntlrDdlParser.java:358)
at com.ververica.cdc.connectors.mysql.source.parser.CustomAlterTableParserListener.exitAlterByAddColumn(CustomAlterTableParserListener.java:98)
at io.debezium.ddl.parser.mysql.generated.MySqlParser$AlterByAddColumnContext.exitRule(MySqlParser.java:15459)
at io.debezium.antlr.ProxyParseTreeListenerUtil.delegateExitRule(ProxyParseTreeListenerUtil.java:64)
at com.ververica.cdc.connectors.mysql.source.parser.CustomMySqlAntlrDdlParserListener.exitEveryRule(CustomMySqlAntlrDdlParserListener.java:122)
at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:48)
其他
调试环境准备的过程中,会遇到各种小问题。这里也很难罗列完,基本是根据日志来判断处理,既然有案例可以正常调试,相信大家也可以的。
结尾
参考文献:https://docs.google.com/document/d/1L6cJiqYkAsZ_nDa3MgRwV3SKQuw5OrMbqGC4YgzgKR4/edit#heading=h.aybxdd96r62i