如何测试模型推理性能:从零开始的Python指南
如何测试模型推理性能:从零开始的Python指南
- 什么是模型推理性能?
- 测试模型推理性能的步骤
- 1. 监测内存使用情况
- 2. 测试模型吞吐量
- 运行测试
- 总结
在机器学习和深度学习中,模型的推理性能是一个非常重要的指标。它可以帮助我们了解模型在实际应用中的表现,尤其是在处理大规模数据时。本文将带你一步步了解如何测试模型的推理性能,并使用Python编写简单的代码来实现这一目标。
什么是模型推理性能?
模型推理性能主要关注两个方面:
- 内存使用情况:模型在推理过程中占用的内存大小。
- 模型吞吐量:模型在单位时间内能够处理的token数量。吞吐量又分为两个阶段:
- Prefill阶段:模型预先计算并缓存一部分自注意力计算的过程。
- Decode阶段:模型在自回归阶段不断生成新token的过程。
测试模型推理性能的步骤
我们将使用Python编写两个脚本:monitor.py
和 benchmark.py
。monitor.py
用于监测内存使用情况,benchmark.py
用于测试模型的吞吐量。
1. 监测内存使用情况
首先,我们需要编写一个脚本来监测模型推理时的内存使用情况。我们将使用 psutil
库来实现这一功能。
import psutil
import time
import os
import signal
import sys
import atexit
import json
current_pid = os.getpid()
print(f"当前进程pid号为: {current_pid}")
monitored = sys.argv[1]
print(f"需要监测的进程为: {monitored}")
target_pid = None
# 清除残留进程
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
if monitored in proc.info['cmdline'] and proc.pid != current_pid:
print(f"残留进程为: {proc.pid}")
proc.kill()
print("残留进程清除完成")
print("\r\n")
print("寻找指定进程,请打开吞吐量测试脚本,并在吞吐量测试完毕后关闭此脚本:")
# 寻找目标进程
while not target_pid:
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
if monitored in proc.info['cmdline'] and proc.pid != current_pid:
target_pid = proc.pid
break
if not target_pid:
print("未找到指定进程")
time.sleep(1)
print("\r\n")
print(f"已找到指定进程pid号为: {target_pid}")
process = psutil.Process(target_pid)
process_cmdline = process.cmdline()
print(f"pid对应名称为 {process_cmdline}")
max_memory_stats = {
"max_rss": 0,
"max_vms": 0
}
# 导出内存使用情况到JSON文件
def write_memory_max_to_json():
with open('memory_results.json', 'w') as report_file:
json.dump(max_memory_stats, report_file, indent=2)
print("\n已成功将内存使用情况保存至 'memory_results.json' 文件中.")
atexit.register(write_memory_max_to_json) # 在进程结束时保存结果
# 实时监测内存使用情况
while 1:
time.sleep(0.1)
mem_info = process.memory_info()
rss = mem_info.rss / 1024 ** 2
vms = mem_info.vms / 1024 ** 2
max_memory_stats["max_rss"] = max(max_memory_stats["max_rss"], rss)
max_memory_stats["max_vms"] = max(max_memory_stats["max_vms"], vms)
print(f"Current RSS Memory: {rss:.2f} MB | Current VMS Memory: {vms:.2f} MB")
print(f"Max RSS Memory: {max_memory_stats['max_rss']:.2f} MB | Max VMS Memory: {max_memory_stats['max_vms']:.2f} MB\n")
2. 测试模型吞吐量
接下来,我们编写一个脚本来测试模型的吞吐量。我们将使用 transformers
库来加载模型并进行推理。
import torch
import time
from transformers import AutoModelForCausalLM, AutoTokenizer
import json
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # 使用CPU进行推理
# 加载分词器和模型
print("加载分词器\r\n")
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-0.5B", trust_remote_code=True)
print("加载模型\r\n")
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-0.5B", device_map="auto", trust_remote_code=True).eval()
print("模型加载完成\r\n")
# 加载指定文本作为prompt
print("加载prompt\r\n")
with open('prompt.txt', 'r', encoding='utf-8') as file:
prompt = file.read()
# 对prompt进行分词
print("分词器分词\r\n")
inputs = tokenizer(prompt, return_tensors='pt')
inputs = inputs.to(model.device)
# Prefill阶段吞吐量测试
print("prefill阶段吞吐量测试:\r\n")
batch_size = 1 # 单个prompt,批次大小为1
total_prompts = 10 # 测试10次
total_tokens = inputs['input_ids'].shape[1] # token数量
start_time_prefill = time.time()
for _ in range(total_prompts):
with torch.no_grad(): # 关闭梯度计算以提高推理性能
outputs = model(**inputs)
end_time_prefill = time.time()
elapsed_time_prefill = end_time_prefill - start_time_prefill # 推理总时长
throughput_prefill = total_prompts * total_tokens / elapsed_time_prefill # prefill吞吐量,每秒处理的token数
print(f"tokens总数为 {total_tokens}")
print(f"测试次数为 {total_prompts}")
print(f"总时长为 {elapsed_time_prefill}")
print(f"模型prefill阶段的吞吐量: {throughput_prefill:.2f} tokens/s\r\n")
print("prefill阶段吞吐量测试完成\r\n")
# Decode阶段吞吐量测试
print("decode阶段吞吐量测试:\r\n")
max_new_tokens = 50 # 要推理的新token总数
total_prompts = 10 # 测试10次
start_time_decode = time.time()
for _ in range(total_prompts):
with torch.no_grad(): # 关闭梯度计算以提高推理性能
outputs = model.generate(**inputs, min_new_tokens=max_new_tokens, max_new_tokens=max_new_tokens)
print(f"请确保生成new_tokens为50 {total_tokens} --> {outputs.shape}")
end_time_decode = time.time()
elapsed_time_decode = end_time_decode - start_time_decode # 推理总时长
throughput_decode = total_prompts * max_new_tokens / elapsed_time_decode # decode吞吐量,每秒生成的新token数
print(f"生成新tokens数为 {max_new_tokens}")
print(f"测试次数为 {total_prompts}")
print(f"总时长为 {elapsed_time_decode}")
print(f"模型decode的吞吐量: {throughput_decode:.2f} tokens/s\r\n")
print("decode阶段吞吐量测试完成\r\n")
# 保存吞吐量测试结果
results = {
"prefill_throughput": throughput_prefill,
"decode_throughput": throughput_decode
}
with open('throughput_results.json', 'w') as output_file:
json.dump(results, output_file, indent=4)
print("\n已成功将吞吐量结果保存至 'throughput_results.json' 文件中.")
运行测试
-
首先,在命令行中运行
monitor.py
脚本,并指定要监测的脚本为benchmark.py
:python monitor.py benchmark.py
-
接着,打开一个新的终端窗口,运行
benchmark.py
脚本:python benchmark.py
-
等待
benchmark.py
运行结束后,monitor.py
会自动结束,并将内存使用情况和吞吐量测试结果分别保存到memory_results.json
和throughput_results.json
文件中。
总结
通过以上步骤,你可以轻松地测试模型的推理性能。无论是内存使用情况还是模型吞吐量,这些指标都能帮助你更好地了解模型的实际表现。希望这篇博客能帮助你入门模型性能测试,并为你未来的项目提供有价值的参考。