当前位置: 首页 > article >正文

客户端发送http请求进行流量控制

客户端发送http请求进行流量控制

实现方式 1:使用 Semaphore (信号量) 控制流量

asyncio.Semaphore 是一种简单的流控方法,可以用来限制并发请求数量。

import asyncio
import aiohttp
import time

class HttpClientWithSemaphore:
    def __init__(self, max_concurrent_requests=5, request_period=10):
        self.max_concurrent_requests = max_concurrent_requests
        self.request_period = request_period
        self.semaphore = asyncio.Semaphore(max_concurrent_requests)
        self.session = aiohttp.ClientSession()

    async def fetch(self, url):
        async with self.semaphore:
            try:
                async with self.session.get(url) as response:
                    return await response.text()
            except Exception as e:
                print(f"Request failed: {e}")
                return None

    async def close(self):
        await self.session.close()

async def main_with_semaphore():
    client = HttpClientWithSemaphore(max_concurrent_requests=5)
    urls = [
        "http://example.com/api/1",
        "http://example.com/api/2",
        "http://example.com/api/3",
        "http://example.com/api/4",
        "http://example.com/api/5",
        "http://example.com/api/6",
    ]

    tasks = [client.fetch(url) for url in urls]
    responses = await asyncio.gather(*tasks)

    for response in responses:
        if response:
            print(response)

    await client.close()

if __name__ == "__main__":
    asyncio.run(main_with_semaphore())

优点

  • 简单易实现,使用内置的 asyncio.Semaphore 就能限制并发请求数量。
  • 易于维护,代码简单清晰。

缺点

  • 缺少精细的流控机制,例如每 10 秒内限制请求数量(只能控制总并发数量)。
  • 难以适应更加复杂的流控需求。

实现方式 2:使用滑动窗口 (Sliding Window) 算法

滑动窗口算法是一种可以精确控制在一定时间内的请求数量的机制。它能平滑地调整速率。

import asyncio
import aiohttp
from collections import deque
import time

class SlidingWindowRateLimiter:
    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.timestamps = deque()

    async def acquire(self):
        current_time = time.monotonic()
        # 清理超出窗口时间的旧请求
        while self.timestamps and current_time - self.timestamps[0] > self.window_seconds:
            self.timestamps.popleft()

        if len(self.timestamps) < self.max_requests:
            self.timestamps.append(current_time)
            return True
        else:
            # 计算需要等待的时间
            sleep_time = self.window_seconds - (current_time - self.timestamps[0])
            await asyncio.sleep(sleep_time)
            return await self.acquire()

class HttpClientWithSlidingWindow:
    def __init__(self, max_requests_per_period=5, period=10):
        self.rate_limiter = SlidingWindowRateLimiter(max_requests_per_period, period)
        self.session = aiohttp.ClientSession()

    async def fetch(self, url):
        await self.rate_limiter.acquire()
        try:
            async with self.session.get(url) as response:
                return await response.text()
        except Exception as e:
            print(f"Request failed: {e}")
            return None

    async def close(self):
        await self.session.close()

async def main_with_sliding_window():
    client = HttpClientWithSlidingWindow(max_requests_per_period=5, period=10)
    urls = [
        "http://example.com/api/1",
        "http://example.com/api/2",
        "http://example.com/api/3",
        "http://example.com/api/4",
        "http://example.com/api/5",
        "http://example.com/api/6",
    ]

    tasks = [client.fetch(url) for url in urls]
    responses = await asyncio.gather(*tasks)

    for response in responses:
        if response:
            print(response)

    await client.close()

if __name__ == "__main__":
    asyncio.run(main_with_sliding_window())

优点

  • 更加精确地控制时间窗口内的请求数量。
  • 平滑控制请求速率,适用于需要稳定流量的情况。

缺点

  • 实现稍复杂,需要维护一个请求时间戳队列。
  • 在极端条件下,如果有大量请求积压,可能会造成延迟波动。

实现方式 3:使用 aiolimiter 第三方库

aiolimiter 是一个专门用于异步流控的 Python 库,支持令牌桶和滑动窗口算法。

安装 aiolimiter

pip install aiolimiter

代码示例

import asyncio
import aiohttp
from aiolimiter import AsyncLimiter

class HttpClientWithAiolimiter:
    def __init__(self, max_requests_per_period=5, period=10):
        # 初始化流控器,每10秒允许5个请求
        self.limiter = AsyncLimiter(max_requests_per_period, period)
        self.session = aiohttp.ClientSession()

    async def fetch(self, url):
        async with self.limiter:
            try:
                async with self.session.get(url) as response:
                    return await response.text()
            except Exception as e:
                print(f"Request failed: {e}")
                return None

    async def close(self):
        await self.session.close()

async def main_with_aiolimiter():
    client = HttpClientWithAiolimiter(max_requests_per_period=5, period=10)
    urls = [
        "http://example.com/api/1",
        "http://example.com/api/2",
        "http://example.com/api/3",
        "http://example.com/api/4",
        "http://example.com/api/5",
        "http://example.com/api/6",
    ]

    tasks = [client.fetch(url) for url in urls]
    responses = await asyncio.gather(*tasks)

    for response in responses:
        if response:
            print(response)

    await client.close()

if __name__ == "__main__":
    asyncio.run(main_with_aiolimiter())

优点

  • 使用方便,aiolimiter 直接支持流控机制。
  • 代码简洁且配置灵活,可直接设置流控参数。
  • 第三方库已经过优化,适合快速开发。

缺点

  • 依赖于外部库,需要额外安装。
  • 灵活性相对有限,无法完全控制算法的细节。

比较总结

实现方式优点缺点适用场景
信号量控制 (Semaphore)简单易实现,易于维护控制粒度较粗,不适合复杂流控适合简单并发控制场景
滑动窗口 (Sliding Window)精确控制时间窗口内的请求数量,平滑控制请求速率实现稍复杂,可能出现延迟波动适合需要精确流控的场景
aiolimiter 第三方库使用方便,代码简洁,库优化良好依赖外部库,灵活性相对有限适合快速实现流控的项目

希望这些不同的实现方式和比较能够帮助你选择适合的 HTTP 客户端实现方案。如果你对某种实现方式有特别的需求或疑问,请随时告知!


http://www.kler.cn/a/394595.html

相关文章:

  • [DEBUG] 服务器 CORS 已经允许所有源,仍然有 304 的跨域问题
  • 算法学习第一弹——C++基础
  • 封装一个省市区的筛选组件
  • MFC工控项目实例二十九主对话框调用子对话框设定参数值
  • sol机器人pump机器人如何实现盈利的?什么是Pump 扫链机器人?
  • 代码随想录第二十一天| 669. 修剪二叉搜索树 108.将有序数组转换为二叉搜索树 538.把二叉搜索树转换为累加树
  • 如何绕过Captcha并使用OCR技术抓取数据
  • 如何轻松导出所有 WordPress URL 为纯文本格式
  • 红黑树
  • 【日常记录-Git】如何为post-checkout脚本传递参数
  • 【SQL】sql常用命令
  • uniapp在app模式下组件传值
  • 【go从零单排】Ticker
  • Python自动化操作JSON文件详解
  • 在Ubuntu 24.04 LTS上安装飞桨PaddleX
  • uniapp隐藏自带的tabBar
  • 【刷题19】队列+bfs专题
  • 生成自签名证书并配置 HTTPS 使用自签名证书
  • uni-app快速入门(四)--maninfest.json及pages.json配置
  • CSS新特性
  • Ai编程从零开始全栈开发一个后台管理系统之用户登录、权限控制、用户管理-前端部分(十二)
  • nacos配置中心入门
  • 【达梦数据库】参数优化脚本主要改什么
  • spark.default.parallelism 在什么时候起作用,与spark.sql.shuffle.partitions有什么异同点?
  • LaTeX中浮动体(图片、表格)的位置及上下间距设置
  • 使用命令强制给ESXI上的硬盘分区