当前位置: 首页 > article >正文

Linux 安装 Mosquitto 及 SpringBoot 整合

Linux(centos)下 Mosquitto MQTT 代理的安装与配置

MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的消息传输协议,广泛应用于物联网(IoT)领域。Mosquitto 是一个开源的 MQTT 代理,它支持 MQTT 协议 3.1 和 3.1.1,适用于各种设备和平台。

在工业上使用 MQTT 协议来进行物联网数据传输,主要看中了以下优点:

  • 低协议开销:它的每消息标题可以短至 2 个字节,容错性好
  • 物联网的网络环境往往比较恶劣,MQTT能从断开故障中恢复,并且不需要额外的编码
  • 低功耗:MQTT专门为了低功耗的目标而设计
  • 最多能接受百万级别的客户端

安装 Mosquitto

安装

选择需要安装的版本,我这里安装的是 2.0.0 版。

官网下载:https://mosquitto.org/files/source/

# 下载
wget --no-check-certificate https://mosquitto.org/files/source/mosquitto-2.0.0.tar.gz

# 安装
tar -zxvf mosquitto-2.0.0.tar.gz

cd mosquitto-2.0.0

make

make install

如果 make 时报如下错误

In file included from mosquitto_ctrl.c:29:
mosquitto_ctrl.h:21:10: fatal error: cJSON.h: No such file or directory
21 | #include <cJSON.h>
|          ^~~~~~~~~
compilation terminated.
make[2]: *** [Makefile:49: mosquitto_ctrl.o] Error 1
make[2]: Leaving directory '/usr/local/mosquitto-2.0.0/apps/mosquitto_ctrl'
make[1]: *** [Makefile:9: all] Error 2
make[1]: Leaving directory '/usr/local/mosquitto-2.0.0/apps'
make: *** [Makefile:64: mosquitto] Error 2

出现 cJSON 找不到,需要安装 cJSON(cJSON-1.7.18.tar.gz):

下载地址:https://github.com/DaveGamble/cJSON/releases

# 安装

tar -zxvf cJSON-1.7.18.tar.gz

cd cJSON-1.7.18

make

make install

cJSON 安装成功后再进行 make 安装即可成功。

相关配置

Mosquitto 的配置文件通常位于:/etc/mosquitto/mosquitto.conf 根据需要修改配置文件中的设置,可以更改监听地址和端口、设置持久化选项、配置用户认证等。

创建配置文件

cp /etc/mosquitto/mosquitto.conf.example /etc/mosquitto/mosquitto.conf

监听端口

一般使用mqtt默认监听端口 1883, 如果需要修改监听端口,需修改配置文件设置自己的端口即可(我这里使用默认的)。

配置用户认证

MQTT 默认无需用户认证,配置 MQTT 的用户名和密码通常涉及编辑 MQTT 代理的配置文件。

Mosquitto 使用 pwfile 来存储用户名和密码,可以使用 mosquitto_passwd 命令来创建和管理这个文件。

# 创建密码文件,用户名称为 mosquittoadmin

mosquitto_passwd -c /etc/mosquitto/pwfile.conf mosquittoadmin

系统会提示输入密码并再次确认,-c 选项表示创建一个新的密码文件。如果文件已经存在,则使用 -b 选项追加用户。

编辑 Mosquitto 配置文件 /etc/mosquitto/mosquitto.conf,添加或修改以下行以启用密码文件认证:

allow_anonymous false

password_file /etc/mosquitto/pwfile

创建 mosquitto 用户

配置文件中默认使用 user mosquitto,需要创建 mosquitto 用户:

groupadd mosquitto

useradd -g mosquitto mosquitto
 
chown -R mosquitto:mosquitto /etc/mosquitto/

设置外网访问

# 设置外网访问
listener 1883 0.0.0.0

启动、查看、关闭程序

运行程序

mosquitto -c /etc/mosquitto/mosquitto.conf -d

查看

ps -aux | grep mosquitto

关闭程序

kill -9 $(pidof mosquitto)

SpringBoot 整合 Mosquitto

添加依赖

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

配置 MQTT 服务器的连接信息

server:
  port: 8001

mqtt:
  url: tcp://127.0.0.1:1883
  username: mosquittoadmin
  password: mosquittoadmin@123
  client-id: test-client

MQTT 配置类

package com.demo.config;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @path:com.demo.config.MqttClientConfig.java
 * @className:MqttClientConfig.java
 * @description: MQTT配置
 * @dateTime:2025/1/5 10:51 
 * @editNote:
 */
@Configuration
public class MqttClientConfig {

    @Value("${mqtt.url}")
    private String url;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient client = new MqttClient(url, clientId);
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        client.connect(options);
        return client;
    }

}

发布、订阅消息

package com.demo.service;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @path:com.demo.service.MqttService.java
 * @className:MqttService.java
 * @description: 发布、订阅消息
 * @dateTime:2025/1/5 10:51 
 * @editNote:
 */
@Slf4j
@Service
public class MqttService {

    @Autowired
    private MqttClient mqttClient;

    /**
     * @MonthName: publish
     * @Description: 发布消息
     * @Date: 2025/1/5 11:26
     * @Param: [topic, message]
     * @return: void
     **/
    public void publish(String topic, String message) throws MqttException {
        log.info("===发布消息====topic:{},消息内容:{}", topic, message);
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        mqttMessage.setQos(2);
        mqttClient.publish(topic, mqttMessage);
    }

    /**
     * @MonthName: subscribe
     * @Description: 订阅消息
     * @Date: 2025/1/5 11:26
     * @Param: [topic]
     * @return: void
     **/
    public void subscribe(String topic) throws MqttException {
        mqttClient.subscribe(topic, (t, msg) -> {
            String message = new String(msg.getPayload());
            log.info("===订阅消息====topic:{},消息内容:{}", topic, message);
        });
    }

}

** MqttController 接口**

package com.demo.controller;

import com.demo.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttService mqttService;

    @GetMapping("/publish")
    public String publish(@RequestParam String topic, @RequestParam String message) {
        try {
            mqttService.publish(topic, message);
            log.info("发布消息,topic:{},消息内容:{}", topic, message);
            return "success";
        } catch (MqttException e) {
            log.error("{}", e);
            return "error";
        }
    }

    @GetMapping("/subscribe")
    public String subscribe(@RequestParam String topic) {
        try {
            mqttService.subscribe(topic);
            log.info("订阅消息,topic:{}", topic);
            return "success";
        } catch (MqttException e) {
            log.error("{}", e);
            return "error";
        }
    }

}

初始化订阅消息

服务初始化启动完成后,自动订阅 Topic

package com.demo.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

/**
 * @path:com.demo.service.SubscribeRunner.java
 * @className:SubscribeRunner.java
 * @description: 初始化订阅消息
 * @dateTime:2025/1/5 15:25 
 * @editNote:
 */
@Slf4j
@Component
@Order(10)
public class SubscribeRunner implements ApplicationRunner {

    @Autowired
    MqttService mqttService;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("================初始化订阅消息==================");
        mqttService.subscribe("mytopic");
    }
}

启动服务测试

启动服务

Connected to the target VM, address: '127.0.0.1:52517', transport: 'socket'

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.7.9)

2025-01-05 21:28:00.464  INFO 8888 --- [           main] com.demo.DemoBootApplication               : Starting DemoBootApplication using Java 1.8.0_251 on DESKTOP-BBRENPM with PID 8888 (E:\workspace\demo-boot\target\classes started by Administrator in E:\workspace\demo-boot)
2025-01-05 21:28:00.466  INFO 8888 --- [           main] com.demo.DemoBootApplication               : No active profile set, falling back to 1 default profile: "default"
2025-01-05 21:28:00.949  INFO 8888 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8001 (http)

自动订阅消息,可以通过请求接口手动订阅:http://localhost:8001/mqtt/subscribe?topic=mytopic

2025-01-05 21:28:01.666  INFO 8888 --- [           main] com.demo.DemoBootApplication               : Started DemoBootApplication in 1.427 seconds (JVM running for 1.932)
2025-01-05 21:28:01.667  INFO 8888 --- [           main] com.demo.service.SubscribeRunner          : ================初始化订阅消息==================
2025-01-05 21:28:16.428  INFO 8888 --- [nio-8001-exec-1] o.a.c.c.C.[.[localhost]        : Initializing Spring DispatcherServlet 'dispatcherServlet'
2025-01-05 21:28:16.428  INFO 8888 --- [nio-8001-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2025-01-05 21:28:16.428  INFO 8888 --- [nio-8001-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms

发布消息,浏览器访问接口:

http://localhost:8001/mqtt/publish?message=测试消息

2025-01-05 21:29:34.603  INFO 8888 --- [nio-8001-exec-8] com.demo.controller.MqttController        : 发布消息,topic:mytopic,消息内容:测试消息

2025-01-05 21:29:34.613  INFO 8888 --- [ll: test-client] com.demo.service.MqttService              : ===订阅消息====topic:mytopic,消息内容:测试消息

http://www.kler.cn/a/472334.html

相关文章:

  • C#语言的网络编程
  • KCP解读:C#库类图
  • 掌握正则表达式:从入门到精通的实战指南
  • 攻防世界 wtf.sh-150
  • 二十三种设计模式-原型模式
  • 大模型LLM-Prompt-CRISPE
  • 用JAVA编写一个简单的小游戏
  • pdf在页面中预览的方法
  • 数据标注「orc」
  • 路由组件与一般组件的区别
  • UDP_TCP
  • 应急指挥与调度子模块示例
  • 解密Navicat密码(Java)
  • 基于Centos 7系统的安全加固方案
  • 理解PDF文档的力量:使用LLM与RAG的本地应用
  • 大语言模型提示技巧(五)-推断
  • 引领实时数据分析新时代:阿里云实时数仓 Hologres
  • Netron可视化深度学习的模型框架,大大降低了大模型的学习门槛
  • 使用ElasticSearch查询
  • Redis的内存预分配策略
  • Ungoogled Chromium127 编译指南 MacOS篇(六)- 获取源代码
  • 用Rust构建高性能WebAssembly模块:性能调优与实际案例
  • 【网络安全技术与应用】(选修)实验4 网络扫描
  • android 启动页倒计时页面编写
  • 【Qt】QtConcurrent
  • 【UE5 C++课程系列笔记】21——弱指针的简单使用