Unity网络通信(part8.客户端主动断连与心跳消息)
目录
前言
解决目前断开不及时的问题
1.客户端尝试使用Disconnect方法主动断开连接
2.自定义退出消息
主动断连解决方案
服务端
客户端
退出消息脚本
心跳消息
为什么需要心跳消息?
实现心跳消息
客户端
服务端
最终实现
前言
在Unity网络编程中,当客户端主动断开连接时,仅仅调用 Shutdown
和 Close
方法确实可能不足以让服务器端立即得知客户端已经断开。这是因为TCP协议本身的一些特性导致的。TCP协议是面向连接的、可靠的传输层协议,它不会立即通知对方连接已经断开,而是会等待一段时间(通常是TCP连接的超时时间)以确认是否有未发送完毕的数据或者是否有必要进行重传。
解决目前断开不及时的问题
1.客户端尝试使用Disconnect方法主动断开连接
Socket当中有一个专门在客户端使用的方法
Disconect方法
客户端调用该方法和服务器端断开连接
看是否是因为之前直接Close而没有调用Disconet造成服务器端无法及时获取状态
主要修改的逻辑:
客户端:
主动断开连接
服务端:
1.收发消息时判断socket是否已经断开
2.处理删除记录的socket的相关逻辑(会用到线程锁)
2.自定义退出消息
让服务器端收到该消息就知道是客户端想要主动断开
然后服务器端处理释放socket相关工作
主动断连解决方案
服务端
在之前的代码基础上进行修改。
using System.Net.Sockets;
namespace TcpServer
{
class ClientSocket
{
private static int CLIENT_BEGIN_ID = 1;
public int clientID;
public Socket socket;
public ClientSocket(Socket socket)
{
this.clientID = CLIENT_BEGIN_ID;
this.socket = socket;
++CLIENT_BEGIN_ID;
}
//是否是连接状态
public bool Connected => this.socket.Connected;
//封装方法
//关闭
public void Close()
{
if (socket != null)
{
socket.Shutdown(SocketShutdown.Both);
socket.Close();
socket = null;
}
}
//发送
public void Send(BaseMsg msg)
{
if (Connected)
{
try
{
socket.Send(msg.Writing());
}
catch (Exception e)
{
Console.WriteLine("发送消息出错:" + e.Message);
Program.socket.AddDelSocekt(this);
}
}
else
{
Program.socket.AddDelSocekt(this);
}
}
//接收
public void Receive()
{
if (!Connected)
{
Program.socket.AddDelSocekt(this);
return;
}
try
{
if (socket.Available > 0)
{
byte[] result = new byte[1024 * 5];//5KB
int recevieNum = socket.Receive(result);
//收到数据后 先读取4个字节 转为ID 才知道用哪一个类型去处理反序列化
int msgID = BitConverter.ToInt32(result, 0);
BaseData msg = null;
switch (msgID)
{
case 1001:
msg = new PlayerMsg();
msg.Reading(result, 4);
break;
case 1003:
msg = new QuitMsg();
break;
}
if (msg == null)
{
Console.WriteLine("无法解析消息类型");
return;
}
ThreadPool.QueueUserWorkItem(MsgHandle, msg);
}
}
catch (Exception e)
{
Console.WriteLine("接收消息出错:" + e.Message);
//解析消息出错 也认为 要把socket断开了
Program.socket.AddDelSocekt(this);
}
}
private void MsgHandle(object obj)
{
BaseMsg msg = obj as BaseMsg;
if (msg is PlayerMsg)
{
PlayerMsg playerMsg = msg as PlayerMsg;
Console.WriteLine(playerMsg.playerID);
Console.WriteLine(playerMsg.playerData.atk);
Console.WriteLine(playerMsg.playerData.lev);
Console.WriteLine(playerMsg.playerData.name);
}
else if(msg is QuitMsg)
{
//收到断开连接消息 把自己添加到待移除的列表当中
Program.socket.AddDelSocekt(this);
}
}
}
}
using System.Net;
using System.Net.Sockets;
namespace TcpServer
{
class ServerSocket
{
//服务端Socket
public Socket socket;
//客户端链接的所有Socket
public Dictionary<int, ClientSocket> clientDic = new Dictionary<int, ClientSocket>();
//有待移除的客户端socket 避免 在foreach时直接从字典中移除 出现问题
private List<ClientSocket> delList = new List<ClientSocket>();
private bool isClose;
//开启服务器端
public void Start(string ip,int port,int num)
{
isClose = false;
socket = new Socket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
IPEndPoint ipPoint = new IPEndPoint(IPAddress.Parse(ip),port);
socket.Bind(ipPoint);
socket.Listen(num);
ThreadPool.QueueUserWorkItem(Accpet);
ThreadPool.QueueUserWorkItem(Receive);
}
//关闭服务器端
public void Close()
{
foreach (ClientSocket client in clientDic.Values)
{
client.Close();
}
clientDic.Clear();
socket.Shutdown(SocketShutdown.Both);
socket.Close();
socket = null;
}
//接收客户端连入
private void Accpet(object obj)
{
while(!isClose)
{
try
{
//连入了一个客户端
Socket clientSockket = socket.Accept();
ClientSocket client = new ClientSocket(clientSockket);
lock (clientDic)
{
clientDic.Add(client.clientID, client);
}
}
catch (Exception e)
{
Console.WriteLine("客户端连入报错"+e.Message);
throw;
}
}
}
//接收客户端消息
private void Receive(object obj)
{
while(!isClose)
{
if(clientDic.Count>0)
{
lock (clientDic)
{
foreach (ClientSocket client in clientDic.Values)
{
client.Receive();
}
//判断有没有断开连接的 把其移除
CloseClientSocket();
delList.Clear();
}
}
}
}
public void Broadcast(BaseMsg msg)
{
lock(clientDic)
{
foreach (ClientSocket client in clientDic.Values)
{
client.Send(msg);
}
}
}
//添加待移除的socket内容
public void AddDelSocekt(ClientSocket socket)
{
if(!delList.Contains(socket))
{
delList.Add(socket);
}
}
public void CloseClientSocket()
{
//判断有没有断开连接的 把其移除
for (int i = 0; i < delList.Count; i++)
{
CloseClientSocket(delList[i]);
}
}
//关闭客户端连接 从字典中移除
public void CloseClientSocket(ClientSocket clientSocket)
{
lock(clientDic)
{
clientSocket.Close();
if (clientDic.ContainsKey(clientSocket.clientID))
{
clientDic.Remove(clientSocket.clientID);
Console.WriteLine("客户端{0}主动断开连接了",clientSocket.clientID);
}
}
}
}
}
namespace TcpServer
{
class Program
{
public static ServerSocket socket;
static void Main(string[] args)
{
socket =new ServerSocket();
socket.Start("127.0.0.1",8080,1024);
Console.WriteLine("服务开启成功");
while(true)
{
string input = Console.ReadLine();
if(input == "Quit")
{
socket.Close();
}
else if(input.Substring(0,2)=="B:")
{
if(input.Substring(2)=="1001")
{
PlayerMsg msg = new PlayerMsg();
msg.playerID = 91;
msg.playerData = new PlayerData();
msg.playerData.name = "服务器发来的消息";
msg.playerData.lev = 99;
msg.playerData.atk = 80;
socket.Broadcast(msg);
}
}
}
}
}
}
客户端
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using UnityEngine;
using UnityEngine.XR;
public class NetMgr : MonoBehaviour
{
private static NetMgr instance;
public static NetMgr Instance => instance;
//客户端Socket
private Socket socket;
//用于发送消息的队列 公共容器 主线程往里面放 发送线程往里面取
private Queue<BaseMsg> sendMsgQueue = new Queue<BaseMsg>();
//用于接收消息的队列 公共容器 子线程往里面放 主线程往里面取
private Queue<BaseMsg> receiveQueue = new Queue<BaseMsg>();
用于收消息的容器
//private byte[] receiveBytes = new byte[1024*1024];
返回收到的字节数
//private int receiveNum;
//用于处理分包时缓存的字节数
private byte[] cacheBytes = new byte[1024*1024];
private int cacheNum;
//是否连接
private bool isConnect=false;
private void Awake()
{
instance = this;
DontDestroyOnLoad(this.gameObject);
}
private void Update()
{
if(receiveQueue.Count>0)
{
BaseMsg msg = receiveQueue.Dequeue();
if(msg is PlayerMsg)
{
PlayerMsg playerMsg = (PlayerMsg)msg;
print(playerMsg.playerID);
print(playerMsg.playerData.name);
print(playerMsg.playerData.lev);
print(playerMsg.playerData.atk);
}
}
}
//连接服务端
public void Connect(string ip,int port)
{
//如果是连接状态 直接返回
if(isConnect)
{
return;
}
if(socket==null)
{
socket = new Socket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
}
//连接服务端
IPEndPoint iPPoint = new IPEndPoint(IPAddress.Parse(ip),port);
try
{
socket.Connect(iPPoint);
isConnect=true;
//开启发送线程
ThreadPool.QueueUserWorkItem(SendMsg);
//开启接收线程
ThreadPool.QueueUserWorkItem(ReceiveMsg);
}
catch(SocketException e)
{
if(e.ErrorCode == 10061)
{
print("服务器拒绝连接");
}
else
{
print("连接失败"+e.ErrorCode+e.Message);
}
}
}
//发送消息
public void Send(BaseMsg msg)
{
sendMsgQueue.Enqueue(msg);
}
private void SendMsg(object obj)
{
while(isConnect)
{
if(sendMsgQueue.Count>0)
{
socket.Send(sendMsgQueue.Dequeue().Writing());
}
}
}
//接收消息
public void ReceiveMsg(object obj)
{
while(isConnect)
{
if (socket.Available > 0)
{
//申明为临时变量,节约内存空间
byte[] receiveBytes = new byte[1024 * 1024];
int receiveNum = socket.Receive(receiveBytes);
HandleReceiveMsg(receiveBytes,receiveNum);
首先把收到字节数组的前4个字节 读取出来得到ID
//int msgID = BitConverter.ToInt32(receiveBytes, 0);
//BaseMsg baseMsg = null;
//switch (msgID)
//{
// case 1001:
// PlayerMsg msg = new PlayerMsg();
// msg.Reading(receiveBytes, 4);
// baseMsg = msg;
// break;
//}
如果消息为空 那证明是不知道类型的消息 没有解析
//if (baseMsg == null)
//{
// continue;
//}
收到消息 解析消息为字符串 并放入公共容器
//receiveQueue.Enqueue(baseMsg);
}
}
}
//处理接收消息 分包、黏包问题的方法
private void HandleReceiveMsg(byte[] receiveBytes,int receiveNum)
{
int msgID=0;
int msgLength = 0;
int nowIndex = 0;
//收到消息时,应该看看之前有没有缓存的 如果有的话 直接拼接到后面
receiveBytes.CopyTo(cacheBytes,cacheNum);
cacheNum += receiveNum;
while(true)
{
//每次将长度设置为-1,是为了避免上一次解析的数据影响这一次的判断
msgLength = -1;
//处理解析一条消息
if(cacheNum-nowIndex >= 8)
{
//解析ID
msgID = BitConverter.ToInt32(cacheBytes, nowIndex);
nowIndex += 4;
//解析长度
msgLength = BitConverter.ToInt32(cacheBytes, nowIndex);
nowIndex += 4;
}
if(cacheNum - nowIndex>=msgLength&&msgLength!=-1)
{
//解析消息体
BaseMsg baseMsg = null;
switch (msgID)
{
case 1001:
PlayerMsg msg = new PlayerMsg();
msg.Reading(cacheBytes, nowIndex);
baseMsg = msg;
break;
}
if (baseMsg != null)
{
receiveQueue.Enqueue(baseMsg);
}
nowIndex += msgLength;
if(nowIndex == cacheNum)
{
cacheNum = 0;
break;
}
}
else//保存消息体,等下一次收到消息时进行拼接
{
//receiveBytes.CopyTo(cacheBytes, 0);
//cacheNum = receiveNum;
//如果进行了id和长度的解析 但是 没有成功jie'xi'xiao'xi'ti
if(msgLength !=-1)
{
nowIndex -= 8;
}
//就是把剩余没有解析字节数组内容 移到前面来 用来缓存下次继续解析
Array.Copy(cacheBytes,nowIndex,cacheBytes,0,cacheNum-nowIndex);
cacheNum = cacheNum - nowIndex;
break;
}
}
}
//关闭连接
public void Close()
{
if(socket!=null)
{
print("客户端主动断开连接");
//主动发送一条断开链接的消息给服务端
QuitMsg msg = new QuitMsg();
socket.Send(msg.Writing());
socket.Shutdown(SocketShutdown.Both);
socket.Disconnect(false);
socket.Close();
socket = null;
isConnect = false;
}
}
private void OnDestroy()
{
Close();
}
}
退出消息脚本
此脚本应添加到服务端文件夹与客户端Unity文件中
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
public class QuitMsg : BaseMsg
{
public override int GetBytesNum()
{
return 8;
}
public override int GetID()
{
return 1003;
}
public override int Reading(byte[] bytes, int beginIndex = 0)
{
return 0;
}
public override byte[] Writing()
{
int index = 0;
byte[] bytes = new byte[GetBytesNum()];
WriteInt(bytes, GetID(), ref index);
WriteInt(bytes, 0, ref index);
return bytes;
}
}
心跳消息
在Unity网络通信中,心跳消息是长连接中客户端和服务端之间定期发送的一种特殊数据包。这种数据包的主要作用是通知对方自己还在线,以确保长连接的有效性。以下是对Unity网络通信中心跳消息的详细解析。
为什么需要心跳消息?
1.避免非正常关闭客户端时,服务器无法正常收到关闭连接消息
通过心跳消息我们可以自定义超时判断,如果超时没有收到客户端消息,证明客户端已经断开连接
2.避免客户端长期不发送消息,防火墙或者路由器会断开连接,我们可以通过心跳消息一直保持活跃状态
实现心跳消息
客户端
主要功能:定时发送消息
服务器
主要功能:不停检测上次收到某客户端消息的时间,如果超时则认为连接已经断开
客户端
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
public class HeartMsg : BaseMsg
{
public override int GetBytesNum()
{
return 8;
}
public override int GetID()
{
return 999;
}
public override int Reading(byte[] bytes, int beginIndex = 0)
{
return base.Reading(bytes, beginIndex);
}
public override byte[] Writing()
{
int index = 0;
byte[] bytes = new byte[GetBytesNum()];
WriteInt(bytes, GetID(),ref index);
WriteInt(bytes, 0,ref index);
return base.Writing();
}
}
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using UnityEngine;
using UnityEngine.XR;
public class NetMgr : MonoBehaviour
{
private static NetMgr instance;
public static NetMgr Instance => instance;
//客户端Socket
private Socket socket;
//用于发送消息的队列 公共容器 主线程往里面放 发送线程往里面取
private Queue<BaseMsg> sendMsgQueue = new Queue<BaseMsg>();
//用于接收消息的队列 公共容器 子线程往里面放 主线程往里面取
private Queue<BaseMsg> receiveQueue = new Queue<BaseMsg>();
用于收消息的容器
//private byte[] receiveBytes = new byte[1024*1024];
返回收到的字节数
//private int receiveNum;
//用于处理分包时缓存的字节数
private byte[] cacheBytes = new byte[1024*1024];
private int cacheNum;
//是否连接
private bool isConnect=false;
//发送心跳机制的间隔时间
private int SEND_HEART_MSG_TIME = 2;
private HeartMsg HeartMsg = new HeartMsg();
private void Awake()
{
instance = this;
DontDestroyOnLoad(this.gameObject);
//客户端循环定时给服务端发送心跳消息
InvokeRepeating("SendHeartMsg",0, SEND_HEART_MSG_TIME);
}
private void SendHeartMsg()
{
if(isConnect)
Send(HeartMsg);
}
private void Update()
{
if(receiveQueue.Count>0)
{
BaseMsg msg = receiveQueue.Dequeue();
if(msg is PlayerMsg)
{
PlayerMsg playerMsg = (PlayerMsg)msg;
print(playerMsg.playerID);
print(playerMsg.playerData.name);
print(playerMsg.playerData.lev);
print(playerMsg.playerData.atk);
}
}
}
//连接服务端
public void Connect(string ip,int port)
{
//如果是连接状态 直接返回
if(isConnect)
{
return;
}
if(socket==null)
{
socket = new Socket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
}
//连接服务端
IPEndPoint iPPoint = new IPEndPoint(IPAddress.Parse(ip),port);
try
{
socket.Connect(iPPoint);
isConnect=true;
//开启发送线程
ThreadPool.QueueUserWorkItem(SendMsg);
//开启接收线程
ThreadPool.QueueUserWorkItem(ReceiveMsg);
}
catch(SocketException e)
{
if(e.ErrorCode == 10061)
{
print("服务器拒绝连接");
}
else
{
print("连接失败"+e.ErrorCode+e.Message);
}
}
}
//发送消息
public void Send(BaseMsg msg)
{
sendMsgQueue.Enqueue(msg);
}
private void SendMsg(object obj)
{
while(isConnect)
{
if(sendMsgQueue.Count>0)
{
socket.Send(sendMsgQueue.Dequeue().Writing());
}
}
}
//接收消息
public void ReceiveMsg(object obj)
{
while(isConnect)
{
if (socket.Available > 0)
{
//申明为临时变量,节约内存空间
byte[] receiveBytes = new byte[1024 * 1024];
int receiveNum = socket.Receive(receiveBytes);
HandleReceiveMsg(receiveBytes,receiveNum);
首先把收到字节数组的前4个字节 读取出来得到ID
//int msgID = BitConverter.ToInt32(receiveBytes, 0);
//BaseMsg baseMsg = null;
//switch (msgID)
//{
// case 1001:
// PlayerMsg msg = new PlayerMsg();
// msg.Reading(receiveBytes, 4);
// baseMsg = msg;
// break;
//}
如果消息为空 那证明是不知道类型的消息 没有解析
//if (baseMsg == null)
//{
// continue;
//}
收到消息 解析消息为字符串 并放入公共容器
//receiveQueue.Enqueue(baseMsg);
}
}
}
//处理接收消息 分包、黏包问题的方法
private void HandleReceiveMsg(byte[] receiveBytes,int receiveNum)
{
int msgID=0;
int msgLength = 0;
int nowIndex = 0;
//收到消息时,应该看看之前有没有缓存的 如果有的话 直接拼接到后面
receiveBytes.CopyTo(cacheBytes,cacheNum);
cacheNum += receiveNum;
while(true)
{
//每次将长度设置为-1,是为了避免上一次解析的数据影响这一次的判断
msgLength = -1;
//处理解析一条消息
if(cacheNum-nowIndex >= 8)
{
//解析ID
msgID = BitConverter.ToInt32(cacheBytes, nowIndex);
nowIndex += 4;
//解析长度
msgLength = BitConverter.ToInt32(cacheBytes, nowIndex);
nowIndex += 4;
}
if(cacheNum - nowIndex>=msgLength&&msgLength!=-1)
{
//解析消息体
BaseMsg baseMsg = null;
switch (msgID)
{
case 1001:
PlayerMsg msg = new PlayerMsg();
msg.Reading(cacheBytes, nowIndex);
baseMsg = msg;
break;
}
if (baseMsg != null)
{
receiveQueue.Enqueue(baseMsg);
}
nowIndex += msgLength;
if(nowIndex == cacheNum)
{
cacheNum = 0;
break;
}
}
else//保存消息体,等下一次收到消息时进行拼接
{
//receiveBytes.CopyTo(cacheBytes, 0);
//cacheNum = receiveNum;
//如果进行了id和长度的解析 但是 没有成功jie'xi'xiao'xi'ti
if(msgLength !=-1)
{
nowIndex -= 8;
}
//就是把剩余没有解析字节数组内容 移到前面来 用来缓存下次继续解析
Array.Copy(cacheBytes,nowIndex,cacheBytes,0,cacheNum-nowIndex);
cacheNum = cacheNum - nowIndex;
break;
}
}
}
//关闭连接
public void Close()
{
if(socket!=null)
{
print("客户端主动断开连接");
//主动发送一条断开链接的消息给服务端
QuitMsg msg = new QuitMsg();
socket.Send(msg.Writing());
socket.Shutdown(SocketShutdown.Both);
socket.Disconnect(false);
socket.Close();
socket = null;
isConnect = false;
}
}
private void OnDestroy()
{
Close();
}
}
服务端
using System.Net.Sockets;
namespace TcpServer
{
class ClientSocket
{
private static int CLIENT_BEGIN_ID = 1;
public int clientID;
public Socket socket;
//是否是连接状态
public bool Connected => this.socket.Connected;
//上一次手打消息的时间
private long frontTime = -1;
//超时时间
private static int TIME_OUT_TIME = 10;
public ClientSocket(Socket socket)
{
this.clientID = CLIENT_BEGIN_ID;
this.socket = socket;
++CLIENT_BEGIN_ID;
ThreadPool.QueueUserWorkItem(CheckTimeOut);
}
//间隔一段时间 检测一次超时 如果超时 就会主动断开该客户端的连接
private void CheckTimeOut(object obj)
{
while(Connected)
{
if (frontTime != -1 &&
DateTime.Now.Ticks / TimeSpan.TicksPerSecond - frontTime >= TIME_OUT_TIME)
{
Program.socket.AddDelSocekt(this);
break;
}
Thread.Sleep(5000);
}
}
//封装方法
//关闭
public void Close()
{
if (socket != null)
{
socket.Shutdown(SocketShutdown.Both);
socket.Close();
socket = null;
}
}
//发送
public void Send(BaseMsg msg)
{
if (Connected)
{
try
{
socket.Send(msg.Writing());
}
catch (Exception e)
{
Console.WriteLine("发送消息出错:" + e.Message);
Program.socket.AddDelSocekt(this);
}
}
else
{
Program.socket.AddDelSocekt(this);
}
}
//接收
public void Receive()
{
if (!Connected)
{
Program.socket.AddDelSocekt(this);
return;
}
try
{
if (socket.Available > 0)
{
byte[] result = new byte[1024 * 5];//5KB
int recevieNum = socket.Receive(result);
//收到数据后 先读取4个字节 转为ID 才知道用哪一个类型去处理反序列化
int msgID = BitConverter.ToInt32(result, 0);
BaseData msg = null;
switch (msgID)
{
case 1001:
msg = new PlayerMsg();
msg.Reading(result, 4);
break;
case 1003:
msg = new QuitMsg();
break;
case 999:
msg = new HeartMsg();
break;
}
if (msg == null)
{
Console.WriteLine("无法解析消息类型");
return;
}
ThreadPool.QueueUserWorkItem(MsgHandle, msg);
}
}
catch (Exception e)
{
Console.WriteLine("接收消息出错:" + e.Message);
//解析消息出错 也认为 要把socket断开了
Program.socket.AddDelSocekt(this);
}
}
private void MsgHandle(object obj)
{
BaseMsg msg = obj as BaseMsg;
if (msg is PlayerMsg)
{
PlayerMsg playerMsg = msg as PlayerMsg;
Console.WriteLine(playerMsg.playerID);
Console.WriteLine(playerMsg.playerData.atk);
Console.WriteLine(playerMsg.playerData.lev);
Console.WriteLine(playerMsg.playerData.name);
}
else if(msg is QuitMsg)
{
//收到断开连接消息 把自己添加到待移除的列表当中
Program.socket.AddDelSocekt(this);
}
else if(msg is HeartMsg)
{
//收到心跳消息 记录收到心跳消息的时间
frontTime = DateTime.Now.Ticks / TimeSpan.TicksPerSecond;
}
}
}
}
最终实现
每两秒钟客户端发送心跳消息给服务端,当客户端断开连接10秒后,服务端认为客户端断连并主动移除客户端Socket。