Android实现Socket通信
问题:
我在Android端对接后端Socket服务实现消息模块的时候,使用WebSocketClient 库,手动构造了订阅和发送消息的 STOMP 帧,,Socket连接成功建立,但是当用户发送消息的时候,对方无法接受到。
然后我Web端使用了成熟的 STOMP 库(Stomp.over 和 SockJS),所有接口测试通过。
排查经历:
- 起初我以为是在安卓端手动构造了订阅和发送消息的 STOMP 帧的格式问题,后来经过抓包排查,没有问题
- 然后我以为是,由于Android虚拟机和我手机调试网络的原因,我在本地创建了两个虚拟机,运行项目发现还是消息发送成功,推送失败。
- 因为问题定位到推送消息的步骤失败,后端采用的是Spring WebSocket的方法
messagingTemplate.convertAndSendToUser()
,我开始研究这个方法的底层逻辑,并在网上查找资料,发现有的人因为Header构建的问题导致推送消息失效,我又开始了更改调试,没有解决。 - 因为我Web端是正常运行的,Android是失败的,所以我使用Wireshark开始抓包对比,到底是哪个步骤出现了问题。
失败案例抓包数据:
成功案例抓包数据:
Socket服务器:
Socket建立连接序列图
问题原因:
web端的代码使用了成熟的 STOMP 库(Stomp.over 和 SockJS),它在建立连接时会自动发送 CONNECT 帧,完成 STOMP 握手;而你 Android 端的代码则是直接用 WebSocketClient 建立连接,并手动构造了订阅和发送消息的 STOMP 帧,但没有看到发送 CONNECT 帧的过程。
- 缺少 CONNECT 握手
STOMP 协议要求客户端在发送其他 STOMP 帧(例如 SUBSCRIBE 或 SEND)之前,先发送 CONNECT 帧并等待服务器返回 CONNECTED 帧。如果没有发送 CONNECT 帧,服务器可能会忽略你后续发送的订阅或消息帧,从而只看到 ACK 包(底层 TCP 的 ACK 应答),而没有实际的消息推送。
解决方案:
- 发送 CONNECT 帧
在 WebSocket 连接成功(onOpen 回调)后,尽快构造并发送一个标准的 CONNECT 帧。例如,在 Android 端的 onOpen 方法中添加如下代码:
java
复制编辑
@Override
public void onOpen(ServerHandshake handshakedata) {
Log.d(TAG, "WebSocket连接成功");
// 发送 CONNECT 帧,指定支持的 STOMP 版本和心跳
StringBuilder connectFrame = new StringBuilder();
connectFrame.append("CONNECT\n");
connectFrame.append("accept-version:1.2\n");
connectFrame.append("heart-beat:10000,10000\n");
connectFrame.append("satoken:").append(token).append("\n");
connectFrame.append("\n"); // 空行分隔头部和正文
connectFrame.append("\0"); // STOMP帧结束符
client.send(connectFrame.toString());
connectionState.postValue(ConnectionState.CONNECTED);
reconnectAttempts = 0;
startHeartbeat();
// 在收到服务器的 CONNECTED 帧之后再订阅个人消息通道
// 你可以在 onMessage 中判断收到 CONNECTED 后再调用 subscribeToPersonalMessages()
// 这里为了测试可以先延时一会儿再订阅
new Handler(Looper.getMainLooper()).postDelayed(() -> subscribeToPersonalMessages(), 500);
showToast("WebSocket连接成功");
}
这样确保服务器能够正确识别你的 STOMP 客户端,处理后续的订阅和消息发送。
完整Android端代码
package com.xjl.mobilehotel.websocket;
import android.content.Context;
import android.content.Intent;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
import android.widget.Toast;
import androidx.lifecycle.LiveData;
import androidx.lifecycle.MutableLiveData;
import com.google.gson.Gson;
import com.xjl.mobilehotel.model.MessageChatBO;
import com.xjl.mobilehotel.model.MessageConversationBO;
import com.xjl.mobilehotel.model.MessageDTO;
import com.xjl.mobilehotel.model.config.MessageSessionTypeEnum;
import com.xjl.mobilehotel.model.config.MessageStatus;
import com.xjl.mobilehotel.model.config.MessageTypeEnum;
import com.xjl.mobilehotel.ui.auth.LoginActivity;
import com.xjl.mobilehotel.utils.SharedPreferencesManager;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.json.JSONObject;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
/**
* WebSocket管理类
*/
public class WebSocketManager {
private static final String TAG = "WebSocketManager";
// 服务器地址
private static final String SERVER_URL = "ws://192.168.167.39:8008";
private static final String WS_PATH = "/ws";
// 心跳端点
private static final String HEARTBEAT_ENDPOINT = "/socket/heartbeat";
// 消息端点
private static final String MESSAGE_ENDPOINT = "/socket/message/";
// 个人消息订阅端点
private static final String PERSONAL_MESSAGE_SUBSCRIPTION = "/user/%s/queue/message";
// 最大重连次数
private static final int MAX_RECONNECT_ATTEMPTS = 5;
// 心跳间隔(毫秒)
private static final long HEARTBEAT_INTERVAL = 10000;
private static WebSocketManager instance;
private WebSocketClient client;
private Context context;
private SharedPreferencesManager prefsManager;
private final CompositeDisposable disposables = new CompositeDisposable();
private Disposable heartbeatDisposable;
private int reconnectAttempts = 0;
private boolean intentionalClose = false;
private Handler mainHandler;
private String token;
private Gson gson;
private boolean isSubscribed = false;
// 连接状态
private final MutableLiveData<ConnectionState> connectionState = new MutableLiveData<>(ConnectionState.DISCONNECTED);
// 接收到的消息
private final MutableLiveData<MessageDTO> receivedMessage = new MutableLiveData<>();
private WebSocketManager() {
// 私有构造方法
mainHandler = new Handler(Looper.getMainLooper());
gson = new Gson();
}
/**
* 获取单例实例
*/
public static synchronized WebSocketManager getInstance() {
if (instance == null) {
instance = new WebSocketManager();
}
return instance;
}
/**
* 初始化
*/
public void init(Context context) {
this.context = context.getApplicationContext();
this.prefsManager = SharedPreferencesManager.getInstance(context);
}
/**
* 获取连接状态
*/
public LiveData<ConnectionState> getConnectionState() {
return connectionState;
}
/**
* 获取接收到的消息
*/
public LiveData<MessageDTO> getReceivedMessage() {
return receivedMessage;
}
/**
* 连接WebSocket
*/
public void connect() {
// 检查是否已登录
token = prefsManager.getToken();
if (token == null || token.isEmpty()) {
Log.e(TAG, "用户未登录,无法连接WebSocket");
showToast("用户未登录,无法连接WebSocket");
// 跳转到登录界面
Intent intent = new Intent(context, LoginActivity.class);
intent.addFlags(Intent.FLAG_ACTIVITY_NEW_TASK);
context.startActivity(intent);
return;
}
// 如果已经连接,先断开
disconnect();
try {
// 使用HTTP URL,SockJS会自动处理协议转换
String url = SERVER_URL + WS_PATH + "?satoken=" + token;
Log.d(TAG, "正在连接WebSocket: " + url);
showToast("正在连接WebSocket服务器...");
// 设置请求头
Map<String, String> headers = new HashMap<>();
headers.put("satoken", token);
// 创建WebSocket客户端
client = new WebSocketClient(new URI(url), new Draft_6455(), headers, 0) {
@Override
public void onOpen(ServerHandshake handshakedata) {
Log.d(TAG, "WebSocket连接成功");
connectionState.postValue(ConnectionState.CONNECTED);
reconnectAttempts = 0;
startHeartbeat();
// 连接成功后先发送 CONNECT 帧,完成 STOMP 握手
sendConnectFrame();
showToast("WebSocket连接成功, 正在发送CONNECT帧");
}
@Override
public void onMessage(String message) {
Log.d(TAG, "收到WebSocket消息: " + message);
// 打印消息的前100个字符
String logMessage = message.length() > 100 ? message.substring(0, 100) + "..." : message;
Log.d(TAG, "收到消息预览: " + logMessage);
// 处理消息
handleReceivedMessage(message);
}
@Override
public void onClose(int code, String reason, boolean remote) {
Log.d(TAG, "WebSocket连接关闭: code=" + code + ", reason=" + reason + ", remote=" + remote);
connectionState.postValue(ConnectionState.DISCONNECTED);
stopHeartbeat();
isSubscribed = false;
if (remote && !intentionalClose) {
showToast("WebSocket连接被服务器关闭: " + reason);
}
// 如果不是主动关闭,尝试重连
if (!intentionalClose && reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
reconnect();
}
}
@Override
public void onError(Exception ex) {
Log.e(TAG, "WebSocket连接错误", ex);
connectionState.postValue(ConnectionState.ERROR);
showToast("WebSocket连接失败: " + ex.getMessage());
// 尝试重连
if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
reconnect();
}
}
};
// 连接
connectionState.postValue(ConnectionState.CONNECTING);
client.connect();
} catch (URISyntaxException e) {
Log.e(TAG, "WebSocket URI语法错误", e);
connectionState.postValue(ConnectionState.ERROR);
showToast("WebSocket URI语法错误: " + e.getMessage());
}
}
/**
* 断开WebSocket连接
*/
public void disconnect() {
intentionalClose = true;
if (client != null && client.isOpen()) {
client.close();
showToast("WebSocket连接已断开");
}
stopHeartbeat();
connectionState.postValue(ConnectionState.DISCONNECTED);
isSubscribed = false;
}
/**
* 重连
*/
private void reconnect() {
reconnectAttempts++;
Log.d(TAG, "尝试重连WebSocket,第" + reconnectAttempts + "次");
showToast("正在尝试重新连接WebSocket,第" + reconnectAttempts + "次");
// 使用指数退避策略
long delay = (long) Math.pow(2, reconnectAttempts) * 1000;
Disposable disposable = Observable.timer(delay, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(aLong -> {
intentionalClose = false;
connect();
});
disposables.add(disposable);
}
/**
* 开始心跳
*/
private void startHeartbeat() {
stopHeartbeat();
heartbeatDisposable = Observable.interval(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(aLong -> {
if (client != null && client.isOpen()) {
// 构建STOMP格式的心跳消息
try {
StringBuilder stompFrame = new StringBuilder();
stompFrame.append("SEND\n");
stompFrame.append("destination:").append(HEARTBEAT_ENDPOINT).append("\n");
stompFrame.append("content-type:application/json;charset=UTF-8\n");
stompFrame.append("satoken:").append(token).append("\n");
stompFrame.append("\n"); // 空行分隔头部和正文
// 使用空 JSON 对象作为消息体
JSONObject heartbeatMessage = new JSONObject();
stompFrame.append(heartbeatMessage.toString());
stompFrame.append("\0"); // STOMP帧结束符
client.send(stompFrame.toString());
Log.d(TAG, "发送心跳到: " + HEARTBEAT_ENDPOINT);
} catch (Exception e) {
Log.e(TAG, "构建心跳消息失败", e);
}
}
});
disposables.add(heartbeatDisposable);
}
/**
* 停止心跳
*/
private void stopHeartbeat() {
if (heartbeatDisposable != null && !heartbeatDisposable.isDisposed()) {
heartbeatDisposable.dispose();
}
}
/**
* 发送 CONNECT 帧以完成 STOMP 握手
*/
private void sendConnectFrame() {
if (client != null && client.isOpen()) {
StringBuilder connectFrame = new StringBuilder();
connectFrame.append("CONNECT\n");
connectFrame.append("accept-version:1.2\n");
connectFrame.append("heart-beat:10000,10000\n");
connectFrame.append("satoken:").append(token).append("\n");
connectFrame.append("\n"); // 空行分隔头部和正文
connectFrame.append("\0"); // STOMP帧结束符
client.send(connectFrame.toString());
Log.d(TAG, "发送CONNECT帧: " + connectFrame.toString());
}
}
/**
* 订阅个人消息通道
*/
private void subscribeToPersonalMessages() {
if (client != null && client.isOpen()) {
try {
// 获取当前用户ID
String userId = prefsManager.getUserId();
if (userId == null || userId.isEmpty()) {
Log.e(TAG, "用户ID为空,无法订阅消息");
showToast("用户ID为空,无法订阅消息");
return;
}
// 构建订阅消息
StringBuilder subscribeMessage = new StringBuilder();
subscribeMessage.append("SUBSCRIBE\n");
subscribeMessage.append("id:sub-0\n");
subscribeMessage.append("destination:").append(String.format(PERSONAL_MESSAGE_SUBSCRIPTION, userId)).append("\n");
subscribeMessage.append("\n"); // 空行分隔头部和正文
subscribeMessage.append("\0"); // STOMP帧结束符
client.send(subscribeMessage.toString());
Log.d(TAG, "发送订阅消息到: " + String.format(PERSONAL_MESSAGE_SUBSCRIPTION, userId));
isSubscribed = true;
} catch (Exception e) {
Log.e(TAG, "发送订阅消息失败", e);
}
}
}
/**
* 处理接收到的消息(支持 CONNECTED、MESSAGE、ERROR 三种STOMP消息)
*/
private void handleReceivedMessage(String message) {
try {
Log.d(TAG, "开始处理接收到的消息: " + message);
if (message.startsWith("CONNECTED")) {
Log.d(TAG, "收到 CONNECTED 帧,STOMP连接成功");
// 连接成功后订阅个人消息通道
if (!isSubscribed) {
subscribeToPersonalMessages();
}
} else if (message.startsWith("MESSAGE")) {
Log.d(TAG, "收到STOMP MESSAGE消息");
int headerEnd = message.indexOf("\n\n");
if (headerEnd != -1) {
String body = message.substring(headerEnd + 2).replace("\0", "");
Log.d(TAG, "收到消息体: " + body);
try {
MessageDTO messageDTO = gson.fromJson(body, MessageDTO.class);
if (messageDTO != null && messageDTO.getMessage() != null) {
// 更新消息状态为已接收
messageDTO.getMessage().setStatus(MessageStatus.UNREAD);
Log.d(TAG, "解析消息成功: " + messageDTO);
// 在主线程上发送消息更新
mainHandler.post(() -> {
receivedMessage.setValue(messageDTO);
showToast("收到来自 " + messageDTO.getMessage().getSender() + " 的消息: " + messageDTO.getMessage().getContent());
});
} else {
Log.e(TAG, "解析消息失败或消息内容为空");
}
} catch (Exception e) {
Log.e(TAG, "解析JSON消息体失败", e);
e.printStackTrace();
}
} else {
Log.e(TAG, "无法找到消息体");
}
} else if (message.startsWith("ERROR")) {
Log.e(TAG, "收到STOMP ERROR消息: " + message);
int headerEnd = message.indexOf("\n\n");
if (headerEnd != -1) {
String errorBody = message.substring(headerEnd + 2).replace("\0", "");
Log.e(TAG, "STOMP错误消息体: " + errorBody);
showToast("WebSocket错误: " + errorBody);
}
} else {
Log.d(TAG, "收到其他类型的消息: " + message);
}
} catch (Exception e) {
Log.e(TAG, "处理接收消息失败", e);
e.printStackTrace();
}
}
/**
* 发送聊天消息
* @param content 消息内容
* @param receiver 接收者ID
* @param conversationId 会话ID
* @return 发送的消息对象
*/
public MessageDTO sendChatMessage(String content, String receiver, String conversationId) {
if (client == null || !client.isOpen()) {
Log.e(TAG, "WebSocket未连接,无法发送消息");
showToast("WebSocket未连接,无法发送消息");
return null;
}
if (token == null || token.isEmpty()) {
Log.e(TAG, "用户未登录,无法发送消息");
showToast("用户未登录,无法发送消息");
return null;
}
try {
// 获取当前用户ID
String userId = prefsManager.getUserId();
if (userId == null || userId.isEmpty()) {
Log.e(TAG, "用户ID为空,无法发送消息");
showToast("用户ID为空,无法发送消息");
return null;
}
// 获取当前时间
String currentTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault()).format(new Date());
// 创建消息对象 - 消息ID由后端生成
MessageChatBO messageChatBO = MessageChatBO.builder()
.content(content)
.type(MessageTypeEnum.TEXT) // 只支持TEXT类型
.sender(userId)
.receiver(receiver)
.sendTime(currentTime)
.status(MessageStatus.SENDING)
.build();
// 创建会话参与者列表
List<String> participants = new ArrayList<>();
participants.add(userId);
participants.add(receiver);
// 创建会话对象 - 使用当前会话ID,不设置lastMessageId
MessageConversationBO conversationBO = MessageConversationBO.builder()
.id(conversationId)
.participantIds(participants)
.lastUpdateTime(currentTime)
.unreadCount(0)
.sessionType(MessageSessionTypeEnum.USER)
.build();
// 创建消息DTO
MessageDTO messageDTO = new MessageDTO(messageChatBO, conversationBO);
// 构建STOMP格式的消息
StringBuilder stompFrame = new StringBuilder();
stompFrame.append("SEND\n");
stompFrame.append("destination:").append(MESSAGE_ENDPOINT).append(receiver).append("\n");
stompFrame.append("content-type:application/json;charset=UTF-8\n");
stompFrame.append("satoken:").append(token).append("\n");
stompFrame.append("\n"); // 空行分隔头部和正文
// 将消息对象转换为JSON
String messageJson = gson.toJson(messageDTO);
stompFrame.append(messageJson);
stompFrame.append("\0"); // STOMP帧结束符
client.send(stompFrame.toString());
Log.d(TAG, "发送消息到: " + MESSAGE_ENDPOINT + receiver + ", 内容: " + messageJson);
return messageDTO;
} catch (Exception e) {
Log.e(TAG, "发送消息失败", e);
showToast("发送消息失败: " + e.getMessage());
return null;
}
}
/**
* 显示Toast提示
*/
private void showToast(final String message) {
mainHandler.post(() -> Toast.makeText(context, message, Toast.LENGTH_SHORT).show());
}
/**
* 释放资源
*/
public void release() {
disconnect();
disposables.clear();
instance = null;
}
/**
* 连接状态枚举
*/
public enum ConnectionState {
DISCONNECTED, // 未连接
CONNECTING, // 连接中
CONNECTED, // 已连接
ERROR // 错误
}
}