当前位置: 首页 > article >正文

AI Agent的测试与监控:保障稳定性的实战经验

在前面的文章中,我们讨论了 AI Agent 的各个核心模块。今天,我想聊聊如何保障 AI Agent 的稳定性。说实话,这个话题我一直很关注,因为在生产环境中,稳定性往往比功能更重要。

从一次线上事故说起

还记得去年一个深夜,我被紧急电话叫醒:

运维:老哥,AI 助手疯了!
我:怎么回事?
运维:它开始疯狂调用 API,已经触发了费用告警...
我:...(立即上线排查)

事后分析发现,是一个用户的特殊输入触发了 AI Agent 的自我对话循环。这个事故让我意识到:我们需要一个完善的测试和监控体系,来及时发现和预防这类问题。

测试体系设计

首先,我们来看测试体系的设计:

from typing import List, Dict, Any, Optional
from enum import Enum
from datetime import datetime
from pydantic import BaseModel
import asyncio
import pytest

class TestCase(BaseModel):
    id: str
    name: str
    description: str
    inputs: List[Dict[str, Any]]
    expected_outputs: List[Dict[str, Any]]
    setup: Optional[Dict[str, Any]]
    cleanup: Optional[Dict[str, Any]]

class TestResult(BaseModel):
    case_id: str
    success: bool
    actual_outputs: List[Dict[str, Any]]
    error: Optional[str]
    execution_time: float
    timestamp: datetime

class TestSuite:
    def __init__(
        self,
        agent,
        cases: List[TestCase]
    ):
        self.agent = agent
        self.cases = cases
        self.results: List[TestResult] = []

    async def run(self) -> List[TestResult]:
        for case in self.cases:
            # 1. 设置测试环境
            if case.setup:
                await self._setup_case(case)

            try:
                # 2. 执行测试
                start_time = datetime.now()
                outputs = []

                for input_data in case.inputs:
                    output = await self.agent.process(
                        input_data
                    )
                    outputs.append(output)

                # 3. 验证结果
                success = self._verify_outputs(
                    outputs,
                    case.expected_outputs
                )

                # 4. 记录结果
                self.results.append(TestResult(
                    case_id=case.id,
                    success=success,
                    actual_outputs=outputs,
                    error=None,
                    execution_time=(
                        datetime.now() - start_time
                    ).total_seconds(),
                    timestamp=datetime.now()
                ))

            except Exception as e:
                # 记录错误
                self.results.append(TestResult(
                    case_id=case.id,
                    success=False,
                    actual_outputs=[],
                    error=str(e),
                    execution_time=0,
                    timestamp=datetime.now()
                ))

            finally:
                # 5. 清理环境
                if case.cleanup:
                    await self._cleanup_case(case)

        return self.results

    def _verify_outputs(
        self,
        actual: List[Dict[str, Any]],
        expected: List[Dict[str, Any]]
    ) -> bool:
        if len(actual) != len(expected):
            return False

        for a, e in zip(actual, expected):
            if not self._match_output(a, e):
                return False

        return True

    def _match_output(
        self,
        actual: Dict[str, Any],
        expected: Dict[str, Any]
    ) -> bool:
        # 支持模式匹配和容错
        for key, value in expected.items():
            if key not in actual:
                return False

            if isinstance(value, str) and value.startswith("regex:"):
                import re
                pattern = value[6:]
                if not re.match(pattern, str(actual[key])):
                    return False
            elif actual[key] != value:
                return False

        return True

单元测试示例

@pytest.mark.asyncio
async def test_agent_basic_conversation():
    # 1. 准备测试数据
    case = TestCase(
        id="test_001",
        name="基础对话测试",
        description="测试基本的问答功能",
        inputs=[
            {"message": "你好"},
            {"message": "今天天气怎么样?"}
        ],
        expected_outputs=[
            {
                "type": "text",
                "content": "regex:^你好.*"
            },
            {
                "type": "text",
                "content": "regex:.*天气.*"
            }
        ]
    )

    # 2. 创建测试套件
    suite = TestSuite(
        agent=TestAgent(),
        cases=[case]
    )

    # 3. 运行测试
    results = await suite.run()

    # 4. 验证结果
    assert len(results) == 1
    assert results[0].success
    assert results[0].error is None

@pytest.mark.asyncio
async def test_agent_error_handling():
    # 测试错误处理
    case = TestCase(
        id="test_002",
        name="错误处理测试",
        description="测试异常输入的处理",
        inputs=[
            {"message": ""},  # 空消息
            {"message": "x" * 10000}  # 超长消息
        ],
        expected_outputs=[
            {
                "type": "error",
                "code": "EMPTY_MESSAGE"
            },
            {
                "type": "error",
                "code": "MESSAGE_TOO_LONG"
            }
        ]
    )

    suite = TestSuite(
        agent=TestAgent(),
        cases=[case]
    )

    results = await suite.run()
    assert results[0].success

性能测试框架

class PerformanceMetrics(BaseModel):
    latency_p50: float
    latency_p95: float
    latency_p99: float
    requests_per_second: float
    error_rate: float
    token_usage: int

class LoadTest:
    def __init__(
        self,
        agent,
        test_duration: int = 300,  # 5分钟
        concurrent_users: int = 10
    ):
        self.agent = agent
        self.duration = test_duration
        self.users = concurrent_users
        self.metrics: List[Dict[str, float]] = []

    async def run(self) -> PerformanceMetrics:
        # 1. 准备测试数据
        test_data = self._generate_test_data()

        # 2. 创建用户会话
        sessions = [
            self._simulate_user(test_data)
            for _ in range(self.users)
        ]

        # 3. 运行测试
        start_time = datetime.now()
        results = await asyncio.gather(*sessions)

        # 4. 计算指标
        return self._calculate_metrics(results)

    async def _simulate_user(
        self,
        test_data: List[Dict[str, Any]]
    ):
        results = []
        for data in test_data:
            start_time = datetime.now()
            try:
                response = await self.agent.process(data)
                latency = (
                    datetime.now() - start_time
                ).total_seconds()

                results.append({
                    "success": True,
                    "latency": latency,
                    "tokens": self._count_tokens(
                        response
                    )
                })

            except Exception as e:
                results.append({
                    "success": False,
                    "latency": 0,
                    "error": str(e)
                })

            # 模拟用户思考时间
            await asyncio.sleep(
                random.uniform(1, 5)
            )

        return results

监控系统实现

有了测试体系,我们还需要一个实时监控系统:

class MetricType(Enum):
    COUNTER = "counter"
    GAUGE = "gauge"
    HISTOGRAM = "histogram"

class Metric(BaseModel):
    name: str
    type: MetricType
    value: float
    labels: Dict[str, str]
    timestamp: datetime

class MonitoringSystem:
    def __init__(
        self,
        storage,
        alert_manager
    ):
        self.storage = storage
        self.alert_manager = alert_manager
        self.metrics: List[Metric] = []

    async def record_metric(
        self,
        name: str,
        type: MetricType,
        value: float,
        labels: Dict[str, str] = None
    ):
        metric = Metric(
            name=name,
            type=type,
            value=value,
            labels=labels or {},
            timestamp=datetime.now()
        )

        # 1. 保存指标
        await self.storage.save_metric(metric)

        # 2. 检查告警规则
        await self._check_alerts(metric)

    async def _check_alerts(
        self,
        metric: Metric
    ):
        # 获取相关的告警规则
        rules = await self.alert_manager.get_rules(
            metric.name
        )

        # 检查每个规则
        for rule in rules:
            if rule.should_alert(metric):
                await self.alert_manager.send_alert(
                    rule,
                    metric
                )

class AlertRule(BaseModel):
    name: str
    metric_name: str
    condition: str  # 如 "> 100"
    duration: int  # 持续时间(秒)
    severity: str

    def should_alert(
        self,
        metric: Metric
    ) -> bool:
        # 解析并检查条件
        operator, threshold = self._parse_condition()

        if operator == ">":
            return metric.value > threshold
        elif operator == "<":
            return metric.value < threshold
        # ... 其他操作符

        return False

实际应用

把这些组件整合到我们的 AI Agent 系统中:

class MonitoredAgent:
    def __init__(
        self,
        agent,
        monitoring: MonitoringSystem
    ):
        self.agent = agent
        self.monitoring = monitoring

    async def process(
        self,
        request: Dict[str, Any]
    ) -> Dict[str, Any]:
        start_time = datetime.now()

        try:
            # 1. 记录请求
            await self.monitoring.record_metric(
                name="requests_total",
                type=MetricType.COUNTER,
                value=1,
                labels={
                    "type": request.get("type", "unknown")
                }
            )

            # 2. 处理请求
            response = await self.agent.process(
                request
            )

            # 3. 记录成功
            await self.monitoring.record_metric(
                name="requests_success",
                type=MetricType.COUNTER,
                value=1
            )

            # 4. 记录延迟
            latency = (
                datetime.now() - start_time
            ).total_seconds()
            await self.monitoring.record_metric(
                name="request_latency",
                type=MetricType.HISTOGRAM,
                value=latency
            )

            # 5. 记录 token 使用量
            tokens = self._count_tokens(response)
            await self.monitoring.record_metric(
                name="token_usage",
                type=MetricType.COUNTER,
                value=tokens
            )

            return response

        except Exception as e:
            # 记录错误
            await self.monitoring.record_metric(
                name="requests_error",
                type=MetricType.COUNTER,
                value=1,
                labels={"error": type(e).__name__}
            )
            raise

告警配置示例

# 配置告警规则
alert_rules = [
    AlertRule(
        name="high_latency",
        metric_name="request_latency",
        condition="> 2.0",  # 超过2秒
        duration=300,  # 持续5分钟
        severity="warning"
    ),
    AlertRule(
        name="error_spike",
        metric_name="requests_error",
        condition="> 10",  # 错误数超过10
        duration=60,  # 持续1分钟
        severity="critical"
    ),
    AlertRule(
        name="token_usage_high",
        metric_name="token_usage",
        condition="> 1000000",  # 100万token
        duration=3600,  # 持续1小时
        severity="warning"
    )
]

# 配置告警通知
alert_config = {
    "channels": {
        "slack": {
            "webhook_url": "https://hooks.slack.com/..."
        },
        "email": {
            "recipients": ["oncall@company.com"]
        }
    },
    "rules": {
        "warning": ["slack"],
        "critical": ["slack", "email"]
    }
}

实践心得

在实施这个测试和监控体系的过程中,我总结了几点经验:

  1. 测试要全面

    • 单元测试保障基本功能
    • 集成测试验证交互
    • 性能测试评估瓶颈
  2. 监控要及时

    • 实时指标收集
    • 快速告警响应
    • 定期趋势分析
  3. 运维要到位

    • 自动化部署
    • 容灾备份
    • 应急预案

写在最后

一个稳定的 AI Agent 系统需要完善的测试和监控体系作为保障。就像开车需要仪表盘和保养一样,只有及时发现和解决问题,才能保证系统的长期稳定运行。

在下一篇文章中,我会讲解如何实现 AI Agent 的部署和运维系统。如果你对测试和监控有什么想法,欢迎在评论区交流。


http://www.kler.cn/a/520147.html

相关文章:

  • VSCode 中的 Git Graph扩展使用详解
  • MapReduce,Yarn,Spark理解与执行流程
  • 【Android】布局文件layout.xml文件使用控件属性android:layout_weight使布局较为美观,以RadioButton为例
  • 私有包上传maven私有仓库nexus-2.9.2
  • FreeRtos的使用教程
  • 宏_wps_宏修改word中所有excel表格的格式_设置字体对齐格式_删除空行等
  • STM32-时钟树
  • 新电脑安装系统找不到硬盘原因和解决方法来了
  • 二叉搜索树中的众数(力扣501)
  • Golang的GPM调度器
  • React 前端开发解析:从核心概念到最佳实践
  • Spring MVC 综合案例
  • 记一次STM32编译生成BIN文件过大的问题(基于STM32CubeIDE)
  • 不同操作系统(Windows、Linux)上安装和配置Tomcat的详细教程
  • Java 从数组中查找重复元素的几种方法
  • Scratch游戏作品 | 僵尸来袭——生存大战,保卫你的领地!
  • introJs去实现新手指引
  • 第十四讲 JDBC数据库
  • 在亚马逊云科技上使用Luma AI Ray2视频模型生成炫酷视频 (上)
  • Kafka 副本机制(包含AR、ISR、OSR、HW 和 LEO 介绍)