debezium独立版使用(不结合kafuka)
参考 CDC变化数据捕获——Debezium-Embedded-CSDN博客
pom.xml
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<version.debezium>1.4.2.Final</version.debezium>
<fastjson.version>1.2.75</fastjson.version>
</properties>
<dependencies>
<!-- 阿里JSON解析器 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!--过滤包-->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
Demo01.java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPObject;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.relational.history.FileDatabaseHistory;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class Demo01 {
public static void main(String[] args) {
Properties props = genProps(); // 0. 配置数据库,添加用户,赋予主从同步的权限 // 1. 生成配置
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class) // 2. 构建 DebeziumEngine // 使用 Json 格式
.using(props)
.notifying(record -> {
log.error("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
System.out.println(record);
log.error("record.key() = " + record.key());// record中会有操作的类型(增、删、改)和具体的数据 // key是主键
log.error("record.value() = " + record.value());
String value = record.value();
JSONObject jsonObject = JSON.parseObject(value);
Object act = jsonObject.get("op");
if(act.equals("u")){
log.error("更新操作");
}else if(act.equals("d")){
log.error("删除");
}else if(act.equals("c")){
log.error("新增");
}
log.error("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
})
.using((success, message, error) -> { // 强烈建议加上此部分的回调代码,方便查看错误信息
if (!success && error != null) {
log.error("----------error------"); // 报错回调
log.info(message);
}else {
log.info(message);
}
}).build();
ExecutorService executor = Executors.newSingleThreadExecutor();// 3. 正式运行
executor.execute(engine);
}
// 配置
private static Properties genProps() {
Properties props = new Properties();
// 在maven处引入其他数据库的连接器,例如debezium-connector-postgres,再修改此处的connector.class,即可使用其他数据库的CDC
props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
props.setProperty("database.server.name", "XXX"); // 可以任意修改
props.setProperty("database.hostname", "localhost"); // IP
props.setProperty("database.port","3306"); // 端口
props.setProperty("database.user", "XXX"); // 用户
props.setProperty("database.password", "XXX"); // 密码
props.setProperty("database.serverTimezone", "UTC"); // 时区
// 下面两个是数据库和表,注意只能选择一种:
// 1. 使用database.whitelist,只设置数据库(会通知全库的CDC信息)
// 2. 使用table.whitelist,设置库名和表名(会通知单个库的单个表的CDC信息)
// props.setProperty("database.whitelist", "db_inventory_cdc");
props.setProperty("table.whitelist", "md_mes_db.sys_student"); // 库.表名
props.setProperty("name", "engine");
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");
props.setProperty("include.schema.changes", "false");
props.setProperty("tombstones.on.delete", "false");
props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
props.setProperty("database.history.store.only.monitored.tables.ddl", "true");
props.setProperty("database.history.file.filename", "E://debezuim/history.dat");
props.setProperty("database.history.instance.name", UUID.randomUUID().toString());
props.setProperty("database.history.skip.unparseable.ddl", "true");
props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());// 偏移量持久化配置
props.setProperty("offset.storage.file.filename", "E://debezuim/offsets.dat");
props.setProperty("offset.flush.interval.ms", "10");
return props;
}
}
application.properties
logging.config=classpath:log4j.properties
log4j.properties
# 设置根日志级别
#log4j.rootLogger=DEBUG, stdout, R
log4j.rootLogger=ERROR, stdout, R
# 标准输出Appender配置
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
# 文件输出Appender配置
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=logs/app.log
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
效果:
2024-12-24 10:26:56 ERROR Demo01:24 - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
EmbeddedEngineChangeEvent [key={"id":977356}, value={"before":null,"after":{"id":977356,"factory_code":3,"jzx_num":"5","jzb_ci_number":null,"variety":"","specification":"","modality":"","furnace_num":null,"melt_num":null,"pack_num":"241223_6410009","jzxs":39,"classes":null,"jzks_time":1734942984000,"jzjs_time":1734943354000,"dbdr_time":null,"jzsw":null,"cwzl":386,"jzmw":1376,"remark":null,"line_num":null,"jzbxh":"275","plan_num":"","jzsw_time":null,"jzmw_time":"24/12/23 08:40:22","jhfbzl":"3727","jzsj":370,"jz_timeout":null,"fire_time":12,"results":null,"pt_weight":null,"pt_time":null,"state":1,"sjfbzl":370,"bntsl":214,"yyjjrl":0.0,"create_time":1734943353000},"source":{"version":"1.4.2.Final","connector":"mysql","name":"md_mes_db","ts_ms":1735007216000,"snapshot":"false","db":"md_mes_db","table":"tb_upload_pouringpack","server_id":1,"gtid":null,"file":"mysql-bin.000033","pos":101113167,"row":0,"thread":784,"query":null},"op":"c","ts_ms":1735007216512,"transaction":null}, sourceRecord=SourceRecord{sourcePartition={server=md_mes_db}, sourceOffset={ts_sec=1735007216, file=mysql-bin.000033, pos=101112937, row=1, server_id=1, event=2}} ConnectRecord{topic='md_mes_db.md_mes_db.tb_upload_pouringpack', kafkaPartition=null, key=Struct{id=977356}, keySchema=Schema{md_mes_db.md_mes_db.tb_upload_pouringpack.Key:STRUCT}, value=Struct{after=Struct{id=977356,factory_code=3,jzx_num=5,variety=,specification=,modality=,pack_num=241223_6410009,jzxs=39,jzks_time=1734942984000,jzjs_time=1734943354000,cwzl=386,jzmw=1376,jzbxh=275,plan_num=,jzmw_time=24/12/23 08:40:22,jhfbzl=3727,jzsj=370,fire_time=12,state=1,sjfbzl=370,bntsl=214,yyjjrl=0.0,create_time=1734943353000},source=Struct{version=1.4.2.Final,connector=mysql,name=md_mes_db,ts_ms=1735007216000,db=md_mes_db,table=tb_upload_pouringpack,server_id=1,file=mysql-bin.000033,pos=101113167,row=0,thread=784},op=c,ts_ms=1735007216512}, valueSchema=Schema{md_mes_db.md_mes_db.tb_upload_pouringpack.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
2024-12-24 10:26:56 ERROR Demo01:26 - record.key() = {"id":977356}
2024-12-24 10:26:56 ERROR Demo01:27 - record.value() = {"before":null,"after":{"id":977356,"factory_code":3,"jzx_num":"5","jzb_ci_number":null,"variety":"","specification":"","modality":"","furnace_num":null,"melt_num":null,"pack_num":"241223_6410009","jzxs":39,"classes":null,"jzks_time":1734942984000,"jzjs_time":1734943354000,"dbdr_time":null,"jzsw":null,"cwzl":386,"jzmw":1376,"remark":null,"line_num":null,"jzbxh":"275","plan_num":"","jzsw_time":null,"jzmw_time":"24/12/23 08:40:22","jhfbzl":"3727","jzsj":370,"jz_timeout":null,"fire_time":12,"results":null,"pt_weight":null,"pt_time":null,"state":1,"sjfbzl":370,"bntsl":214,"yyjjrl":0.0,"create_time":1734943353000},"source":{"version":"1.4.2.Final","connector":"mysql","name":"md_mes_db","ts_ms":1735007216000,"snapshot":"false","db":"md_mes_db","table":"tb_upload_pouringpack","server_id":1,"gtid":null,"file":"mysql-bin.000033","pos":101113167,"row":0,"thread":784,"query":null},"op":"c","ts_ms":1735007216512,"transaction":null}
2024-12-24 10:26:56 ERROR Demo01:36 - 新增
2024-12-24 10:26:56 ERROR Demo01:38 - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~