当前位置: 首页 > article >正文

debezium独立版使用(不结合kafuka)

参考 CDC变化数据捕获——Debezium-Embedded-CSDN博客

pom.xml

 <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <version.debezium>1.4.2.Final</version.debezium>
        <fastjson.version>1.2.75</fastjson.version>
    </properties>
    <dependencies>
        <!-- 阿里JSON解析器 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <!--过滤包-->
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-mysql</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

    </dependencies>
Demo01.java

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPObject;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.relational.history.FileDatabaseHistory;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class Demo01 {
    public static void main(String[] args) {
        Properties props = genProps(); // 0. 配置数据库,添加用户,赋予主从同步的权限  // 1. 生成配置
        DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)  // 2. 构建 DebeziumEngine    // 使用 Json 格式
                .using(props)
                .notifying(record -> {
                    log.error("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
                    System.out.println(record);
                    log.error("record.key() = " + record.key());// record中会有操作的类型(增、删、改)和具体的数据  // key是主键
                    log.error("record.value() = " + record.value());
                    String value = record.value();
                    JSONObject jsonObject = JSON.parseObject(value);
                    Object act = jsonObject.get("op");
                    if(act.equals("u")){
                        log.error("更新操作");
                    }else if(act.equals("d")){
                        log.error("删除");
                    }else if(act.equals("c")){
                        log.error("新增");
                    }
                    log.error("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
                })
                .using((success, message, error) -> { // 强烈建议加上此部分的回调代码,方便查看错误信息
                    if (!success && error != null) {
                        log.error("----------error------");  // 报错回调
                        log.info(message);
                    }else {
                        log.info(message);
                    }
                }).build();
        ExecutorService executor = Executors.newSingleThreadExecutor();// 3. 正式运行
        executor.execute(engine);
    }
    // 配置
    private static Properties genProps() {
        Properties props = new Properties();
        // 在maven处引入其他数据库的连接器,例如debezium-connector-postgres,再修改此处的connector.class,即可使用其他数据库的CDC
        props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
        props.setProperty("database.server.name", "XXX"); // 可以任意修改
        props.setProperty("database.hostname", "localhost"); // IP
        props.setProperty("database.port","3306"); // 端口
        props.setProperty("database.user", "XXX"); // 用户
        props.setProperty("database.password", "XXX"); // 密码
        props.setProperty("database.serverTimezone", "UTC"); // 时区
        // 下面两个是数据库和表,注意只能选择一种:
        // 1. 使用database.whitelist,只设置数据库(会通知全库的CDC信息)
        // 2. 使用table.whitelist,设置库名和表名(会通知单个库的单个表的CDC信息)
//        props.setProperty("database.whitelist", "db_inventory_cdc");
        props.setProperty("table.whitelist", "md_mes_db.sys_student"); // 库.表名
        props.setProperty("name", "engine");
        props.setProperty("key.converter.schemas.enable", "false");
        props.setProperty("value.converter.schemas.enable", "false");
        props.setProperty("include.schema.changes", "false");
        props.setProperty("tombstones.on.delete", "false");
        props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
        props.setProperty("database.history.store.only.monitored.tables.ddl", "true");
        props.setProperty("database.history.file.filename", "E://debezuim/history.dat");
        props.setProperty("database.history.instance.name", UUID.randomUUID().toString());
        props.setProperty("database.history.skip.unparseable.ddl", "true");
        props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());// 偏移量持久化配置
        props.setProperty("offset.storage.file.filename", "E://debezuim/offsets.dat");
        props.setProperty("offset.flush.interval.ms", "10");
        return props;
    }





}

application.properties

logging.config=classpath:log4j.properties

log4j.properties

# 设置根日志级别
#log4j.rootLogger=DEBUG, stdout, R
log4j.rootLogger=ERROR, stdout, R

# 标准输出Appender配置
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

# 文件输出Appender配置
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=logs/app.log
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

效果:

2024-12-24 10:26:56 ERROR Demo01:24 - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
EmbeddedEngineChangeEvent [key={"id":977356}, value={"before":null,"after":{"id":977356,"factory_code":3,"jzx_num":"5","jzb_ci_number":null,"variety":"","specification":"","modality":"","furnace_num":null,"melt_num":null,"pack_num":"241223_6410009","jzxs":39,"classes":null,"jzks_time":1734942984000,"jzjs_time":1734943354000,"dbdr_time":null,"jzsw":null,"cwzl":386,"jzmw":1376,"remark":null,"line_num":null,"jzbxh":"275","plan_num":"","jzsw_time":null,"jzmw_time":"24/12/23 08:40:22","jhfbzl":"3727","jzsj":370,"jz_timeout":null,"fire_time":12,"results":null,"pt_weight":null,"pt_time":null,"state":1,"sjfbzl":370,"bntsl":214,"yyjjrl":0.0,"create_time":1734943353000},"source":{"version":"1.4.2.Final","connector":"mysql","name":"md_mes_db","ts_ms":1735007216000,"snapshot":"false","db":"md_mes_db","table":"tb_upload_pouringpack","server_id":1,"gtid":null,"file":"mysql-bin.000033","pos":101113167,"row":0,"thread":784,"query":null},"op":"c","ts_ms":1735007216512,"transaction":null}, sourceRecord=SourceRecord{sourcePartition={server=md_mes_db}, sourceOffset={ts_sec=1735007216, file=mysql-bin.000033, pos=101112937, row=1, server_id=1, event=2}} ConnectRecord{topic='md_mes_db.md_mes_db.tb_upload_pouringpack', kafkaPartition=null, key=Struct{id=977356}, keySchema=Schema{md_mes_db.md_mes_db.tb_upload_pouringpack.Key:STRUCT}, value=Struct{after=Struct{id=977356,factory_code=3,jzx_num=5,variety=,specification=,modality=,pack_num=241223_6410009,jzxs=39,jzks_time=1734942984000,jzjs_time=1734943354000,cwzl=386,jzmw=1376,jzbxh=275,plan_num=,jzmw_time=24/12/23 08:40:22,jhfbzl=3727,jzsj=370,fire_time=12,state=1,sjfbzl=370,bntsl=214,yyjjrl=0.0,create_time=1734943353000},source=Struct{version=1.4.2.Final,connector=mysql,name=md_mes_db,ts_ms=1735007216000,db=md_mes_db,table=tb_upload_pouringpack,server_id=1,file=mysql-bin.000033,pos=101113167,row=0,thread=784},op=c,ts_ms=1735007216512}, valueSchema=Schema{md_mes_db.md_mes_db.tb_upload_pouringpack.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
2024-12-24 10:26:56 ERROR Demo01:26 - record.key() = {"id":977356}
2024-12-24 10:26:56 ERROR Demo01:27 - record.value() = {"before":null,"after":{"id":977356,"factory_code":3,"jzx_num":"5","jzb_ci_number":null,"variety":"","specification":"","modality":"","furnace_num":null,"melt_num":null,"pack_num":"241223_6410009","jzxs":39,"classes":null,"jzks_time":1734942984000,"jzjs_time":1734943354000,"dbdr_time":null,"jzsw":null,"cwzl":386,"jzmw":1376,"remark":null,"line_num":null,"jzbxh":"275","plan_num":"","jzsw_time":null,"jzmw_time":"24/12/23 08:40:22","jhfbzl":"3727","jzsj":370,"jz_timeout":null,"fire_time":12,"results":null,"pt_weight":null,"pt_time":null,"state":1,"sjfbzl":370,"bntsl":214,"yyjjrl":0.0,"create_time":1734943353000},"source":{"version":"1.4.2.Final","connector":"mysql","name":"md_mes_db","ts_ms":1735007216000,"snapshot":"false","db":"md_mes_db","table":"tb_upload_pouringpack","server_id":1,"gtid":null,"file":"mysql-bin.000033","pos":101113167,"row":0,"thread":784,"query":null},"op":"c","ts_ms":1735007216512,"transaction":null}
2024-12-24 10:26:56 ERROR Demo01:36 - 新增
2024-12-24 10:26:56 ERROR Demo01:38 - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


http://www.kler.cn/a/453706.html

相关文章:

  • 计算机网络——期末复习(3)4-6章考试重点
  • 简单讲解关于微信小程序调整 miniprogram 后, tabbar 找不到图片的原因之一
  • qt QZipReader详解
  • 《QT 5.14.1 搭建 opencv 环境全攻略》
  • AI无人直播详解
  • 网络安全词云图与技术浅谈
  • NLP中的神经网络基础
  • CSS(二):美化网页元素
  • 前端:改变鼠标点击物体的颜色
  • CSS快速入门
  • flask后端开发(7):加载静态文件
  • v3+ts 批量引入组件
  • DDI-GPT:使用知识图谱增强的大模型对药物相互作用进行可解释的预测
  • PPO 可能出现 KL 爆炸等问题的详细分析(KL Explosions in PPO): 中英双语
  • 无问社区-无问AI模型
  • 从零开始掌握Spring MVC:深入解析@Controller与@RequestMapping注解的使用
  • iic通信底层讲解
  • Golang微服务-protobuf
  • Niushop开源商城(漏洞复现)
  • 基于人工智能时代政务智慧转型的实现前景初探
  • 实战分享:开发设计文档模版及编写要点
  • 【高等数学】空间解析几何
  • BFS【东北大学oj数据结构11-3】C++
  • 如何在 Ubuntu 22.04 上安装以及使用 MongoDB
  • 牛客网刷题 ——C语言初阶——BC112小乐乐求和
  • 详细讲解axios封装与api接口封装管理