当前位置: 首页 > article >正文

如何测试模型推理性能:从零开始的Python指南

如何测试模型推理性能:从零开始的Python指南

    • 什么是模型推理性能?
    • 测试模型推理性能的步骤
      • 1. 监测内存使用情况
      • 2. 测试模型吞吐量
    • 运行测试
    • 总结

在机器学习和深度学习中,模型的推理性能是一个非常重要的指标。它可以帮助我们了解模型在实际应用中的表现,尤其是在处理大规模数据时。本文将带你一步步了解如何测试模型的推理性能,并使用Python编写简单的代码来实现这一目标。

什么是模型推理性能?

模型推理性能主要关注两个方面:

  1. 内存使用情况:模型在推理过程中占用的内存大小。
  2. 模型吞吐量:模型在单位时间内能够处理的token数量。吞吐量又分为两个阶段:
    • Prefill阶段:模型预先计算并缓存一部分自注意力计算的过程。
    • Decode阶段:模型在自回归阶段不断生成新token的过程。

测试模型推理性能的步骤

我们将使用Python编写两个脚本:monitor.pybenchmark.pymonitor.py 用于监测内存使用情况,benchmark.py 用于测试模型的吞吐量。

1. 监测内存使用情况

首先,我们需要编写一个脚本来监测模型推理时的内存使用情况。我们将使用 psutil 库来实现这一功能。

import psutil
import time
import os
import signal
import sys
import atexit
import json

current_pid = os.getpid()
print(f"当前进程pid号为: {current_pid}")

monitored = sys.argv[1]
print(f"需要监测的进程为: {monitored}")
target_pid = None

# 清除残留进程
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
    if monitored in proc.info['cmdline'] and proc.pid != current_pid:
        print(f"残留进程为: {proc.pid}")
        proc.kill()
        print("残留进程清除完成")

print("\r\n")
print("寻找指定进程,请打开吞吐量测试脚本,并在吞吐量测试完毕后关闭此脚本:")

# 寻找目标进程
while not target_pid:
    for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
        if monitored in proc.info['cmdline'] and proc.pid != current_pid:
            target_pid = proc.pid
            break

    if not target_pid:
        print("未找到指定进程")
        time.sleep(1)

print("\r\n")
print(f"已找到指定进程pid号为: {target_pid}")
process = psutil.Process(target_pid)
process_cmdline = process.cmdline()
print(f"pid对应名称为 {process_cmdline}")

max_memory_stats = {
    "max_rss": 0,
    "max_vms": 0
}

# 导出内存使用情况到JSON文件
def write_memory_max_to_json():
    with open('memory_results.json', 'w') as report_file:
        json.dump(max_memory_stats, report_file, indent=2)
    print("\n已成功将内存使用情况保存至 'memory_results.json' 文件中.")

atexit.register(write_memory_max_to_json)  # 在进程结束时保存结果

# 实时监测内存使用情况
while 1:
    time.sleep(0.1)
    mem_info = process.memory_info()
    rss = mem_info.rss / 1024 ** 2
    vms = mem_info.vms / 1024 ** 2

    max_memory_stats["max_rss"] = max(max_memory_stats["max_rss"], rss)
    max_memory_stats["max_vms"] = max(max_memory_stats["max_vms"], vms)

    print(f"Current RSS Memory: {rss:.2f} MB | Current VMS Memory: {vms:.2f} MB")
    print(f"Max RSS Memory: {max_memory_stats['max_rss']:.2f} MB | Max VMS Memory: {max_memory_stats['max_vms']:.2f} MB\n")

2. 测试模型吞吐量

接下来,我们编写一个脚本来测试模型的吞吐量。我们将使用 transformers 库来加载模型并进行推理。

import torch
import time
from transformers import AutoModelForCausalLM, AutoTokenizer
import json
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"  # 使用CPU进行推理

# 加载分词器和模型
print("加载分词器\r\n")
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-0.5B", trust_remote_code=True)
print("加载模型\r\n")
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-0.5B", device_map="auto", trust_remote_code=True).eval()
print("模型加载完成\r\n")

# 加载指定文本作为prompt
print("加载prompt\r\n")
with open('prompt.txt', 'r', encoding='utf-8') as file:
    prompt = file.read()

# 对prompt进行分词
print("分词器分词\r\n")
inputs = tokenizer(prompt, return_tensors='pt')
inputs = inputs.to(model.device)

# Prefill阶段吞吐量测试
print("prefill阶段吞吐量测试:\r\n")
batch_size = 1  # 单个prompt,批次大小为1
total_prompts = 10  # 测试10次
total_tokens = inputs['input_ids'].shape[1]  # token数量

start_time_prefill = time.time()
for _ in range(total_prompts):
    with torch.no_grad():  # 关闭梯度计算以提高推理性能
        outputs = model(**inputs)
end_time_prefill = time.time()
elapsed_time_prefill = end_time_prefill - start_time_prefill  # 推理总时长
throughput_prefill = total_prompts * total_tokens / elapsed_time_prefill  # prefill吞吐量,每秒处理的token数
print(f"tokens总数为 {total_tokens}")
print(f"测试次数为 {total_prompts}")
print(f"总时长为 {elapsed_time_prefill}")
print(f"模型prefill阶段的吞吐量: {throughput_prefill:.2f} tokens/s\r\n")
print("prefill阶段吞吐量测试完成\r\n")

# Decode阶段吞吐量测试
print("decode阶段吞吐量测试:\r\n")
max_new_tokens = 50  # 要推理的新token总数
total_prompts = 10  # 测试10次

start_time_decode = time.time()
for _ in range(total_prompts):
    with torch.no_grad():  # 关闭梯度计算以提高推理性能
        outputs = model.generate(**inputs, min_new_tokens=max_new_tokens, max_new_tokens=max_new_tokens)
    print(f"请确保生成new_tokens为50 {total_tokens} --> {outputs.shape}")
end_time_decode = time.time()
elapsed_time_decode = end_time_decode - start_time_decode  # 推理总时长
throughput_decode = total_prompts * max_new_tokens / elapsed_time_decode  # decode吞吐量,每秒生成的新token数
print(f"生成新tokens数为 {max_new_tokens}")
print(f"测试次数为 {total_prompts}")
print(f"总时长为 {elapsed_time_decode}")
print(f"模型decode的吞吐量: {throughput_decode:.2f} tokens/s\r\n")
print("decode阶段吞吐量测试完成\r\n")

# 保存吞吐量测试结果
results = {
    "prefill_throughput": throughput_prefill,
    "decode_throughput": throughput_decode
}
with open('throughput_results.json', 'w') as output_file:
    json.dump(results, output_file, indent=4)
print("\n已成功将吞吐量结果保存至 'throughput_results.json' 文件中.")

运行测试

  1. 首先,在命令行中运行 monitor.py 脚本,并指定要监测的脚本为 benchmark.py

    python monitor.py benchmark.py
    
  2. 接着,打开一个新的终端窗口,运行 benchmark.py 脚本:

    python benchmark.py
    
  3. 等待 benchmark.py 运行结束后,monitor.py 会自动结束,并将内存使用情况和吞吐量测试结果分别保存到 memory_results.jsonthroughput_results.json 文件中。

总结

通过以上步骤,你可以轻松地测试模型的推理性能。无论是内存使用情况还是模型吞吐量,这些指标都能帮助你更好地了解模型的实际表现。希望这篇博客能帮助你入门模型性能测试,并为你未来的项目提供有价值的参考。


http://www.kler.cn/a/457076.html

相关文章:

  • 借助 FinClip 跨端技术探索鸿蒙原生应用开发之旅
  • Tabby设计架构介绍
  • 风力涡轮机缺陷检测数据集,86.6%准确识别率,11921张图片,支持yolo,PASICAL VOC XML,COCO JSON格式的标注
  • 初识 Conda:一站式包管理和环境管理工具
  • 爬虫后的数据处理与使用(处理篇)
  • LeetCode每日三题(六)数组
  • 32位MCU主控智能电表方案
  • Linux下编译安装libMesh
  • (带源码)宠物主题商场系统 计算机项目 P10083
  • uni-app(优医咨询)项目实战 - 第7天
  • word无法创建工作文件,检查临时环境变量。
  • 精密缝纫的科技搭档——霍尔传感器
  • 【项目日记(5)】第二层:中心缓存的具体实现(上)
  • HDLBits训练7
  • java使用外部配置文件,springboot使用外部配置文件
  • 小程序基础 —— 08 文件和目录结构
  • 【Android】项目升级时报错 android:style/Holo.Widget
  • 毫米波雷达技术:(九)快时间窗和慢时间窗的概念
  • 强化学习蘑菇书笔记
  • 三、Vue 模板语法
  • 【PLL】电荷泵锁相环各个环路参数意义
  • Vulnhub靶场-Empire_LupinOne(至获取shell)
  • 【Qt】编辑框/按钮控件---实现HelloWorld
  • Qt资源文件复制路径与实际路径不一致
  • 威胁建模助力企业“建防御 抓运营”
  • 物联网防爆小型气象站