Node.js系列(1)--架构设计指南
Node.js架构设计指南 🏗️
引言
Node.js作为一个高性能的JavaScript运行时环境,其架构设计对于构建可扩展的服务端应用至关重要。本文将深入探讨Node.js的架构设计原则、最佳实践和实现方案。
架构概述
Node.js架构主要包括以下方面:
- 事件驱动:基于事件循环的异步非阻塞架构
- 模块系统:CommonJS和ES模块系统
- 进程模型:单线程主进程与工作线程
- 流处理:基于Stream的数据处理
- 错误处理:异步错误处理与进程异常管理
架构实现
应用架构管理器
// 应用架构管理器
class ApplicationArchitecture {
private static instance: ApplicationArchitecture;
private config: ArchitectureConfig;
private modules: Map<string, ModuleWrapper>;
private workers: Map<string, Worker>;
private streams: Map<string, StreamHandler>;
private constructor() {
this.modules = new Map();
this.workers = new Map();
this.streams = new Map();
this.config = {
maxWorkers: os.cpus().length,
modulePrefix: 'app',
streamBufferSize: 1024 * 1024
};
}
// 获取单例实例
static getInstance(): ApplicationArchitecture {
if (!ApplicationArchitecture.instance) {
ApplicationArchitecture.instance = new ApplicationArchitecture();
}
return ApplicationArchitecture.instance;
}
// 初始化应用架构
init(config: ArchitectureConfig): void {
this.config = { ...this.config, ...config };
// 初始化模块系统
this.initializeModuleSystem();
// 初始化工作线程
this.initializeWorkers();
// 初始化流处理
this.initializeStreams();
// 设置错误处理
this.setupErrorHandling();
}
// 初始化模块系统
private initializeModuleSystem(): void {
// 设置模块加载器
require.extensions['.ts'] = (module: any, filename: string) => {
const content = fs.readFileSync(filename, 'utf8');
module._compile(content, filename);
};
// 注册内置模块
this.registerCoreModules();
}
// 注册核心模块
private registerCoreModules(): void {
const coreModules = [
'logger',
'config',
'database',
'cache',
'queue'
];
coreModules.forEach(name => {
const module = require(`./core/${name}`);
this.registerModule(name, module);
});
}
// 注册模块
registerModule(name: string, moduleImpl: any): void {
const wrapper = new ModuleWrapper(name, moduleImpl);
this.modules.set(name, wrapper);
}
// 获取模块
getModule(name: string): any {
const wrapper = this.modules.get(name);
if (!wrapper) {
throw new Error(`Module ${name} not found`);
}
return wrapper.getInstance();
}
// 初始化工作线程
private initializeWorkers(): void {
for (let i = 0; i < this.config.maxWorkers; i++) {
const worker = new Worker('./worker.js');
this.workers.set(`worker-${i}`, worker);
// 设置工作线程消息处理
worker.on('message', this.handleWorkerMessage.bind(this));
worker.on('error', this.handleWorkerError.bind(this));
worker.on('exit', this.handleWorkerExit.bind(this));
}
}
// 处理工作线程消息
private handleWorkerMessage(message: any): void {
console.log('Worker message:', message);
}
// 处理工作线程错误
private handleWorkerError(error: Error): void {
console.error('Worker error:', error);
}
// 处理工作线程退出
private handleWorkerExit(code: number): void {
console.log('Worker exit with code:', code);
}
// 初始化流处理
private initializeStreams(): void {
// 创建标准流处理器
this.createStreamHandler('stdout', process.stdout);
this.createStreamHandler('stderr', process.stderr);
// 创建自定义流处理器
this.createStreamHandler('file', fs.createWriteStream('app.log'));
this.createStreamHandler('network', new net.Socket());
}
// 创建流处理器
private createStreamHandler(
name: string,
stream: NodeJS.WritableStream
): void {
const handler = new StreamHandler(stream, {
highWaterMark: this.config.streamBufferSize
});
this.streams.set(name, handler);
}
// 获取流处理器
getStream(name: string): StreamHandler {
const handler = this.streams.get(name);
if (!handler) {
throw new Error(`Stream ${name} not found`);
}
return handler;
}
// 设置错误处理
private setupErrorHandling(): void {
// 处理未捕获的异常
process.on('uncaughtException', this.handleUncaughtException.bind(this));
// 处理未处理的Promise拒绝
process.on('unhandledRejection', this.handleUnhandledRejection.bind(this));
// 处理进程信号
process.on('SIGTERM', this.handleProcessTermination.bind(this));
process.on('SIGINT', this.handleProcessTermination.bind(this));
}
// 处理未捕获的异常
private handleUncaughtException(error: Error): void {
console.error('Uncaught exception:', error);
this.gracefulShutdown();
}
// 处理未处理的Promise拒绝
private handleUnhandledRejection(
reason: any,
promise: Promise<any>
): void {
console.error('Unhandled rejection:', reason);
}
// 处理进程终止
private handleProcessTermination(): void {
console.log('Process termination requested');
this.gracefulShutdown();
}
// 优雅关闭
async gracefulShutdown(): Promise<void> {
try {
// 停止接受新的请求
console.log('Stopping new requests...');
// 等待现有请求完成
console.log('Waiting for existing requests...');
await this.waitForRequests();
// 关闭工作线程
console.log('Closing workers...');
await this.closeWorkers();
// 关闭流
console.log('Closing streams...');
await this.closeStreams();
// 关闭模块
console.log('Closing modules...');
await this.closeModules();
console.log('Graceful shutdown completed');
process.exit(0);
} catch (error) {
console.error('Error during shutdown:', error);
process.exit(1);
}
}
// 等待请求完成
private async waitForRequests(): Promise<void> {
// 实现等待逻辑
await new Promise(resolve => setTimeout(resolve, 5000));
}
// 关闭工作线程
private async closeWorkers(): Promise<void> {
const promises = Array.from(this.workers.values()).map(worker => {
return new Promise<void>((resolve, reject) => {
worker.terminate()
.then(() => resolve())
.catch(reject);
});
});
await Promise.all(promises);
}
// 关闭流
private async closeStreams(): Promise<void> {
const promises = Array.from(this.streams.values()).map(handler => {
return handler.close();
});
await Promise.all(promises);
}
// 关闭模块
private async closeModules(): Promise<void> {
const promises = Array.from(this.modules.values()).map(wrapper => {
return wrapper.destroy();
});
await Promise.all(promises);
}
}
// 模块包装器
class ModuleWrapper {
private instance: any;
constructor(
private name: string,
private moduleImpl: any
) {
this.instance = new moduleImpl();
}
getInstance(): any {
return this.instance;
}
async destroy(): Promise<void> {
if (typeof this.instance.destroy === 'function') {
await this.instance.destroy();
}
}
}
// 流处理器
class StreamHandler {
private buffer: any[] = [];
private isProcessing: boolean = false;
constructor(
private stream: NodeJS.WritableStream,
private options: StreamOptions
) {}
// 写入数据
write(data: any): void {
this.buffer.push(data);
if (this.buffer.length >= this.options.highWaterMark) {
this.flush();
}
if (!this.isProcessing) {
this.process();
}
}
// 处理缓冲数据
private async process(): Promise<void> {
this.isProcessing = true;
while (this.buffer.length > 0) {
if (!this.stream.writableCorked) {
await this.flush();
}
await new Promise(resolve => setTimeout(resolve, 100));
}
this.isProcessing = false;
}
// 刷新缓冲区
private async flush(): Promise<void> {
if (this.buffer.length === 0) {
return;
}
const chunk = this.buffer.splice(0, this.options.highWaterMark);
return new Promise((resolve, reject) => {
this.stream.write(chunk.join(''), error => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}
// 关闭流
async close(): Promise<void> {
// 刷新剩余数据
await this.flush();
// 关闭流
if (this.stream.end) {
await new Promise<void>((resolve, reject) => {
this.stream.end(error => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}
}
}
// 接口定义
interface ArchitectureConfig {
maxWorkers: number;
modulePrefix: string;
streamBufferSize: number;
}
interface StreamOptions {
highWaterMark: number;
}
// 使用示例
const architecture = ApplicationArchitecture.getInstance();
// 初始化应用架构
architecture.init({
maxWorkers: 4,
modulePrefix: 'myapp',
streamBufferSize: 1024 * 1024
});
// 注册自定义模块
class CustomModule {
async init(): Promise<void> {
console.log('Custom module initialized');
}
async destroy(): Promise<void> {
console.log('Custom module destroyed');
}
}
architecture.registerModule('custom', CustomModule);
// 获取模块
const customModule = architecture.getModule('custom');
await customModule.init();
// 使用流处理器
const logStream = architecture.getStream('file');
logStream.write('Application started\n');
// 优雅关闭
process.on('SIGTERM', () => {
architecture.gracefulShutdown();
});
事件驱动实现
// 事件管理器
class EventManager {
private handlers: Map<string, Set<EventHandler>>;
private maxListeners: number;
constructor(maxListeners: number = 10) {
this.handlers = new Map();
this.maxListeners = maxListeners;
}
// 添加事件监听器
on(event: string, handler: EventHandler): void {
if (!this.handlers.has(event)) {
this.handlers.set(event, new Set());
}
const handlers = this.handlers.get(event)!;
if (handlers.size >= this.maxListeners) {
console.warn(
`Warning: Event ${event} has exceeded maximum listeners`
);
}
handlers.add(handler);
}
// 移除事件监听器
off(event: string, handler: EventHandler): void {
const handlers = this.handlers.get(event);
if (handlers) {
handlers.delete(handler);
}
}
// 触发事件
emit(event: string, ...args: any[]): void {
const handlers = this.handlers.get(event);
if (handlers) {
handlers.forEach(handler => {
try {
handler(...args);
} catch (error) {
console.error(
`Error in event handler for ${event}:`,
error
);
}
});
}
}
// 一次性事件监听器
once(event: string, handler: EventHandler): void {
const wrapper = (...args: any[]) => {
this.off(event, wrapper);
handler(...args);
};
this.on(event, wrapper);
}
// 移除所有监听器
removeAllListeners(event?: string): void {
if (event) {
this.handlers.delete(event);
} else {
this.handlers.clear();
}
}
// 获取监听器数量
listenerCount(event: string): number {
const handlers = this.handlers.get(event);
return handlers ? handlers.size : 0;
}
}
type EventHandler = (...args: any[]) => void;
// 使用示例
const events = new EventManager();
// 添加事件监听器
events.on('data', data => {
console.log('Received data:', data);
});
// 触发事件
events.emit('data', { id: 1, name: 'test' });
// 一次性事件
events.once('init', () => {
console.log('Initialization complete');
});
events.emit('init'); // 输出消息
events.emit('init'); // 无输出
最佳实践与建议
-
模块化设计
- 单一职责原则
- 高内聚低耦合
- 依赖注入
- 接口抽象
-
异步处理
- Promise链式调用
- async/await使用
- 错误处理
- 并发控制
-
资源管理
- 内存管理
- 连接池
- 缓存策略
- 垃圾回收
-
可靠性保障
- 错误恢复
- 优雅关闭
- 监控告警
- 日志记录
总结
Node.js架构设计需要考虑以下方面:
- 事件驱动模型的高效利用
- 模块化设计的合理实现
- 异步编程的最佳实践
- 资源管理的优化策略
- 可靠性和可维护性保障
通过合理的架构设计,可以构建高性能、可扩展的Node.js应用。
学习资源
- Node.js官方文档
- 架构设计模式
- 异步编程指南
- 性能优化实践
- 最佳实践案例
如果你觉得这篇文章有帮助,欢迎点赞收藏,也期待在评论区看到你的想法和建议!👇
终身学习,共同成长。
咱们下一期见
💻