当前位置: 首页 > article >正文

maxwell 输出消息到 kafka

文章目录

  • 1、kafka-producer
  • 2、运行一个Docker容器,该容器内运行的是Zendesk的Maxwell工具,一个用于实时捕获MySQL数据库变更并将其发布到Kafka或其他消息系统的应用
  • 3、进入kafka容器内部
  • 4、tingshu_album 数据库中 新增数据
  • 5、tingshu_album 数据库中 更新数据
  • 6、tingshu_album 数据库中 删除数据
  • 7、总结

1、kafka-producer

kafka-producer:https://maxwells-daemon.io/config/#kafka-producer
在这里插入图片描述

2、运行一个Docker容器,该容器内运行的是Zendesk的Maxwell工具,一个用于实时捕获MySQL数据库变更并将其发布到Kafka或其他消息系统的应用

在这里插入图片描述

docker run -it --rm zendesk/maxwell bin/maxwell \
    --user=maxwell \
    --password=maxwell \
    --host=192.168.74.148 \
    --port=3306 \
    --producer=kafka \
    --kafka.bootstrap.servers=192.168.74.148:9092 \
    --kafka_topic=maxwell
[root@localhost ~]# docker ps
CONTAINER ID   IMAGE                    COMMAND                   CREATED        STATUS         PORTS                                                                                  NAMES
89bb2276fc3d   elasticsearch:8.8.2      "/bin/tini -- /usr/l…"   3 weeks ago    Up 5 days      0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp   elasticsearch
34891ac3e05a   wurstmeister/kafka       "start-kafka.sh"          4 weeks ago    Up 3 minutes   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp                                              kafka
8c71efe9dca7   wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   4 weeks ago    Up 5 days      22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp                  zookeeper
c14772057ab8   redis                    "docker-entrypoint.s…"   8 months ago   Up 5 days      0.0.0.0:6379->6379/tcp, :::6379->6379/tcp                                              spzx-redis
ab66508d9441   mysql:8                  "docker-entrypoint.s…"   8 months ago   Up 2 hours     0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp                                   spzx-mysql
[root@localhost ~]# docker run -it --rm zendesk/maxwell bin/maxwell \
>     --user=maxwell \
>     --password=maxwell \
>     --host=192.168.74.148 \
>     --port=3306 \
>     --producer=kafka \
>     --kafka.bootstrap.servers=192.168.74.148:9092 \
>     --kafka_topic=maxwell
2024-09-19 11:00:53 INFO  Maxwell - Starting Maxwell. maxMemory: 1031798784 bufferMemoryUsage: 0.25
2024-09-19 11:00:54 INFO  ProducerConfig - ProducerConfig values: 
        acks = 1
        batch.size = 16384
        bootstrap.servers = [192.168.74.148:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-1
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        internal.auto.downgrade.txn.commit = false
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 127000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2024-09-19 11:00:54 INFO  AppInfoParser - Kafka version: 2.7.0
2024-09-19 11:00:54 INFO  AppInfoParser - Kafka commitId: 448719dc99a19793
2024-09-19 11:00:54 INFO  AppInfoParser - Kafka startTimeMs: 1726743654433
2024-09-19 11:00:54 INFO  Maxwell - Maxwell v1.41.2 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[spzxbinlog.000003:7129], lastHeartbeat=0]
2024-09-19 11:00:54 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[spzxbinlog.000003:156], lastHeartbeat=0])
2024-09-19 11:00:55 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: spzxbinlog.000003:7129
2024-09-19 11:00:55 INFO  BinaryLogClient - Connected to 192.168.74.148:3306 at spzxbinlog.000003/7129 (sid:6379, cid:60)
2024-09-19 11:00:55 INFO  BinlogConnectorReplicator - Binlog connected.
2024-09-19 11:00:55 INFO  Metadata - [Producer clientId=producer-1] Cluster ID: 9o7eoaK4T1KnGZkMvElpkg

3、进入kafka容器内部

[root@localhost ~]# docker exec -it kafka /bin/bash 
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.74.148:9092 --topic maxwell

4、tingshu_album 数据库中 新增数据

在这里插入图片描述

[root@localhost ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.74.148:9092 --topic maxwell
{"database":"tingshu_album","table":"base_category1","type":"insert","ts":1726743970,"xid":10619,"commit":true,"data":{"id":17,"name":"kafka","order_num":0,"create_time":"2024-09-19 11:06:10","update_time":"2024-09-19 11:06:10","is_deleted":0}}
{
    "database": "tingshu_album",
    "table": "base_category1",
    "type": "insert",
    "ts": 1726743970,
    "xid": 10619,
    "commit": true,
    "data": {
        "id": 17,
        "name": "kafka",
        "order_num": 0,
        "create_time": "2024-09-19 11:06:10",
        "update_time": "2024-09-19 11:06:10",
        "is_deleted": 0
    }
}

5、tingshu_album 数据库中 更新数据

在这里插入图片描述

[root@localhost ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.74.148:9092 --topic maxwell
{"database":"tingshu_album","table":"base_category1","type":"insert","ts":1726743970,"xid":10619,"commit":true,"data":{"id":17,"name":"kafka","order_num":0,"create_time":"2024-09-19 11:06:10","update_time":"2024-09-19 11:06:10","is_deleted":0}}
{"database":"tingshu_album","table":"base_category1","type":"update","ts":1726744191,"xid":11128,"commit":true,"data":{"id":17,"name":"xxx","order_num":0,"create_time":"2024-09-19 11:06:10","update_time":"2024-09-19 11:09:51","is_deleted":0},"old":{"name":"kafka","update_time":"2024-09-19 11:06:10"}}
{
    "database": "tingshu_album",
    "table": "base_category1",
    "type": "update",
    "ts": 1726744191,
    "xid": 11128,
    "commit": true,
    "data": {
        "id": 17,
        "name": "xxx",
        "order_num": 0,
        "create_time": "2024-09-19 11:06:10",
        "update_time": "2024-09-19 11:09:51",
        "is_deleted": 0
    },
    "old": {
        "name": "kafka",
        "update_time": "2024-09-19 11:06:10"
    }
}

6、tingshu_album 数据库中 删除数据

在这里插入图片描述

[root@localhost ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.74.148:9092 --topic maxwell
{"database":"tingshu_album","table":"base_category1","type":"insert","ts":1726743970,"xid":10619,"commit":true,"data":{"id":17,"name":"kafka","order_num":0,"create_time":"2024-09-19 11:06:10","update_time":"2024-09-19 11:06:10","is_deleted":0}}
{"database":"tingshu_album","table":"base_category1","type":"update","ts":1726744191,"xid":11128,"commit":true,"data":{"id":17,"name":"xxx","order_num":0,"create_time":"2024-09-19 11:06:10","update_time":"2024-09-19 11:09:51","is_deleted":0},"old":{"name":"kafka","update_time":"2024-09-19 11:06:10"}}
{"database":"tingshu_album","table":"base_category1","type":"delete","ts":1726744396,"xid":11623,"commit":true,"data":{"id":17,"name":"xxx","order_num":0,"create_time":"2024-09-19 11:06:10","update_time":"2024-09-19 11:09:51","is_deleted":0}}
{
    "database": "tingshu_album",
    "table": "base_category1",
    "type": "delete",
    "ts": 1726744396,
    "xid": 11623,
    "commit": true,
    "data": {
        "id": 17,
        "name": "xxx",
        "order_num": 0,
        "create_time": "2024-09-19 11:06:10",
        "update_time": "2024-09-19 11:09:51",
        "is_deleted": 0
    }
}

7、总结

在这里插入图片描述


http://www.kler.cn/news/316649.html

相关文章:

  • 核心复现—计及需求响应的区域综合能源系统双层优化调度策略
  • 南大通用数仓-GCDW-学习-03-用户管理
  • 工业级5口485中继器通讯光电隔离防雷RS232HUB分共享分割器RS485集线器
  • 基于MySQL的数据库课程设计详解
  • 笔记整理—内核!启动!—linux应用编程、网络编程部分(4)linux文件属性
  • ruoyi-vue若依前端是如何防止接口重复请求
  • 计算机前沿技术-人工智能算法-大语言模型-最新论文阅读-2024-09-19
  • 【Linux 20】线程控制
  • Facebook开发者篇 - API拉取广告投放数据对接流程
  • D. Minimize the Difference (Codeforces Round 973 Div. 2)
  • 【人工智能学习笔记】7_智能语音技术基础
  • 【自定义函数】讲解
  • 香港科技大学广州|金融科技学域博士招生宣讲会——武汉大学、华中科技大学
  • 【算法】遗传算法
  • go语言基础入门(一)
  • 安全带检测系统源码分享
  • ArcGIS Pro SDK (十六)公共设施网络 2 网络图
  • MySQL篇(高级字符串函数/正则表达式)(持续更新迭代)
  • Web端云剪辑解决方案,BS架构私有化部署,安全可控
  • 【ARM】A64指令介绍及内存屏障和寄存器
  • 借用连接1-怎么从目标数据源借用连接
  • 【题解-力扣189. 轮转数组(java实现O(1)空间要求)】
  • Python3爬虫教程-HTTP基本原理
  • 数据结构--单链表创建、增删改查功能以及与结构体合用
  • Java 入门指南:JVM(Java虚拟机)—— 双亲委派模型(Parent Delegation Model)
  • 2024短剧系统开发,付费短剧小程序app源码教程,分销功能讲解搭建上线
  • 【UI自动化】前言
  • 服务器相关问题
  • 机器学习及其应用领域【金融领域】
  • css 控制虚线刻度尺寸