基于 GPUTasker 的 GPU 使用情况钉钉推送机器人实现
引言
https://github.com/cnstark/gputasker
随着 AI 模型的广泛应用,GPU 成为团队中最重要的资源之一。然而,如何实时监控 GPU 的使用情况并及时通知团队是一个值得关注的问题。为了更好地管理显卡资源,本文基于 GPUTasker,实现了一个定期向钉钉群推送显卡使用情况的机器人。
我们通过钉钉自定义机器人 API 和 GPU 监控工具,结合 Python 脚本实现了以下功能:
- 根据设定的 工作时间 和 节假日规则,控制消息推送;
- 按指定时间间隔发送 GPU 的利用率、显存使用量以及正在使用显卡的用户信息;
- 自动跳过节假日和非工作时间,减少不必要的推送。
实现步骤
1. 获取钉钉机器人 Token 和 Secret
在钉钉群中创建一个自定义机器人,获取 Token 和 Secret。具体步骤如下:
- 登录钉钉 Web 端:
打开 钉钉开放平台 或在钉钉桌面端打开需要管理的工作群。 - 添加机器人:
- 点击群设置 -> 智能群助手 -> 添加机器人;
- 选择 自定义机器人,并设置一个名称(如:GPU 使用监控机器人);
- 配置机器人安全设置,选择 自定义关键词 或 签名校验。
- 记录 Token 和 Secret:
- 添加完成后,系统会生成一个 Token;
- 如果选择了签名校验,还会生成一个 Secret;
- 这两个字段将在脚本中用于身份验证。
2. Messenger 类的实现
Messenger 类是整个系统的核心,负责构建和发送消息到钉钉群。以下是该类的详细实现及功能介绍。
2.1 文件路径
在项目中,新建以下文件路径:
dingding/dingding.py
将 Messenger 类的代码放入 dingding.py 文件中,供其他模块调用。
2.2 核心功能
以下是 Messenger 类的关键功能:
- 节假日跳过:
使用 chinese_calendar 库判断当前日期是否为中国法定节假日。如果是节假日,机器人将自动跳过消息推送。 - 工作时间设置:
支持自定义工作时间段(如上午 8:20 到 11:50,下午 13:10 到 17:30),并在非工作时间内停止推送消息。 - 固定时间间隔推送:
支持设置推送间隔时间(如每 30 分钟推送一次),避免频繁发送消息。 - 显卡使用信息格式化:
将显卡使用情况转化为 Markdown 格式,方便在钉钉群中以表格形式展示。
以下是 Messenger 类的完整代码:
import os
import time
import hmac
import json
import base64
import hashlib
import requests
import chinese_calendar as calendar
from urllib.parse import quote_plus
from datetime import datetime
class Messenger:
def __init__(self, token=os.getenv("DD_ACCESS_TOKEN"), secret=os.getenv("DD_SECRET")):
"""
初始化方法
@param token: str, 钉钉机器人访问令牌
@param secret: str, 钉钉机器人密钥
"""
self.token = token
self.secret = secret
self.URL = "https://oapi.dingtalk.com/robot/send"
self.headers = {'Content-Type': 'application/json'}
self.params = {'access_token': self.token}
self.update_timestamp_and_sign()
# GPU 参数
self.total_memory_GB = 24
self.utilization_thred = 0.6
self.memory_used_thred = 0.5
# 时间控制参数
self.time_range = [('08:20', '11:50'), ('13:10', '17:30')]
self.last_true_time = {}
self.time_interval = 30 # 间隔30分钟推送一次
def send_md(self, message_json, server_ip):
"""
发送 Markdown 格式的消息到钉钉。
"""
self.update_timestamp_and_sign()
if self.should_call_function_during_chinese_workdays(server_ip):
if not message_json:
text = f"**服务器IP**: `{server_ip}`\n**状态**: **连接失败**"
self.send_markdown_to_dingtalk("服务器连接失败", text)
else:
content, is_free = self.format_gpu_usage_to_markdown(message_json, server_ip)
if is_free:
self.send_markdown_to_dingtalk("显卡使用情况", content)
def update_timestamp_and_sign(self):
"""
更新时间戳和签名。
"""
self.timestamp = str(round(time.time() * 1000))
secret_enc = self.secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(self.timestamp, self.secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
self.sign = quote_plus(base64.b64encode(hmac_code))
self.params['timestamp'] = self.timestamp
self.params['sign'] = self.sign
def send_markdown_to_dingtalk(self, title, text):
"""
构建并通过钉钉发送 Markdown 消息。
"""
data = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": text
}
}
try:
requests.post(url=self.URL, data=json.dumps(data), params=self.params, headers=self.headers)
except Exception as e:
print(f"发生错误: {e}")
def format_gpu_usage_to_markdown(self, message_json, server_ip):
"""
格式化 GPU 使用信息为 Markdown 文本。
"""
rows = []
rows.append(f"**{server_ip}**")
rows.append("")
rows.append("| ID | GPU利用率 | 显存使用量 | 用户 |")
rows.append("|:-------:|:------------:|:----------------:|:------:|")
is_any_free = False
for gpu in message_json:
index = gpu['index']
utilization = gpu['utilization.gpu']
memory_used_MB = gpu['memory.used']
memory_used_GB = memory_used_MB / 1024
memory_percentage = (memory_used_MB / (self.total_memory_GB * 1024)) * 100
users = [process['username'] for process in gpu['processes']]
users_str = ', '.join(set(users)) if users else '-'
is_free = utilization < 100 * self.utilization_thred and memory_used_MB < (self.total_memory_GB * 1024 * self.memory_used_thred)
if is_free:
is_any_free = True
row = f"| <font color='green'>**{index}**</font> | <font color='green'>**{utilization}%**</font> | <font color='green'>**{memory_used_GB:.1f}GB ({memory_percentage:.0f}%)**</font> | <font color='green'>**{users_str}**</font> |"
else:
row = f"| {index} | {utilization}% | {memory_used_GB:.1f}GB ({memory_percentage:.0f}%) | {users_str} |"
rows.append(row)
return '\n'.join(rows), is_any_free
def should_call_function_during_chinese_workdays(self, server_ip):
"""
检查是否为中国工作日以及指定时间段。
"""
now = datetime.now()
current_time = now.time()
if not calendar.is_workday(now):
return False
in_any_time_range = False
for time_range in self.time_range:
start_time = datetime.strptime(time_range[0], '%H:%M').time()
end_time = datetime.strptime(time_range[1], '%H:%M').time()
if start_time <= end_time:
in_time_range = start_time <= current_time <= end_time
else:
in_time_range = start_time <= current_time or current_time <= end_time
if in_time_range:
in_any_time_range = True
break
if in_any_time_range:
last_time = self.last_true_time.get(server_ip)
if last_time is None or (now - last_time).total_seconds() >= self.time_interval * 60:
self.last_true_time[server_ip] = now
return True
return False
# 实例化类
messager = Messenger(token="xxxxxx",
secret="xxxxxx")
2.3 调用 Messenger 类
将以下代码加入 gputasker/gpu_info/utils.py 中,通过 try 捕获异常并调用钉钉推送功能:
from dingding.dingding import messager
class GPUInfoUpdater:
def update_gpu_info(self):
server_list = GPUServer.objects.all()
for server in server_list:
try:
gpu_info_json = get_gpu_status(server.ip, self.user, server.port, self.private_key_path)
except:
gpu_info_json = None
finally:
messager.send_md(gpu_info_json, server.ip)
3. 效果展示
以下是钉钉群中接收到的 GPU 使用情况推送示例:
**172.20.3.27**
| ID | GPU利用率 | 显存使用量 | 用户 |
|:-------:|:------------:|:----------------:|:------:|
| 0 | 0% | 12.7GB (53%) | root|
| 1 | 87% | 16.7GB (70%) | root|
| 2 | 92% | 14.2GB (59%) | root|
| 3 | 87% | 14.2GB (59%) | root|
| 4 | 86% | 14.2GB (59%) | root|
| 5 | 83% | 14.2GB (59%) | root|
| 6 | 86% | 17.0GB (71%) | root|
| 7 | 0% | 2.1GB (9%) | root|
总结
通过本文的实现,可以将 GPU 使用情况实时推送到钉钉群,方便团队成员及时了解资源状态,提高显卡的利用效率。