maxwell 输出消息到 kafka
文章目录
- 1、kafka-producer
- 2、运行一个Docker容器,该容器内运行的是Zendesk的Maxwell工具,一个用于实时捕获MySQL数据库变更并将其发布到Kafka或其他消息系统的应用
- 3、进入kafka容器内部
- 4、tingshu_album 数据库中 新增数据
- 5、tingshu_album 数据库中 更新数据
- 6、tingshu_album 数据库中 删除数据
- 7、总结
1、kafka-producer
kafka-producer:https://maxwells-daemon.io/config/#kafka-producer
2、运行一个Docker容器,该容器内运行的是Zendesk的Maxwell工具,一个用于实时捕获MySQL数据库变更并将其发布到Kafka或其他消息系统的应用
docker run -it --rm zendesk/maxwell bin/maxwell \
--user=maxwell \
--password=maxwell \
--host=192.168.74.148 \
--port=3306 \
--producer=kafka \
--kafka.bootstrap.servers=192.168.74.148:9092 \
--kafka_topic=maxwell
[root@localhost ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
89bb2276fc3d elasticsearch:8.8.2 "/bin/tini -- /usr/l…" 3 weeks ago Up 5 days 0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp elasticsearch
34891ac3e05a wurstmeister/kafka "start-kafka.sh" 4 weeks ago Up 3 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
8c71efe9dca7 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 4 weeks ago Up 5 days 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
c14772057ab8 redis "docker-entrypoint.s…" 8 months ago Up 5 days 0.0.0.0:6379->6379/tcp, :::6379->6379/tcp spzx-redis
ab66508d9441 mysql:8 "docker-entrypoint.s…" 8 months ago Up 2 hours 0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp spzx-mysql
[root@localhost ~]# docker run -it --rm zendesk/maxwell bin/maxwell \
> --user=maxwell \
> --password=maxwell \
> --host=192.168.74.148 \
> --port=3306 \
> --producer=kafka \
> --kafka.bootstrap.servers=192.168.74.148:9092 \
> --kafka_topic=maxwell
2024-09-19 11:00:53 INFO Maxwell - Starting Maxwell. maxMemory: 1031798784 bufferMemoryUsage: 0.25
2024-09-19 11:00:54 INFO ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [192.168.74.148:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2024-09-19 11:00:54 INFO AppInfoParser - Kafka version: 2.7.0
2024-09-19 11:00:54 INFO AppInfoParser - Kafka commitId: 448719dc99a19793
2024-09-19 11:00:54 INFO AppInfoParser - Kafka startTimeMs: 1726743654433
2024-09-19 11:00:54 INFO Maxwell - Maxwell v1.41.2 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[spzxbinlog.000003:7129], lastHeartbeat=0]
2024-09-19 11:00:54 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[spzxbinlog.000003:156], lastHeartbeat=0])
2024-09-19 11:00:55 INFO BinlogConnectorReplicator - Setting initial binlog pos to: spzxbinlog.000003:7129
2024-09-19 11:00:55 INFO BinaryLogClient - Connected to 192.168.74.148:3306 at spzxbinlog.000003/7129 (sid:6379, cid:60)
2024-09-19 11:00:55 INFO BinlogConnectorReplicator - Binlog connected.
2024-09-19 11:00:55 INFO Metadata - [Producer clientId=producer-1] Cluster ID: 9o7eoaK4T1KnGZkMvElpkg
3、进入kafka容器内部
[root@localhost ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.74.148:9092 --topic maxwell
4、tingshu_album 数据库中 新增数据
[root@localhost ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.74.148:9092 --topic maxwell
{"database":"tingshu_album","table":"base_category1","type":"insert","ts":1726743970,"xid":10619,"commit":true,"data":{"id":17,"name":"kafka","order_num":0,"create_time":"2024-09-19 11:06:10","update_time":"2024-09-19 11:06:10","is_deleted":0}}
{
"database": "tingshu_album",
"table": "base_category1",
"type": "insert",
"ts": 1726743970,
"xid": 10619,
"commit": true,
"data": {
"id": 17,
"name": "kafka",
"order_num": 0,
"create_time": "2024-09-19 11:06:10",
"update_time": "2024-09-19 11:06:10",
"is_deleted": 0
}
}
5、tingshu_album 数据库中 更新数据
[root@localhost ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.74.148:9092 --topic maxwell
{"database":"tingshu_album","table":"base_category1","type":"insert","ts":1726743970,"xid":10619,"commit":true,"data":{"id":17,"name":"kafka","order_num":0,"create_time":"2024-09-19 11:06:10","update_time":"2024-09-19 11:06:10","is_deleted":0}}
{"database":"tingshu_album","table":"base_category1","type":"update","ts":1726744191,"xid":11128,"commit":true,"data":{"id":17,"name":"xxx","order_num":0,"create_time":"2024-09-19 11:06:10","update_time":"2024-09-19 11:09:51","is_deleted":0},"old":{"name":"kafka","update_time":"2024-09-19 11:06:10"}}
{
"database": "tingshu_album",
"table": "base_category1",
"type": "update",
"ts": 1726744191,
"xid": 11128,
"commit": true,
"data": {
"id": 17,
"name": "xxx",
"order_num": 0,
"create_time": "2024-09-19 11:06:10",
"update_time": "2024-09-19 11:09:51",
"is_deleted": 0
},
"old": {
"name": "kafka",
"update_time": "2024-09-19 11:06:10"
}
}
6、tingshu_album 数据库中 删除数据
[root@localhost ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.74.148:9092 --topic maxwell
{"database":"tingshu_album","table":"base_category1","type":"insert","ts":1726743970,"xid":10619,"commit":true,"data":{"id":17,"name":"kafka","order_num":0,"create_time":"2024-09-19 11:06:10","update_time":"2024-09-19 11:06:10","is_deleted":0}}
{"database":"tingshu_album","table":"base_category1","type":"update","ts":1726744191,"xid":11128,"commit":true,"data":{"id":17,"name":"xxx","order_num":0,"create_time":"2024-09-19 11:06:10","update_time":"2024-09-19 11:09:51","is_deleted":0},"old":{"name":"kafka","update_time":"2024-09-19 11:06:10"}}
{"database":"tingshu_album","table":"base_category1","type":"delete","ts":1726744396,"xid":11623,"commit":true,"data":{"id":17,"name":"xxx","order_num":0,"create_time":"2024-09-19 11:06:10","update_time":"2024-09-19 11:09:51","is_deleted":0}}
{
"database": "tingshu_album",
"table": "base_category1",
"type": "delete",
"ts": 1726744396,
"xid": 11623,
"commit": true,
"data": {
"id": 17,
"name": "xxx",
"order_num": 0,
"create_time": "2024-09-19 11:06:10",
"update_time": "2024-09-19 11:09:51",
"is_deleted": 0
}
}