JavaScript系列(62)--实时通信系统实现详解
JavaScript实时通信系统实现详解 🔄
今天,让我们深入探讨JavaScript的实时通信系统实现。实时通信是现代Web应用中不可或缺的一部分,它能够提供即时的数据交互和更好的用户体验。
WebSocket通信基础 🌟
💡 小知识:WebSocket是HTML5开始提供的一种在单个TCP连接上进行全双工通讯的协议。相比HTTP,它能够提供持久连接和双向通信能力。
// 1. WebSocket连接管理器
class WebSocketManager {
constructor(url, options = {}) {
this.url = url;
this.options = {
reconnectAttempts: 5,
reconnectDelay: 1000,
heartbeatInterval: 30000,
...options
};
this.connection = null;
this.reconnectCount = 0;
this.listeners = new Map();
this.heartbeatTimer = null;
}
// 建立连接
connect() {
try {
this.connection = new WebSocket(this.url);
this.setupEventListeners();
this.startHeartbeat();
} catch (error) {
this.handleConnectionError(error);
}
}
// 设置事件监听器
setupEventListeners() {
this.connection.onopen = () => {
this.reconnectCount = 0;
this.emit('connected');
};
this.connection.onclose = () => {
this.handleDisconnect();
};
this.connection.onerror = (error) => {
this.handleConnectionError(error);
};
this.connection.onmessage = (event) => {
this.handleMessage(event.data);
};
}
// 启动心跳检测
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.connection.readyState === WebSocket.OPEN) {
this.send('heartbeat', { timestamp: Date.now() });
}
}, this.options.heartbeatInterval);
}
// 处理断开连接
handleDisconnect() {
this.stopHeartbeat();
this.emit('disconnected');
if (this.reconnectCount < this.options.reconnectAttempts) {
setTimeout(() => {
this.reconnectCount++;
this.connect();
}, this.options.reconnectDelay * this.reconnectCount);
} else {
this.emit('maxReconnectAttemptsReached');
}
}
// 发送消息
send(type, data) {
if (this.connection.readyState !== WebSocket.OPEN) {
throw new Error('Connection is not open');
}
const message = JSON.stringify({ type, data });
this.connection.send(message);
}
// 停止心跳检测
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
}
// 2. 消息处理器
class MessageHandler {
constructor() {
this.handlers = new Map();
}
// 注册消息处理器
register(type, handler) {
if (!this.handlers.has(type)) {
this.handlers.set(type, new Set());
}
this.handlers.get(type).add(handler);
}
// 处理消息
async handle(message) {
const { type, data } = JSON.parse(message);
const handlers = this.handlers.get(type);
if (handlers) {
const promises = Array.from(handlers)
.map(handler => handler(data));
await Promise.all(promises);
}
}
}
// 3. 重连管理器
class ReconnectionManager {
constructor(options = {}) {
this.options = {
maxAttempts: 5,
baseDelay: 1000,
maxDelay: 30000,
...options
};
this.attempts = 0;
this.currentDelay = this.options.baseDelay;
}
// 计算下一次重连延迟
getNextDelay() {
const delay = Math.min(
this.currentDelay * Math.pow(2, this.attempts),
this.options.maxDelay
);
this.attempts++;
return delay;
}
// 重置重连状态
reset() {
this.attempts = 0;
this.currentDelay = this.options.baseDelay;
}
// 检查是否可以继续重连
canReconnect() {
return this.attempts < this.options.maxAttempts;
}
}
消息队列系统 📨
// 1. 消息队列
class MessageQueue {
constructor() {
this.queue = [];
this.processing = false;
this.maxRetries = 3;
}
// 添加消息
enqueue(message) {
this.queue.push({
message,
retries: 0,
timestamp: Date.now()
});
this.processQueue();
}
// 处理队列
async processQueue() {
if (this.processing || this.queue.length === 0) return;
this.processing = true;
while (this.queue.length > 0) {
const item = this.queue[0];
try {
await this.processMessage(item.message);
this.queue.shift();
} catch (error) {
if (item.retries < this.maxRetries) {
item.retries++;
// 移到队列末尾
this.queue.push(this.queue.shift());
} else {
// 放入死信队列
this.moveToDeadLetter(item);
this.queue.shift();
}
}
}
this.processing = false;
}
// 移动到死信队列
moveToDeadLetter(item) {
// 实现死信队列逻辑
}
}
// 2. 优先级队列
class PriorityMessageQueue {
constructor() {
this.queues = new Map();
this.priorities = ['high', 'medium', 'low'];
}
// 添加消息
enqueue(message, priority = 'medium') {
if (!this.queues.has(priority)) {
this.queues.set(priority, []);
}
this.queues.get(priority).push({
message,
timestamp: Date.now()
});
}
// 获取下一个消息
dequeue() {
for (const priority of this.priorities) {
const queue = this.queues.get(priority);
if (queue && queue.length > 0) {
return queue.shift();
}
}
return null;
}
}
// 3. 消息持久化管理器
class MessagePersistenceManager {
constructor() {
this.storage = new Map();
this.initStorage();
}
// 初始化存储
async initStorage() {
try {
const stored = localStorage.getItem('message_queue');
if (stored) {
const data = JSON.parse(stored);
this.storage = new Map(Object.entries(data));
}
} catch (error) {
console.error('Failed to initialize storage:', error);
}
}
// 保存消息
async persistMessage(id, message) {
this.storage.set(id, {
message,
timestamp: Date.now()
});
await this.saveToStorage();
}
// 保存到存储
async saveToStorage() {
try {
const data = Object.fromEntries(this.storage);
localStorage.setItem('message_queue',
JSON.stringify(data));
} catch (error) {
console.error('Failed to save to storage:', error);
}
}
}
实时数据同步 🔄
// 1. 实时数据同步器
class RealtimeDataSync {
constructor(options = {}) {
this.options = {
syncInterval: 1000,
batchSize: 100,
...options
};
this.changes = new Map();
this.syncTimer = null;
}
// 记录变更
recordChange(key, value) {
this.changes.set(key, {
value,
timestamp: Date.now()
});
this.scheduleSyncIfNeeded();
}
// 调度同步
scheduleSyncIfNeeded() {
if (!this.syncTimer && this.changes.size > 0) {
this.syncTimer = setTimeout(() => {
this.performSync();
}, this.options.syncInterval);
}
}
// 执行同步
async performSync() {
const batch = this.prepareSyncBatch();
if (batch.size > 0) {
try {
await this.sendChanges(batch);
this.clearSyncedChanges(batch);
} catch (error) {
this.handleSyncError(error);
}
}
this.syncTimer = null;
this.scheduleSyncIfNeeded();
}
// 准备同步批次
prepareSyncBatch() {
const batch = new Map();
let count = 0;
for (const [key, value] of this.changes) {
if (count >= this.options.batchSize) break;
batch.set(key, value);
count++;
}
return batch;
}
}
// 2. 冲突解决器
class ConflictResolver {
constructor() {
this.strategies = new Map();
this.setupDefaultStrategies();
}
// 设置默认策略
setupDefaultStrategies() {
this.strategies.set('lastWriteWins', (local, remote) => {
return local.timestamp > remote.timestamp ? local : remote;
});
this.strategies.set('merge', (local, remote) => {
return {
...local,
...remote,
timestamp: Math.max(local.timestamp, remote.timestamp)
};
});
}
// 解决冲突
resolve(local, remote, strategy = 'lastWriteWins') {
const resolver = this.strategies.get(strategy);
if (!resolver) {
throw new Error(`Unknown strategy: ${strategy}`);
}
return resolver(local, remote);
}
}
// 3. 版本控制管理器
class VersionManager {
constructor() {
this.versions = new Map();
this.history = new Map();
}
// 更新版本
updateVersion(key, value) {
const currentVersion = this.versions.get(key) || 0;
const newVersion = currentVersion + 1;
this.versions.set(key, newVersion);
this.recordHistory(key, value, newVersion);
return newVersion;
}
// 记录历史
recordHistory(key, value, version) {
if (!this.history.has(key)) {
this.history.set(key, new Map());
}
const keyHistory = this.history.get(key);
keyHistory.set(version, {
value,
timestamp: Date.now()
});
}
// 获取特定版本
getVersion(key, version) {
const keyHistory = this.history.get(key);
if (!keyHistory) return null;
return keyHistory.get(version);
}
}
性能优化策略 ⚡
// 1. 消息压缩器
class MessageCompressor {
constructor() {
this.compressionThreshold = 1024; // 1KB
}
// 压缩消息
async compress(message) {
if (typeof message !== 'string') {
message = JSON.stringify(message);
}
if (message.length < this.compressionThreshold) {
return message;
}
const msgBuffer = new TextEncoder().encode(message);
const compressed = await gzip(msgBuffer);
return compressed;
}
// 解压消息
async decompress(data) {
if (!(data instanceof Uint8Array)) {
return data;
}
const decompressed = await ungzip(data);
return new TextDecoder().decode(decompressed);
}
}
// 2. 批处理优化器
class BatchProcessor {
constructor(options = {}) {
this.options = {
maxBatchSize: 100,
maxWaitTime: 1000,
...options
};
this.batch = [];
this.timer = null;
}
// 添加项目到批处理
add(item) {
this.batch.push(item);
if (this.batch.length >= this.options.maxBatchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(),
this.options.maxWaitTime);
}
}
// 刷新批处理
async flush() {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
if (this.batch.length === 0) return;
const items = [...this.batch];
this.batch = [];
await this.processBatch(items);
}
}
// 3. 连接池管理器
class ConnectionPool {
constructor(options = {}) {
this.options = {
maxConnections: 5,
idleTimeout: 30000,
...options
};
this.connections = new Set();
this.idle = new Set();
}
// 获取连接
async getConnection() {
let connection;
if (this.idle.size > 0) {
connection = this.idle.values().next().value;
this.idle.delete(connection);
} else if (this.connections.size < this.options.maxConnections) {
connection = await this.createConnection();
this.connections.add(connection);
} else {
throw new Error('Connection pool exhausted');
}
return connection;
}
// 释放连接
releaseConnection(connection) {
if (this.connections.has(connection)) {
this.idle.add(connection);
setTimeout(() => {
if (this.idle.has(connection)) {
this.closeConnection(connection);
}
}, this.options.idleTimeout);
}
}
}
安全性考虑 🔒
// 1. 消息加密器
class MessageEncryptor {
constructor() {
this.keyPair = null;
this.initializeKeyPair();
}
// 初始化密钥对
async initializeKeyPair() {
this.keyPair = await window.crypto.subtle.generateKey(
{
name: 'RSA-OAEP',
modulusLength: 2048,
publicExponent: new Uint8Array([1, 0, 1]),
hash: 'SHA-256'
},
true,
['encrypt', 'decrypt']
);
}
// 加密消息
async encrypt(message) {
const encoded = new TextEncoder().encode(
typeof message === 'string' ? message : JSON.stringify(message)
);
return window.crypto.subtle.encrypt(
{
name: 'RSA-OAEP'
},
this.keyPair.publicKey,
encoded
);
}
// 解密消息
async decrypt(encrypted) {
const decrypted = await window.crypto.subtle.decrypt(
{
name: 'RSA-OAEP'
},
this.keyPair.privateKey,
encrypted
);
return new TextDecoder().decode(decrypted);
}
}
// 2. 认证管理器
class AuthenticationManager {
constructor() {
this.tokens = new Map();
}
// 验证令牌
async validateToken(token) {
if (!token) return false;
const tokenInfo = this.tokens.get(token);
if (!tokenInfo) return false;
if (tokenInfo.expiresAt < Date.now()) {
this.tokens.delete(token);
return false;
}
return true;
}
// 生成新令牌
async generateToken(userId) {
const token = await this.createSecureToken();
this.tokens.set(token, {
userId,
expiresAt: Date.now() + 24 * 60 * 60 * 1000 // 24小时
});
return token;
}
}
// 3. 速率限制器
class RateLimiter {
constructor(options = {}) {
this.options = {
windowMs: 60000, // 1分钟
maxRequests: 100,
...options
};
this.requests = new Map();
}
// 检查请求是否允许
async checkLimit(clientId) {
this.removeOldRequests(clientId);
const requests = this.requests.get(clientId) || [];
if (requests.length >= this.options.maxRequests) {
return false;
}
requests.push(Date.now());
this.requests.set(clientId, requests);
return true;
}
// 移除过期请求记录
removeOldRequests(clientId) {
const now = Date.now();
const windowStart = now - this.options.windowMs;
const requests = this.requests.get(clientId) || [];
const validRequests = requests.filter(time => time > windowStart);
if (validRequests.length < requests.length) {
this.requests.set(clientId, validRequests);
}
}
}
最佳实践建议 💡
- 连接管理模式
// 1. 连接状态管理器
class ConnectionStateManager {
constructor() {
this.state = 'disconnected';
this.listeners = new Set();
}
// 更新状态
setState(newState) {
const oldState = this.state;
this.state = newState;
this.notifyListeners(oldState, newState);
}
// 添加状态监听器
addListener(listener) {
this.listeners.add(listener);
}
// 通知监听器
notifyListeners(oldState, newState) {
for (const listener of this.listeners) {
listener(oldState, newState);
}
}
}
// 2. 重试策略
class RetryStrategy {
constructor(options = {}) {
this.options = {
initialDelay: 1000,
maxDelay: 30000,
factor: 2,
maxAttempts: 5,
...options
};
}
// 计算延迟时间
getDelay(attempt) {
const delay = this.options.initialDelay *
Math.pow(this.options.factor, attempt);
return Math.min(delay, this.options.maxDelay);
}
// 检查是否应该重试
shouldRetry(attempt, error) {
if (attempt >= this.options.maxAttempts) {
return false;
}
// 根据错误类型决定是否重试
return this.isRetryableError(error);
}
}
// 3. 日志记录器
class CommunicationLogger {
constructor() {
this.logs = [];
this.maxLogs = 1000;
}
// 记录日志
log(type, data) {
const logEntry = {
type,
data,
timestamp: Date.now()
};
this.logs.push(logEntry);
if (this.logs.length > this.maxLogs) {
this.logs.shift();
}
this.persistLogs();
}
// 持久化日志
persistLogs() {
try {
localStorage.setItem('communication_logs',
JSON.stringify(this.logs));
} catch (error) {
console.error('Failed to persist logs:', error);
}
}
}
结语 📝
实时通信系统是现代Web应用中的重要组成部分。通过本文,我们学习了:
- WebSocket通信的基础实现
- 消息队列系统的设计
- 实时数据同步机制
- 性能优化策略
- 安全性考虑和最佳实践
💡 学习建议:在实现实时通信系统时,要特别注意连接的可靠性和消息的可靠传递。同时,要根据实际需求选择合适的同步策略,平衡实时性和系统负载。
如果你觉得这篇文章有帮助,欢迎点赞收藏,也期待在评论区看到你的想法和建议!👇
终身学习,共同成长。
咱们下一期见
💻