当前位置: 首页 > article >正文

高效Android MQTT封装工具:简化物联网开发,提升性能与稳定性

在Android开发中,封装MQTT工具可以帮助简化与MQTT服务器的通信。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,常用于物联网(IoT)设备之间的通信。

以下是一个简单的MQTT封装工具示例,使用Eclipse Paho库来实现MQTT客户端功能。

  1. 添加依赖
    首先,在build.gradle文件中添加Paho MQTT库的依赖:
dependencies {
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
  1. 权限配置
    在AndroidManifest.xml中添加必要的权限:
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
  1. 服务配置
    在AndroidManifest.xml中注册Paho的MqttService:
<service android:name="org.eclipse.paho.android.service.MqttService" />

4.MQTT封装工具类支持:
自定义客户端ID:支持用户自定义客户端ID。

遗言消息:支持设置遗言消息,用于客户端异常断开时通知其他客户端。

线程池管理:使用线程池处理MQTT操作,避免阻塞主线程。

灵活的回调机制:支持为每个操作单独设置回调。

连接重试机制:增加连接失败后的重试机制,提升连接稳定性。

日志级别:支持动态调整日志级别,便于调试和生产环境切换。
支持多服务器连接:允许同时连接多个MQTT服务器。

消息缓存策略:增加消息缓存的最大数量和时间限制,避免内存泄漏。

更灵活的重试机制:支持自定义重试次数和重试间隔。

连接状态管理:增加更详细的连接状态(如连接中、已连接、断开中等)。

支持QoS级别配置:允许动态配置发布和订阅的QoS级别。

MQTT工具类

import android.content.Context;
import android.util.Log;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MqttManager {

    private static final String TAG = "MqttManager";

    private static MqttManager instance;
    private Map<String, MqttAndroidClient> mqttClients = new HashMap<>(); // 支持多服务器连接
    private Map<String, MqttConnectOptions> mqttConnectOptionsMap = new HashMap<>();
    private Map<String, Boolean> connectionStatusMap = new HashMap<>(); // 连接状态
    private Map<String, List<MqttMessage>> messageQueueMap = new HashMap<>(); // 消息队列
    private Map<String, List<String>> subscribedTopicsMap = new HashMap<>(); // 已订阅的主题
    private ExecutorService executorService = Executors.newCachedThreadPool(); // 线程池

    private ConnectionStateListener connectionStateListener;

    private static final int DEFAULT_QOS = 1;
    private static final boolean DEFAULT_RETAINED = false;
    private static final int MAX_MESSAGE_QUEUE_SIZE = 100; // 消息队列最大数量
    private static final long MESSAGE_QUEUE_TIMEOUT = 60000; // 消息队列超时时间(毫秒)
    private static final int MAX_RETRY_COUNT = 3; // 最大重试次数
    private static final int RETRY_INTERVAL = 5000; // 重试间隔(毫秒)

    // 单例模式
    public static synchronized MqttManager getInstance() {
        if (instance == null) {
            instance = new MqttManager();
        }
        return instance;
    }

    private MqttManager() {}

    /**
     * 初始化MQTT客户端
     */
    public void initClient(Context context, String serverUri, String clientId) {
        if (mqttClients.containsKey(serverUri)) {
            Log.w(TAG, "Client already initialized for server: " + serverUri);
            return;
        }

        MqttAndroidClient client = new MqttAndroidClient(context, serverUri, clientId);
        mqttClients.put(serverUri, client);
        mqttConnectOptionsMap.put(serverUri, new MqttConnectOptions());
        connectionStatusMap.put(serverUri, false);
        messageQueueMap.put(serverUri, new ArrayList<>());
        subscribedTopicsMap.put(serverUri, new ArrayList<>());

        // 设置回调
        client.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                connectionStatusMap.put(serverURI, true);
                Log.d(TAG, "MQTT connected: " + serverURI);
                if (connectionStateListener != null) {
                    connectionStateListener.onConnected(serverURI, reconnect);
                }
                // 连接成功后发送缓存的消息
                processMessageQueue(serverURI);
                // 重新订阅主题
                resubscribeTopics(serverURI);
            }

            @Override
            public void connectionLost(Throwable cause) {
                for (Map.Entry<String, MqttAndroidClient> entry : mqttClients.entrySet()) {
                    if (entry.getValue().equals(client)) {
                        String serverUri = entry.getKey();
                        connectionStatusMap.put(serverUri, false);
                        Log.e(TAG, "MQTT connection lost: " + serverUri, cause);
                        if (connectionStateListener != null) {
                            connectionStateListener.onDisconnected(serverUri, cause);
                        }
                        // 尝试重新连接
                        attemptReconnect(serverUri, 0);
                        break;
                    }
                }
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                Log.d(TAG, "Message arrived: " + new String(message.getPayload()));
                if (connectionStateListener != null) {
                    connectionStateListener.onMessageReceived(topic, message);
                }
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                Log.d(TAG, "Message delivered");
            }
        });
    }

    /**
     * 设置连接参数
     */
    public void setConnectionOptions(String serverUri, String username, String password) {
        MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);
        if (options != null) {
            options.setUserName(username);
            options.setPassword(password.toCharArray());
        }
    }

    /**
     * 设置遗言消息
     */
    public void setWillMessage(String serverUri, String topic, String message, int qos, boolean retained) {
        MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);
        if (options != null) {
            options.setWill(topic, message.getBytes(), qos, retained);
        }
    }

    /**
     * 设置SSL/TLS加密连接
     */
    public void setSSL(String serverUri, boolean useSSL) {
        MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);
        if (options != null && useSSL) {
            options.setSocketFactory(new DefaultSSLSocketFactory());
        }
    }

    /**
     * 连接到MQTT服务器
     */
    public void connect(String serverUri) {
        if (connectionStatusMap.get(serverUri)) {
            Log.w(TAG, "Already connected to MQTT server: " + serverUri);
            return;
        }

        MqttAndroidClient client = mqttClients.get(serverUri);
        MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);

        if (client == null || options == null) {
            Log.e(TAG, "Client or options not initialized for server: " + serverUri);
            return;
        }

        executorService.execute(() -> {
            try {
                client.connect(options, null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        Log.d(TAG, "Connected to MQTT server: " + serverUri);
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        Log.e(TAG, "Failed to connect to MQTT server: " + serverUri, exception);
                        attemptReconnect(serverUri, 0);
                    }
                });
            } catch (MqttException e) {
                Log.e(TAG, "MQTT connection error: " + serverUri, e);
                attemptReconnect(serverUri, 0);
            }
        });
    }

    /**
     * 断开连接
     */
    public void disconnect(String serverUri) {
        if (!connectionStatusMap.get(serverUri)) {
            Log.w(TAG, "Already disconnected from MQTT server: " + serverUri);
            return;
        }

        MqttAndroidClient client = mqttClients.get(serverUri);
        if (client == null) {
            Log.e(TAG, "Client not initialized for server: " + serverUri);
            return;
        }

        executorService.execute(() -> {
            try {
                client.disconnect(null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        connectionStatusMap.put(serverUri, false);
                        Log.d(TAG, "Disconnected from MQTT server: " + serverUri);
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        Log.e(TAG, "Failed to disconnect from MQTT server: " + serverUri, exception);
                    }
                });
            } catch (MqttException e) {
                Log.e(TAG, "MQTT disconnection error: " + serverUri, e);
            }
        });
    }

    /**
     * 订阅主题
     */
    public void subscribe(String serverUri, String topic, int qos, IMqttActionListener listener) {
        if (!connectionStatusMap.get(serverUri)) {
            Log.w(TAG, "Not connected, subscription will be attempted after connection: " + serverUri);
            subscribedTopicsMap.get(serverUri).add(topic); // 缓存主题
            return;
        }

        MqttAndroidClient client = mqttClients.get(serverUri);
        if (client == null) {
            Log.e(TAG, "Client not initialized for server: " + serverUri);
            return;
        }

        executorService.execute(() -> {
            try {
                client.subscribe(topic, qos, null, listener);
            } catch (MqttException e) {
                Log.e(TAG, "MQTT subscription error: " + serverUri, e);
            }
        });
    }

    /**
     * 发布消息
     */
    public void publish(String serverUri, String topic, String message, int qos, boolean retained, IMqttActionListener listener) {
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);

        if (!connectionStatusMap.get(serverUri)) {
            Log.w(TAG, "Not connected, message will be queued: " + serverUri);
            List<MqttMessage> queue = messageQueueMap.get(serverUri);
            if (queue.size() < MAX_MESSAGE_QUEUE_SIZE) {
                queue.add(mqttMessage); // 缓存消息
            } else {
                Log.w(TAG, "Message queue is full, discarding message: " + serverUri);
            }
            return;
        }

        MqttAndroidClient client = mqttClients.get(serverUri);
        if (client == null) {
            Log.e(TAG, "Client not initialized for server: " + serverUri);
            return;
        }

        executorService.execute(() -> {
            try {
                client.publish(topic, mqttMessage, null, listener);
            } catch (MqttException e) {
                Log.e(TAG, "MQTT publish error: " + serverUri, e);
            }
        });
    }

    /**
     * 处理消息队列
     */
    private void processMessageQueue(String serverUri) {
        List<MqttMessage> queue = messageQueueMap.get(serverUri);
        for (MqttMessage message : queue) {
            publish(serverUri, "default/topic", new String(message.getPayload()), message.getQos(), message.isRetained(), null);
        }
        queue.clear();
    }

    /**
     * 重新订阅主题
     */
    private void resubscribeTopics(String serverUri) {
        List<String> topics = subscribedTopicsMap.get(serverUri);
        for (String topic : topics) {
            subscribe(serverUri, topic, DEFAULT_QOS, null);
        }
    }

    /**
     * 尝试重新连接
     */
    private void attemptReconnect(String serverUri, int retryCount) {
        if (retryCount >= MAX_RETRY_COUNT) {
            Log.w(TAG, "Max retry count reached for server: " + serverUri);
            return;
        }

        executorService.execute(() -> {
            try {
                Thread.sleep(RETRY_INTERVAL);
                connect(serverUri);
            } catch (InterruptedException e) {
                Log.e(TAG, "Reconnect attempt interrupted: " + serverUri, e);
            }
        });
    }

    /**
     * 设置连接状态监听器
     */
    public void setConnectionStateListener(ConnectionStateListener listener) {
        this.connectionStateListener = listener;
    }

    /**
     * 连接状态监听器接口
     */
    public interface ConnectionStateListener {
        void onConnected(String serverUri, boolean isReconnect);
        void onDisconnected(String serverUri, Throwable cause);
        void onMessageReceived(String topic, MqttMessage message);
    }
}

以下是一个使用 MqttManager 的完整示例。这个示例展示了如何初始化MQTT客户端、连接服务器、订阅主题、发布消息以及处理连接状态和消息回调。

import android.os.Bundle;
import androidx.appcompat.app.AppCompatActivity;

import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MainActivity extends AppCompatActivity {

    private static final String TAG = "MainActivity";
    private static final String SERVER_URI = "tcp://your.mqtt.broker:1883"; // MQTT服务器地址
    private static final String CLIENT_ID = "android_client_" + System.currentTimeMillis(); // 客户端ID
    private static final String TOPIC = "your/topic"; // 订阅的主题
    private static final String WILL_TOPIC = "your/lwt/topic"; // 遗言主题
    private static final String WILL_MESSAGE = "Client disconnected"; // 遗言消息

    private MqttManager mqttManager;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        // 初始化MQTT管理器
        mqttManager = MqttManager.getInstance();
        mqttManager.initClient(this, SERVER_URI, CLIENT_ID);

        // 设置连接参数
        mqttManager.setConnectionOptions(SERVER_URI, "username", "password");

        // 设置遗言消息
        mqttManager.setWillMessage(SERVER_URI, WILL_TOPIC, WILL_MESSAGE, 1, true);

        // 设置连接状态监听器
        mqttManager.setConnectionStateListener(new MqttManager.ConnectionStateListener() {
            @Override
            public void onConnected(String serverUri, boolean isReconnect) {
                Log.d(TAG, "Connected to MQTT server: " + serverUri + ", isReconnect: " + isReconnect);

                // 订阅主题
                mqttManager.subscribe(SERVER_URI, TOPIC, 1, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        Log.d(TAG, "Subscribed to topic: " + TOPIC);
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        Log.e(TAG, "Failed to subscribe to topic: " + TOPIC, exception);
                    }
                });
            }

            @Override
            public void onDisconnected(String serverUri, Throwable cause) {
                Log.e(TAG, "Disconnected from MQTT server: " + serverUri, cause);
            }

            @Override
            public void onMessageReceived(String topic, MqttMessage message) {
                Log.d(TAG, "Received message from topic: " + topic + ", payload: " + new String(message.getPayload()));
            }
        });

        // 连接MQTT服务器
        mqttManager.connect(SERVER_URI);

        // 发布消息
        publishMessage("Hello, MQTT!");
    }

    /**
     * 发布消息
     */
    private void publishMessage(String message) {
        mqttManager.publish(SERVER_URI, TOPIC, message, 1, false, new IMqttActionListener() {
            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                Log.d(TAG, "Message published to topic: " + TOPIC);
            }

            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                Log.e(TAG, "Failed to publish message to topic: " + TOPIC, exception);
            }
        });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        // 断开连接
        mqttManager.disconnect(SERVER_URI);
    }
}

示例说明
初始化MQTT客户端:

使用 MqttManager.getInstance() 获取单例实例。

调用 initClient() 初始化MQTT客户端,传入服务器地址和客户端ID。

设置连接参数:

使用 setConnectionOptions() 设置用户名和密码。

设置遗言消息:

使用 setWillMessage() 设置遗言消息,当客户端异常断开时,服务器会发布该消息。

设置连接状态监听器:

实现 ConnectionStateListener 接口,监听连接状态变化和消息到达事件。

连接MQTT服务器:

调用 connect() 连接到MQTT服务器。

订阅主题:

在 onConnected() 回调中订阅主题。

发布消息:

使用 publish() 方法发布消息到指定主题。

断开连接:

在 onDestroy() 中调用 disconnect() 断开连接。

注意事项
服务器地址和主题:

替换 SERVER_URI 和 TOPIC 为实际的MQTT服务器地址和主题。

用户名和密码:

如果MQTT服务器需要认证,请设置正确的用户名和密码。

遗言消息:

遗言消息用于在客户端异常断开时通知其他客户端,确保设置合适的主题和消息内容。

线程安全:

MQTT操作在后台线程中执行,避免阻塞主线程。


http://www.kler.cn/a/579061.html

相关文章:

  • 基于Linux环境部署和使用ElasticSearch搜索引擎
  • C# 开发工具Visual Studio下载和安装
  • ubuntu20.04已安装 11.6版本 cuda,现需要通过源码编译方式安装使用 cuda 加速的 ffmpeg 步骤
  • 【MySQL】(4) 表的操作
  • Spring Boot 内置工具类,功能齐全!!
  • MoonSharp 文档一
  • 2025年最新可用!Docker/DockerHub 国内镜像源/加速列表
  • 【蓝桥杯每日一题】3.8
  • Vue.js框架设计核心要素解析
  • 【GPT入门】第8课 大语言模型的自洽性
  • 安固软件指南:确保外发文件安全的全面策略
  • 腾讯云大模型知识引擎LKE+DeepSeek结合工作流升级智能客服
  • ESP8266TCP单连接透传
  • RabbitMQ从入门到实战-2
  • 计算机视觉cv2入门之图像的读取,显示,与保存
  • STM32-USART串口数据包
  • 大白话JavaScript闭包实现原理与在实际开发中的应用场景
  • 六十天前端强化训练之第八天到第十四天——综合案例:用户管理系统
  • PHP:格式化JSON为PHP语法格式
  • 【贪心算法】将数组和减半的最小操作数