Golang Gin框架mqtt消费者
这里主要是展示一个非常简单的Gin框架下的mqtt消费者,在保持启动后持续轮训消费。简单实用是主旨。
viper全局调用yaml文件中的数据
redisClient 缓存客户端
doJYService 业务逻辑代码
package main
import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/spf13/viper"
"jieyu-gin/mqtt-service/config"
"jieyu-gin/mqtt-service/doJYService"
"jieyu-gin/mqtt-service/internal/redisClient"
)
func main() {
router := gin.Default()
gin.SetMode(gin.ReleaseMode)
if err := config.InitConfig(); err != nil {
fmt.Printf("Failed to initialize config: %v\n", err)
return
}
redisClient.InitRedisClient()
doJYService.BatchProcessing()
go func() {
router.Run(viper.GetString("app.port"))
}()
}
func BatchProcessing() {} 方法中 for{}中处理数据。
package doJYService
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"jieyu-gin/mqtt-service/internal/database"
"jieyu-gin/mqtt-service/internal/log"
"jieyu-gin/mqtt-service/internal/redisClient"
"time"
)
var broker = "127.0.0.1" //你的broker ip 服务器消费地址
var port = 1883
var userName = "jieyu_mqtt"
var passwd = "jieyu_mqtt_1688"
var topic = "jieyu_online"
var messageRecHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Clenit Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
func BatchProcessing() string {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_consumer")
opts.SetUsername(userName)
opts.SetPassword(passwd)
opts.SetKeepAlive(8 * time.Second)
opts.SetDefaultPublishHandler(messageRecHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client, false)
time.Sleep(30 * time.Second)
for {
time.Sleep(3 * time.Second)
fmt.Println("目前没有数据,等待消息进场。")
}
return ""
}
// 初始化数据库和日志
func InitMysqlAndLog(name string, id string) {
writer, err := log.New(id, name)
if err != nil {
panic(err)
}
database.Init(writer)
redisClient.Set("log-"+name+"-"+id, writer.Name())
}
func sub(client mqtt.Client, producer bool) {
token := client.Subscribe(topic, 1, nil)
token.Wait()
if producer {
fmt.Printf("Producer subscribed to topic %s", topic)
} else {
fmt.Printf("Consumer subscribed to topic %s", topic)
}
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
效果图如下
消息持续消费,具体处理业务的逻辑代码自己写。