WebSocket --- ws模块源码解析(详解)
摘要
在这一篇文章中,写了如何在node端和web端,实现一个WebSocket通信。
WebSocket在node端和客户端的使用
而在node端里面,我们使用了ws模块来创建WebSocket和WebSocketServer,那ws模块是如何做到可以和客户端进行双向通信的呢?
426状态码
在HTTP中,426表示“Upgrade Required”,即客户端需要通过HTTP协议的升级版进行访问。这个状态码主要用在WebSockets协议中,表示客户端需要使用WebSockets协议来连接服务器。
什么意思呢?例如我们创建一个HTTP服务如果这么写:
const http = require('http')
const server = http.createServer((req, res) => {
const body = http.STATUS_CODES[426];
res.writeHead('426', {
'Content-Type': 'text/align',
'Content-Length': body.length
})
res.end(body)
})
server.listen(8080)
就是告诉客户端,如果你访问我这边的服务,那么你就要进行升级服务。也就是使用WebSocket对我进行访问!
那有一个问题,如果客户端使用了WebSocket访问,服务端要怎么进行响应呢?
还直接在createServer里面的回调中处理吗?
upgrade事件
在这里面,如果客户端通过WebSocket进行访问服务端,会触发服务端server的upgrade事件,也就是说会进下面的回调函数里。
server.on('upgrade',(req, socket, head) => {
// 固定格式
const key = req.headers['sec-websocket-key'];
const digest = createHash('sha1')
.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',)
.digest('base64');
const headers = [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
`Sec-WebSocket-Accept: ${digest}`
];
socket.write(headers.concat('\r\n').join('\r\n'));
// 客户端发送的消息
socket.on('data', (data) => {
console.log(data.toString());
})
// 服务端向客户端发送消息
socket.write('你好')
})
这个回调中,通过socket来进行和客户端进行双向通信。
转码
但是只有上面的例子,似乎每次拿到的数据都是乱码。这是因为WebSocket之间的通信的报文,不能通过Buffer的toString直接转码。这里提供一下在网上找到的转码方法:
server.on('upgrade', (req, socket, head) => {
const key = req.headers['sec-websocket-key'];
const digest = createHash('sha1')
.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',)
.digest('base64');
const headers = [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
`Sec-WebSocket-Accept: ${digest}`
];
socket.write(headers.concat('\r\n').join('\r\n'));
socket.on('data',(data) => {
console.log(decodeSocketFrame(data).payloadBuf.toString())
socket.write(encodeSocketFrame({
fin:1,
opcode:1,
payloadBuf:Buffer.from('你好')
}))
})
})
function decodeSocketFrame (bufData){
let bufIndex = 0
const byte1 = bufData.readUInt8(bufIndex++).toString(2)
const byte2 = bufData.readUInt8(bufIndex++).toString(2)
console.log(byte1);
console.log(byte2);
const frame = {
fin:parseInt(byte1.substring(0,1),2),
// RSV是保留字段,暂时不计算
opcode:parseInt(byte1.substring(4,8),2),
mask:parseInt(byte2.substring(0,1),2),
payloadLen:parseInt(byte2.substring(1,8),2),
}
// 如果frame.payloadLen为126或127说明这个长度不够了,要使用扩展长度了
// 如果frame.payloadLen为126,则使用Extended payload length同时为16/8字节数
// 如果frame.payloadLen为127,则使用Extended payload length同时为64/8字节数
// 注意payloadLen得长度单位是字节(bytes)而不是比特(bit)
if(frame.payloadLen==126) {
frame.payloadLen = bufData.readUIntBE(bufIndex,2);
bufIndex+=2;
} else if(frame.payloadLen==127) {
// 虽然是8字节,但是前四字节目前留空,因为int型是4字节不留空int会溢出
bufIndex+=4;
frame.payloadLen = bufData.readUIntBE(bufIndex,4);
bufIndex+=4;
}
if(frame.mask){
const payloadBufList = []
// maskingKey为4字节数据
frame.maskingKey=[bufData[bufIndex++],bufData[bufIndex++],bufData[bufIndex++],bufData[bufIndex++]];
for(let i=0;i<frame.payloadLen;i++) {
payloadBufList.push(bufData[bufIndex+i]^frame.maskingKey[i%4]);
}
frame.payloadBuf = Buffer.from(payloadBufList)
} else {
frame.payloadBuf = bufData.slice(bufIndex,bufIndex+frame.payloadLen)
}
return frame
}
function encodeSocketFrame (frame){
const frameBufList = [];
// 对fin位移七位则为10000000加opcode为10000001
const header = (frame.fin<<7)+frame.opcode;
frameBufList.push(header)
const bufBits = Buffer.byteLength(frame.payloadBuf);
let payloadLen = bufBits;
let extBuf;
if(bufBits>=126) {
//65536是2**16即两字节数字极限
if(bufBits>=65536) {
extBuf = Buffer.allocUnsafe(8);
buf.writeUInt32BE(bufBits, 4);
payloadLen = 127;
} else {
extBuf = Buffer.allocUnsafe(2);
buf.writeUInt16BE(bufBits, 0);
payloadLen = 126;
}
}
let payloadLenBinStr = payloadLen.toString(2);
while(payloadLenBinStr.length<8){payloadLenBinStr='0'+payloadLenBinStr;}
frameBufList.push(parseInt(payloadLenBinStr,2));
if(bufBits>=126) {
frameBufList.push(extBuf);
}
frameBufList.push(...frame.payloadBuf)
return Buffer.from(frameBufList)
}
WebSocketServer的实现
有了上面的基础,基本知道双向通信是怎么做到的了。就来看一下WebSocketServer的实现。
当我们使用的时候,我们是以这种方式:
const WebSocketServer = require('ws')
const wss = new WebSocketServer('8080');
wss.on('connection', (ws) => {
})
我们知道,connection是httpServer的回调,为什么在WebSocketServer中可以使用呢?
export default class WebSocketServer {
constructor(port) {
this._server = http.createServer((req, res) => {
const body = http.STATUS_CODES[426];
res.writeHead('426', {
'Content-Type': 'text/align',
'Content-Length': body.length
})
res.end(body)
})
this._server.listen(port);
const connectionEmit = this.emit.bind(this, 'connection');
const closeEmit = this.emit.bind(this, 'close');
// 其他事件,都是http能监听到的;
const map = {
connection: connectionEmit,
close: closeEmit
}
for(let emitName in map) {
this._server.on(emitName, map[emitName])
}
}
}
在WebSocketServer中,如果客户端触发了http的事件时,它便将其转发到WebSocket实例上面。
然后再处理自己的逻辑。