当前位置: 首页 > article >正文

使用Flinkcdc 采集mysql数据

1.下载 Flink CDC 连接器

(1)登录官网下载
https://github.com/apache/flink-cdc/releases
(1)或者虚拟机在线下载
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar

(2)将下载的 JAR 文件放到 Flink 集群的 lib/ 目录 中,重启flink集群。

[root@node1 ~]# cd /export/server/flink
[root@node1 flink]# bin/start-cluster.sh

在这里插入图片描述

2.创建mysql表和数据

Drop database if exists test;
Create database test character set utf8;
Use test;
--建表语句:
-- 建表
-- 学生表
CREATE TABLE `Student`(
      `s_id` VARCHAR(20),
      `s_name` VARCHAR(20) NOT NULL DEFAULT '',
      `s_birth` VARCHAR(20) NOT NULL DEFAULT '',
      `s_sex` VARCHAR(10) NOT NULL DEFAULT '',
      PRIMARY KEY(`s_id`)
);
-- 成绩表
CREATE TABLE `Score`(
    `s_id` VARCHAR(20),
    `c_id` VARCHAR(20),
    `s_score` INT(3),
    PRIMARY KEY(`s_id`,`c_id`)
);
-- 插入学生表测试数据
insert into Student values('01' , '赵雷' , '1990-01-01' , '男');
insert into Student values('02' , '钱电' , '1990-12-21' , '男');
insert into Student values('03' , '孙风' , '1990-05-20' , '男');
insert into Student values('04' , '李云' , '1990-08-06' , '男');
insert into Student values('05' , '周梅' , '1991-12-01' , '女');
insert into Student values('06' , '吴兰' , '1992-03-01' , '女');
insert into Student values('07' , '郑竹' , '1989-07-01' , '女');
insert into Student values('08' , '王菊' , '1990-01-20' , '女');
-- 成绩表测试数据
insert into Score values('01' , '01' , 80);
insert into Score values('01' , '02' , 90);
insert into Score values('01' , '03' , 99);
insert into Score values('02' , '01' , 70);
insert into Score values('02' , '02' , 60);
insert into Score values('02' , '03' , 80);
insert into Score values('03' , '01' , 80);
insert into Score values('03' , '02' , 80);
insert into Score values('03' , '03' , 80);
insert into Score values('04' , '01' , 50);
insert into Score values('04' , '02' , 30);
insert into Score values('04' , '03' , 20);
insert into Score values('05' , '01' , 76);
insert into Score values('05' , '02' , 87);
insert into Score values('06' , '01' , 31);
insert into Score values('06' , '03' , 34);
insert into Score values('07' , '02' , 89);
insert into Score values('07' , '03' , 98);

3.使用Flink cdc 采集mysql

!!!注意:开启 binlog 日志功能,如果已开启忽略如下对mysql的配置操作,直接编辑flink sql即可

验证 MySQL 配置: Flink CDC 依赖 MySQL 的 binlog 功能进行数据采集。确保 binlog 已开启,执行以下命令检查:

[root@node1 ~]# mysql -uroot -p

在mysql里面执行,如果返回值为 OFF,需要开启 binlog 功能;
SHOW VARIABLES LIKE 'log_bin';

开启方式:

vi /etc/my.cnf
在[mysqld]下面增加如下代码:
server_id=1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30

解释:
server_id=1 # MySQL 实例唯一标识符,必须是唯一的
log_bin = mysql-bin # 开启 binlog 功能,文件名为 mysql-bin
binlog_format = ROW # 设置 binlog 格式为 ROW,必需
expire_logs_days = 30 # binlog 日志保留天数,自动清理超过30天的日志

然后重启 MySQL 服务:

systemctl restart mysqld

====================================

启动flink sql客户端

sql-client.sh

在FlinkSQL-Client,执行创建表 mysql_cdc_to_test_Student

CREATE TABLE if not exists mysql_cdc_to_test_Student (
     s_id     STRING,
     s_name   STRING,
     s_birth  STRING,
     s_sex    STRING,
     PRIMARY KEY (`s_id`) NOT ENFORCED
) WITH (
    'connector'= 'mysql-cdc',
    'hostname'= '192.168.77.161',
    'port'= '3306',
    'username'= 'root',
    'password'='123456',
    'server-time-zone'= 'Asia/Shanghai',
    'debezium.snapshot.mode'='initial',
    'database-name'= 'test',
    'table-name'= 'Student'
);

#设置以表形式查看
SET sql-client.execution.result-mode = tableau;

select * from mysql_cdc_to_test_Student;

在这里插入图片描述


http://www.kler.cn/a/444571.html

相关文章:

  • 渗透测试-前端加密分析之RSA加密登录(密钥来源服务器)
  • 基于MATLAB的图像增强
  • maven-resources-production:ratel-fast: java.lang.IndexOutOfBoundsException
  • 开放词汇目标检测(Open-Vocabulary Object Detection, OVOD)综述
  • EGO Swarm翻译
  • 蓝桥杯刷题——day8
  • Swift 的动态性
  • package.json中版本管理的标识有哪些
  • 欢乐堡游乐园信息管理系统的设计与实现(Django Python MySQL)+文档
  • Express (nodejs) 相关
  • 手机无法连接电脑,如何解决(快速排除手机与电脑连接问题的方法)
  • 【2024版】超详细Python+Pycharm安装保姆级教程,Python环境配置和使用指南,看完这一篇就够了
  • 深度学习之目标检测篇——残差网络与FPN结合
  • 007 Qt_按钮类控件
  • docker如何学习与使用入门
  • springboot439校园健康驿站管理系统(论文+源码)_kaic
  • windows C#-查询表达式中使用隐式类型的局部变量和数组
  • Leetcode O(1) 时间插入、删除和获取随机元素
  • 深入理解 Kali Linux:基础命令与操作技巧
  • 【数据库】大二数据库复习范围 (快速版)帮助你快速复习数据库
  • openeuler24.09 系统无需配置 docker 源即可安装 docker 和 docker-composer
  • springboot437校园悬赏任务平台(论文+源码)_kaic
  • Linux函数栈帧
  • 掌握特征提取:机器学习中的 PCA、t-SNE 和 LDA模型
  • [unity3D] 利用 Button 组件实现鼠标悬停显示文字
  • git 不使用第三方软件解决冲突