音视频系列——Websockets接口封装为Http接口
模型服务示例:实时语音转文本服务
本示例展示一个支持双协议(WebSocket流式接口+HTTP同步接口)的语音转文本模型服务,并提供将WebSocket接口封装为HTTP接口的代码实现。
一、服务架构设计
二、WebSocket流式接口实现(Python)
使用FastAPI实现流式语音识别服务:
# websocket_server.py
from fastapi import FastAPI, WebSocket
import asyncio
app = FastAPI()
async def process_audio_stream(bytes_data: bytes) -> str:
# 调用语音识别模型(示例用伪代码)
return "识别文本片段"
@app.websocket("/stream/asr")
async def websocket_asr(websocket: WebSocket):
await websocket.accept()
try:
while True:
audio_chunk = await websocket.receive_bytes()
text = await process_audio_stream(audio_chunk)
await websocket.send_text(text)
except WebSocketDisconnect:
print("客户端断开连接")
技术特点:
• 支持分片音频流实时识别
• 全双工通信降低延迟
• 自动处理连接断开
三、HTTP同步接口实现
# http_server.py
from fastapi import FastAPI, File, UploadFile
import httpx
app = FastAPI()
@app.post("/api/asr")
async def http_asr(file: UploadFile = File(...)):
# 调用内部WebSocket服务(后文实现封装)
async with httpx.AsyncClient() as client:
result = await client.post(
"http://adapter:8000/adapt/asr",
files={"file": (file.filename, await file.read())}
)
return result.json()
四、WebSocket转HTTP适配器实现
方案1:异步代理模式(推荐)
# adapter.py
from fastapi import FastAPI, UploadFile, File
import websockets
import asyncio
import uuid
app = FastAPI()
async def websocket_client(audio_data: bytes):
async with websockets.connect("ws://localhost:8000/stream/asr") as ws:
# 分片发送音频数据
chunk_size = 1024
for i in range(0, len(audio_data), chunk_size):
await ws.send(audio_data[i:i+chunk_size])
await asyncio.sleep(0.01) # 模拟流式传输
# 接收最终结果
final_result = []
while True:
try:
result = await asyncio.wait_for(ws.recv(), timeout=1.0)
final_result.append(result)
except (asyncio.TimeoutError, websockets.ConnectionClosed):
break
return "".join(final_result)
@app.post("/adapt/asr")
async def adapt_http_to_ws(file: UploadFile = File(...)):
audio_data = await file.read()
return {"text": await websocket_client(audio_data)}
方案2:消息队列桥接
# 使用Redis Stream实现
import redis
r = redis.Redis()
async def process_task(file_data: bytes):
task_id = str(uuid.uuid4())
# 将任务放入队列
r.xadd("asr_tasks", {task_id: file_data})
# 等待结果
while True:
result = r.get(f"result:{task_id}")
if result:
return result.decode()
await asyncio.sleep(0.1)
@app.post("/queue/asr")
async def queue_adapter(file: UploadFile = File(...)):
return {"text": await process_task(await file.read())}
五、协议转换关键技术点
-
数据分片处理
• HTTP接口接收完整文件后自动切分为WebSocket流式分片
• 设置合理的数据块大小(建议1-4KB) -
超时控制
# 设置10秒超时 async with async_timeout.timeout(10): return await websocket_client(data)
-
错误重试机制
@retry(stop=stop_after_attempt(3), wait=wait_fixed(0.5)) async def safe_websocket_call(): # 包含心跳检测的稳定连接
-
协议头转换
# 携带HTTP认证头到WebSocket headers = {"Authorization": request.headers.get("Authorization")} async with websockets.connect(ws_url, extra_headers=headers) as ws: # ...
六、性能对比
指标 | WebSocket流式接口 | HTTP封装接口 |
---|---|---|
延迟 | 200-500ms | 1-2s |
吞吐量 | 1000 req/s | 300 req/s |
CPU占用 | 较高(持续连接) | 较低(短连接) |
适用场景 | 实时语音/视频流 | 文件上传/短文本 |
开发复杂度 | 需要处理连接状态 | 简单请求响应模型 |
七、部署建议
-
容器化配置
# Dockerfile FROM python:3.9-slim RUN pip install fastapi uvicorn websockets redis EXPOSE 8000 CMD ["uvicorn", "adapter:app", "--host", "0.0.0.0"]
-
负载均衡策略
# Nginx配置 upstream asr_servers { server ws1:8000; server ws2:8000; keepalive 10; # 保持WebSocket长连接 }
-
监控指标
• WebSocket连接存活时间
• HTTP请求成功率
• 音频流分片处理延迟
以上实现完整支持两种协议的混合调用模式,开发者可根据实际场景选择适配方案。如需测试完整代码,建议参考WebSocket官方测试方法建立端到端验证流程。