websocket实现
由于安卓资源管理器展示的路径不尽相同,各种软件保存文件的位置也不一定一样.对于普通用户上传文件时,查找文件可能是一个麻烦的事情.后来想到了一个办法,使用pc端进行辅助上传.
文章目录
- 实现思路
- 1.0 实现
- 定义web与客户端通信数据类型和数据格式
- web端websocket实现
- web端对客户端数据的管理
- pc端实现
- OkHttp3建立websocket连接
- 2.0版本
- spring-boot放到nginx后面
- spring-boot 放到gateway后面
- spring-boot 放到nginx gateway后面
- ws升级为wss
- 其他
- springboot打包
实现思路
- pc端与服务器建立websocket连接;
- 服务器将sessionId传递到pc端;
- pc端生成二维码;
- 手机端扫描二维码,读取pc端sessionId;
- 手机端与服务器建立websocket连接;
- 手机端将fileId(后面再解释)、pc端sessionId、token等参数传递给服务器;
- 服务器更新pc端session 对应的fileId;
- 服务器将fileId、token等发送到pc端;
- pc使用token、fileId等请求文件列表并进行展示;
- 手机端、pc端进行文件修改后,向服务器发送给更新信号,服务器将更新信号转发到对端。
1.0 实现
定义web与客户端通信数据类型和数据格式
- 定义web与客户端通信数据类型
public class MsgType {
public static final int UPDATE = 0; //提示客户端数据发生更新
public static final int REQ = 1; //发送/接受fileId等字段
public static final int SELF = 3; //建立连接后,web端发送client其sessionId
public static final int ERR_SESSION = 4; //提示session不存在或已close
public static final int HEART_BEAT = 100; //心跳包
}
- 定义web与客户端通信数据格式
@Data
public class MsgData {
private int type; //对应 MsgType
private String sessionId; //SELF 对应自身sessionId; REQ 对应pc端sessionId;
private String fileId; //建立连接后,向pc端发送fileId等字段
web端websocket实现
创建spring-boot项目,添加web\websocket相关依赖
使用maven引入websocket依赖;
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
配置websocket和访问路径的映射关系
@Configuration //配置websocket和访问路径的映射关系
@EnableWebSocket // 全局开启WebSocket支持
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new WebSocketServer(), "/websocket").setAllowedOrigins("*");
}
}
web端对客户端数据的管理
- 定义web管理session的数据结构
@Data
public class SessionData {
private int sessionType; // 1 master(app). 0 pc
private String fileId; //pc会话ID
private WebSocketSession session;
private String sessionId;
//虽然可以通过session.getId()获取到sessionId,但session关闭后,读取就会报错
web端对session的管理逻辑
-
新创建的连接添加到链表上,web向客户端发送SELF,告知其对应的sessionId;
-
断开连接时,如果是pc端session直接从链表中删除,如果是app端session,将其他相同fileId的session全部关闭并从链表删除;
-
接收到新消息后,根据消息类型进行分类处理:
- 心跳包,则直接返回;
- REQ app发送的fileId\pc端sessionId等字段,修改sessions上app连接和pc端SessionData内的fileId字段;
并将fileId等字段发送给pc端; - UPDATE 给所有相同fileId的session发送更新信号;
注意: sessions遍历\删除\添加必须添加synchronized,否则ConcurrentModificationException
package com.example.im.ws;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* @ClassName WebSocketServer
* @Description 处理websocket 连接
* @Author guchuanhang
* @date 2025/1/25 14:01
* @Version 1.0
**/
@Slf4j
public class WebSocketServer extends TextWebSocketHandler {
private final Object syncObject = new Object();
private final List<SessionData> sessions =
new ArrayList<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info("New connection established: " + session.getId());
SessionData sessionData = new SessionData(session);
synchronized (syncObject) {
sessions.add(sessionData);
}
MsgData msgData = new MsgData();
msgData.setType(MsgType.SELF);
msgData.setSessionId(session.getId());
session.sendMessage(new TextMessage(new Gson().toJson(msgData)));
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message)
throws Exception {
String payload = message.getPayload();
log.info("handleTextMessage: " + session.getId());
log.info("Received message: " + payload);
final MsgData msgData = new Gson().fromJson(payload, MsgData.class);
//master 发来的需求.
switch (msgData.getType()) {
case MsgType.HEART_BEAT: {
//heart beat
break;
}
case MsgType.REQ: {
//set master
{
SessionData sessionData = null;
synchronized (syncObject) {
final Optional<SessionData> any = sessions.stream().
filter(s -> s.getSessionId()
.equals(session.getId())).findAny();
if (any.isPresent()) {
sessionData = any.get();
}
}
if (null != sessionData) {
//set master.
sessionData.setSessionType(ClientType.MASTER);
sessionData.setFileId(msgData.getFileId());
}
}
//set slave
{
SessionData sessionData = null;
synchronized (syncObject) {
final Optional<SessionData> any = sessions.stream().
filter(s -> s.getSessionId().equals(msgData.getSessionId())).findAny();
if (any.isPresent()) {
sessionData = any.get();
}
}
if (null != sessionData) {
sessionData.setSessionType(ClientType.SALVER);
sessionData.setFileId(msgData.getFileId());
MsgData msgData1 = new MsgData();
msgData1.setType(MsgType.REQ);
msgData1.setFileId(msgData.getFileId());
sessionData.getSession().sendMessage(new TextMessage(new Gson().toJson(msgData1)));
} else {
//pc session error.
MsgData msgData1 = new MsgData();
msgData1.setType(MsgType.ERR_SESSION);
session.sendMessage(new TextMessage(new Gson().toJson(msgData1)));
}
}
break;
}
case MsgType.UPDATE: {
//slf
SessionData sessionData = null;
synchronized (syncObject) {
final Optional<SessionData> any = sessions.stream().
filter(s -> s.getSessionId().equals(session.getId())).findAny();
if (any.isPresent()) {
sessionData = any.get();
}
}
if (null != sessionData) {
final String fileId = sessionData.getFileId();
List<SessionData> collect;
synchronized (syncObject) {
collect =
sessions.stream().filter(s -> (null != s.getFileId() && s.getFileId().
equals(fileId)) || (null == s.getSession() || !s.getSession().isOpen())).collect(Collectors.toList());
}
if (collect.isEmpty()) {
return;
}
List<SessionData> errList = new ArrayList<>();
for (SessionData s : collect) {
if (null == s.getSession() || !s.getSession().isOpen()) {
errList.add(s);
continue;
}
//不需要给自己发送了
if (s.getSessionId().equals(session.getId())) {
continue;
}
MsgData msgData1 = new MsgData();
msgData1.setType(MsgType.UPDATE);
try {
s.getSession().sendMessage(new TextMessage(new Gson().toJson(msgData1)));
} catch (Exception e) {
e.printStackTrace();
errList.add(s);
}
}
synchronized (syncObject) {
sessions.removeAll(errList);
}
}
break;
}
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
log.info("Connection closed: " + session.getId() + ", Status: " + status);
SessionData sessionData = null;
synchronized (syncObject) {
Optional<SessionData> any = sessions.stream().
filter(s -> s.getSessionId().equals(session.getId())).findAny();
if (any.isPresent()) {
sessionData = any.get();
}
}
if (null == sessionData) {
return;
}
final String fileId = sessionData.getFileId();
//slave just ignore and delete.
if (ClientType.SALVER == sessionData.getSessionType()) {
sessions.remove(sessionData);
return;
}
if (ClientType.MASTER == sessionData.getSessionType()) {
List<SessionData> collect;
synchronized (syncObject) {
collect =
sessions.stream().filter(s ->
(null != s.getFileId()
&& s.getFileId().equals(fileId)) ||
(null == s.getSession() || !s.getSession().isOpen())).collect(Collectors.toList());
}
if (collect.isEmpty()) {
return;
}
for (SessionData s : collect) {
final WebSocketSession session1 = s.getSession();
if (null == session1 || !session1.isOpen()) {
continue;
}
session1.close();
}
synchronized (syncObject) {
sessions.removeAll(collect);
}
}
}
}
pc端实现
- 页面创建时创建websocket,销毁时关闭websocket
- 根据和服务器约定的消息格式 在websocket回调函数onmessage接受数据类型进行二维码生成\文件列表查询等操作
- 添加心跳机制,让websocket更健壮
fileId是一个key,通过fileId可以查询最新的数据. pc端接受到刷新信号后,请求获取最新数据; pc端更新数据后,发送数据已更新信号.
<template>
<div v-if="fileId">
<div>{{ fileId }}</div>
<el-button @click="updateData" type="primary">更新数据</el-button>
<div>发送给服务端更新信号时间: {{ sndUpdateSignalTime }}</div>
<div>收到服务端更新信号时间: {{ rcvUpdateSignalTime }}</div>
<div>心跳最新时间: {{ heartBeatSignalTime }}</div>
<div>服务器返回最新内容: {{ serverContent }}</div>
</div>
<div v-else-if="sessionId">
<div>sessionId:{{ sessionId }}</div>
<img width="200px" height="200px" :src="qrCode" alt="QR Code"/>
</div>
</template>
<script>
import QRCode from "qrcode";
export default {
name: "HelloWorld",
data() {
return {
wsuri: "ws://192.168.0.110:7890/websocket",
ws: null,
sessionId: '',
qrCode: null,
fileId: '',
rcvUpdateSignalTime: '',
sndUpdateSignalTime: '',
heartBeatSignalTime: '',
serverContent: '',
heartbeatInterval: null,
heartbeatIntervalTime: 3000, // 心跳间隔时间,单位为毫秒
}
},
created() {
//页面打开时,初始化WebSocket连接
this.initWebSocket()
},
beforeDestroy() {
// 页面销毁时,关闭WebSocket连接
this.stopHeartbeat()
this.fileId = ''
try {
this.ws.close()
} catch (e) {
}
this.ws = null;
this.sessionId = ''
},
methods: {
// pc端更新附件数据后,向服务器端发送更新信号
updateData() {
console.error('snd update signal')
this.ws.send(JSON.stringify({
type: 0
}))
//格式化为 yyyy-MM-dd HH:mm:ss
this.sndUpdateSignalTime = new Date().toLocaleTimeString()
this.resetHeartbeat();
},
async generateQRCode() {
try {
this.qrCode = await QRCode.toDataURL(this.sessionId);
} catch (error) {
console.error('生成二维码时出错:', error);
}
},
// 周期性发送心跳包
startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
type: 100
}))
this.heartBeatSignalTime = new Date().toLocaleTimeString()
console.error('snd heartbeat signal')
} else {
this.stopHeartbeat();
}
}, this.heartbeatIntervalTime);
},
//在发送或接受数据后,重置下一次发送心跳包的时间
resetHeartbeat() {
clearInterval(this.heartbeatInterval);
this.startHeartbeat();
},
// 停止发送心跳包
stopHeartbeat() {
clearInterval(this.heartbeatInterval);
},
initWebSocket() {
let that = this;
this.ws = new WebSocket(this.wsuri);
this.ws.onopen = () => {
this.startHeartbeat();
};
// 接收后端消息
this.ws.onmessage = function (event) {
console.error('RECV:' + event.data)
that.serverContent = event.data;
let parse = JSON.parse(event.data);
that.resetHeartbeat();
switch (parse.type) {
case 0: {
console.error('update')
that.rcvUpdateSignalTime = new Date().toLocaleTimeString()
//TODO. 请求最新数据
break;
}
case 1: { //fileId list. 接受数据,进行路径跳转
console.error('REQ:' + event.data)
that.fileId = parse.fileId;
//记录并请求最新数据
break;
}
case 3: {
that.sessionId = parse.sessionId;
that.generateQRCode();
break;
}
}
};
// 关闭连接时调用
this.ws.onclose = function (event) {
alert('连接已关闭');
that.stopHeartbeat()
// 强制刷新页面(created 会调用)
location.reload(true)
};
}
}
}
</script>
<style scoped>
</style>
OkHttp3建立websocket连接
- 使用okhttp3建立websocket连接,监听onMessage根据消息类型进行不同的处理;
- 使用handler 管理心跳包
扫码后, 如果已经建立连接了
package com.example.im.ws;
import android.content.Intent;
import android.os.Bundle;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.text.TextUtils;
import android.widget.EditText;
import android.widget.TextView;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.appcompat.app.AppCompatActivity;
import com.example.im.R;
import com.google.gson.Gson;
import com.google.zxing.integration.android.IntentIntegrator;
import com.google.zxing.integration.android.IntentResult;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
public class HelloActivity extends AppCompatActivity {
public static final int MSG_HEART = 0x123;
public static final int MSG_INTERVAL = 3000;
private WebSocket webSocket;
public static final String URL = "ws://192.168.0.110:7890/websocket";
private TextView msgView;
private List<String> sessionIds = new ArrayList<>();
Handler mHandler = new Handler(Looper.getMainLooper()) {
@Override
public void handleMessage(@NonNull Message msg) {
super.handleMessage(msg);
if (MSG_HEART == msg.what) {
MsgData msgData = new MsgData();
msgData.setType(MsgType.HEART_BEAT);
webSocket.send(new Gson().toJson(msgData));
msgView.append(getNowDate() + ":发送消息 heart beat\n");
mHandler.sendMessageDelayed(mHandler.obtainMessage(MSG_HEART), MSG_INTERVAL);
}
}
};
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
msgView = findViewById(R.id.tv_msg);
findViewById(R.id.btn_scan).setOnClickListener(v -> {
scanQRCode();
});
findViewById(R.id.btn_update).setOnClickListener(v -> {
MsgData msgData = new MsgData();
msgData.setType(MsgType.UPDATE);
webSocket.send(new Gson().toJson(msgData));
});
}
@Override
protected void onDestroy() {
mHandler.removeCallbacksAndMessages(null);
super.onDestroy();
}
private void scanQRCode() {
IntentIntegrator integrator = new IntentIntegrator(this);
integrator.setDesiredBarcodeFormats(IntentIntegrator.QR_CODE);
integrator.setPrompt("提示");
integrator.setCameraId(0); // 使用后置摄像头
integrator.setBeepEnabled(false);
integrator.setBarcodeImageEnabled(true);
integrator.initiateScan();
}
@Override
protected void onActivityResult(int requestCode, int resultCode, Intent data) {
IntentResult result = IntentIntegrator.parseActivityResult(requestCode, resultCode, data);
if (result != null && !TextUtils.isEmpty(result.getContents())) {
String sessionId = result.getContents();
if (sessionIds.contains(sessionId)) {
return;
}
sessionIds.add(sessionId);
//start
if (null == webSocket) {
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(URL).build();
webSocket = client.newWebSocket(request, new MyWebSocketListener());
} else {
//这样可以实现扫多个pc端. 同时与多个pc端通信
MsgData msgData = new MsgData();
msgData.setSessionId(sessionId);
msgData.setType(MsgType.REQ);
msgData.setFileId("123");
webSocket.send(new Gson().toJson(msgData));
}
}
super.onActivityResult(requestCode, resultCode, data);
}
private String getNowDate() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",
Locale.getDefault());
return simpleDateFormat.format(new java.util.Date());
}
private class MyWebSocketListener extends WebSocketListener {
@Override
public void onOpen(WebSocket webSocket, okhttp3.Response response) {
// 连接成功
msgView.append(getNowDate() + ":连接成功\n");
MsgData msgData = new MsgData();
msgData.setSessionId(sessionIds.get(sessionIds.size() - 1));
msgData.setType(MsgType.REQ);
msgData.setFileId("123");
webSocket.send(new Gson().toJson(msgData));
mHandler.sendMessageDelayed(mHandler.obtainMessage(MSG_HEART), MSG_INTERVAL);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
msgView.append(getNowDate() + ":接受消息" + text + "\n");
mHandler.removeMessages(MSG_HEART);
mHandler.sendMessageDelayed(mHandler.obtainMessage(MSG_HEART), MSG_INTERVAL);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, okhttp3.Response response) {
// 连接失败
msgView.append(getNowDate() + ":失败" + t.getMessage() + "\n");
}
}
}
2.0版本
上面的实现确实简单.下面结合实际的系统架构进行适配一下.
spring-boot放到nginx后面
nginx常用来进行负载均衡\防火墙\反向代理等等,这种情况比较常见.
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 7777;
server_name localhost;
location / {
proxy_pass http://127.0.0.1:7890;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}
}
}
设置Upgrade\Connection请求头,将访问地址修改为nginx的地址,即可实现nginx代理到spring-boot.
spring-boot 放到gateway后面
也就是所谓的spring-cloud 微服务架构.
gateway添加ws协议的路由
# IM
- id: im
uri: ws://localhost:7890
predicates:
- Path=/im/**
filters:
- StripPrefix=1
访问gateway代理之后的地址,即可实现nginx代理到spring-boot.
spring-boot 放到nginx gateway后面
将前面两者进行结合, nginx保证可以代理到gateway, gateway再路由到spring-boot.
ws升级为wss
网上的做法是, 给gateway\spring-boot都配置证书.
简单才能高效,既然gateway有防火墙验证证书等功能,应用不需要管理才对. nginx要屏蔽这种差异.
配置nginx 直接将wss的请求重写为ws.
nginx重写协议
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server
{
listen 443 ssl http2;
server_name
#SSL-START SSL相关配置,请勿删除或修改下一行带注释的404规则
ssl on;
ssl_certificate
ssl_certificate_key
add_header Strict-Transport-Security "max-age=31536000";
error_page 497 https://$host$request_uri;
location /im/ {
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_pass http://127.0.0.1:18080/im/;
rewrite ^(.*)wss://(.*)$ $1ws://$2 permanent;
}
}
这样 wss://域名/im/websocket就可以进行访问了.
其他
源码下载地址: https://gitee.com/guchuanhang/imapplication.git
springboot打包
- 注释掉skip
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.example.im.ImApplication</mainClass>
<!-- 注释掉,否则不能打包-->
<!-- <skip>true</skip>-->
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
- springboot日志
spring-boot 默认支持logback
@Slf4j
public class WebSocketServer extends TextWebSocketHandler {
log.info("New connection established: " + session.getId());
- bootstrap.yml
bootstrap.yml 是 spring-cloud 配置文件.
application.yml applicaition.properties 是 spring-boot 的配置文件.
- wss测试工具 wscat
npm install -g wscat # 安装方式
wscat -c wss://www.baidu.com/im/websocket