当前位置: 首页 > article >正文

FlinkCDC 实现 MySQL 数据变更实时同步

文章目录

  • 1、基本介绍
  • 2、代码实战
    • 2.1、数据源准备
    • 2.2、代码实战
    • 2.3、数据格式

1、基本介绍

Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:

  1. FlinkSQL
  2. Flink DataStream 和 Table API(本文使用该方式)
    在这里插入图片描述
    对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:
    在这里插入图片描述

2、代码实战

2.1、数据源准备

本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:

CREATE TABLE `quick_chat_msg` (
  `id` bigint NOT NULL COMMENT '主键id',
  `from_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(发送人)',
  `to_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(接收人)',
  `relation_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '发送关联',
  `content` varchar(500) DEFAULT NULL COMMENT '消息内容',
  `msg_type` tinyint(1) DEFAULT NULL COMMENT '消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',
  `extra_info` varchar(500) DEFAULT NULL COMMENT '额外信息',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `deleted` tinyint(1) DEFAULT NULL COMMENT '删除标识',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:
在这里插入图片描述
最后,要把数据库时区配置好,否则会出现问题,命令如下:

SET persist time_zone = '+8:00';
SET time_zone = '+8:00';
SHOW VARIABLES LIKE '%time_zone%';

在这里插入图片描述

2.2、代码实战

首先,引入Flink CDC相关依赖,内容如下:

<dependencies>
    <!-- Flink connector连接器基础包 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>1.14.0</version>
    </dependency>
    <!-- Flink CDC MySQL源 -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-sql-connector-mysql-cdc</artifactId>
        <version>2.3.0</version>
    </dependency>
    <!-- Flink DataStream数据流API -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.2.0</version>
        <scope>provided</scope>
    </dependency>
    <!-- Flink客户端-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <!--Flink WebUI,端口8081(默认没有开启)-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-runtime_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>

第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class MySinkHandler extends RichSinkFunction<String> {
    @Override
    public void invoke(String value, Context context) throws Exception {
        System.out.println(value);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
    }

    @Override
    public void close() throws Exception {
    }
}

最后,配置好 Flink CDC 监听进程,随着项目启动运行:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class MySqlSourceExample {
    @PostConstruct
    public void init() throws Exception {
        // 配置监听数据源
        MySqlSource<String> source = MySqlSource.<String>builder()
                .hostname("8.141.28.132")
                .port(3306)
                // 数据库集合,可以配置多个
                .databaseList("quick_chat")
                // 表集合,可以配置多个
                .tableList("quick_chat.quick_chat_msg")
                .username("root")
                .password("root")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .includeSchemaChanges(true)
                .build();

        // 配置 Flink WebUI
        Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT, 8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

        // 检查点间隔时间
        // checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。
        env.enableCheckpointing(5000);
        DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .addSink(new MySinkHandler());
        env.execute();
    }
}

项目启动完毕后,可以通过8081端口访问Flink UI页面:
在这里插入图片描述

2.3、数据格式

上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:

# 新增
{
    "before": null,
    "after": {
        "id": 3,
        "from_id": "dog",
        "to_id": "cat",
        "relation_id": "dog:cat",
        "content": "你好啊",
        "msg_type": 1,
        "extra_info": null,
        "create_time": 1729164075000,
        "deleted": 0
    },
    "source": {
        "version": "1.6.4.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1729135279000,
        "snapshot": "false",
        "db": "quick_chat",
        "sequence": null,
        "table": "quick_chat_msg",
        "server_id": 1,
        "gtid": null,
        "file": "binlog.000002",
        "pos": 2452,
        "row": 0,
        "thread": null,
        "query": null
    },
    "op": "c",
    "ts_ms": 1729135278633,
    "transaction": null
}
# 修改
{
    "before": {
        "id": 3,
        "from_id": "dog",
        "to_id": "cat",
        "relation_id": "dog:cat",
        "content": "你好啊",
        "msg_type": 1,
        "extra_info": null,
        "create_time": 1729164075000,
        "deleted": 0
    },
    "after": {
        "id": 3,
        "from_id": "dog",
        "to_id": "cat",
        "relation_id": "dog:cat",
        "content": "你好啊,小猫咪",
        "msg_type": 1,
        "extra_info": null,
        "create_time": 1729164075000,
        "deleted": 0
    },
    "source": {
        "version": "1.6.4.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1729135289000,
        "snapshot": "false",
        "db": "quick_chat",
        "sequence": null,
        "table": "quick_chat_msg",
        "server_id": 1,
        "gtid": null,
        "file": "binlog.000002",
        "pos": 2825,
        "row": 0,
        "thread": null,
        "query": null
    },
    "op": "u",
    "ts_ms": 1729135288473,
    "transaction": null
}
# 删除
{
    "before": {
        "id": 3,
        "from_id": "dog",
        "to_id": "cat",
        "relation_id": "dog:cat",
        "content": "你好啊,小猫咪",
        "msg_type": 1,
        "extra_info": null,
        "create_time": 1729164075000,
        "deleted": 0
    },
    "after": null,
    "source": {
        "version": "1.6.4.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1729135301000,
        "snapshot": "false",
        "db": "quick_chat",
        "sequence": null,
        "table": "quick_chat_msg",
        "server_id": 1,
        "gtid": null,
        "file": "binlog.000002",
        "pos": 3247,
        "row": 0,
        "thread": null,
        "query": null
    },
    "op": "d",
    "ts_ms": 1729135300692,
    "transaction": null
}

http://www.kler.cn/news/353606.html

相关文章:

  • jvm垃圾收集器简介
  • Ability内页面的跳转和数据传递(router和want显/隐跳转)
  • 美客多产品没流量?不要只看广告!
  • mysql高级sql语句 二
  • Python中的help()函数:追踪错误并提供解决方案
  • JDK、JRE、JVM相关知识点
  • 626,换座位
  • 简述Linux和RTOS
  • DAPLINK 之 RTT 输出日志
  • Java洗车保养不出门上门服务一键享上门洗车保养维修系统小程序源码
  • YOLO模型在不同光照条件下的检测性能如何优化?
  • 红日安全vulnstack (二)
  • 存储设备专栏 2.5 -- linux 下块设备信息查看命令 lsblk 详细介绍】
  • 【UML】一个UML学习的还不错的几个帖子
  • 电脑版剪映使用本地字体
  • TCP三握四挥
  • 详解Oracle审计(一)
  • 【C++进阶】哈希表的介绍及实现
  • window下使用命令行启动llamafactory报错AttributeError: can‘t set attribute
  • DNS隧道流量分析