当前位置: 首页 > article >正文

Flink CDC的安装配置

Flink CDC 是基于 Apache Flink 构建的,用于捕获数据库表中数据的变更情况,包括数据的插入、更新和删除操作,并能将这些变更数据以流的形式实时传输到其他系统中进行处理和分析。传统的数据同步工具往往是定时任务,存在时间延迟。而 Flink CDC 能实时捕获数据库的变更数据,一旦有数据变化,立即将其同步到目标系统,让数据在各个系统之间保持高度一致,几乎不存在延迟。CDC含义:变更数据捕获(Change Data Capture,简称 CDC)

1、开启MySQL binlog功能

# 1、修改mysql配置文件
vim /etc/my.cnf

# 2、增加以下配置
# 在配置文件中增加二配置
# 需要将配置放在[mysqld]后面
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1


# 3、重启mysql服务
systemctl restart mysqld

# 查看mysql binlog文件
cd /var/lib/mysql
mysql-bin.000001

# 改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:
mysql -uroot -p123456
show variables like 'log_bin';

# 创建表
create table user(
	id varchar(255),
	name varchar(255),
	age INT,
	PRIMARY KEY(id)
);
insert into user
values('002','李四',24);

2、部署Flink CDC

# 1、上传jar到flink lib目录下
flink-sql-connector-mysql-cdc-2.2.1.jar

# 2、重启flink集群
yarn application -list
yarn application -kill application_1699579932721_2024
yarn-session.sh -d

3、使用flink cdc

-- 创建flink cdc表,
-- cdc表实时从mysql读取数据的表 -- 无界流
CREATE TABLE user_cdc (
	id STRING,
	name STRING,
	age INT,
    PRIMARY KEY(id) NOT ENFORCED
) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = 'cdh',
     'port' = '3306',
     'username' = 'root',
     'password' = 'lisi1234',
     'database-name' = 'bigdata',
     'table-name' = 'user'
);

CREATE TABLE user_hdfs (
	id STRING,
	name STRING,
	age INT,
    PRIMARY KEY(id) NOT ENFORCED
)WITH (
    'connector' = 'filesystem',           -- 必选:指定连接器类型
    'path' = 'hdfs://cdh:8020/user/hdfs/user',  -- 必选:指定路径
    'format' = 'canal-json',                     -- 必选:文件系统连接器指定 format
    'source.monitor-interval' ='5000',
    'sink.rolling-policy.file-size' ='128MB',
    'sink.rolling-policy.rollover-interval' ='30 min',
    'sink.rolling-policy.check-interval' ='1 min'
);
-- 将变更数据使用canal json格式保存到hdfs中
-- 1、延迟会比较大
-- 2、如果修改了延迟时间,会产生小文件

-- 小文件导致的问题
-- 1、namenode压力大
-- 2、后续的批处理任务task太多,任务执行慢

insert into user_hdfs
select * from user_cdc;


http://www.kler.cn/a/405618.html

相关文章:

  • ElasticSearch学习篇17_《检索技术核心20讲》最邻近检索-局部敏感哈希、乘积量化PQ思路
  • Android 14.0 kenel中修改rom系统内部存储的大小
  • MongoDB 更新集合名
  • fastadmin实现站内通知功能
  • C:mbedtls库实现https双向认证连接示例_七侠镇莫尛貝大侠20241122
  • nginx 配置lua执行shell脚本
  • 招聘和面试
  • MySQL性能分析工具的使用
  • 用python简单集成一个分词工具
  • 基于 DRNN 神经网络整定的 PID 解耦控制
  • Python 使用 Selenuim进行自动化点击入门,谷歌驱动,以百度为例
  • 数据驱动与并行策略:用 JUnit 5 让软件测试更高效
  • 前端面试题大汇总:React 篇
  • 2025杭州国际智能网联新能源汽车展览会
  • Linux 磁盘分区、格式化和挂载
  • DRNN 神经网络的Jacobian 信息辨识
  • Python-flet实现个人视频播放器
  • 太速科技-512-基于ZU19EG的4路100G 8路40G的光纤汇流计算卡
  • 动态规划 详解
  • 基于 springboot +vue 的实践性教学系统
  • 和为 K 的子数组(java)
  • shell循环
  • MinGW 与 MSVC 的区别与联系及相关特性分析
  • 小兔鲜项目总结——项目亮点
  • 神经网络问题之二:梯度爆炸(Gradient Explosion)
  • 双指针算法详解:原理、应用场景及代码示例