Maxwell的入门实战
1. Maxwell 的定义
Maxwell 是美国 Zendesk 开源,用Java 编写的 Mysql 实时抓取软件。
实时读取 mysql 的二进制日志 Binlog,并生成 JSON 格式的信息,做为 生产者 发送 给 Kafka、Kinesis、RabbitMQ、Redis、Goodle Cloud Pub/Sub、文件 或 其他平台的应用程序。
官网: Maxwell's Daemon
常用的就是,将 MySQL 的数据 同步到 hdfs 上
注意:也可以通过 sqoop 编写脚本的方式将 mysql的数据同步的hdfs上 (离线)
Maxwell 可以实时的将mysql 的数据 同步到 kafka 或者redis 上等,然后在通过其他组件同步到 hdfs上
2. Maxwell 的工作 原理
2.1. Mysql 的 主从 复制过程
- Master(主库) 将改变的记录,写到 二进制 日志 (binary log) 中
- Slave (从库)向 mysql Master 发动 dump 协议,将Master 的二进制日志 ( binary log events )拷贝到他的中继日志 (relay log)
- Salve 从库 读取并重做中继日志中的事件,将改变的数据同步到自己的数据库
2.2. Maxwell 的工作原理
就是将自己伪装成 Mysql 的一个 slave ,然后 以 Salve 的身份 假装从Mysql (Master)复制数据.
2.3. MySQL 的 binlog
2.3.1. 什么是 binlog
Mysql 的 二进制 日志记录了 所有的 DDL 和DML (除了数据查询语句)语句,以事件的形式记录,还包含语句执行所消耗的时间,MySql 的二进制 日志 是事务安全型 的.
使用场景:
- MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递 给 slaves 来达到 master-slave 数据一致的目的。
- 数据恢复了,通过使用 mysqlbinlog 工具来使恢复数据。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有 的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除 了数据查询语句)语句事件。
2.3.2. 开启 binlog
找到 MySQL 配置文件的位置
➢ Linux: /etc/my.cnf 如果/etc 目录下没有,可以通过 locate my.cnf 查找位置
➢ Windows: \my.ini
➢ 在 mysql 的配置文件下,修改配置 在[mysqld] 区块,设置/添加 log-bin=mysql-bin
3. Maxwell 的使用
3.1. Maxwell 安装
3.1.1. 安装地址
- Maxwell 官网地址:http://maxwells-daemon.io/
- 文档查看地址:http://maxwells-daemon.io/quickstart/
3.1.2. 安装
上传 ---> 解压 ---> 重命名
注意使用的是:1.29.2
1.30以及以上不支持 jdk8
3.1.3. Mysql 的环境准备
sudo vim /etc/my.cnf
在mysqid 下面添加
server_id=1 #主从复制
log-bin=mysql-bin
binlog_format=row # 日志级别
binlog-do-db=test_maxwell # 指定 binlog 执行的数据库
重启mysql服务
systemctl restart mysqld
查看 Mysql 是否完成修改
mysql -uroot -p123456
show variables like '%binlog%';
查看下列属性
binlog_format | ROW 修改完成
然后可以进入/var/lib/mysql 目录,查看 MySQL 生成的 binlog 文件
cd /var/lib/mysql
ll
3.1.4. 初始化Maxwell 元数据库
- 在mysql中建立一个maxwell库用于存储Maxwell 元数据
mysql -uroot -p123456
CREATE DATABASE maxwell
- 设置 Mysql 用户 密码安全级别
查看密码等级:
SHOW VARIABLES LIKE 'validate_password%';
修改密码级别:
set global validate_password.length=4;
set global validate_password.policy=0;
;
- 分配一个账号可以操作该数据库
# 创建用户
create user 'maxwell'@'%' identified by '123456';
# 赋给权限
grant all privileges on maxwell.* to 'maxwell'@'%' with grant option;
flush privileges;
- 分配这个账号可以监控其他数据库的权限
GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
flush privileges
3.1.5. Maxwell 进程启动
第一种启动方式
cd /opt/installs/maxwell/bin
./maxwell --user='user' --password='123456' --host='caiji' --producer=stdout
--user 连接 mysql 的用户
--password 连接 mysql 的用户的密码
--host mysql 安装的主机名
--producer 生产者模式(stdout:控制台 kafka:kafka 集群)
假如报错 : error: unhandled character set 'utf8mb3'
这个问题是因为MySQL从 5.5.3 开始,用 utf8mb4 编码来实现完整的 UTF-8,其中 mb4 表示 most bytes 4,最多占用4个字节。而原来的utf8则被utf8mb3则代替。
- 将MySQL降级,重新安装5.5.3以下的版本。
- 则是修改maxwell源码。在打包上传
启动成功的截图:
添加数据:监控到的数据
第二种启动方式: 修改配置文件,定制化启动 Maxwell 进程
cp config.properties.example config.properties
vim config.properties
bin/maxwell --config ./config.properties
4. 入门案例
4.1. 监控 Mysql 数据并在控制台打印
1.运行 maxwell 来监控 mysql 数据更新
bin/maxwell --user='user' --password='123456' --host='caiji' --producer=stdout
2.向 mysql 的 test_maxwell 库的 test 表插入一条数据,查看 maxwell 的控制台输出
insert into test values(1,'aaa');
窗口会显示
{
"database": "test_maxwell", --库名
"table": "test", --表名
"type": "insert", --数据更新类型
"ts": 1716212020, --操作时间
"xid": 20764, --操作 id
"commit": true, --提交成功
"data": {"id":1,"name":"aaa"} --数据
}
3.向 mysql 的 test_maxwell 库的 test 表同时插入 3 条数据,控制台出现了 3 条 json
日志,说明 maxwell 是以数据行为单位进行日志的采集的。
insert into test values(2,'bbb'),(3,'ccc'),(4,'ddd');
窗口显示:
{"database":"test_Maxwell","table":"test","type":"insert","ts":1716212175,"xid":21294,"xoffset":0,"data":{"id":2,"name":"bbb"}}
{"database":"test_Maxwell","table":"test","type":"insert","ts":1716212175,"xid":21294,"xoffset":1,"data":{"id":3,"name":"ccc"}}
{"database":"test_Maxwell","table":"test","type":"insert","ts":1716212175,"xid":21294,"commit":true,"data":{"id":4,"name":"ddd"}}
4.修改 test_maxwell 库的 test 表的一条数据,查看 maxwell 的控制台输出
update test set name='abc' where id =1;
显示
{"database":"test_Maxwell",
"table":"test",
"type":"update",
"ts":1716212232,
"xid":21452,
"commit":true,
"data":{"id":1,"name":"abc"}, --新数据
"old":{"name":"aaa"} -- 旧数据
}
5.删除 test_maxwell 库的 test 表的一条数据,查看 maxwell 的控制台输出
delete from test where id =1;
{"database":"test_Maxwell",
"table":"test",
"type":"delete",
"ts":1716212316,
"xid":21677,
"commit":true,
"data":{"id":1,"name":"abc"}
}
4.2. 监控 Mysql 数据输出到 kafka
1.启动 zookeeper 和 kafka
zkServer.sh start
bin/kafka-server-start.sh -daemon /opt/installs/kafka3/config/server.properties
2.启动 Maxwell 监控 binlog
bin/maxwell --config ./config.properties
3.打开 kafka 的控制台的消费者消费 maxwell 主题
使用 kafkaUI进行操作(新建,赋权限)
命令行:
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic maxwell
kafka-console-consumer.sh --bootstrapserver caiji:9092 --topic maxwell
4.向数据库中插入一条数据
insert into test values (5,'eee');
5.通过 kafka 消费者来查看到了数据,说明数据成功传入 kafka
{"database":"test_Maxwell",
"table":"test",
"type":"insert",
"ts":1716213374,
"xid":24478,
"commit":true,
"data":{"id":5,"name":"eee"}
}
开始监听:
kafka 上的图片:
4.3. 监控 Mysql 指定表数据输出控制台
1.运行 maxwell 来监控 mysql 指定表数据更新
bin/maxwell --user='user' --password='123456' --host='caiji' --filter 'exclude: *.*, include:test_maxwell.test' --producer=stdout
2.向 test_maxwell.test 表插入一条数据,查看 maxwell 的监控
insert into test_Maxwell.test values(7,'ggg');
{"database":"test_Maxwell",
"table":"test","type":"insert",
"ts":1716214156,
"xid":27726,
"commit":true,
"data":{"id":7,"name":"ggg"}
}
3.向 tset_maxwell.test2 表插入一条数据,查看 maxwell 的监控
insert into tset_Maxwell2.test values(7,'ggg');
没有数据
说明 include 参数生效,只能监控指定的 mysql 表的信息
注意:还可以设置 include:test_maxwell.*,通过此种方式来监控 mysql 某个库的所有
表,也就是说过滤整个库。