当前位置: 首页 > article >正文

Golang Gin框架mqtt消费者

这里主要是展示一个非常简单的Gin框架下的mqtt消费者,在保持启动后持续轮训消费。简单实用是主旨。

viper全局调用yaml文件中的数据

redisClient 缓存客户端
doJYService 业务逻辑代码

package main

import (
	"fmt"
	"github.com/gin-gonic/gin"
	"github.com/spf13/viper"
	"jieyu-gin/mqtt-service/config"
	"jieyu-gin/mqtt-service/doJYService"
	"jieyu-gin/mqtt-service/internal/redisClient"
)

func main() {
	router := gin.Default()
	gin.SetMode(gin.ReleaseMode)

	if err := config.InitConfig(); err != nil {
		fmt.Printf("Failed to initialize config: %v\n", err)
		return
	}

	redisClient.InitRedisClient()

	doJYService.BatchProcessing()

	go func() {
		router.Run(viper.GetString("app.port"))
	}()
}

func BatchProcessing() {} 方法中 for{}中处理数据。

package doJYService

import (
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"jieyu-gin/mqtt-service/internal/database"
	"jieyu-gin/mqtt-service/internal/log"
	"jieyu-gin/mqtt-service/internal/redisClient"
	"time"
)

var broker = "127.0.0.1" //你的broker ip 服务器消费地址
var port = 1883
var userName = "jieyu_mqtt"
var passwd = "jieyu_mqtt_1688"
var topic = "jieyu_online"

var messageRecHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("Clenit Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

func BatchProcessing() string {

	opts := mqtt.NewClientOptions()
	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
	opts.SetClientID("go_mqtt_consumer")
	opts.SetUsername(userName)
	opts.SetPassword(passwd)
	opts.SetKeepAlive(8 * time.Second)

	opts.SetDefaultPublishHandler(messageRecHandler)
	opts.OnConnect = connectHandler
	opts.OnConnectionLost = connectLostHandler
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	sub(client, false)
	time.Sleep(30 * time.Second)

	for {
		time.Sleep(3 * time.Second)
		fmt.Println("目前没有数据,等待消息进场。")
	}

	return ""
}

// 初始化数据库和日志
func InitMysqlAndLog(name string, id string) {

	writer, err := log.New(id, name)
	if err != nil {
		panic(err)
	}

	database.Init(writer)

	redisClient.Set("log-"+name+"-"+id, writer.Name())
}

func sub(client mqtt.Client, producer bool) {
	token := client.Subscribe(topic, 1, nil)
	token.Wait()
	if producer {
		fmt.Printf("Producer subscribed to topic %s", topic)
	} else {
		fmt.Printf("Consumer subscribed to topic %s", topic)
	}
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	fmt.Printf("Connect lost: %v", err)
}

效果图如下

消息持续消费,具体处理业务的逻辑代码自己写。


http://www.kler.cn/a/540070.html

相关文章:

  • Kokoro 开源文本转语音引擎上线!多语言支持,无需联网,浏览器内极速运行
  • openAI官方prompt技巧(一)
  • 【系统架构设计师】体系结构文档化
  • vscode无法ssh连接远程机器解决方案
  • Elasticsearch去分析目标服务器的日志,需要在目标服务器上面安装Elasticsearch 软件吗
  • MongoDB开发规范
  • 【分布式理论7】分布式调用之:服务间的(RPC)远程调用
  • ffmpeg 常用命令
  • modbus tcp,modbus,tcp几种通信方式的区别
  • 子集II(力扣90)
  • 【Linux网络编程】之守护进程
  • 2025年面试运维经验分享
  • Elasticsearch操作--笔记
  • 安宝特方案 | AR眼镜:远程医疗的“时空折叠者”,如何为生命争夺每一分钟?
  • AJAX项目——数据管理平台
  • java-初识List
  • 如何通过PHP接入DeepSeek的API
  • DevOps 所需的行为
  • 速通DeepSeek 安装部署文档
  • MYSQL关联关系查询
  • STM32+Proteus+DS18B20数码管仿真实验
  • w200基于spring boot的个人博客系统的设计与实现
  • Logo语言的学习路线
  • 一种基于Leaflet.Legend的图例动态更新方法
  • Spring Boot极速入门:从零搭建第一个Web应用
  • 科技赋能直播!DeepSeek大模型+智享AI直播第三代plus版本,未来直播将更加智能化!