Android WebSocket工具类:重连、心跳、消息队列一站式解决方案
- 依赖库
使用 OkHttp 的WebSocket支持。
在 build.gradle 中添加依赖:
implementation 'com.squareup.okhttp3:okhttp:4.9.3'
- WebSocket工具类实现
import okhttp3.*;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class WebSocketManager {
private static final String TAG = "WebSocketManager";
private OkHttpClient client;
private WebSocket webSocket;
private String wsUrl;
private AtomicInteger reconnectCount = new AtomicInteger(0);
private boolean isConnecting = false;
private boolean isConnected = false;
private Handler mainHandler = new Handler(Looper.getMainLooper());
private Runnable heartbeatRunnable;
private ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>(); // 线程安全的消息队列
private ExecutorService executorService = Executors.newSingleThreadExecutor(); // 线程池
private int maxReconnectCount = 5; // 最大重连次数
private long reconnectInterval = 5000; // 重连间隔时间(毫秒)
private long heartbeatInterval = 30000; // 心跳间隔时间(毫秒)
private long heartbeatTimeout = 60000; // 心跳超时时间(毫秒)
private WebSocketCallback callback;
private WebSocketListener webSocketListener = new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
Log.d(TAG, "WebSocket connected");
isConnecting = false;
isConnected = true;
reconnectCount.set(0); // 重置重连次数
startHeartbeat(); // 开始心跳
sendQueuedMessages(); // 发送缓存的消息
if (callback != null) {
mainHandler.post(() -> callback.onConnected());
}
}
@Override
public void onMessage(WebSocket webSocket, String text) {
Log.d(TAG, "Received message: " + text);
if (callback != null) {
mainHandler.post(() -> callback.onMessage(text));
}
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
Log.d(TAG, "WebSocket closed: " + reason);
isConnecting = false;
isConnected = false;
stopHeartbeat(); // 停止心跳
if (callback != null) {
mainHandler.post(() -> callback.onDisconnected());
}
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
Log.e(TAG, "WebSocket failed: " + t.getMessage());
isConnecting = false;
isConnected = false;
stopHeartbeat(); // 停止心跳
if (callback != null) {
mainHandler.post(() -> callback.onError(t));
}
reconnect(); // 尝试重连
}
};
// 私有构造函数,使用Builder模式创建实例
private WebSocketManager(Builder builder) {
this.wsUrl = builder.wsUrl;
this.maxReconnectCount = builder.maxReconnectCount;
this.reconnectInterval = builder.reconnectInterval;
this.heartbeatInterval = builder.heartbeatInterval;
this.heartbeatTimeout = builder.heartbeatTimeout;
client = new OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.writeTimeout(10, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.build();
}
// 连接WebSocket
public void connect() {
if (!isConnecting && !isConnected) {
isConnecting = true;
executorService.submit(() -> {
Request request = new Request.Builder().url(wsUrl).build();
webSocket = client.newWebSocket(request, webSocketListener);
});
}
}
// 断开连接
public void disconnect() {
if (webSocket != null) {
executorService.submit(() -> webSocket.close(1000, "Normal closure"));
}
releaseResources();
}
// 发送消息
public void sendMessage(String message) {
if (webSocket != null && isConnected) {
executorService.submit(() -> {
boolean sent = webSocket.send(message);
if (!sent) {
// 发送失败,将消息加入队列
messageQueue.offer(message);
}
});
} else {
// 网络未连接时,将消息加入队列
messageQueue.offer(message);
}
}
// 重连机制
private void reconnect() {
if (reconnectCount.get() < maxReconnectCount) {
reconnectCount.incrementAndGet();
mainHandler.postDelayed(() -> {
Log.d(TAG, "Reconnecting... Attempt: " + reconnectCount.get());
connect();
}, reconnectInterval);
} else {
Log.e(TAG, "Max reconnection attempts reached");
}
}
// 发送缓存的消息
private void sendQueuedMessages() {
executorService.submit(() -> {
while (!messageQueue.isEmpty()) {
String message = messageQueue.poll();
if (message != null) {
boolean sent = webSocket.send(message);
if (!sent) {
// 发送失败,将消息重新加入队列
messageQueue.offer(message);
break;
}
}
}
});
}
// 开始心跳
private void startHeartbeat() {
heartbeatRunnable = new Runnable() {
@Override
public void run() {
if (isConnected) {
webSocket.send("Heartbeat"); // 发送心跳消息
mainHandler.postDelayed(this, heartbeatInterval);
}
}
};
mainHandler.post(heartbeatRunnable);
}
// 停止心跳
private void stopHeartbeat() {
mainHandler.removeCallbacks(heartbeatRunnable);
}
// 释放资源
private void releaseResources() {
executorService.shutdown();
mainHandler.removeCallbacksAndMessages(null);
}
// 设置回调
public void setWebSocketCallback(WebSocketCallback callback) {
this.callback = callback;
}
// Builder模式
public static class Builder {
private String wsUrl;
private int maxReconnectCount = 5;
private long reconnectInterval = 5000;
private long heartbeatInterval = 30000;
private long heartbeatTimeout = 60000;
public Builder(String wsUrl) {
this.wsUrl = wsUrl;
}
public Builder setMaxReconnectCount(int maxReconnectCount) {
this.maxReconnectCount = maxReconnectCount;
return this;
}
public Builder setReconnectInterval(long reconnectInterval) {
this.reconnectInterval = reconnectInterval;
return this;
}
public Builder setHeartbeatInterval(long heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
return this;
}
public Builder setHeartbeatTimeout(long heartbeatTimeout) {
this.heartbeatTimeout = heartbeatTimeout;
return this;
}
public WebSocketManager build() {
return new WebSocketManager(this);
}
}
// 回调接口
public interface WebSocketCallback {
void onMessage(String message);
void onConnected();
void onDisconnected();
void onError(Throwable t);
}
}
- 功能说明
3.1 连接状态管理
增加 isConnecting 状态,区分连接中和已连接状态。
3.2 消息重发机制
在发送失败时,将消息重新加入队列,等待重连后发送。
3.3 动态心跳机制
支持动态调整心跳间隔和超时时间。
3.4 资源释放
在断开连接时,释放线程池和Handler资源,避免内存泄漏。
3.5 日志分级
通过 Log.d 和 Log.e 区分调试日志和错误日志。
- 使用示例
4.1 初始化并连接WebSocket
WebSocketManager webSocketManager = new WebSocketManager.Builder("ws://your-websocket-url")
.setMaxReconnectCount(10)
.setReconnectInterval(3000)
.setHeartbeatInterval(60000)
.setHeartbeatTimeout(120000)
.build();
webSocketManager.setWebSocketCallback(new WebSocketManager.WebSocketCallback() {
@Override
public void onMessage(String message) {
Log.d(TAG, "Received: " + message);
}
@Override
public void onConnected() {
Log.d(TAG, "Connected");
}
@Override
public void onDisconnected() {
Log.d(TAG, "Disconnected");
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "Error: " + t.getMessage());
}
});
webSocketManager.connect();
4.2 发送消息
webSocketManager.sendMessage("Hello, WebSocket!");
4.3 断开连接
webSocketManager.disconnect();
4.4 处理生命周期
@Override
protected void onDestroy() {
super.onDestroy();
webSocketManager.disconnect();
}
在Activity中使用WebSocket工具类非常简单。我们需要确保WebSocket的生命周期与Activity的生命周期绑定,避免内存泄漏和资源浪费。以下是完整的示例代码,展示如何在Activity中初始化、使用和释放WebSocket工具类。
在Activity中使用WebSocket工具类
- Activity代码示例
import android.os.Bundle;
import android.util.Log;
import androidx.annotation.Nullable;
import androidx.appcompat.app.AppCompatActivity;
public class MainActivity extends AppCompatActivity {
private static final String TAG = "MainActivity";
private WebSocketManager webSocketManager;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 初始化WebSocketManager
webSocketManager = new WebSocketManager.Builder("ws://your-websocket-url")
.setMaxReconnectCount(10)
.setReconnectInterval(3000)
.setHeartbeatInterval(60000)
.setHeartbeatTimeout(120000)
.build();
// 设置WebSocket回调
webSocketManager.setWebSocketCallback(new WebSocketManager.WebSocketCallback() {
@Override
public void onMessage(String message) {
Log.d(TAG, "Received: " + message);
// 在这里处理接收到的消息
}
@Override
public void onConnected() {
Log.d(TAG, "Connected");
// 在这里处理连接成功事件
}
@Override
public void onDisconnected() {
Log.d(TAG, "Disconnected");
// 在这里处理连接断开事件
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "Error: " + t.getMessage());
// 在这里处理错误事件
}
});
// 连接WebSocket
webSocketManager.connect();
// 发送消息
webSocketManager.sendMessage("Hello, WebSocket!");
}
@Override
protected void onDestroy() {
super.onDestroy();
// 断开WebSocket连接并释放资源
if (webSocketManager != null) {
webSocketManager.disconnect();
}
}
}
- 代码说明
2.1 初始化WebSocketManager
在 onCreate 方法中,使用 Builder 模式初始化 WebSocketManager,并设置最大重连次数、重连间隔、心跳间隔和超时时间。
webSocketManager = new WebSocketManager.Builder("ws://your-websocket-url")
.setMaxReconnectCount(10)
.setReconnectInterval(3000)
.setHeartbeatInterval(60000)
.setHeartbeatTimeout(120000)
.build();
2.2 设置回调
通过 setWebSocketCallback 方法设置回调接口,处理WebSocket的连接、消息、断开和错误事件。
webSocketManager.setWebSocketCallback(new WebSocketManager.WebSocketCallback() {
@Override
public void onMessage(String message) {
Log.d(TAG, "Received: " + message);
// 在这里处理接收到的消息
}
@Override
public void onConnected() {
Log.d(TAG, "Connected");
// 在这里处理连接成功事件
}
@Override
public void onDisconnected() {
Log.d(TAG, "Disconnected");
// 在这里处理连接断开事件
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "Error: " + t.getMessage());
// 在这里处理错误事件
}
});
2.3 连接WebSocket
在 onCreate 方法中调用 connect() 方法,启动WebSocket连接。
webSocketManager.connect();
2.4 发送消息
通过 sendMessage 方法发送消息。
webSocketManager.sendMessage("Hello, WebSocket!");
2.5 释放资源
在 onDestroy 方法中调用 disconnect() 方法,断开WebSocket连接并释放资源。
@Override
protected void onDestroy() {
super.onDestroy();
if (webSocketManager != null) {
webSocketManager.disconnect();
}
}
- 注意事项
3.1 生命周期绑定
确保WebSocket的生命周期与Activity的生命周期绑定,避免内存泄漏。
在 onDestroy 中释放资源。
3.2 线程安全
WebSocket的回调方法(如 onMessage)运行在后台线程,如果需要更新UI,请使用 Handler 或 runOnUiThread。
3.3 网络权限
在 AndroidManifest.xml 中添加网络权限:
<!-- 网络权限-->
<uses-permission android:name="android.permission.INTERNET"/>
<!-- wifi权限-->
<uses-permission android:name="android.permission.ACCESS_WIFI_STATE"/>
<uses-permission android:name="android.permission.CHANGE_WIFI_STATE"/>
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/>
<uses-permission android:name="android.permission.CHANGE_NETWORK_STATE"/>
3.4 SSL/TLS支持
如果WebSocket服务器使用 wss 协议(即基于SSL/TLS的安全连接),OkHttp 会自动处理SSL/TLS握手,无需额外配置。
- 扩展功能
4.1 动态调整心跳间隔
可以根据服务器响应动态调整心跳间隔。例如,在收到服务器的心跳响应后,延长心跳间隔。
@Override
public void onMessage(String message) {
if ("HeartbeatResponse".equals(message)) {
// 动态调整心跳间隔
webSocketManager.setHeartbeatInterval(120000);
}
}
4.2 消息重发机制
在发送失败时,将消息加入队列,等待重连后重新发送。
webSocketManager.sendMessage("Hello, WebSocket!");
4.3 日志分级
通过 Log.d 和 Log.e 区分调试日志和错误日志,方便调试和生产环境使用。
- 总结
在Activity中使用优化后的WebSocket工具类非常简单。通过生命周期绑定、回调接口和资源释放机制,可以确保WebSocket的高效运行和资源管理。以下是完整的流程:
初始化:在 onCreate 中初始化 WebSocketManager。
设置回调:处理连接、消息、断开和错误事件。
连接WebSocket:调用 connect() 方法。
发送消息:通过 sendMessage 方法发送消息。
释放资源:在 onDestroy 中调用 disconnect() 方法。