当前位置: 首页 > article >正文

Kafka-go语言一命速通

记录

命令(终端操作kafka)

# 验证kafka是否启动
ps -ef | grep kafka # ps -ef 命令用于显示所有正在运行的进程的详细信息
lsof -i :9092


# 启动kafka
brew services start zookeeper
brew services start kafka

# 创建topic
kafka-topics --create --topic test --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
解释:kafka-topics:用于管理主题。–create:创建一个新的主题。–topic test:主题的名称为 test–partitions 1:有 1 个分区(partition)。–replication-factor 1:主题的副本因子为 1。表示没有冗余,数据仅存储在一个节点上。–bootstrap-server localhost:9092:localhost:9092 表示 Kafka 服务器运行在本地主机的 9092 端口。

# 查看主题
kafka-topics --list --bootstrap-server localhost:9092

#订阅(消费者) 新建一个终端,输入
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

#发布(生产者) 新建一个终端,输入
kafka-console-producer --bootstrap-server localhost:9092 --topic test

# 删除Topic
kafka-topics --delete --topic test --bootstrap-server localhost:9092

代码操作Kafka

简单版本

生产者:

packagemain

import(
    "github.com/IBM/sarama"
    "log"
)

funcmain() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = 5
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    iferr != nil {
       log.Fatalf("Failed to start producer: %s", err)
    }
    deferproducer.Close()

    msg := &sarama.ProducerMessage{
       Topic: "test_topic",
       Value: sarama.StringEncoder("Hello, Kafka!"),
    }

    partition, offset, err := producer.SendMessage(msg)
    iferr != nil {
       log.Fatalf("Failed to send message: %s", err)
    }

    log.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", "test_topic", partition, offset)
}

消费者

package main

import (
        "fmt"
        "github.com/IBM/sarama"
        "log"
)

func main() {
        config := sarama.NewConfig()
        config.Consumer.Return.Errors = true

        // 创建消费者
        consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
        if err != nil {
                log.Fatalf("Failed to start consumer: %s", err)
        }
        defer consumer.Close()

        // 订阅 Kafka 主题
        partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)
        if err != nil {
                log.Fatalf("Failed to start partition consumer: %s", err)
        }
        defer partitionConsumer.Close()

        // 消费消息
        for msg := range partitionConsumer.Messages() {
                log.Printf("Consumed message: %s, from partition(%d), offset(%d)\n", string(msg.Value), msg.Partition, msg.Offset)
        }
}

多点配置版本

生产者

packagemain

import(
    "fmt"
    "github.com/IBM/sarama"
    "log"
    "time"
)

funcmain() {
    // 配置生产者config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认config.Producer.Retry.Max = 5                    // 最大重试次数config.Producer.Return.Successes = true          // 返回成功的消息config.Producer.Return.Errors = true             // 返回失败的消息config.Producer.Timeout = 10 * time.Second       // 设置生产者的超时时间config.Net.MaxOpenRequests = 5                   // 控制最大请求数config.Version = sarama.V2_8_0_0                 // 配置 Kafka 版本(可根据实际 Kafka 版本调整)// 创建生产者实例producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    iferr != nil {
       log.Fatalf("Failed to start producer: %s", err)
    }
    deferproducer.Close()

    // 循环发送消息fori := 1; ; i++ {
       // 构造消息msg := &sarama.ProducerMessage{
          Topic: "test_topic",                                                                       // 目标主题Value: sarama.StringEncoder(fmt.Sprintf("Message #%d: Hello, Kafka! www.zpf0000.com", i)), // 动态生成消息内容}

       // 发送消息partition, offset, err := producer.SendMessage(msg)
       iferr != nil {
          // 错误处理:打印错误并继续发送下一条消息log.Printf("Failed to send message: %s", err)
          continue}

       // 成功发送消息后记录日志log.Printf("Message #%d is stored in topic(%s)/partition(%d)/offset(%d)", i, "test_topic", partition, offset)

       // 模拟消息生产间隔(例如每秒发送一条消息)time.Sleep(1 * time.Second)
    }
}

消费者

packagemain

import(
    "github.com/IBM/sarama"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

funcmain() {
    // 配置消费者config := sarama.NewConfig()
    config.Consumer.Return.Errors = true                  // 启用错误返回config.Consumer.Offsets.Initial = sarama.OffsetNewest // 从最新消息开始消费config.Version = sarama.V2_8_0_0                      // 配置 Kafka 版本(可根据实际 Kafka 版本调整)// 创建 Kafka 消费者实例consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    iferr != nil {
       log.Fatalf("Failed to start consumer: %s", err)
    }
    deferconsumer.Close()

    // 订阅主题的分区partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)
    iferr != nil {
       log.Fatalf("Failed to start partition consumer: %s", err)
    }
    deferpartitionConsumer.Close()

    // 用于捕获系统信号(例如 Ctrl+C),在接收到信号时优雅地退出sigChan := make(chanos.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    // 用来跟踪消费者的状态,确保及时处理错误gofunc() {
       forerr := rangepartitionConsumer.Errors() {
          log.Printf("Error: %s", err.Error())
       }
    }()

    // 监听消息并处理log.Println("Consumer is ready, waiting for messages...")
    for{
       select{
       casemsg := <-partitionConsumer.Messages():
          // 打印收到的消息log.Printf("Received message: %s, from partition(%d), offset(%d)\n", string(msg.Value), msg.Partition, msg.Offset)

          // 处理消息(可以根据需求扩展处理逻辑)// 模拟消息处理时间time.Sleep(500 * time.Millisecond) // 例如处理消息需要 500 毫秒// 在这里,可以对消息进行确认或其他操作,例如处理完消息后将其存入数据库等case<-sigChan:
          // 捕获到退出信号,优雅退出log.Println("Received shutdown signal, exiting...")
          return}
    }
}

链接

https://cloud.tencent.com/developer/article/1547380 # 优质博客一篇

https://kafka1x.apachecn.org/documentation.html#producerapi # 官方中文文档


http://www.kler.cn/a/473572.html

相关文章:

  • 【HTML+CSS+JS+VUE】web前端教程-2-HTML5介绍和基础骨架
  • 如何配置Cursor的显示主题模式
  • /src/utils/request.ts:axios 请求封装,适用于需要统一处理请求和响应的场景
  • 【问题】配置 Conda 与 Pip 源
  • 年会抽奖Html
  • HCIA-Access V2.5_8_2_EPON基本架构和关键参数
  • 专家系统和混合专家系统区别;FedMix和FedMoE 区别联系
  • 【剑指Offer刷题系列】数据流中的中位数
  • 【HTML+CSS+JS+VUE】web前端教程-6-图片路径详解
  • vue el-table 数据变化后,高度渲染问题
  • 新模型设计:Attention-ResNet for CIFAR-10 分类
  • 漏洞挖掘 | Swagger UI 目录枚举小总结
  • 【pyqt】(八)ui文件使用
  • Ollama私有化部署大语言模型LLM(上)
  • Go语言的 的继承(Inheritance)基础知识
  • 甘肃省乡镇界arcgis格式shp数据乡镇名称和编码下载内容测评
  • c++ 两线交点计算程序(Program for Point of Intersection of Two Lines)
  • 初学stm32 --- DMA直接存储器
  • Python入门教程 —— 网络编程
  • 机器学习算法的分类
  • 8_TypeScript String --[深入浅出 TypeScript 测试]
  • Ubuntu网络连接问题(笔记本更换wifi后,虚拟机连不上网络)
  • 【漏洞分析】DDOS攻防分析
  • 【修改mysql支持远程访问】
  • openai swarm agent框架源码详解及应用案例实战
  • Python数据可视化-Pandas