当前位置: 首页 > article >正文

如何限制大量请求并发

前言:

1、主流浏览器在 HTTP/1.1 下对同一域名的最大并发请求数通常是 6~8 个。超过限制的请求会进入队列,等待空闲的连接。
2、可以利用Promise模拟任务队列,控制并发请求数量,以避免对服务器造成过大的压力。(先进先出)

代码

一个简单的请求队列调度器,用于控制并发请求的最大数量

import axios from 'axios'

export const handQueue = (
  reqs // 请求总数
) => {
  reqs = reqs || []
  
  const requestQueue = (concurrency) => {
    concurrency = concurrency || 6 // 最大并发数
    const queue = [] // 请求池
    let current = 0
    
    const dequeue = () => {
      while (current < concurrency && queue.length) {
        current++;
        const requestPromiseFactory = queue.shift() // 出列
        requestPromiseFactory()
          .then(() => { // 成功的请求逻辑
          })
          .catch(error => { // 失败
            console.log(error)
          })
          .finally(() => {
            current--
            dequeue()
          });
      }

    }

    return (requestPromiseFactory) => {
      queue.push(requestPromiseFactory) // 入队
      dequeue()
    }

  }

  const enqueue = requestQueue(6)

  for (let i = 0; i < reqs.length; i++) {

    enqueue(() => axios.get('/api/test' + i))
  }
}

功能拆解

  1. handQueue 函数:
    参数 reqs: 是一个数组,包含需要发送的请求。
    函数的主要目的是对这些请求进行队列管理,确保并发请求的数量不会超过设定的上限。

  2. requestQueue 函数:
    concurrency:最大并发数
    enqueue:用于将新的请求添加到队列并处理它们
    queue: 请求池,用于存储待处理的请求。
    current: 当前正在执行的请求数。

  3. dequeue 函数:

    1、从请求池中取出请求并发送。它在一个循环中运行,直到当前并发请求数current达到最大并发数concurrency或请求池queue为空。
    2、对于每个出队的请求,它首先增加current的值,然后调用请求函数requestPromiseFactory来发送请求。
    3、当请求完成(无论成功还是失败)后,它会减少current的值并再次调用dequeue,以便处理下一个请求。
    
    const dequeue = () => {  
      while (current < concurrency && queue.length) {  
        current++;  
        const requestPromiseFactory = queue.shift() // 出列  
        requestPromiseFactory()  
          .then(() => { // 成功的请求逻辑  
          })  
          .catch(error => { // 失败  
            console.log(error)  
          })  
          .finally(() => {  
            current--  
            dequeue()  
          });  
      }  
    }
    
    // 定义返回请求入队函数
    return (requestPromiseFactory) => {  
      queue.push(requestPromiseFactory) // 入队  
      dequeue()  
    }
    // 函数返回一个函数,这个函数接受一个参数requestPromiseFactory,表示一个返回Promise的请求工厂函数。
    // 这个返回的函数将请求工厂函数加入请求池queue,并调用dequeue来尝试发送新的请求,
    // 当然也可以自定义axios,利用Promise.all统一处理返回后的结果。
    

    使用一个 while 循环,当当前请求数 current 小于最大并发数 concurrency 并且队列 queue 中有待处理请求时:
    1、从队列中取出第一个请求工厂 requestPromiseFactory。
    2、调用 requestPromiseFactory() 开始请求。
    3、使用 then 和 catch 处理请求成功和失败的逻辑。
    4、在 finally 中,减少当前请求数 current,并递归调用 dequeue,以确保队列持续运行。

  4. 分发逻辑:
    for 循环遍历传入的请求数组 reqs。
    对于每个请求,通过 enqueue 将其加入队列。
    每个请求通过工厂函数 () => axios.get(‘/api/test’ + i) 包装,保证每次调用都返回新的请求 Promise。

优化

  1. 超时管理:
    如果某个请求卡住,会阻塞队列,可为请求设置超时,例如:

    const timeout = (promise, ms) =>
      Promise.race([
        promise,
        new Promise((_, reject) => setTimeout(() => reject('Timeout'), ms)),
      ]);
    
  2. 错误处理:
    重试或告警机制:

     const retry = (fn, retries = 3) =>
      fn().catch(err => (retries > 0 ? retry(fn, retries - 1) : Promise.reject(err)));
    
  3. 队列清空检测:
    在所有请求处理完成后,触发回调或事件通知:

    if (queue.length === 0 && current === 0) {
      console.log('All requests completed');
    }
    
  4. 优化后的代码

    import axios from "axios";
    /**
     * 处理并发请求队列
     * @param {Array<Function>} reqs 请求总数
     * @param {Object} options 配置选项
     * @param {number} options.concurrency 最大并发数
     * @param {number} options.timeout 请求超时时间 (ms)
     * @param {Function} options.onQueueEmpty 队列清空后的回调
     */
    export const handQueue = (reqs, options = {}) => {
      const { concurrency = 6, timeout = 10000, onQueueEmpty = () => {} } = options;
    
      // 确保请求队列为数组
      reqs = reqs || [];
      const queue = [];
      let current = 0;
    
      // 超时管理器
      const timeoutRequest = (requestPromise, timeout) =>
        new Promise((resolve, reject) => {
          const timer = setTimeout(() => reject(new Error("Request timeout")), timeout);
          requestPromise
            .then((res) => {
              clearTimeout(timer);
              resolve(res);
            })
            .catch((err) => {
              clearTimeout(timer);
              reject(err);
            });
        });
    
      // 出列处理
      const dequeue = async () => {
        while (current < concurrency && queue.length) {
          current++;
          const requestPromiseFactory = queue.shift(); // 出列
    
          try {
            await timeoutRequest(requestPromiseFactory(), timeout); // 包装超时处理
            console.log("Request success");
          } catch (error) {
            console.error("Request failed:", error.message); // 错误处理
          } finally {
            current--;
            dequeue();
    
            // 如果队列为空且没有正在处理的请求,调用队列清空回调
            if (queue.length === 0 && current === 0) {
              onQueueEmpty();
            }
          }
        }
      };
    
      // 入队函数
      const enqueue = (requestPromiseFactory) => {
        queue.push(requestPromiseFactory);
        dequeue();
      };
    
      // 将请求添加到队列
      for (let i = 0; i < reqs.length; i++) {
        enqueue(() => axios.get(`/api/test${i}`));
      }
    };
    
  5. 使用

    const reqs = Array.from({ length: 20 }, (_, i) => () => axios.get(`/api/test${i}`));
    handQueue(reqs, {
     concurrency: 4, // 最大并发数
     timeout: 5000,  // 超时时间 5 秒
     onQueueEmpty: () => {
       console.log("All requests have been processed!");
     },
    });
    

http://www.kler.cn/a/470496.html

相关文章:

  • 在环境冲突情况下调整优先级以解决ROS Catkin构建中缺少模块的问题【ubuntu20.04】
  • 解决 IntelliJ IDEA 中 Tomcat 日志乱码问题的详细指南
  • 详解GPT-信息抽取任务 (GPT-3 FAMILY LARGE LANGUAGE MODELS)
  • 力扣23.合并K个升序链表
  • Speech Recognition vs. Voice Recognition | 语音识别工作原理 | 模型训练 | 应用
  • 【Android项目学习】3. MVVMHabit
  • iOS - 引用计数(ARC)
  • 《Python游戏编程入门》注-第9章7
  • EasyExcel数据的导入导出
  • 数据结构与算法-顺序表
  • 使用扣子实现营销获客套电机器人-工作流+多维表格+飞书机器人
  • 基于springboot的论坛管理系统
  • 【C语言程序设计——选择结构程序设计】求一元二次方程的根(头歌实践教学平台习题)【合集】
  • 【C++数据结构——图】最小生成树(头歌实践教学平台习题) 【合集】
  • html内容过长,实现向上循环滑动效果
  • 设计模式 结构型 外观模式(Facade Pattern)与 常见技术框架应用 解析
  • TCP Listen 原语:端口失衡、对称性及协议演进
  • Linux 磁盘管理命令:mkinitrd :建立要载入ramdisk 的映象文件ssm:命令行集中存储管理工具
  • 利用API接口提升电商平台用户体验的实践
  • 【HarmonyOS】鸿蒙应用实现屏幕录制详解和源码
  • 【Linux】深入理解进程信号机制:信号的产生、捕获与阻塞
  • Kafka【应用 04】Java实现筛选查询Kafka符合条件的最新数据(保证数据最新+修改map对象key的方法+获取指定数量的记录)源码分享粘贴可用
  • 生信技能69 - 使用deepvariant进行对基因组指定区域Calling SNPs/Indels
  • 机器学习经典算法——线性回归
  • Spring Boot(4)使用 IDEA 搭建 Spring Boot+MyBatis 项目全流程实战
  • 【PPTist】批注、选择窗格