当前位置: 首页 > article >正文

[ NodeJs ] worker pool

代码

import { cpus } from "node:os";
import { Worker } from "node:worker_threads";

/**
 * 获取合理的 worker 数量
 * @param {number} taskCount
 */
export function getRationalWorkerCount(taskCount) {
  let cpuCoreCount = cpus().length - 1;
  if (taskCount >= cpuCoreCount) {
    // 任务较多,
    // 使用 cpu 核心数来当做数量,
    // 剩余的任务进行排队
    return cpuCoreCount;
  } else {
    // 任务较少,
    // 使用 任务数 当做 数量,
    // 避免启动多余的 worker
    return taskCount;
  }
}

/**
 * ref: https://blog.skk.moe/post/say-hello-to-nodejs-worker-thread/
 */
export class WorkerPool {
  /**
   * @param {string} workerPath js 文件的路径
   * @param {number} workerCount worker 的数量 (默认 cpu 核心数 - 1)
   * @example
   * import path, { dirname } from "node:path";
   * import { fileURLToPath } from "node:url";
   *
   * const __filename = fileURLToPath(import.meta.url);
   * const __dirname = dirname(__filename);
   *
   * let list = [ "data1", "data2", "data3" ]
   *
   * let workerCount = getRationalWorkerCount(list.length)
   *
   * // 注意路径,
   * // "./worker.js" 会定位到根目录,
   * // 所以这里需要传入绝对路径,才可以定位到 src/index.js 同级的 "src/worker.js"
   * let worker = new WorkerPool(path.join(__dirname, "worker.js"), workerCount);
   *
   * let await_list = [];
   * for (let item of ) {
   *   await_list.push(worker.run(item));
   * }
   * await Promise.all(await_list);
   * worker.destroyAllWorker();
   */
  constructor(workerPath, workerCount = getRationalWorkerCount(3)) {
    // 修正错误的 worker 数量
    if (workerCount < 1) {
      workerCount = 1;
    }

    /**
     * js 脚本路径
     * @private
     */
    this.workerPath = workerPath;
    /**
     * worker 数量
     * @private
     */
    this.workerCount = workerCount;

    /**
     * 任务队列
     * @private
     */
    this._queue = [];
    /**
     * worker 索引
     * @private
     */
    this._workersById = {};
    /**
     * worker 激活状态索引
     * @private
     */
    this._activeWorkersById = {};

    // 创建 worker
    for (let i = 0; i < this.workerCount; i++) {
      const worker = new Worker(workerPath);

      this._workersById[i] = worker;
      // 将这些 worker 设置为未激活状态
      this._activeWorkersById[i] = false;
    }
  }

  /**
   * 获取一个空闲 worker 的 id
   * @private
   * @returns
   */
  _getInactiveWorkerId() {
    for (let i = 0; i < this.workerCount; i++) {
      if (!this._activeWorkersById[i]) {
        return i;
      }
    }
    return -1;
  }

  /**
   * 调用指定的 worker 运行一个任务
   * @private
   * @param {*} workerId
   * @param {*} taskObj
   */
  _runWorker(workerId, taskObj) {
    const worker = this._workersById[workerId];

    /** 当任务执行完毕后执行 */
    const doAfterTaskIsFinished = () => {
      // 去除所有的 Listener,不然一次次添加不同的 Listener 会 OOM 的
      worker.removeAllListeners("message");
      worker.removeAllListeners("error");
      // 将这个 worker 设为未激活状态
      this._activeWorkersById[workerId] = false;

      if (this._queue.length) {
        // 任务队列非空,使用该 worker 执行任务队列中第一个任务
        this._runWorker(workerId, this._queue.shift());
      }
    };

    // 将这个 worker 设置为激活状态
    this._activeWorkersById[workerId] = true;
    // 设置两个回调,用于 worker 的监听器
    const messageCallback = (result) => {
      taskObj.callback(null, result);
      doAfterTaskIsFinished();
    };
    const errorCallback = (error) => {
      taskObj.callback(error);
      doAfterTaskIsFinished();
    };

    // 为 worker 添加 'message' 和 'error' 两个 Listener
    worker.once("message", messageCallback);
    worker.once("error", errorCallback);
    // 将数据传给 worker 供其获取和执行
    worker.postMessage(taskObj.data);
  }

  /**
   * 运行一个任务, (没有空闲 worker,则存到队列中)
   * @public
   * @param {any} data 传递给 message 事件的数据
   * @example
   * // file: worker.js
   * // nodejs worker 接收数据
   * import { parentPort } from "worker_threads";
   * parentPort.on("message", (data) => {
   *   console.log(data);
   *   // 完成任务,给主线程发消息
   *   parentPort?.postMessage(true);
   * });
   * @returns
   */
  run(data) {
    return new Promise((resolve, reject) => {
      // 获取一个空闲的 worker
      const availableWorkerId = this._getInactiveWorkerId();

      const taskObj = {
        data: data,
        callback: (error, result) => {
          if (error) {
            reject(error);
          } else {
            resolve(result);
          }
        },
      };

      if (availableWorkerId === -1) {
        // 当前没有空闲的 worker,把任务丢进队列里,这样一旦有 worker 空闲时就会开始执行。
        this._queue.push(taskObj);
        return null;
      }

      // 有一个空闲的 worker ,用它执行任务
      this._runWorker(availableWorkerId, taskObj);
    });
  }

  /**
   * 清理所有 worker
   * @public
   * @param {boolean} force
   */
  destroyAllWorker(force = false) {
    for (let i = 0; i < this.workerCount; i++) {
      if (this._activeWorkersById[i] && !force) {
        // 通常情况下,不应该在还有 worker 在执行的时候就销毁它,
        // 这一定是什么地方出了问题,所以还是抛个 Error 比较好
        // 不过保留一个 force 参数,总有人用得到的
        throw new Error(`The worker ${i} is still runing!`);
      }

      // 销毁这个 Worker
      this._workersById[i].terminate();
    }
  }
}

http://www.kler.cn/a/584532.html

相关文章:

  • TCP网络协议
  • MySQL(社区版)安装过程
  • 上下文微调(Contextual Fine-Tuning, CFT)提高大型语言模型(LLMs)在特定领域的学习和推理能力
  • 高级java每日一道面试题-2025年2月22日-数据库篇[Redis篇]-Redis是什么?
  • LangGraph 构建的工作流调用数据库的时候怎么添加重试机制
  • 基于Spring Boot的牙科诊所管理系统的设计与实现(LW+源码+讲解)
  • 上下文学习思维链COTPrompt工程
  • ClickHouse SQL优化:从原理到实战的深度指南
  • 【Grok3 deepseek-R1】利用伪逆方法求解机械臂速度级的运动方案
  • Unity知识总结——算法
  • 源IP泄露后如何涅槃重生?高可用架构与自动化防御体系设计
  • 计算机网络之应用层(控制连接与数据连接)
  • TCP为什么可靠?
  • 当今前沿技术:改变生活的新趋势与应用
  • Python个人学习笔记(15):模块(time)
  • 加速布局前置仓,京东健康自营大药房也要做即时零售仓店?
  • 定义未来!广东天谱科技集团有限公司荣获“GAS科创奖-产品创新奖”!
  • Python自动化脚本:2分钟快速搭建MTProto代理服务(支持多端口负载均衡)
  • C# 事件使用详解
  • logback希望特定的error日志写入到特定文件