【物联网IoT - 10分钟,构建一个自己的MQTT Broker服务!】
常见的MQTT Broker
- Eclipse Mosquitto:轻量级 MQTT Broker,易于部署和使用。
- EMQX:高性能、可扩展的 MQTT Broker。
- HiveMQ:企业级 MQTT Broker,支持集群和高可用性。
- VerneMQ:高性能、可扩展的 MQTT Broker,支持集群。
需求
- 配置要求低:单机可部署
- 适配mqtt协议,客户端支持paho.mqtt
- 部署相对简单
- 部署环境支持
选哪个
选Eclipse Mosquitto
为什么
- 之前的客户端用了paho mqtt (也属于Eclipse下面的项目),和Eclipse Mosquitto 都属于Eclipse 基金会下的开源项目,完全适配。
- 部署简单,且支持多环境。
Eclipse Mosquitto 作为物联网IoT MQTT服务,对比其它MQTT Broker,类似于数据库中的SQLite。
单机可部署,轻量级,不支持分布式,简单快直接。
部署步骤
1.安装
sudo apt-get update
安装客户端和服务端
sudo apt-get install -y mosquitto mosquitto-clients
2. 启动
mosquitto -c /etc/mosquitto/mosquitto.conf
3. 测试
使用mosquitto命令行客户端 mosquitto_sub/pub 安装完后试下,订阅和发布
mosquitto_sub -h localhost -t test/topic
mosquitto_pub -h localhost -t test/topic -m "Hello, MQTT"
代码 测试,一个Go Paho MQTT Client示例
package main
import (
"fmt"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
// 创建 MQTT 客户端选项
opts := mqtt.NewClientOptions().AddBroker("tcp://123.56.56.123:1883")
opts.SetClientID("go_mqtt_client")
// opts.SetUsername("username") // 添加用户名
// opts.SetPassword("xxx") // 添加密码
opts.SetDefaultPublishHandler(messagePubHandler)
// 创建 MQTT 客户端
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
// 订阅主题
topic := "test/topic"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s\n", topic)
// 发布消息
for i := 0; i < 5; i++ {
text := fmt.Sprintf("Hello, MQTT %d", i)
token = client.Publish(topic, 0, false, text)
token.Wait()
time.Sleep(1 * time.Second)
}
// 断开连接
client.Disconnect(250)
}
// 消息发布处理函数
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
4 安全相关
第一步:开启ssl加密传输,配置ssl,避免消息在网络中明文传输。
- 使用RSA算法生成,私钥,公钥,证书信息。
openssl genpkey -algorithm RSA -out mosquitto.key
openssl req -new -key mosquitto.key -out mosquitto.csr
openssl x509 -req -in mosquitto.csr -signkey mosquitto.key -out mosquitto.crt -days 365
- 在mosquitto中配置
listener 8883
cafile /path/to/ca.crt
certfile /path/to/mosquitto.crt
keyfile /path/to/mosquitto.key
- 重启服务后,再次连接,注意代码中加上ssl配置
// 更新连接串
opts := mqtt.NewClientOptions().AddBroker("ssl://123.56.56.123:8883")
...
// 配置TLS
tlsConfig := &tls.Config{
InsecureSkipVerify: true, // 跳过对服务端CA证书的验证
}
opts.SetTLSConfig(tlsConfig)
第二步:身份验证,增加密码
mosquitto_passwd -c /etc/mosquitto/passwd username
修改配置文件 ,路径 /etc/mosquitto/mosquitto.conf
allow_anonymous false
password_file /etc/mosquitto/passwd
对应代码修改
...
// 创建 MQTT 客户端选项
opts := mqtt.NewClientOptions().AddBroker("tcp://123.56.56.123:1883")
opts.SetClientID("go_mqtt_client")
opts.SetUsername("username") // 添加用户名
opts.SetPassword("xxx") // 添加密码
opts.SetDefaultPublishHandler(messagePubHandler)
...
补充问题
mosquitto对于传输大小是否有限制
- 默认限制 256MB ,可以设置
max_packet_size 268435455 # 设置最大消息大小为256MB
mosquitto里面的消息队列长度默认多长,如果长时间不消费,怎么处理死信
- 有过期时间, 比如1小时
message-expiry-interval 3600 # 消息过期时间为1小时