当前位置: 首页 > article >正文

【Kafka-go】golang的kafka应用

网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。

一、下载github.com/segmentio/kafka-go包

go get github.com/segmentio/kafka-go

二、建立kafka连接

正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9092"    //host 具体看你们自己的配置如果是服务器上的 就是服务器iP:9092 本地就是localhost:9092
const topic = "my"
const partition = 0

/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaConn() (*kafka.Conn, error) {
	return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}

三、kafka之发送消息(生产者)

/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {
	Name string `json:"name"`
	Pwd  string `json:"pwd"`
}


// writeByConn 基于Conn发送消息
func writeByConn() {

	// 连接至Kafka集群的Leader节点
	conn, err := NewKafKaCon()
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	// 设置发送消息的超时时间
	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
	people1 := People{"Tmo",
		"124"}
	people2 := People{"Mac",
		"124"}
	people3 := People{"Joker",
		"124"}
	// 发送消息
	str1, _ := json.Marshal(people1)
	str2, _ := json.Marshal(people2)
	str3, _ := json.Marshal(people3)
	_, err = conn.WriteMessages(
		kafka.Message{Value: []byte(str1)},
		kafka.Message{Value: []byte(str2)},
		kafka.Message{Value: []byte(str3)},
	)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}

四、kafka之接收消息(消费者)

// readByConn 连接至kafka后接收消息
func readByConn() {
	// 指定要连接的topic和partition

	// 连接至Kafka的leader节点
	conn, err := NewKafKaCon()
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}
	// 设置读取超时时间
	conn.SetReadDeadline(time.Now().Add(10 * time.Second))
	// 读取一批消息,得到的batch是一系列消息的迭代器
	batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

	// 遍历读取消息
	b := make([]byte, 10e3) // 10KB max per message
	for {
		p := People{}
		n, err := batch.Read(b)
		if err != nil {
			break
		}
		err = json.Unmarshal(b[:n], &p)
		if err != nil {
			fmt.Println(string(b))
			fmt.Println(err, "**************")
			continue
		}
		fmt.Println(p)
	}
	// 关闭batch
	if err := batch.Close(); err != nil {
		log.Fatal("failed to close batch:", err)
	}
	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close connection:", err)
	}
}

完整代码

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/segmentio/kafka-go"
	"log"
	"time"
)

/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {
	Name string `json:"name"`
	Pwd  string `json:"pwd"`
}

/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0

/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {
	return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}

func main() {
	writeByConn()
	readByConn()
	
}

// writeByConn 基于Conn发送消息
func writeByConn() {

	// 连接至Kafka集群的Leader节点
	conn, err := NewKafKaCon()
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	// 设置发送消息的超时时间
	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
	people1 := People{"Tmo",
		"124"}
	people2 := People{"Mac",
		"124"}
	people3 := People{"Joker",
		"124"}
	// 发送消息
	str1, _ := json.Marshal(people1)
	str2, _ := json.Marshal(people2)
	str3, _ := json.Marshal(people3)
	_, err = conn.WriteMessages(
		kafka.Message{Value: []byte(str1)},
		kafka.Message{Value: []byte(str2)},
		kafka.Message{Value: []byte(str3)},
	)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}

// readByConn 连接至kafka后接收消息
func readByConn() {
	// 指定要连接的topic和partition

	// 连接至Kafka的leader节点
	conn, err := NewKafKaCon()
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}
	// 设置读取超时时间
	conn.SetReadDeadline(time.Now().Add(10 * time.Second))
	// 读取一批消息,得到的batch是一系列消息的迭代器
	batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

	// 遍历读取消息
	b := make([]byte, 10e3) // 10KB max per message
	for {
		p := People{}
		n, err := batch.Read(b)
		if err != nil {
			break
		}
		err = json.Unmarshal(b[:n], &p)
		if err != nil {
			fmt.Println(string(b))
			fmt.Println(err, "**************")
			continue
		}
		fmt.Println(p)
	}
	// 关闭batch
	if err := batch.Close(); err != nil {
		log.Fatal("failed to close batch:", err)
	}
	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close connection:", err)
	}
}

五、kafka之消费者组实现消息确认(从一次消费消息的末尾开始接收消息)

只需要给读取消息的方法改变一下就可以了


func readByConn() {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{host},
		GroupID:  "consumer-group-id",
		Topic:    topic,
		MaxBytes: 10e6, // 10MB
	})

	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			break
		}
		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
	}
}

完整代码

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/segmentio/kafka-go"
	"log"
	"time"
)

/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {
	Name string `json:"name"`
	Pwd  string `json:"pwd"`
}

/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0

/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {
	return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}

func main() {
	writeByConn()
	readByConn()
}

// writeByConn 基于Conn发送消息
func writeByConn() {

	// 连接至Kafka集群的Leader节点
	conn, err := NewKafKaCon()
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	// 设置发送消息的超时时间
	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
	people1 := People{"Tmo",
		"124"}
	people2 := People{"Mac",
		"124"}
	people3 := People{"Joker",
		"124"}
	// 发送消息
	str1, _ := json.Marshal(people1)
	str2, _ := json.Marshal(people2)
	str3, _ := json.Marshal(people3)
	_, err = conn.WriteMessages(
		kafka.Message{Value: []byte(str1)},
		kafka.Message{Value: []byte(str2)},
		kafka.Message{Value: []byte(str3)},
	)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}



func readByConn() {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{host},
		GroupID:  "consumer-group-id",
		Topic:    topic,
		MaxBytes: 10e6, // 10MB
	})

	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			break
		}
		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
	}
}


http://www.kler.cn/a/389202.html

相关文章:

  • Linux设置Nginx开机启动
  • 基于promtail+loki+grafana搭建日志系统
  • 【Qt】在 Qt Creator 中使用图片资源方法(含素材网站推荐)
  • 给查询业务添加redis缓存和缓存更新策略
  • uniapp打包华为,提示请提供64位版本软件包后再提交审核
  • LLM之模型评估:情感评估/EQ评估/幻觉评估等
  • ReactPress:深入解析技术方案设计与源码
  • C++学习笔记----10、模块、头文件及各种主题(六)---- C风格可变长度参数列表
  • qt QRunnable 与 QThreadPool详解
  • 【CSS】居中样式
  • Nginx性能调优的具体策略方法
  • opcua认证测试1108 增加对三菱,西门子,modbus支持
  • PySindy学习
  • 无人机避障——(局部规划方法)DWA(动态窗口法)
  • C#开发流程
  • 1.1 算法基本概念与复杂度分析
  • qt QWebSocketServer详解
  • 第十三届交通运输研究(上海)论坛┆智能网联汽车技术现状与研究实践
  • SpringBoot项目编译报错 类文件具有错误的版本 61.0, 应为 52.0
  • ssh2-sftp-client一键将你的前端项目部署到远程服务器,你只需要专注写bug就好了
  • Rust:GUI 开源框架
  • 数据库范式、MySQL 架构、算法与树的深入解析
  • 实现rtos操作系统 【二】基本任务切换实现
  • 大模型,智能家居的春秋战国之交
  • goframe开发一个企业网站 验证码17
  • DotNet使用CsvHelper快速读取和写入CSV文件的操作方法