当前位置: 首页 > article >正文

WebSocket 服务端开发:Node.js 实战

在现代网页应用中,一个强大的 WebSocket 服务器就像是一个高效的调度中心,能够处理成千上万的并发连接。记得在一个直播平台项目中,我们通过优化 WebSocket 服务器架构,成功支持了 10 万用户同时在线。今天,我想和大家分享如何使用 Node.js 构建高性能的 WebSocket 服务器。

服务器架构设计

一个优秀的 WebSocket 服务器需要考虑以下几个关键点:

  1. 连接管理
  2. 消息处理
  3. 错误处理
  4. 性能优化
  5. 集群扩展

让我们从基础架构开始:

// server.js
const WebSocket = require('ws')
const Redis = require('ioredis')
const cluster = require('cluster')
const numCPUs = require('os').cpus().length

// Redis 客户端
const redis = new Redis({
  host: 'localhost',
  port: 6379,
  retryStrategy: (times) => {
    const delay = Math.min(times * 50, 2000)
    return delay
  }
})

class WebSocketServer {
  constructor(options = {}) {
    this.port = options.port || 8080
    this.clients = new Map()
    this.channels = new Map()
    this.initialize()
  }

  initialize() {
    // 创建 WebSocket 服务器
    this.wss = new WebSocket.Server({
      port: this.port,
      // 自定义握手
      verifyClient: this.verifyClient.bind(this),
      // 客户端追踪
      clientTracking: true
    })

    // 设置服务器事件监听
    this.setupServerEvents()

    // 初始化 Redis 订阅
    this.setupRedis()

    console.log(`WebSocket Server is running on port ${this.port}`)
  }

  // 验证客户端连接
  verifyClient(info, callback) {
    const token = this.parseToken(info.req)
    if (!token) {
      callback(false, 401, 'Unauthorized')
      return
    }

    // 验证 token
    this.verifyToken(token)
      .then((user) => {
        info.req.user = user
        callback(true)
      })
      .catch((error) => {
        console.error('Auth error:', error)
        callback(false, 401, 'Invalid token')
      })
  }

  // 解析 token
  parseToken(req) {
    const auth = req.headers['authorization']
    if (!auth) return null

    const [type, token] = auth.split(' ')
    return type === 'Bearer' ? token : null
  }

  // 验证 token
  async verifyToken(token) {
    // 实现 token 验证逻辑
    // 返回用户信息
    return { id: 'user-1', name: 'Test User' }
  }

  // 设置服务器事件
  setupServerEvents() {
    // 处理新连接
    this.wss.on('connection', (ws, req) => {
      this.handleConnection(ws, req)
    })

    // 处理服务器错误
    this.wss.on('error', (error) => {
      console.error('Server error:', error)
    })

    // 优雅关闭
    process.on('SIGTERM', () => {
      this.shutdown()
    })
  }

  // 处理新连接
  handleConnection(ws, req) {
    const user = req.user
    const clientId = user.id

    // 存储客户端连接
    this.clients.set(clientId, {
      ws,
      user,
      channels: new Set(),
      lastPing: Date.now()
    })

    console.log(`Client connected: ${clientId}`)

    // 设置客户端事件监听
    this.setupClientEvents(ws, clientId)

    // 发送欢迎消息
    this.sendToClient(ws, {
      type: 'welcome',
      data: {
        clientId,
        timestamp: Date.now()
      }
    })
  }

  // 设置客户端事件
  setupClientEvents(ws, clientId) {
    // 处理消息
    ws.on('message', (message) => {
      this.handleMessage(clientId, message)
    })

    // 处理关闭
    ws.on('close', () => {
      this.handleClose(clientId)
    })

    // 处理错误
    ws.on('error', (error) => {
      this.handleError(clientId, error)
    })

    // 处理 ping
    ws.on('ping', () => {
      this.handlePing(clientId)
    })
  }

  // 处理消息
  handleMessage(clientId, message) {
    try {
      const data = JSON.parse(message)
      const client = this.clients.get(clientId)

      if (!client) {
        console.error(`Client not found: ${clientId}`)
        return
      }

      // 更新最后活动时间
      client.lastActivity = Date.now()

      // 处理不同类型的消息
      switch (data.type) {
        case 'subscribe':
          this.handleSubscribe(clientId, data.channel)
          break
        case 'unsubscribe':
          this.handleUnsubscribe(clientId, data.channel)
          break
        case 'publish':
          this.handlePublish(clientId, data.channel, data.data)
          break
        default:
          console.warn(`Unknown message type: ${data.type}`)
      }
    } catch (error) {
      console.error('Message handling error:', error)
      this.sendError(clientId, 'Invalid message format')
    }
  }

  // 处理订阅
  handleSubscribe(clientId, channel) {
    const client = this.clients.get(clientId)
    if (!client) return

    // 添加到频道
    client.channels.add(channel)

    // 更新频道订阅者
    if (!this.channels.has(channel)) {
      this.channels.set(channel, new Set())
    }
    this.channels.get(channel).add(clientId)

    // 订阅 Redis 频道
    redis.subscribe(channel)

    // 发送确认消息
    this.sendToClient(client.ws, {
      type: 'subscribed',
      channel
    })
  }

  // 处理取消订阅
  handleUnsubscribe(clientId, channel) {
    const client = this.clients.get(clientId)
    if (!client) return

    // 从频道移除
    client.channels.delete(channel)

    // 更新频道订阅者
    const subscribers = this.channels.get(channel)
    if (subscribers) {
      subscribers.delete(clientId)
      if (subscribers.size === 0) {
        this.channels.delete(channel)
        // 取消订阅 Redis 频道
        redis.unsubscribe(channel)
      }
    }

    // 发送确认消息
    this.sendToClient(client.ws, {
      type: 'unsubscribed',
      channel
    })
  }

  // 处理发布
  handlePublish(clientId, channel, data) {
    const client = this.clients.get(clientId)
    if (!client) return

    // 验证权限
    if (!this.canPublish(client, channel)) {
      this.sendError(clientId, 'No permission to publish')
      return
    }

    // 发布到 Redis
    redis.publish(channel, JSON.stringify({
      from: clientId,
      data,
      timestamp: Date.now()
    }))
  }

  // 处理 ping
  handlePing(clientId) {
    const client = this.clients.get(clientId)
    if (client) {
      client.lastPing = Date.now()
      client.ws.pong()
    }
  }

  // 处理关闭
  handleClose(clientId) {
    const client = this.clients.get(clientId)
    if (!client) return

    // 清理订阅
    client.channels.forEach((channel) => {
      this.handleUnsubscribe(clientId, channel)
    })

    // 移除客户端
    this.clients.delete(clientId)

    console.log(`Client disconnected: ${clientId}`)
  }

  // 处理错误
  handleError(clientId, error) {
    console.error(`Client error (${clientId}):`, error)
    this.sendError(clientId, 'Internal error occurred')
  }

  // 发送消息给客户端
  sendToClient(ws, data) {
    if (ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify(data))
    }
  }

  // 发送错误消息
  sendError(clientId, message) {
    const client = this.clients.get(clientId)
    if (client) {
      this.sendToClient(client.ws, {
        type: 'error',
        message
      })
    }
  }

  // 设置 Redis
  setupRedis() {
    // 订阅消息处理
    redis.on('message', (channel, message) => {
      try {
        const data = JSON.parse(message)
        const subscribers = this.channels.get(channel)

        if (subscribers) {
          subscribers.forEach((clientId) => {
            const client = this.clients.get(clientId)
            if (client && client.ws.readyState === WebSocket.OPEN) {
              this.sendToClient(client.ws, {
                type: 'message',
                channel,
                data: data.data,
                from: data.from,
                timestamp: data.timestamp
              })
            }
          })
        }
      } catch (error) {
        console.error('Redis message handling error:', error)
      }
    })

    // Redis 错误处理
    redis.on('error', (error) => {
      console.error('Redis error:', error)
    })
  }

  // 清理无效连接
  cleanup() {
    const now = Date.now()
    const timeout = 60000 // 60 秒超时

    this.clients.forEach((client, clientId) => {
      if (now - client.lastPing > timeout) {
        console.log(`Cleaning up inactive client: ${clientId}`)
        client.ws.terminate()
        this.handleClose(clientId)
      }
    })
  }

  // 优雅关闭
  shutdown() {
    console.log('Shutting down WebSocket server...')

    // 关闭所有客户端连接
    this.clients.forEach((client) => {
      client.ws.close(1001, 'Server shutting down')
    })

    // 关闭 WebSocket 服务器
    this.wss.close(() => {
      console.log('WebSocket server closed')

      // 关闭 Redis 连接
      redis.quit(() => {
        console.log('Redis connection closed')
        process.exit(0)
      })
    })
  }
}

// 集群模式
if (cluster.isMaster) {
  console.log(`Master ${process.pid} is running`)

  // 启动工作进程
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork()
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died`)
    // 重启工作进程
    cluster.fork()
  })
} else {
  // 工作进程启动服务器
  const server = new WebSocketServer({
    port: process.env.PORT || 8080
  })

  // 定期清理
  setInterval(() => {
    server.cleanup()
  }, 30000)

  console.log(`Worker ${process.pid} started`)
}

连接管理

实现可靠的连接管理机制:

// connection-manager.js
class ConnectionManager {
  constructor() {
    this.connections = new Map()
    this.groups = new Map()
  }

  // 添加连接
  addConnection(id, connection) {
    this.connections.set(id, {
      connection,
      groups: new Set(),
      metadata: {},
      stats: {
        messagesReceived: 0,
        messagesSent: 0,
        bytesReceived: 0,
        bytesSent: 0,
        connectedAt: Date.now()
      }
    })
  }

  // 移除连接
  removeConnection(id) {
    const conn = this.connections.get(id)
    if (conn) {
      // 从所有组中移除
      conn.groups.forEach(group => {
        this.removeFromGroup(id, group)
      })
      this.connections.delete(id)
    }
  }

  // 添加到组
  addToGroup(connectionId, group) {
    const conn = this.connections.get(connectionId)
    if (!conn) return false

    if (!this.groups.has(group)) {
      this.groups.set(group, new Set())
    }

    this.groups.get(group).add(connectionId)
    conn.groups.add(group)
    return true
  }

  // 从组中移除
  removeFromGroup(connectionId, group) {
    const groupSet = this.groups.get(group)
    if (groupSet) {
      groupSet.delete(connectionId)
      if (groupSet.size === 0) {
        this.groups.delete(group)
      }
    }

    const conn = this.connections.get(connectionId)
    if (conn) {
      conn.groups.delete(group)
    }
  }

  // 发送消息到组
  broadcastToGroup(group, message, excludeId = null) {
    const groupSet = this.groups.get(group)
    if (!groupSet) return

    groupSet.forEach(id => {
      if (id !== excludeId) {
        this.sendTo(id, message)
      }
    })
  }

  // 发送消息到指定连接
  sendTo(id, message) {
    const conn = this.connections.get(id)
    if (conn && conn.connection.readyState === WebSocket.OPEN) {
      const data = JSON.stringify(message)
      conn.connection.send(data)

      // 更新统计
      conn.stats.messagesSent++
      conn.stats.bytesSent += data.length
    }
  }

  // 获取连接统计
  getStats(id) {
    const conn = this.connections.get(id)
    return conn ? conn.stats : null
  }

  // 获取组成员
  getGroupMembers(group) {
    return Array.from(this.groups.get(group) || [])
  }

  // 获取连接的组
  getConnectionGroups(id) {
    const conn = this.connections.get(id)
    return conn ? Array.from(conn.groups) : []
  }

  // 设置连接元数据
  setMetadata(id, key, value) {
    const conn = this.connections.get(id)
    if (conn) {
      conn.metadata[key] = value
    }
  }

  // 获取连接元数据
  getMetadata(id, key) {
    const conn = this.connections.get(id)
    return conn ? conn.metadata[key] : null
  }
}

消息队列集成

使用 Redis 实现消息队列:

// message-queue.js
const Redis = require('ioredis')

class MessageQueue {
  constructor(options = {}) {
    this.publisher = new Redis(options)
    this.subscriber = new Redis(options)
    this.handlers = new Map()

    this.subscriber.on('message', (channel, message) => {
      this.handleMessage(channel, message)
    })
  }

  // 发布消息
  async publish(channel, message) {
    try {
      const data = JSON.stringify({
        message,
        timestamp: Date.now()
      })
      await this.publisher.publish(channel, data)
      return true
    } catch (error) {
      console.error('Publish error:', error)
      return false
    }
  }

  // 订阅频道
  async subscribe(channel, handler) {
    try {
      await this.subscriber.subscribe(channel)
      this.handlers.set(channel, handler)
      return true
    } catch (error) {
      console.error('Subscribe error:', error)
      return false
    }
  }

  // 取消订阅
  async unsubscribe(channel) {
    try {
      await this.subscriber.unsubscribe(channel)
      this.handlers.delete(channel)
      return true
    } catch (error) {
      console.error('Unsubscribe error:', error)
      return false
    }
  }

  // 处理消息
  handleMessage(channel, message) {
    try {
      const handler = this.handlers.get(channel)
      if (handler) {
        const data = JSON.parse(message)
        handler(data.message, data.timestamp)
      }
    } catch (error) {
      console.error('Message handling error:', error)
    }
  }

  // 关闭连接
  async close() {
    await this.publisher.quit()
    await this.subscriber.quit()
  }
}

心跳检测

实现可靠的心跳检测机制:

// heartbeat.js
class HeartbeatManager {
  constructor(options = {}) {
    this.interval = options.interval || 30000
    this.timeout = options.timeout || 60000
    this.connections = new Map()
    this.start()
  }

  // 启动心跳检测
  start() {
    this.timer = setInterval(() => {
      this.check()
    }, this.interval)
  }

  // 停止心跳检测
  stop() {
    if (this.timer) {
      clearInterval(this.timer)
      this.timer = null
    }
  }

  // 添加连接
  add(id, connection) {
    this.connections.set(id, {
      connection,
      lastPing: Date.now(),
      pingCount: 0
    })

    // 设置 ping 处理
    connection.on('pong', () => {
      this.handlePong(id)
    })
  }

  // 移除连接
  remove(id) {
    this.connections.delete(id)
  }

  // 检查连接
  check() {
    const now = Date.now()

    this.connections.forEach((data, id) => {
      const { connection, lastPing } = data

      // 检查超时
      if (now - lastPing > this.timeout) {
        console.log(`Connection timeout: ${id}`)
        connection.terminate()
        this.remove(id)
        return
      }

      // 发送 ping
      if (connection.readyState === WebSocket.OPEN) {
        connection.ping()
        data.pingCount++
      }
    })
  }

  // 处理 pong 响应
  handlePong(id) {
    const data = this.connections.get(id)
    if (data) {
      data.lastPing = Date.now()
      data.pingCount = 0
    }
  }

  // 获取连接状态
  getStatus(id) {
    const data = this.connections.get(id)
    if (!data) return null

    return {
      lastPing: data.lastPing,
      pingCount: data.pingCount,
      isAlive: Date.now() - data.lastPing <= this.timeout
    }
  }
}

错误处理

实现全面的错误处理机制:

// error-handler.js
class ErrorHandler {
  constructor() {
    this.handlers = new Map()
  }

  // 注册错误处理器
  register(type, handler) {
    this.handlers.set(type, handler)
  }

  // 处理错误
  handle(error, context = {}) {
    const handler = this.handlers.get(error.type) || this.defaultHandler
    return handler(error, context)
  }

  // 默认错误处理器
  defaultHandler(error, context) {
    console.error('Unhandled error:', error)
    return {
      type: 'error',
      message: 'Internal server error',
      code: 500
    }
  }
}

// 错误类型
class WebSocketError extends Error {
  constructor(type, message, code = 500) {
    super(message)
    this.type = type
    this.code = code
  }
}

// 使用示例
const errorHandler = new ErrorHandler()

// 注册错误处理器
errorHandler.register('auth', (error, context) => {
  return {
    type: 'error',
    message: 'Authentication failed',
    code: 401
  }
})

errorHandler.register('validation', (error, context) => {
  return {
    type: 'error',
    message: 'Invalid message format',
    code: 400
  }
})

// 处理错误
try {
  throw new WebSocketError('auth', 'Invalid token')
} catch (error) {
  const response = errorHandler.handle(error, {
    clientId: 'user-1'
  })
  console.log(response)
}

性能监控

实现性能监控系统:

// monitor.js
class PerformanceMonitor {
  constructor() {
    this.metrics = new Map()
    this.startTime = Date.now()
  }

  // 记录指标
  record(name, value) {
    if (!this.metrics.has(name)) {
      this.metrics.set(name, {
        count: 0,
        total: 0,
        min: Infinity,
        max: -Infinity,
        values: []
      })
    }

    const metric = this.metrics.get(name)
    metric.count++
    metric.total += value
    metric.min = Math.min(metric.min, value)
    metric.max = Math.max(metric.max, value)
    metric.values.push(value)
  }

  // 获取指标统计
  getStats(name) {
    const metric = this.metrics.get(name)
    if (!metric) return null

    const avg = metric.total / metric.count
    const sorted = [...metric.values].sort((a, b) => a - b)
    const p95 = sorted[Math.floor(sorted.length * 0.95)]
    const p99 = sorted[Math.floor(sorted.length * 0.99)]

    return {
      count: metric.count,
      min: metric.min,
      max: metric.max,
      avg,
      p95,
      p99
    }
  }

  // 获取所有指标
  getAllStats() {
    const stats = {}
    this.metrics.forEach((value, key) => {
      stats[key] = this.getStats(key)
    })
    return stats
  }

  // 重置指标
  reset() {
    this.metrics.clear()
    this.startTime = Date.now()
  }

  // 获取运行时间
  getUptime() {
    return Date.now() - this.startTime
  }
}

// 使用示例
const monitor = new PerformanceMonitor()

// 记录消息处理时间
function processMessage(message) {
  const start = process.hrtime()

  // 处理消息...

  const [seconds, nanoseconds] = process.hrtime(start)
  const duration = seconds * 1000 + nanoseconds / 1000000
  monitor.record('messageProcessing', duration)
}

// 定期输出统计信息
setInterval(() => {
  console.log('Performance Stats:', monitor.getAllStats())
}, 60000)

写在最后

通过这篇文章,我们详细探讨了如何使用 Node.js 构建高性能的 WebSocket 服务器。从基础架构到性能优化,我们不仅关注了功能实现,更注重了实际应用中的各种挑战。

记住,一个优秀的 WebSocket 服务器需要在功能、性能和可靠性之间找到平衡。在实际开发中,我们要根据具体需求选择合适的实现方案,确保服务器能够稳定高效地运行。

如果觉得这篇文章对你有帮助,别忘了点个赞 👍


http://www.kler.cn/a/473233.html

相关文章:

  • MySql根据经纬度查询距离
  • 记录一次面试中被问到的问题 (HR面)
  • Flink源码解析之:Flink on k8s 客户端提交任务源码分析
  • React Native 项目 Error: EMFILE: too many open files, watch
  • STM32供电参考设计
  • 腾讯云AI代码助手编程挑战赛-图片转换工具
  • 备战春招—FPGA 2024年的面试题库
  • 网络传输层TCP协议
  • Java-编写的一个生产者-消费者模式
  • docker-compose部署下Fastapi中使用sqlalchemy和Alembic
  • CST软件如何设置分布式计算(Distributed Computing)的 TCP-IP子网
  • Redis 笔记(二)-Redis 安装及测试
  • (长期更新)《零基础入门 ArcGIS(ArcScene) 》实验七----城市三维建模与分析(超超超详细!!!)
  • 运行vue项目,显示“npm”无法识别为 cmdlet、函数、脚本文件或可操作程序的名称
  • 腾讯云AI代码助手-每日清单助手
  • Python----Python爬虫(selenium的使用,定位元素,层级定位)
  • 每日一题-两个链表的第一个公共结点
  • 阿里云人工智能平台图像视频特征提取
  • python注意事项:range遍历越索引现象、列表边遍历边修改出现的问题
  • 车载软件架构 --- 关于ARXML文件那点事
  • 论文导读 | 数据库系统中基于机器学习的基数估计方法
  • 使用python脚本爬取前端页面上的表格导出为Excel
  • 基于单片机的速度里程表设计(论文+源码)
  • 基于V2X的无人机与特种车辆战地智能通信:技术融合与实战应用
  • vue3的计算属性computed传参问题
  • Nginx:Stream模块