WebSocket 服务端开发:Node.js 实战
在现代网页应用中,一个强大的 WebSocket 服务器就像是一个高效的调度中心,能够处理成千上万的并发连接。记得在一个直播平台项目中,我们通过优化 WebSocket 服务器架构,成功支持了 10 万用户同时在线。今天,我想和大家分享如何使用 Node.js 构建高性能的 WebSocket 服务器。
服务器架构设计
一个优秀的 WebSocket 服务器需要考虑以下几个关键点:
- 连接管理
- 消息处理
- 错误处理
- 性能优化
- 集群扩展
让我们从基础架构开始:
// server.js
const WebSocket = require('ws')
const Redis = require('ioredis')
const cluster = require('cluster')
const numCPUs = require('os').cpus().length
// Redis 客户端
const redis = new Redis({
host: 'localhost',
port: 6379,
retryStrategy: (times) => {
const delay = Math.min(times * 50, 2000)
return delay
}
})
class WebSocketServer {
constructor(options = {}) {
this.port = options.port || 8080
this.clients = new Map()
this.channels = new Map()
this.initialize()
}
initialize() {
// 创建 WebSocket 服务器
this.wss = new WebSocket.Server({
port: this.port,
// 自定义握手
verifyClient: this.verifyClient.bind(this),
// 客户端追踪
clientTracking: true
})
// 设置服务器事件监听
this.setupServerEvents()
// 初始化 Redis 订阅
this.setupRedis()
console.log(`WebSocket Server is running on port ${this.port}`)
}
// 验证客户端连接
verifyClient(info, callback) {
const token = this.parseToken(info.req)
if (!token) {
callback(false, 401, 'Unauthorized')
return
}
// 验证 token
this.verifyToken(token)
.then((user) => {
info.req.user = user
callback(true)
})
.catch((error) => {
console.error('Auth error:', error)
callback(false, 401, 'Invalid token')
})
}
// 解析 token
parseToken(req) {
const auth = req.headers['authorization']
if (!auth) return null
const [type, token] = auth.split(' ')
return type === 'Bearer' ? token : null
}
// 验证 token
async verifyToken(token) {
// 实现 token 验证逻辑
// 返回用户信息
return { id: 'user-1', name: 'Test User' }
}
// 设置服务器事件
setupServerEvents() {
// 处理新连接
this.wss.on('connection', (ws, req) => {
this.handleConnection(ws, req)
})
// 处理服务器错误
this.wss.on('error', (error) => {
console.error('Server error:', error)
})
// 优雅关闭
process.on('SIGTERM', () => {
this.shutdown()
})
}
// 处理新连接
handleConnection(ws, req) {
const user = req.user
const clientId = user.id
// 存储客户端连接
this.clients.set(clientId, {
ws,
user,
channels: new Set(),
lastPing: Date.now()
})
console.log(`Client connected: ${clientId}`)
// 设置客户端事件监听
this.setupClientEvents(ws, clientId)
// 发送欢迎消息
this.sendToClient(ws, {
type: 'welcome',
data: {
clientId,
timestamp: Date.now()
}
})
}
// 设置客户端事件
setupClientEvents(ws, clientId) {
// 处理消息
ws.on('message', (message) => {
this.handleMessage(clientId, message)
})
// 处理关闭
ws.on('close', () => {
this.handleClose(clientId)
})
// 处理错误
ws.on('error', (error) => {
this.handleError(clientId, error)
})
// 处理 ping
ws.on('ping', () => {
this.handlePing(clientId)
})
}
// 处理消息
handleMessage(clientId, message) {
try {
const data = JSON.parse(message)
const client = this.clients.get(clientId)
if (!client) {
console.error(`Client not found: ${clientId}`)
return
}
// 更新最后活动时间
client.lastActivity = Date.now()
// 处理不同类型的消息
switch (data.type) {
case 'subscribe':
this.handleSubscribe(clientId, data.channel)
break
case 'unsubscribe':
this.handleUnsubscribe(clientId, data.channel)
break
case 'publish':
this.handlePublish(clientId, data.channel, data.data)
break
default:
console.warn(`Unknown message type: ${data.type}`)
}
} catch (error) {
console.error('Message handling error:', error)
this.sendError(clientId, 'Invalid message format')
}
}
// 处理订阅
handleSubscribe(clientId, channel) {
const client = this.clients.get(clientId)
if (!client) return
// 添加到频道
client.channels.add(channel)
// 更新频道订阅者
if (!this.channels.has(channel)) {
this.channels.set(channel, new Set())
}
this.channels.get(channel).add(clientId)
// 订阅 Redis 频道
redis.subscribe(channel)
// 发送确认消息
this.sendToClient(client.ws, {
type: 'subscribed',
channel
})
}
// 处理取消订阅
handleUnsubscribe(clientId, channel) {
const client = this.clients.get(clientId)
if (!client) return
// 从频道移除
client.channels.delete(channel)
// 更新频道订阅者
const subscribers = this.channels.get(channel)
if (subscribers) {
subscribers.delete(clientId)
if (subscribers.size === 0) {
this.channels.delete(channel)
// 取消订阅 Redis 频道
redis.unsubscribe(channel)
}
}
// 发送确认消息
this.sendToClient(client.ws, {
type: 'unsubscribed',
channel
})
}
// 处理发布
handlePublish(clientId, channel, data) {
const client = this.clients.get(clientId)
if (!client) return
// 验证权限
if (!this.canPublish(client, channel)) {
this.sendError(clientId, 'No permission to publish')
return
}
// 发布到 Redis
redis.publish(channel, JSON.stringify({
from: clientId,
data,
timestamp: Date.now()
}))
}
// 处理 ping
handlePing(clientId) {
const client = this.clients.get(clientId)
if (client) {
client.lastPing = Date.now()
client.ws.pong()
}
}
// 处理关闭
handleClose(clientId) {
const client = this.clients.get(clientId)
if (!client) return
// 清理订阅
client.channels.forEach((channel) => {
this.handleUnsubscribe(clientId, channel)
})
// 移除客户端
this.clients.delete(clientId)
console.log(`Client disconnected: ${clientId}`)
}
// 处理错误
handleError(clientId, error) {
console.error(`Client error (${clientId}):`, error)
this.sendError(clientId, 'Internal error occurred')
}
// 发送消息给客户端
sendToClient(ws, data) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data))
}
}
// 发送错误消息
sendError(clientId, message) {
const client = this.clients.get(clientId)
if (client) {
this.sendToClient(client.ws, {
type: 'error',
message
})
}
}
// 设置 Redis
setupRedis() {
// 订阅消息处理
redis.on('message', (channel, message) => {
try {
const data = JSON.parse(message)
const subscribers = this.channels.get(channel)
if (subscribers) {
subscribers.forEach((clientId) => {
const client = this.clients.get(clientId)
if (client && client.ws.readyState === WebSocket.OPEN) {
this.sendToClient(client.ws, {
type: 'message',
channel,
data: data.data,
from: data.from,
timestamp: data.timestamp
})
}
})
}
} catch (error) {
console.error('Redis message handling error:', error)
}
})
// Redis 错误处理
redis.on('error', (error) => {
console.error('Redis error:', error)
})
}
// 清理无效连接
cleanup() {
const now = Date.now()
const timeout = 60000 // 60 秒超时
this.clients.forEach((client, clientId) => {
if (now - client.lastPing > timeout) {
console.log(`Cleaning up inactive client: ${clientId}`)
client.ws.terminate()
this.handleClose(clientId)
}
})
}
// 优雅关闭
shutdown() {
console.log('Shutting down WebSocket server...')
// 关闭所有客户端连接
this.clients.forEach((client) => {
client.ws.close(1001, 'Server shutting down')
})
// 关闭 WebSocket 服务器
this.wss.close(() => {
console.log('WebSocket server closed')
// 关闭 Redis 连接
redis.quit(() => {
console.log('Redis connection closed')
process.exit(0)
})
})
}
}
// 集群模式
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`)
// 启动工作进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork()
}
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died`)
// 重启工作进程
cluster.fork()
})
} else {
// 工作进程启动服务器
const server = new WebSocketServer({
port: process.env.PORT || 8080
})
// 定期清理
setInterval(() => {
server.cleanup()
}, 30000)
console.log(`Worker ${process.pid} started`)
}
连接管理
实现可靠的连接管理机制:
// connection-manager.js
class ConnectionManager {
constructor() {
this.connections = new Map()
this.groups = new Map()
}
// 添加连接
addConnection(id, connection) {
this.connections.set(id, {
connection,
groups: new Set(),
metadata: {},
stats: {
messagesReceived: 0,
messagesSent: 0,
bytesReceived: 0,
bytesSent: 0,
connectedAt: Date.now()
}
})
}
// 移除连接
removeConnection(id) {
const conn = this.connections.get(id)
if (conn) {
// 从所有组中移除
conn.groups.forEach(group => {
this.removeFromGroup(id, group)
})
this.connections.delete(id)
}
}
// 添加到组
addToGroup(connectionId, group) {
const conn = this.connections.get(connectionId)
if (!conn) return false
if (!this.groups.has(group)) {
this.groups.set(group, new Set())
}
this.groups.get(group).add(connectionId)
conn.groups.add(group)
return true
}
// 从组中移除
removeFromGroup(connectionId, group) {
const groupSet = this.groups.get(group)
if (groupSet) {
groupSet.delete(connectionId)
if (groupSet.size === 0) {
this.groups.delete(group)
}
}
const conn = this.connections.get(connectionId)
if (conn) {
conn.groups.delete(group)
}
}
// 发送消息到组
broadcastToGroup(group, message, excludeId = null) {
const groupSet = this.groups.get(group)
if (!groupSet) return
groupSet.forEach(id => {
if (id !== excludeId) {
this.sendTo(id, message)
}
})
}
// 发送消息到指定连接
sendTo(id, message) {
const conn = this.connections.get(id)
if (conn && conn.connection.readyState === WebSocket.OPEN) {
const data = JSON.stringify(message)
conn.connection.send(data)
// 更新统计
conn.stats.messagesSent++
conn.stats.bytesSent += data.length
}
}
// 获取连接统计
getStats(id) {
const conn = this.connections.get(id)
return conn ? conn.stats : null
}
// 获取组成员
getGroupMembers(group) {
return Array.from(this.groups.get(group) || [])
}
// 获取连接的组
getConnectionGroups(id) {
const conn = this.connections.get(id)
return conn ? Array.from(conn.groups) : []
}
// 设置连接元数据
setMetadata(id, key, value) {
const conn = this.connections.get(id)
if (conn) {
conn.metadata[key] = value
}
}
// 获取连接元数据
getMetadata(id, key) {
const conn = this.connections.get(id)
return conn ? conn.metadata[key] : null
}
}
消息队列集成
使用 Redis 实现消息队列:
// message-queue.js
const Redis = require('ioredis')
class MessageQueue {
constructor(options = {}) {
this.publisher = new Redis(options)
this.subscriber = new Redis(options)
this.handlers = new Map()
this.subscriber.on('message', (channel, message) => {
this.handleMessage(channel, message)
})
}
// 发布消息
async publish(channel, message) {
try {
const data = JSON.stringify({
message,
timestamp: Date.now()
})
await this.publisher.publish(channel, data)
return true
} catch (error) {
console.error('Publish error:', error)
return false
}
}
// 订阅频道
async subscribe(channel, handler) {
try {
await this.subscriber.subscribe(channel)
this.handlers.set(channel, handler)
return true
} catch (error) {
console.error('Subscribe error:', error)
return false
}
}
// 取消订阅
async unsubscribe(channel) {
try {
await this.subscriber.unsubscribe(channel)
this.handlers.delete(channel)
return true
} catch (error) {
console.error('Unsubscribe error:', error)
return false
}
}
// 处理消息
handleMessage(channel, message) {
try {
const handler = this.handlers.get(channel)
if (handler) {
const data = JSON.parse(message)
handler(data.message, data.timestamp)
}
} catch (error) {
console.error('Message handling error:', error)
}
}
// 关闭连接
async close() {
await this.publisher.quit()
await this.subscriber.quit()
}
}
心跳检测
实现可靠的心跳检测机制:
// heartbeat.js
class HeartbeatManager {
constructor(options = {}) {
this.interval = options.interval || 30000
this.timeout = options.timeout || 60000
this.connections = new Map()
this.start()
}
// 启动心跳检测
start() {
this.timer = setInterval(() => {
this.check()
}, this.interval)
}
// 停止心跳检测
stop() {
if (this.timer) {
clearInterval(this.timer)
this.timer = null
}
}
// 添加连接
add(id, connection) {
this.connections.set(id, {
connection,
lastPing: Date.now(),
pingCount: 0
})
// 设置 ping 处理
connection.on('pong', () => {
this.handlePong(id)
})
}
// 移除连接
remove(id) {
this.connections.delete(id)
}
// 检查连接
check() {
const now = Date.now()
this.connections.forEach((data, id) => {
const { connection, lastPing } = data
// 检查超时
if (now - lastPing > this.timeout) {
console.log(`Connection timeout: ${id}`)
connection.terminate()
this.remove(id)
return
}
// 发送 ping
if (connection.readyState === WebSocket.OPEN) {
connection.ping()
data.pingCount++
}
})
}
// 处理 pong 响应
handlePong(id) {
const data = this.connections.get(id)
if (data) {
data.lastPing = Date.now()
data.pingCount = 0
}
}
// 获取连接状态
getStatus(id) {
const data = this.connections.get(id)
if (!data) return null
return {
lastPing: data.lastPing,
pingCount: data.pingCount,
isAlive: Date.now() - data.lastPing <= this.timeout
}
}
}
错误处理
实现全面的错误处理机制:
// error-handler.js
class ErrorHandler {
constructor() {
this.handlers = new Map()
}
// 注册错误处理器
register(type, handler) {
this.handlers.set(type, handler)
}
// 处理错误
handle(error, context = {}) {
const handler = this.handlers.get(error.type) || this.defaultHandler
return handler(error, context)
}
// 默认错误处理器
defaultHandler(error, context) {
console.error('Unhandled error:', error)
return {
type: 'error',
message: 'Internal server error',
code: 500
}
}
}
// 错误类型
class WebSocketError extends Error {
constructor(type, message, code = 500) {
super(message)
this.type = type
this.code = code
}
}
// 使用示例
const errorHandler = new ErrorHandler()
// 注册错误处理器
errorHandler.register('auth', (error, context) => {
return {
type: 'error',
message: 'Authentication failed',
code: 401
}
})
errorHandler.register('validation', (error, context) => {
return {
type: 'error',
message: 'Invalid message format',
code: 400
}
})
// 处理错误
try {
throw new WebSocketError('auth', 'Invalid token')
} catch (error) {
const response = errorHandler.handle(error, {
clientId: 'user-1'
})
console.log(response)
}
性能监控
实现性能监控系统:
// monitor.js
class PerformanceMonitor {
constructor() {
this.metrics = new Map()
this.startTime = Date.now()
}
// 记录指标
record(name, value) {
if (!this.metrics.has(name)) {
this.metrics.set(name, {
count: 0,
total: 0,
min: Infinity,
max: -Infinity,
values: []
})
}
const metric = this.metrics.get(name)
metric.count++
metric.total += value
metric.min = Math.min(metric.min, value)
metric.max = Math.max(metric.max, value)
metric.values.push(value)
}
// 获取指标统计
getStats(name) {
const metric = this.metrics.get(name)
if (!metric) return null
const avg = metric.total / metric.count
const sorted = [...metric.values].sort((a, b) => a - b)
const p95 = sorted[Math.floor(sorted.length * 0.95)]
const p99 = sorted[Math.floor(sorted.length * 0.99)]
return {
count: metric.count,
min: metric.min,
max: metric.max,
avg,
p95,
p99
}
}
// 获取所有指标
getAllStats() {
const stats = {}
this.metrics.forEach((value, key) => {
stats[key] = this.getStats(key)
})
return stats
}
// 重置指标
reset() {
this.metrics.clear()
this.startTime = Date.now()
}
// 获取运行时间
getUptime() {
return Date.now() - this.startTime
}
}
// 使用示例
const monitor = new PerformanceMonitor()
// 记录消息处理时间
function processMessage(message) {
const start = process.hrtime()
// 处理消息...
const [seconds, nanoseconds] = process.hrtime(start)
const duration = seconds * 1000 + nanoseconds / 1000000
monitor.record('messageProcessing', duration)
}
// 定期输出统计信息
setInterval(() => {
console.log('Performance Stats:', monitor.getAllStats())
}, 60000)
写在最后
通过这篇文章,我们详细探讨了如何使用 Node.js 构建高性能的 WebSocket 服务器。从基础架构到性能优化,我们不仅关注了功能实现,更注重了实际应用中的各种挑战。
记住,一个优秀的 WebSocket 服务器需要在功能、性能和可靠性之间找到平衡。在实际开发中,我们要根据具体需求选择合适的实现方案,确保服务器能够稳定高效地运行。
如果觉得这篇文章对你有帮助,别忘了点个赞 👍