当前位置: 首页 > article >正文

Maxwell的入门实战

1. Maxwell 的定义

Maxwell 是美国 Zendesk 开源,用Java 编写的 Mysql 实时抓取软件。

实时读取 mysql 的二进制日志 Binlog,并生成 JSON 格式的信息,做为 生产者 发送 给 Kafka、Kinesis、RabbitMQ、Redis、Goodle Cloud Pub/Sub、文件 或 其他平台的应用程序。

官网: Maxwell's Daemon

常用的就是,将 MySQL 的数据 同步到 hdfs 上

注意:也可以通过 sqoop 编写脚本的方式将 mysql的数据同步的hdfs上 (离线)

Maxwell 可以实时的将mysql 的数据 同步到 kafka 或者redis 上等,然后在通过其他组件同步到 hdfs上

2. Maxwell 的工作 原理

2.1. Mysql 的 主从 复制过程

  1. Master(主库) 将改变的记录,写到 二进制 日志 (binary log) 中
  2. Slave (从库)向 mysql Master 发动 dump 协议,将Master 的二进制日志 ( binary log events )拷贝到他的中继日志 (relay log)
  3. Salve 从库 读取并重做中继日志中的事件,将改变的数据同步到自己的数据库

2.2. Maxwell 的工作原理

就是将自己伪装成 Mysql 的一个 slave ,然后 以 Salve 的身份 假装从Mysql (Master)复制数据.

2.3. MySQL 的 binlog

2.3.1. 什么是 binlog

Mysql 的 二进制 日志记录了 所有的 DDL 和DML (除了数据查询语句)语句,以事件的形式记录,还包含语句执行所消耗的时间,MySql 的二进制 日志 是事务安全型 的.

使用场景:

  1. MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递 给 slaves 来达到 master-slave 数据一致的目的。
  2. 数据恢复了,通过使用 mysqlbinlog 工具来使恢复数据。

二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有 的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除 了数据查询语句)语句事件。

2.3.2. 开启 binlog

找到 MySQL 配置文件的位置

➢ Linux: /etc/my.cnf 如果/etc 目录下没有,可以通过 locate my.cnf 查找位置

➢ Windows: \my.ini

➢ 在 mysql 的配置文件下,修改配置 在[mysqld] 区块,设置/添加 log-bin=mysql-bin

3. Maxwell 的使用

3.1. Maxwell 安装

3.1.1. 安装地址

  1. Maxwell 官网地址:http://maxwells-daemon.io/
  2. 文档查看地址:http://maxwells-daemon.io/quickstart/

3.1.2. 安装

上传 ---> 解压 ---> 重命名

注意使用的是:1.29.2

1.30以及以上不支持 jdk8

3.1.3. Mysql 的环境准备

sudo vim /etc/my.cnf
在mysqid 下面添加
server_id=1  #主从复制
log-bin=mysql-bin	
binlog_format=row	# 日志级别
binlog-do-db=test_maxwell  # 指定 binlog 执行的数据库

重启mysql服务
systemctl restart mysqld

查看 Mysql 是否完成修改
mysql -uroot -p123456
show variables like '%binlog%';

查看下列属性
binlog_format   | ROW  修改完成


然后可以进入/var/lib/mysql 目录,查看 MySQL 生成的 binlog 文件
cd /var/lib/mysql
ll

3.1.4. 初始化Maxwell 元数据库

  1. 在mysql中建立一个maxwell库用于存储Maxwell 元数据
mysql -uroot -p123456
CREATE DATABASE maxwell
  1. 设置 Mysql 用户 密码安全级别
查看密码等级:
SHOW VARIABLES LIKE 'validate_password%';
修改密码级别:
set global validate_password.length=4;
set global validate_password.policy=0;
;
  1. 分配一个账号可以操作该数据库
# 创建用户
create user 'maxwell'@'%' identified by '123456';
# 赋给权限
grant all privileges on maxwell.* to 'maxwell'@'%' with grant option;
flush privileges;
  1. 分配这个账号可以监控其他数据库的权限
 GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
 flush privileges

3.1.5. Maxwell 进程启动

第一种启动方式

cd /opt/installs/maxwell/bin
./maxwell --user='user' --password='123456' --host='caiji' --producer=stdout
--user 	连接 mysql 的用户 
--password 	连接 mysql 的用户的密码
--host  	mysql 安装的主机名 
--producer 	生产者模式(stdout:控制台  kafka:kafka 集群)  

假如报错 : error: unhandled character set 'utf8mb3'

这个问题是因为MySQL从 5.5.3 开始,用 utf8mb4 编码来实现完整的 UTF-8,其中 mb4 表示 most bytes 4,最多占用4个字节。而原来的utf8则被utf8mb3则代替。

  • 将MySQL降级,重新安装5.5.3以下的版本。
  • 则是修改maxwell源码。在打包上传

启动成功的截图:

添加数据:监控到的数据

第二种启动方式: 修改配置文件,定制化启动 Maxwell 进程

cp config.properties.example config.properties  
vim config.properties  
bin/maxwell --config ./config.properties

4. 入门案例

4.1. 监控 Mysql 数据并在控制台打印

1.运行 maxwell 来监控 mysql 数据更新
bin/maxwell --user='user' --password='123456' --host='caiji' --producer=stdout

2.向 mysql 的 test_maxwell 库的 test 表插入一条数据,查看 maxwell 的控制台输出
insert into test values(1,'aaa');
窗口会显示
{
 "database": "test_maxwell", --库名
 "table": "test", --表名
 "type": "insert", --数据更新类型
 "ts": 1716212020, --操作时间
 "xid": 20764, --操作 id
 "commit": true, --提交成功
 "data": {"id":1,"name":"aaa"} --数据
}

3.向 mysql 的 test_maxwell 库的 test 表同时插入 3 条数据,控制台出现了 3 条 json
日志,说明 maxwell 是以数据行为单位进行日志的采集的。
insert into test values(2,'bbb'),(3,'ccc'),(4,'ddd');
窗口显示:
{"database":"test_Maxwell","table":"test","type":"insert","ts":1716212175,"xid":21294,"xoffset":0,"data":{"id":2,"name":"bbb"}}
{"database":"test_Maxwell","table":"test","type":"insert","ts":1716212175,"xid":21294,"xoffset":1,"data":{"id":3,"name":"ccc"}}
{"database":"test_Maxwell","table":"test","type":"insert","ts":1716212175,"xid":21294,"commit":true,"data":{"id":4,"name":"ddd"}}

4.修改 test_maxwell 库的 test 表的一条数据,查看 maxwell 的控制台输出
update test set name='abc' where id =1;
显示
{"database":"test_Maxwell",
 "table":"test",
 "type":"update",
 "ts":1716212232,
 "xid":21452,
 "commit":true,
 "data":{"id":1,"name":"abc"}, --新数据
 "old":{"name":"aaa"}		-- 旧数据
}		

5.删除 test_maxwell 库的 test 表的一条数据,查看 maxwell 的控制台输出
 delete from test where id =1;
{"database":"test_Maxwell",
 "table":"test",
 "type":"delete",
 "ts":1716212316,
 "xid":21677,
 "commit":true,
 "data":{"id":1,"name":"abc"}
}

4.2. 监控 Mysql 数据输出到 kafka

1.启动 zookeeper 和 kafka
zkServer.sh start
bin/kafka-server-start.sh -daemon /opt/installs/kafka3/config/server.properties

2.启动 Maxwell 监控 binlog
bin/maxwell --config ./config.properties

3.打开 kafka 的控制台的消费者消费 maxwell 主题
使用 kafkaUI进行操作(新建,赋权限)

命令行:
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic maxwell
kafka-console-consumer.sh --bootstrapserver caiji:9092 --topic maxwell

4.向数据库中插入一条数据
insert into test values (5,'eee');

5.通过 kafka 消费者来查看到了数据,说明数据成功传入 kafka
{"database":"test_Maxwell",
 "table":"test",
 "type":"insert",
 "ts":1716213374,
 "xid":24478,
 "commit":true,
 "data":{"id":5,"name":"eee"}
}

开始监听:

kafka 上的图片:

4.3. 监控 Mysql 指定表数据输出控制台

1.运行 maxwell 来监控 mysql 指定表数据更新
bin/maxwell --user='user' --password='123456' --host='caiji' --filter 'exclude: *.*, include:test_maxwell.test' --producer=stdout
2.向 test_maxwell.test 表插入一条数据,查看 maxwell 的监控
insert into test_Maxwell.test values(7,'ggg');
{"database":"test_Maxwell",
 "table":"test","type":"insert",
 "ts":1716214156,
 "xid":27726,
 "commit":true,
 "data":{"id":7,"name":"ggg"}
}
3.向 tset_maxwell.test2 表插入一条数据,查看 maxwell 的监控
insert into tset_Maxwell2.test values(7,'ggg');
没有数据
说明 include 参数生效,只能监控指定的 mysql 表的信息

注意:还可以设置 include:test_maxwell.*,通过此种方式来监控 mysql 某个库的所有
表,也就是说过滤整个库。

http://www.kler.cn/a/420790.html

相关文章:

  • Harnessing Large Language Models for Training-free Video Anomaly Detection
  • 学习threejs,使用VideoTexture实现视频Video更新纹理
  • Python 面向对象编程详解
  • ProtoBuf快速上手(C++)
  • Kamailio SIP服务器的配置与运行
  • 【AI系统】AI 编译器基本架构
  • java基础概念46-数据结构1
  • Rust循环引用与多线程并发
  • 浏览器中输入一个URL后,按下回车后发生了什么
  • Truffle和Remix简介
  • 基础原型链污染
  • K8S资源之secret资源
  • 基于Java Springboot传统戏曲推广微信小程序
  • 泷羽sec-burp功能介绍(1) 学习笔记
  • Python爬取机车网车型数据并存入Mysql数据库
  • 如何让控件始终处于父容器的居中位置(父容器可任意改变大小)
  • vue项目如何设置字体样式font-family,font-family在项目中不生效
  • linux perf安装问题解决
  • Python线程使用
  • linux arm下获取屏幕事件(rk3588)
  • 大模型开发和微调工具Llama-Factory-->训练方法(SFT, RLHF, DPO, KTO)
  • Android 编译和使用libheif
  • playwright 学习复仇记-2 Selector选择器定位元素
  • vmware虚拟机移植
  • 多线程 03 实现方式
  • 三维开发中blender建模后如何完美兼容到threejs