当前位置: 首页 > article >正文

Android实现Socket通信

问题:

我在Android端对接后端Socket服务实现消息模块的时候,使用WebSocketClient 库,手动构造了订阅和发送消息的 STOMP 帧,,Socket连接成功建立,但是当用户发送消息的时候,对方无法接受到。

然后我Web端使用了成熟的 STOMP 库(Stomp.over 和 SockJS),所有接口测试通过。

排查经历:

  1. 起初我以为是在安卓端手动构造了订阅和发送消息的 STOMP 帧的格式问题,后来经过抓包排查,没有问题
  2. 然后我以为是,由于Android虚拟机和我手机调试网络的原因,我在本地创建了两个虚拟机,运行项目发现还是消息发送成功,推送失败。
  3. 因为问题定位到推送消息的步骤失败,后端采用的是Spring WebSocket的方法messagingTemplate.convertAndSendToUser(),我开始研究这个方法的底层逻辑,并在网上查找资料,发现有的人因为Header构建的问题导致推送消息失效,我又开始了更改调试,没有解决。
  4. 因为我Web端是正常运行的,Android是失败的,所以我使用Wireshark开始抓包对比,到底是哪个步骤出现了问题。

失败案例抓包数据:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

成功案例抓包数据:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

Socket服务器:

Socket建立连接序列图

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

问题原因:

web端的代码使用了成熟的 STOMP 库(Stomp.over 和 SockJS),它在建立连接时会自动发送 CONNECT 帧,完成 STOMP 握手;而你 Android 端的代码则是直接用 WebSocketClient 建立连接,并手动构造了订阅和发送消息的 STOMP 帧,但没有看到发送 CONNECT 帧的过程。

  1. 缺少 CONNECT 握手
    STOMP 协议要求客户端在发送其他 STOMP 帧(例如 SUBSCRIBE 或 SEND)之前,先发送 CONNECT 帧并等待服务器返回 CONNECTED 帧。如果没有发送 CONNECT 帧,服务器可能会忽略你后续发送的订阅或消息帧,从而只看到 ACK 包(底层 TCP 的 ACK 应答),而没有实际的消息推送。

解决方案:

  1. 发送 CONNECT 帧
    在 WebSocket 连接成功(onOpen 回调)后,尽快构造并发送一个标准的 CONNECT 帧。例如,在 Android 端的 onOpen 方法中添加如下代码:
java


复制编辑
@Override
public void onOpen(ServerHandshake handshakedata) {
    Log.d(TAG, "WebSocket连接成功");
    // 发送 CONNECT 帧,指定支持的 STOMP 版本和心跳
    StringBuilder connectFrame = new StringBuilder();
    connectFrame.append("CONNECT\n");
    connectFrame.append("accept-version:1.2\n");
    connectFrame.append("heart-beat:10000,10000\n");
    connectFrame.append("satoken:").append(token).append("\n");
    connectFrame.append("\n"); // 空行分隔头部和正文
    connectFrame.append("\0"); // STOMP帧结束符
    client.send(connectFrame.toString());

    connectionState.postValue(ConnectionState.CONNECTED);
    reconnectAttempts = 0;
    startHeartbeat();

    // 在收到服务器的 CONNECTED 帧之后再订阅个人消息通道
    // 你可以在 onMessage 中判断收到 CONNECTED 后再调用 subscribeToPersonalMessages()
    // 这里为了测试可以先延时一会儿再订阅
    new Handler(Looper.getMainLooper()).postDelayed(() -> subscribeToPersonalMessages(), 500);
    showToast("WebSocket连接成功");
}

这样确保服务器能够正确识别你的 STOMP 客户端,处理后续的订阅和消息发送。

完整Android端代码

package com.xjl.mobilehotel.websocket;

import android.content.Context;
import android.content.Intent;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
import android.widget.Toast;

import androidx.lifecycle.LiveData;
import androidx.lifecycle.MutableLiveData;

import com.google.gson.Gson;
import com.xjl.mobilehotel.model.MessageChatBO;
import com.xjl.mobilehotel.model.MessageConversationBO;
import com.xjl.mobilehotel.model.MessageDTO;
import com.xjl.mobilehotel.model.config.MessageSessionTypeEnum;
import com.xjl.mobilehotel.model.config.MessageStatus;
import com.xjl.mobilehotel.model.config.MessageTypeEnum;
import com.xjl.mobilehotel.ui.auth.LoginActivity;
import com.xjl.mobilehotel.utils.SharedPreferencesManager;

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.json.JSONObject;

import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

/**
 * WebSocket管理类
 */
public class WebSocketManager {
    private static final String TAG = "WebSocketManager";

    // 服务器地址
    private static final String SERVER_URL = "ws://192.168.167.39:8008";
    private static final String WS_PATH = "/ws";

    // 心跳端点
    private static final String HEARTBEAT_ENDPOINT = "/socket/heartbeat";

    // 消息端点
    private static final String MESSAGE_ENDPOINT = "/socket/message/";

    // 个人消息订阅端点
    private static final String PERSONAL_MESSAGE_SUBSCRIPTION = "/user/%s/queue/message";

    // 最大重连次数
    private static final int MAX_RECONNECT_ATTEMPTS = 5;

    // 心跳间隔(毫秒)
    private static final long HEARTBEAT_INTERVAL = 10000;

    private static WebSocketManager instance;
    private WebSocketClient client;
    private Context context;
    private SharedPreferencesManager prefsManager;
    private final CompositeDisposable disposables = new CompositeDisposable();
    private Disposable heartbeatDisposable;
    private int reconnectAttempts = 0;
    private boolean intentionalClose = false;
    private Handler mainHandler;
    private String token;
    private Gson gson;
    private boolean isSubscribed = false;

    // 连接状态
    private final MutableLiveData<ConnectionState> connectionState = new MutableLiveData<>(ConnectionState.DISCONNECTED);

    // 接收到的消息
    private final MutableLiveData<MessageDTO> receivedMessage = new MutableLiveData<>();

    private WebSocketManager() {
        // 私有构造方法
        mainHandler = new Handler(Looper.getMainLooper());
        gson = new Gson();
    }

    /**
     * 获取单例实例
     */
    public static synchronized WebSocketManager getInstance() {
        if (instance == null) {
            instance = new WebSocketManager();
        }
        return instance;
    }

    /**
     * 初始化
     */
    public void init(Context context) {
        this.context = context.getApplicationContext();
        this.prefsManager = SharedPreferencesManager.getInstance(context);
    }

    /**
     * 获取连接状态
     */
    public LiveData<ConnectionState> getConnectionState() {
        return connectionState;
    }

    /**
     * 获取接收到的消息
     */
    public LiveData<MessageDTO> getReceivedMessage() {
        return receivedMessage;
    }

    /**
     * 连接WebSocket
     */
    public void connect() {
        // 检查是否已登录
        token = prefsManager.getToken();
        if (token == null || token.isEmpty()) {
            Log.e(TAG, "用户未登录,无法连接WebSocket");
            showToast("用户未登录,无法连接WebSocket");
            // 跳转到登录界面
            Intent intent = new Intent(context, LoginActivity.class);
            intent.addFlags(Intent.FLAG_ACTIVITY_NEW_TASK);
            context.startActivity(intent);
            return;
        }

        // 如果已经连接,先断开
        disconnect();

        try {
            // 使用HTTP URL,SockJS会自动处理协议转换
            String url = SERVER_URL + WS_PATH + "?satoken=" + token;
            Log.d(TAG, "正在连接WebSocket: " + url);
            showToast("正在连接WebSocket服务器...");

            // 设置请求头
            Map<String, String> headers = new HashMap<>();
            headers.put("satoken", token);

            // 创建WebSocket客户端
            client = new WebSocketClient(new URI(url), new Draft_6455(), headers, 0) {
                @Override
                public void onOpen(ServerHandshake handshakedata) {
                    Log.d(TAG, "WebSocket连接成功");
                    connectionState.postValue(ConnectionState.CONNECTED);
                    reconnectAttempts = 0;
                    startHeartbeat();

                    // 连接成功后先发送 CONNECT 帧,完成 STOMP 握手
                    sendConnectFrame();
                    showToast("WebSocket连接成功, 正在发送CONNECT帧");
                }

                @Override
                public void onMessage(String message) {
                    Log.d(TAG, "收到WebSocket消息: " + message);
                    // 打印消息的前100个字符
                    String logMessage = message.length() > 100 ? message.substring(0, 100) + "..." : message;
                    Log.d(TAG, "收到消息预览: " + logMessage);
                    // 处理消息
                    handleReceivedMessage(message);
                }

                @Override
                public void onClose(int code, String reason, boolean remote) {
                    Log.d(TAG, "WebSocket连接关闭: code=" + code + ", reason=" + reason + ", remote=" + remote);
                    connectionState.postValue(ConnectionState.DISCONNECTED);
                    stopHeartbeat();
                    isSubscribed = false;

                    if (remote && !intentionalClose) {
                        showToast("WebSocket连接被服务器关闭: " + reason);
                    }

                    // 如果不是主动关闭,尝试重连
                    if (!intentionalClose && reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
                        reconnect();
                    }
                }

                @Override
                public void onError(Exception ex) {
                    Log.e(TAG, "WebSocket连接错误", ex);
                    connectionState.postValue(ConnectionState.ERROR);
                    showToast("WebSocket连接失败: " + ex.getMessage());

                    // 尝试重连
                    if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
                        reconnect();
                    }
                }
            };

            // 连接
            connectionState.postValue(ConnectionState.CONNECTING);
            client.connect();

        } catch (URISyntaxException e) {
            Log.e(TAG, "WebSocket URI语法错误", e);
            connectionState.postValue(ConnectionState.ERROR);
            showToast("WebSocket URI语法错误: " + e.getMessage());
        }
    }

    /**
     * 断开WebSocket连接
     */
    public void disconnect() {
        intentionalClose = true;

        if (client != null && client.isOpen()) {
            client.close();
            showToast("WebSocket连接已断开");
        }

        stopHeartbeat();
        connectionState.postValue(ConnectionState.DISCONNECTED);
        isSubscribed = false;
    }

    /**
     * 重连
     */
    private void reconnect() {
        reconnectAttempts++;
        Log.d(TAG, "尝试重连WebSocket,第" + reconnectAttempts + "次");
        showToast("正在尝试重新连接WebSocket,第" + reconnectAttempts + "次");

        // 使用指数退避策略
        long delay = (long) Math.pow(2, reconnectAttempts) * 1000;

        Disposable disposable = Observable.timer(delay, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(aLong -> {
                    intentionalClose = false;
                    connect();
                });

        disposables.add(disposable);
    }

    /**
     * 开始心跳
     */
    private void startHeartbeat() {
        stopHeartbeat();

        heartbeatDisposable = Observable.interval(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(aLong -> {
                    if (client != null && client.isOpen()) {
                        // 构建STOMP格式的心跳消息
                        try {
                            StringBuilder stompFrame = new StringBuilder();
                            stompFrame.append("SEND\n");
                            stompFrame.append("destination:").append(HEARTBEAT_ENDPOINT).append("\n");
                            stompFrame.append("content-type:application/json;charset=UTF-8\n");
                            stompFrame.append("satoken:").append(token).append("\n");
                            stompFrame.append("\n"); // 空行分隔头部和正文

                            // 使用空 JSON 对象作为消息体
                            JSONObject heartbeatMessage = new JSONObject();
                            stompFrame.append(heartbeatMessage.toString());
                            stompFrame.append("\0"); // STOMP帧结束符

                            client.send(stompFrame.toString());
                            Log.d(TAG, "发送心跳到: " + HEARTBEAT_ENDPOINT);
                        } catch (Exception e) {
                            Log.e(TAG, "构建心跳消息失败", e);
                        }
                    }
                });

        disposables.add(heartbeatDisposable);
    }

    /**
     * 停止心跳
     */
    private void stopHeartbeat() {
        if (heartbeatDisposable != null && !heartbeatDisposable.isDisposed()) {
            heartbeatDisposable.dispose();
        }
    }

    /**
     * 发送 CONNECT 帧以完成 STOMP 握手
     */
    private void sendConnectFrame() {
        if (client != null && client.isOpen()) {
            StringBuilder connectFrame = new StringBuilder();
            connectFrame.append("CONNECT\n");
            connectFrame.append("accept-version:1.2\n");
            connectFrame.append("heart-beat:10000,10000\n");
            connectFrame.append("satoken:").append(token).append("\n");
            connectFrame.append("\n"); // 空行分隔头部和正文
            connectFrame.append("\0"); // STOMP帧结束符

            client.send(connectFrame.toString());
            Log.d(TAG, "发送CONNECT帧: " + connectFrame.toString());
        }
    }

    /**
     * 订阅个人消息通道
     */
    private void subscribeToPersonalMessages() {
        if (client != null && client.isOpen()) {
            try {
                // 获取当前用户ID
                String userId = prefsManager.getUserId();
                if (userId == null || userId.isEmpty()) {
                    Log.e(TAG, "用户ID为空,无法订阅消息");
                    showToast("用户ID为空,无法订阅消息");
                    return;
                }

                // 构建订阅消息
                StringBuilder subscribeMessage = new StringBuilder();
                subscribeMessage.append("SUBSCRIBE\n");
                subscribeMessage.append("id:sub-0\n");
                subscribeMessage.append("destination:").append(String.format(PERSONAL_MESSAGE_SUBSCRIPTION, userId)).append("\n");
                subscribeMessage.append("\n"); // 空行分隔头部和正文
                subscribeMessage.append("\0"); // STOMP帧结束符

                client.send(subscribeMessage.toString());
                Log.d(TAG, "发送订阅消息到: " + String.format(PERSONAL_MESSAGE_SUBSCRIPTION, userId));
                isSubscribed = true;
            } catch (Exception e) {
                Log.e(TAG, "发送订阅消息失败", e);
            }
        }
    }

    /**
     * 处理接收到的消息(支持 CONNECTED、MESSAGE、ERROR 三种STOMP消息)
     */
    private void handleReceivedMessage(String message) {
        try {
            Log.d(TAG, "开始处理接收到的消息: " + message);
            if (message.startsWith("CONNECTED")) {
                Log.d(TAG, "收到 CONNECTED 帧,STOMP连接成功");
                // 连接成功后订阅个人消息通道
                if (!isSubscribed) {
                    subscribeToPersonalMessages();
                }
            } else if (message.startsWith("MESSAGE")) {
                Log.d(TAG, "收到STOMP MESSAGE消息");
                int headerEnd = message.indexOf("\n\n");
                if (headerEnd != -1) {
                    String body = message.substring(headerEnd + 2).replace("\0", "");
                    Log.d(TAG, "收到消息体: " + body);
                    try {
                        MessageDTO messageDTO = gson.fromJson(body, MessageDTO.class);
                        if (messageDTO != null && messageDTO.getMessage() != null) {
                            // 更新消息状态为已接收
                            messageDTO.getMessage().setStatus(MessageStatus.UNREAD);
                            Log.d(TAG, "解析消息成功: " + messageDTO);
                            
                            // 在主线程上发送消息更新
                            mainHandler.post(() -> {
                                receivedMessage.setValue(messageDTO);
                                showToast("收到来自 " + messageDTO.getMessage().getSender() + " 的消息: " + messageDTO.getMessage().getContent());
                            });
                        } else {
                            Log.e(TAG, "解析消息失败或消息内容为空");
                        }
                    } catch (Exception e) {
                        Log.e(TAG, "解析JSON消息体失败", e);
                        e.printStackTrace();
                    }
                } else {
                    Log.e(TAG, "无法找到消息体");
                }
            } else if (message.startsWith("ERROR")) {
                Log.e(TAG, "收到STOMP ERROR消息: " + message);
                int headerEnd = message.indexOf("\n\n");
                if (headerEnd != -1) {
                    String errorBody = message.substring(headerEnd + 2).replace("\0", "");
                    Log.e(TAG, "STOMP错误消息体: " + errorBody);
                    showToast("WebSocket错误: " + errorBody);
                }
            } else {
                Log.d(TAG, "收到其他类型的消息: " + message);
            }
        } catch (Exception e) {
            Log.e(TAG, "处理接收消息失败", e);
            e.printStackTrace();
        }
    }

    /**
     * 发送聊天消息
     * @param content 消息内容
     * @param receiver 接收者ID
     * @param conversationId 会话ID
     * @return 发送的消息对象
     */
    public MessageDTO sendChatMessage(String content, String receiver, String conversationId) {
        if (client == null || !client.isOpen()) {
            Log.e(TAG, "WebSocket未连接,无法发送消息");
            showToast("WebSocket未连接,无法发送消息");
            return null;
        }

        if (token == null || token.isEmpty()) {
            Log.e(TAG, "用户未登录,无法发送消息");
            showToast("用户未登录,无法发送消息");
            return null;
        }

        try {
            // 获取当前用户ID
            String userId = prefsManager.getUserId();
            if (userId == null || userId.isEmpty()) {
                Log.e(TAG, "用户ID为空,无法发送消息");
                showToast("用户ID为空,无法发送消息");
                return null;
            }

            // 获取当前时间
            String currentTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault()).format(new Date());

            // 创建消息对象 - 消息ID由后端生成
            MessageChatBO messageChatBO = MessageChatBO.builder()
                    .content(content)
                    .type(MessageTypeEnum.TEXT) // 只支持TEXT类型
                    .sender(userId)
                    .receiver(receiver)
                    .sendTime(currentTime)
                    .status(MessageStatus.SENDING)
                    .build();

            // 创建会话参与者列表
            List<String> participants = new ArrayList<>();
            participants.add(userId);
            participants.add(receiver);

            // 创建会话对象 - 使用当前会话ID,不设置lastMessageId
            MessageConversationBO conversationBO = MessageConversationBO.builder()
                    .id(conversationId)
                    .participantIds(participants)
                    .lastUpdateTime(currentTime)
                    .unreadCount(0)
                    .sessionType(MessageSessionTypeEnum.USER)
                    .build();

            // 创建消息DTO
            MessageDTO messageDTO = new MessageDTO(messageChatBO, conversationBO);

            // 构建STOMP格式的消息
            StringBuilder stompFrame = new StringBuilder();
            stompFrame.append("SEND\n");
            stompFrame.append("destination:").append(MESSAGE_ENDPOINT).append(receiver).append("\n");
            stompFrame.append("content-type:application/json;charset=UTF-8\n");
            stompFrame.append("satoken:").append(token).append("\n");
            stompFrame.append("\n"); // 空行分隔头部和正文

            // 将消息对象转换为JSON
            String messageJson = gson.toJson(messageDTO);
            stompFrame.append(messageJson);
            stompFrame.append("\0"); // STOMP帧结束符

            client.send(stompFrame.toString());
            Log.d(TAG, "发送消息到: " + MESSAGE_ENDPOINT + receiver + ", 内容: " + messageJson);

            return messageDTO;
        } catch (Exception e) {
            Log.e(TAG, "发送消息失败", e);
            showToast("发送消息失败: " + e.getMessage());
            return null;
        }
    }

    /**
     * 显示Toast提示
     */
    private void showToast(final String message) {
        mainHandler.post(() -> Toast.makeText(context, message, Toast.LENGTH_SHORT).show());
    }

    /**
     * 释放资源
     */
    public void release() {
        disconnect();
        disposables.clear();
        instance = null;
    }

    /**
     * 连接状态枚举
     */
    public enum ConnectionState {
        DISCONNECTED,  // 未连接
        CONNECTING,    // 连接中
        CONNECTED,     // 已连接
        ERROR          // 错误
    }
}


http://www.kler.cn/a/581325.html

相关文章:

  • Chrome 扩展开发 API实战:Bookmarks(二)
  • Python高级之操作Mysql
  • 华为OD机试 - 平均像素值-贪心算法(Java 2024 E卷 100分)
  • 【区块链+ 医疗健康】基于区块链和AI 技术的儿童近视防控大数据平台 | FISCO BCOS 应用案例
  • iTextSharp-PDF批量导出
  • 3.3.2 用仿真图实现点灯效果
  • 软考高级信息系统项目管理师笔记-第22章组织通用治理
  • nginx的使用
  • Ubuntu22.04修改root用户并安装cuda
  • 网络安全之命令
  • 发展史 | 深度学习 / 云计算
  • Vue.js探秘:从基础到高级教程
  • Spring Boot笔记(上)
  • Leetcode 刷题笔记1 动态规划part10
  • Spark性能优化深度剖析:十大实战策略与案例解析
  • Smart contract -- 自毁合约
  • 【MySQL基础-2】使用 Docker 搭建 MySQL:配置文件详解与实战案例
  • FerretDB 2.0:开源 MongoDB 替代品的安装与使用指南
  • 笔记:代码随想录算法训练营day41:LeetCode121. 买卖股票的最佳时机、122.买卖股票的最佳时机II、123.买卖股票的最佳时机III
  • SpringBoot基础Kafka示例