客户端发送http请求进行流量控制
客户端发送http请求进行流量控制
实现方式 1:使用 Semaphore (信号量) 控制流量
asyncio.Semaphore
是一种简单的流控方法,可以用来限制并发请求数量。
import asyncio
import aiohttp
import time
class HttpClientWithSemaphore:
def __init__(self, max_concurrent_requests=5, request_period=10):
self.max_concurrent_requests = max_concurrent_requests
self.request_period = request_period
self.semaphore = asyncio.Semaphore(max_concurrent_requests)
self.session = aiohttp.ClientSession()
async def fetch(self, url):
async with self.semaphore:
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Request failed: {e}")
return None
async def close(self):
await self.session.close()
async def main_with_semaphore():
client = HttpClientWithSemaphore(max_concurrent_requests=5)
urls = [
"http://example.com/api/1",
"http://example.com/api/2",
"http://example.com/api/3",
"http://example.com/api/4",
"http://example.com/api/5",
"http://example.com/api/6",
]
tasks = [client.fetch(url) for url in urls]
responses = await asyncio.gather(*tasks)
for response in responses:
if response:
print(response)
await client.close()
if __name__ == "__main__":
asyncio.run(main_with_semaphore())
优点:
- 简单易实现,使用内置的
asyncio.Semaphore
就能限制并发请求数量。 - 易于维护,代码简单清晰。
缺点:
- 缺少精细的流控机制,例如每 10 秒内限制请求数量(只能控制总并发数量)。
- 难以适应更加复杂的流控需求。
实现方式 2:使用滑动窗口 (Sliding Window) 算法
滑动窗口算法是一种可以精确控制在一定时间内的请求数量的机制。它能平滑地调整速率。
import asyncio
import aiohttp
from collections import deque
import time
class SlidingWindowRateLimiter:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.timestamps = deque()
async def acquire(self):
current_time = time.monotonic()
# 清理超出窗口时间的旧请求
while self.timestamps and current_time - self.timestamps[0] > self.window_seconds:
self.timestamps.popleft()
if len(self.timestamps) < self.max_requests:
self.timestamps.append(current_time)
return True
else:
# 计算需要等待的时间
sleep_time = self.window_seconds - (current_time - self.timestamps[0])
await asyncio.sleep(sleep_time)
return await self.acquire()
class HttpClientWithSlidingWindow:
def __init__(self, max_requests_per_period=5, period=10):
self.rate_limiter = SlidingWindowRateLimiter(max_requests_per_period, period)
self.session = aiohttp.ClientSession()
async def fetch(self, url):
await self.rate_limiter.acquire()
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Request failed: {e}")
return None
async def close(self):
await self.session.close()
async def main_with_sliding_window():
client = HttpClientWithSlidingWindow(max_requests_per_period=5, period=10)
urls = [
"http://example.com/api/1",
"http://example.com/api/2",
"http://example.com/api/3",
"http://example.com/api/4",
"http://example.com/api/5",
"http://example.com/api/6",
]
tasks = [client.fetch(url) for url in urls]
responses = await asyncio.gather(*tasks)
for response in responses:
if response:
print(response)
await client.close()
if __name__ == "__main__":
asyncio.run(main_with_sliding_window())
优点:
- 更加精确地控制时间窗口内的请求数量。
- 平滑控制请求速率,适用于需要稳定流量的情况。
缺点:
- 实现稍复杂,需要维护一个请求时间戳队列。
- 在极端条件下,如果有大量请求积压,可能会造成延迟波动。
实现方式 3:使用 aiolimiter
第三方库
aiolimiter
是一个专门用于异步流控的 Python 库,支持令牌桶和滑动窗口算法。
安装 aiolimiter
:
pip install aiolimiter
代码示例:
import asyncio
import aiohttp
from aiolimiter import AsyncLimiter
class HttpClientWithAiolimiter:
def __init__(self, max_requests_per_period=5, period=10):
# 初始化流控器,每10秒允许5个请求
self.limiter = AsyncLimiter(max_requests_per_period, period)
self.session = aiohttp.ClientSession()
async def fetch(self, url):
async with self.limiter:
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Request failed: {e}")
return None
async def close(self):
await self.session.close()
async def main_with_aiolimiter():
client = HttpClientWithAiolimiter(max_requests_per_period=5, period=10)
urls = [
"http://example.com/api/1",
"http://example.com/api/2",
"http://example.com/api/3",
"http://example.com/api/4",
"http://example.com/api/5",
"http://example.com/api/6",
]
tasks = [client.fetch(url) for url in urls]
responses = await asyncio.gather(*tasks)
for response in responses:
if response:
print(response)
await client.close()
if __name__ == "__main__":
asyncio.run(main_with_aiolimiter())
优点:
- 使用方便,
aiolimiter
直接支持流控机制。 - 代码简洁且配置灵活,可直接设置流控参数。
- 第三方库已经过优化,适合快速开发。
缺点:
- 依赖于外部库,需要额外安装。
- 灵活性相对有限,无法完全控制算法的细节。
比较总结
实现方式 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
信号量控制 (Semaphore ) | 简单易实现,易于维护 | 控制粒度较粗,不适合复杂流控 | 适合简单并发控制场景 |
滑动窗口 (Sliding Window) | 精确控制时间窗口内的请求数量,平滑控制请求速率 | 实现稍复杂,可能出现延迟波动 | 适合需要精确流控的场景 |
aiolimiter 第三方库 | 使用方便,代码简洁,库优化良好 | 依赖外部库,灵活性相对有限 | 适合快速实现流控的项目 |
希望这些不同的实现方式和比较能够帮助你选择适合的 HTTP 客户端实现方案。如果你对某种实现方式有特别的需求或疑问,请随时告知!