AI Agent的测试与监控:保障稳定性的实战经验
在前面的文章中,我们讨论了 AI Agent 的各个核心模块。今天,我想聊聊如何保障 AI Agent 的稳定性。说实话,这个话题我一直很关注,因为在生产环境中,稳定性往往比功能更重要。
从一次线上事故说起
还记得去年一个深夜,我被紧急电话叫醒:
运维:老哥,AI 助手疯了!
我:怎么回事?
运维:它开始疯狂调用 API,已经触发了费用告警...
我:...(立即上线排查)
事后分析发现,是一个用户的特殊输入触发了 AI Agent 的自我对话循环。这个事故让我意识到:我们需要一个完善的测试和监控体系,来及时发现和预防这类问题。
测试体系设计
首先,我们来看测试体系的设计:
from typing import List, Dict, Any, Optional
from enum import Enum
from datetime import datetime
from pydantic import BaseModel
import asyncio
import pytest
class TestCase(BaseModel):
id: str
name: str
description: str
inputs: List[Dict[str, Any]]
expected_outputs: List[Dict[str, Any]]
setup: Optional[Dict[str, Any]]
cleanup: Optional[Dict[str, Any]]
class TestResult(BaseModel):
case_id: str
success: bool
actual_outputs: List[Dict[str, Any]]
error: Optional[str]
execution_time: float
timestamp: datetime
class TestSuite:
def __init__(
self,
agent,
cases: List[TestCase]
):
self.agent = agent
self.cases = cases
self.results: List[TestResult] = []
async def run(self) -> List[TestResult]:
for case in self.cases:
# 1. 设置测试环境
if case.setup:
await self._setup_case(case)
try:
# 2. 执行测试
start_time = datetime.now()
outputs = []
for input_data in case.inputs:
output = await self.agent.process(
input_data
)
outputs.append(output)
# 3. 验证结果
success = self._verify_outputs(
outputs,
case.expected_outputs
)
# 4. 记录结果
self.results.append(TestResult(
case_id=case.id,
success=success,
actual_outputs=outputs,
error=None,
execution_time=(
datetime.now() - start_time
).total_seconds(),
timestamp=datetime.now()
))
except Exception as e:
# 记录错误
self.results.append(TestResult(
case_id=case.id,
success=False,
actual_outputs=[],
error=str(e),
execution_time=0,
timestamp=datetime.now()
))
finally:
# 5. 清理环境
if case.cleanup:
await self._cleanup_case(case)
return self.results
def _verify_outputs(
self,
actual: List[Dict[str, Any]],
expected: List[Dict[str, Any]]
) -> bool:
if len(actual) != len(expected):
return False
for a, e in zip(actual, expected):
if not self._match_output(a, e):
return False
return True
def _match_output(
self,
actual: Dict[str, Any],
expected: Dict[str, Any]
) -> bool:
# 支持模式匹配和容错
for key, value in expected.items():
if key not in actual:
return False
if isinstance(value, str) and value.startswith("regex:"):
import re
pattern = value[6:]
if not re.match(pattern, str(actual[key])):
return False
elif actual[key] != value:
return False
return True
单元测试示例
@pytest.mark.asyncio
async def test_agent_basic_conversation():
# 1. 准备测试数据
case = TestCase(
id="test_001",
name="基础对话测试",
description="测试基本的问答功能",
inputs=[
{"message": "你好"},
{"message": "今天天气怎么样?"}
],
expected_outputs=[
{
"type": "text",
"content": "regex:^你好.*"
},
{
"type": "text",
"content": "regex:.*天气.*"
}
]
)
# 2. 创建测试套件
suite = TestSuite(
agent=TestAgent(),
cases=[case]
)
# 3. 运行测试
results = await suite.run()
# 4. 验证结果
assert len(results) == 1
assert results[0].success
assert results[0].error is None
@pytest.mark.asyncio
async def test_agent_error_handling():
# 测试错误处理
case = TestCase(
id="test_002",
name="错误处理测试",
description="测试异常输入的处理",
inputs=[
{"message": ""}, # 空消息
{"message": "x" * 10000} # 超长消息
],
expected_outputs=[
{
"type": "error",
"code": "EMPTY_MESSAGE"
},
{
"type": "error",
"code": "MESSAGE_TOO_LONG"
}
]
)
suite = TestSuite(
agent=TestAgent(),
cases=[case]
)
results = await suite.run()
assert results[0].success
性能测试框架
class PerformanceMetrics(BaseModel):
latency_p50: float
latency_p95: float
latency_p99: float
requests_per_second: float
error_rate: float
token_usage: int
class LoadTest:
def __init__(
self,
agent,
test_duration: int = 300, # 5分钟
concurrent_users: int = 10
):
self.agent = agent
self.duration = test_duration
self.users = concurrent_users
self.metrics: List[Dict[str, float]] = []
async def run(self) -> PerformanceMetrics:
# 1. 准备测试数据
test_data = self._generate_test_data()
# 2. 创建用户会话
sessions = [
self._simulate_user(test_data)
for _ in range(self.users)
]
# 3. 运行测试
start_time = datetime.now()
results = await asyncio.gather(*sessions)
# 4. 计算指标
return self._calculate_metrics(results)
async def _simulate_user(
self,
test_data: List[Dict[str, Any]]
):
results = []
for data in test_data:
start_time = datetime.now()
try:
response = await self.agent.process(data)
latency = (
datetime.now() - start_time
).total_seconds()
results.append({
"success": True,
"latency": latency,
"tokens": self._count_tokens(
response
)
})
except Exception as e:
results.append({
"success": False,
"latency": 0,
"error": str(e)
})
# 模拟用户思考时间
await asyncio.sleep(
random.uniform(1, 5)
)
return results
监控系统实现
有了测试体系,我们还需要一个实时监控系统:
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
class Metric(BaseModel):
name: str
type: MetricType
value: float
labels: Dict[str, str]
timestamp: datetime
class MonitoringSystem:
def __init__(
self,
storage,
alert_manager
):
self.storage = storage
self.alert_manager = alert_manager
self.metrics: List[Metric] = []
async def record_metric(
self,
name: str,
type: MetricType,
value: float,
labels: Dict[str, str] = None
):
metric = Metric(
name=name,
type=type,
value=value,
labels=labels or {},
timestamp=datetime.now()
)
# 1. 保存指标
await self.storage.save_metric(metric)
# 2. 检查告警规则
await self._check_alerts(metric)
async def _check_alerts(
self,
metric: Metric
):
# 获取相关的告警规则
rules = await self.alert_manager.get_rules(
metric.name
)
# 检查每个规则
for rule in rules:
if rule.should_alert(metric):
await self.alert_manager.send_alert(
rule,
metric
)
class AlertRule(BaseModel):
name: str
metric_name: str
condition: str # 如 "> 100"
duration: int # 持续时间(秒)
severity: str
def should_alert(
self,
metric: Metric
) -> bool:
# 解析并检查条件
operator, threshold = self._parse_condition()
if operator == ">":
return metric.value > threshold
elif operator == "<":
return metric.value < threshold
# ... 其他操作符
return False
实际应用
把这些组件整合到我们的 AI Agent 系统中:
class MonitoredAgent:
def __init__(
self,
agent,
monitoring: MonitoringSystem
):
self.agent = agent
self.monitoring = monitoring
async def process(
self,
request: Dict[str, Any]
) -> Dict[str, Any]:
start_time = datetime.now()
try:
# 1. 记录请求
await self.monitoring.record_metric(
name="requests_total",
type=MetricType.COUNTER,
value=1,
labels={
"type": request.get("type", "unknown")
}
)
# 2. 处理请求
response = await self.agent.process(
request
)
# 3. 记录成功
await self.monitoring.record_metric(
name="requests_success",
type=MetricType.COUNTER,
value=1
)
# 4. 记录延迟
latency = (
datetime.now() - start_time
).total_seconds()
await self.monitoring.record_metric(
name="request_latency",
type=MetricType.HISTOGRAM,
value=latency
)
# 5. 记录 token 使用量
tokens = self._count_tokens(response)
await self.monitoring.record_metric(
name="token_usage",
type=MetricType.COUNTER,
value=tokens
)
return response
except Exception as e:
# 记录错误
await self.monitoring.record_metric(
name="requests_error",
type=MetricType.COUNTER,
value=1,
labels={"error": type(e).__name__}
)
raise
告警配置示例
# 配置告警规则
alert_rules = [
AlertRule(
name="high_latency",
metric_name="request_latency",
condition="> 2.0", # 超过2秒
duration=300, # 持续5分钟
severity="warning"
),
AlertRule(
name="error_spike",
metric_name="requests_error",
condition="> 10", # 错误数超过10
duration=60, # 持续1分钟
severity="critical"
),
AlertRule(
name="token_usage_high",
metric_name="token_usage",
condition="> 1000000", # 100万token
duration=3600, # 持续1小时
severity="warning"
)
]
# 配置告警通知
alert_config = {
"channels": {
"slack": {
"webhook_url": "https://hooks.slack.com/..."
},
"email": {
"recipients": ["oncall@company.com"]
}
},
"rules": {
"warning": ["slack"],
"critical": ["slack", "email"]
}
}
实践心得
在实施这个测试和监控体系的过程中,我总结了几点经验:
测试要全面
- 单元测试保障基本功能
- 集成测试验证交互
- 性能测试评估瓶颈
监控要及时
- 实时指标收集
- 快速告警响应
- 定期趋势分析
运维要到位
- 自动化部署
- 容灾备份
- 应急预案
写在最后
一个稳定的 AI Agent 系统需要完善的测试和监控体系作为保障。就像开车需要仪表盘和保养一样,只有及时发现和解决问题,才能保证系统的长期稳定运行。
在下一篇文章中,我会讲解如何实现 AI Agent 的部署和运维系统。如果你对测试和监控有什么想法,欢迎在评论区交流。