当前位置: 首页 > article >正文

360AI平台资源可视化建设

1.引言

在当前高性能计算(HPC)、人工智能训练以及大规模分布式系统的快速发展中,计算集群成为了核心基础设施。然而,集群规模的不断扩张和复杂性的提升使得资源利用和优化变得愈发重要。集群资源可视化作为一种直观且高效的分析手段,为用户和管理员提供了对计算资源状态、节点健康状况和任务性能的深刻洞察,从而助力系统优化和业务决策。
本文从集群资源可视化的整体概念出发,详细探讨360AI平台集群、节点、任务以及人维度可视化的实践与优化思路,并同时结合实践调整DCGM和kube-state-metrics的方法不断优化。

2.资源可视化实现

2.1 集群资源可视化

2.1.1 集群可视化重要意义
集群资源可视化是现代计算系统中不可或缺的一环,其核心在于帮助用户全面了解计算资源的整体使用状况,包括关键的 CPU、GPU、内存、存储和网络等资源。这些资源的使用情况直接决定了集群的运行效率和性能。通过可视化技术,用户可以快速掌握资源分配的全貌,并深入分析各类资源的使用趋势,从而实现更高效的集群管理。    

首先,资源可视化能够帮助用户快速识别资源的使用瓶颈。例如,在计算密集型任务中,CPU 和 GPU 的使用率往往是性能的关键指标。通过可视化界面,用户可以实时监测这些指标,快速发现异常,如 CPU 长时间高负载或 GPU 使用率过低等问题。这种实时洞察能力使得用户能够在问题恶化之前采取措施,避免系统性能下降或资源浪费。     

其次,可视化为资源调度和任务分配提供了可靠的数据支持。在多任务环境中,合理的资源调度直接影响任务的完成效率。通过可视化展示各节点的资源使用情况,调度系统可以更智能地分配任务,避免资源分配不均的情况。例如,对于高优先级任务,系统可以通过可视化数据找到当前负载最轻的节点,将任务分配给该节点,从而优化任务执行效率并提高集群的整体吞吐量。    

此外,资源可视化还能为成本优化提供有效的支持。在云计算和大规模集群场景中,资源的使用直接与成本挂钩。通过对资源使用趋势的可视化分析,用户可以精确评估资源的利用率,减少闲置资源的浪费,进而降低运营成本。例如,用户可以根据使用数据优化节点配置,关闭长时间闲置的实例,或调整资源配额以避免不必要的支出。    

更重要的是,基于历史数据和可视化趋势的分析,用户可以预测系统的潜在问题。例如,通过观察资源使用的增长趋势,用户可以预测存储容量即将耗尽或网络流量压力即将上升,从而提前规划扩容或优化方案。这种预测能力对于保障系统的高可用性和稳定性具有重要意义。   

总之,集群资源可视化不仅是了解资源使用现状的有效工具,更是优化资源分配、提升性能、降低成本和预防潜在问题的重要手段。通过直观的可视化展示,用户可以更高效地管理集群资源,从而为复杂计算任务和企业业务的发展提供有力支持。

2.1.2 集群资源可视化实现过程    

针对DCGM中的GPU指标和记录本身中就包含集群信息,所以不需要额外的工作,但对于ES中的故障节点信息需要查询,相关代码如下:

func GetGpuFaultRateAndTimes(ctx context.Context, req gpuReqType.FilterReq, p *middleware.Permissions) (float64, int, int64, float64, error) {
  client, err := pool.EsBorrow()
  defer pool.EsReturnBack(client)
  if err != nil || client == nil {
    logrus.Errorf("GetGpuFaultRateAndTimes get client err %v", err)
    return 0, 0, 0, 0, errors.New("获取es客户端异常")
  }
  //var tasks []gpuReqType.NodeNum
  var subGroup errgroup.Group
  total := 0
  m := make(map[string]int)
  var totalUnavailableTime int64
  var selfHealingRate float64
  subGroup.Go(func() error {
    beginTime := time.Now()
    start, end := getTimeRange(req.IntervalTime, req.StartTime, req.EndTime)
    q := buildElasticQuery(req)
    startTime, endTime := time.Unix(start, 0).Format(gpuReqType.LayoutFormat), time.Unix(end, 0).Format(gpuReqType.LayoutFormat)
    q.Filter(elastic.NewRangeQuery("timestamp").Gte(startTime).Lte(endTime))
    search := client.Search().Index("qihoo-smi*").Query(q).Size(500)
    searchResult, err := search.Do(context.Background())
    if err != nil {
      logrus.Errorf("GetGpuFaultRateAndTimes query es error: %v", err)
      return errors.New("查询es指标异常:" + err.Error())
    }
    logrus.Debugf("GetGpuFaultRateAndTimes query searchResult data  %v, spend time %v", searchResult.Hits.Hits, time.Since(beginTime))
    var repaireCnt, cnt float64
    var nodeNames []string
    for _, hit := range searchResult.Hits.Hits {
      var nodeFaultMetric gpuReqType.NodeFaultMetric
      if err = json.Unmarshal(hit.Source, &nodeFaultMetric); err != nil {
        logrus.Errorf("GetGpuFaultRateAndTimes unmarshall err: %v", err)
        return errors.New("反序列化异常:" + err.Error())
      }
      if _, ok := m[nodeFaultMetric.Node]; !ok {
        nodeNames = append(nodeNames, nodeFaultMetric.Node)
        total++
        if nodeFaultMetric.IsSelfHealing {
          repaireCnt++
        }
        cnt++
      }
      m[nodeFaultMetric.Node]++
      logrus.Debugf("GetGpuFaultRateAndTimes node %s fault %d", nodeFaultMetric.Node, m[nodeFaultMetric.Node])
    }
    logrus.Infof("[controller/GetGpuFaultRateAndTimes] query data len %d, spend time %v", len(searchResult.Hits.Hits), time.Since(beginTime))
    if cnt > 0 {
      selfHealingRate = math.Round(repaireCnt/cnt*10000) / 100
    }
    if len(nodeNames) == 0 {
      return nil
    }
    totalUnavailableTime = dealNodeTime(nodeNames, start, end)
    return nil
  })
  newNodesMap := make(map[string]bool)
  subGroup.Go(func() error {
    beginTime := time.Now()
    _, newNodesMap, err = GetPrometheusNodes(ctx, req, p)
    if err != nil {
      logrus.Errorf("[controller/GetGpuFaultRateAndTimes] get node err:%v", err)
      return err
    }
    logrus.Infof("[controller/GetGpuFaultRateAndTimes] GetPrometheusNodes spend time %v", time.Since(beginTime))
    return nil
  })
  if err = subGroup.Wait(); err != nil {
    logrus.Errorf("[controller/GetGpuFaultRateAndTimes] wait err %v", err)
    return 0, 0, 0, 0, err
  }
  nodeTotal := len(newNodesMap)
  rate := 0.0
  if nodeTotal > 0 {
    rate = float64(total) / float64(nodeTotal) * 100
  }
  logrus.Infof("GetGpuFaultRateAndTimes node total %d, nodeTotal %d, nodes %v", total, nodeTotal, m)
  return rate, total, totalUnavailableTime, selfHealingRate, nil
}

该函数的目标是获取 GPU 故障率、故障节点总数、总不可用时间以及自愈率,主要通过 Elasticsearch 和 Prometheus 数据源实现以下几项工作:

  1. 查询 GPU 节点故障的相关数据。

  2. 计算故障节点总数及自愈率。

  3. 获取节点总数,并计算 GPU 故障率。

  4. 计算总的不可用时间。

以下是对代码逐段分析:

  • 获取es的Client

client, err := pool.EsBorrow()
defer pool.EsReturnBack(client)

从 Elasticsearch 客户端池中借用一个客户端,如果借用失败或客户端为空,则直接返回错误。确保在函数结束时归还客户端。关键点:客户端池设计提升了连接复用效率;错误处理日志记录了客户端获取的异常。

  • 并行任务处理

使用 errgroup.Group 处理并行任务,主要包括两部分:从 Elasticsearch 查询故障相关数据;从 Prometheus 获取节点列表,并行执行能有效缩短数据获取和处理的耗时。

var subGroup errgroup.Group
subGroup.Go(func() error {
    ....
  })
subGroup.Go(func() error {
    ....
  })
  • Elasticsearch 查询

start, end := getTimeRange(req.IntervalTime, req.StartTime, req.EndTime)
q := buildElasticQuery(req)
q.Filter(elastic.NewRangeQuery("timestamp").Gte(startTime).Lte(endTime))
search := client.Search().Index("qihoo-smi*").Query(q).Size(500)

构建时间范围和查询条件:使用 getTimeRange 获取时间范围(startend);构建基于时间范围的查询,过滤时间戳在范围内的数据。

执行查询:限制返回数据条数为 500;查询的索引模式为 qihoo-smi*

  • Elasticsearch 数据处理

for _, hit := range searchResult.Hits.Hits {
    var nodeFaultMetric gpuReqType.NodeFaultMetric
    if err = json.Unmarshal(hit.Source, &nodeFaultMetric); err != nil {
        logrus.Errorf("GetGpuFaultRateAndTimes unmarshall err: %v", err)
        return errors.New("反序列化异常:" + err.Error())
    }
    ...
}

查询结果逐条反序列化为 NodeFaultMetric 结构体;检查节点故障是否已记录:如果节点为新节点,则计入总节点数 total;统计自愈节点数 repaireCnt 以及故障节点总数 cnt;计算不可用时间:调用 dealNodeTime 处理节点时间范围;计算自愈率:

selfHealingRate = math.Round(repaireCnt/cnt*10000) / 100
  • Prometheus 查询

第二个并行任务从 Prometheus 中获取所有节点信息:

_, newNodesMap, err = GetPrometheusNodes(ctx, req, p)

返回节点总数以及节点映射;映射数据结构方便后续计算节点总数。


  • 整体计算

在所有并行任务完成后,计算最终结果:

nodeTotal := len(newNodesMap)
rate := 0.0
if nodeTotal > 0 {
    rate = float64(total) / float64(nodeTotal) * 100
}

故障率:如果 Prometheus 查询的节点总数不为零,则计算故障率:

rate = (故障节点数 / 总节点数) * 100

返回值:故障率 rate;故障节点总数 total;总不可用时间 totalUnavailableTime;自愈率 selfHealingRate

  • 错误处理

在任务执行失败时,及时返回错误并记录日志:

if err = subGroup.Wait(); err != nil {
    logrus.Errorf("[controller/GetGpuFaultRateAndTimes] wait err %v", err)
    return 0, 0, 0, 0, err
}

2.1.3 集群资源可视化技术实现效果
显卡利用率指标(HBOX_DCGM_FI_DEV_GPU_UTIL是优化后的DCGM指标,后面有专门介绍)计算:

avg(HBOX_DCGM_FI_DEV_GPU_UTIL{node !=\"\"%s})

显卡分配率指标(hbox_kube_pod_container_resource_requests和hbox_kube_node_status_allocatable为kube-state-metric优化后的指标,后面有专门介绍)计算:

sum_over_time(sum(max(hbox_kube_pod_container_resource_requests{resource=\"nvidia_com_gpu\",kubernetes_name!=\"kube-state-metrics-test\",unit=\"integer\"%s})by(pod)"+
    " * on(pod) group_right hbox_kube_pod_status_ready{kubernetes_name!=\"kube-state-metrics-test\", condition=\"true\"%s})[1h])/sum_over_time(sum(hbox_kube_node_status_allocatable{resource=\"nvidia_com_gpu\",kubernetes_name!=\"kube-state-metrics-test\",unit=\"integer\"%s})[1h])

空置率指标计算:

count(HBOX_DCGM_FI_DEV_GPU_UTIL{node!=""%s}==0)[1h]/sum(hbox_kube_pod_container_resource_requests{node!="",resource="nvidia_com_gpu",unit="integer"%s})[1h]

下图为各个指标实际展示效果:

37ad80d640e77fe78e03639c1bb4ddb3.pngdf326017441760dfd6956d540e62654b.png

2.2 节点可视化

2.2.1 节点维度可视化意义

节点可视化通过对功耗、利用率、GPU状态、温度以及故障情况的直观展示,能够全面提升资源使用效率和硬件管理能力。未来,可进一步结合机器学习技术,预测资源使用趋势并智能优化调度,为高性能计算平台提供更高效的支持。

节点功耗是评估计算资源运行效率的重要指标。通过可视化工具,将各节点的功耗以折线图实时展示,可以快速识别功耗异常的节点。例如,通过设置功耗阈值,当节点功耗超过预期范围时通过Grafana配置触发报警机制,提醒运维人员及时优化工作负载分配或检查硬件健康。

GPU利用率反映了节点资源使用的效率。在可视化界面中,可以使用折线图或柱状图对各节点的GPU利用率进行实时展示。同时,通过结合时间轴分析,可以识别低效利用的趋势,并进一步通过任务调度优化资源分配,提升整体性能。

对GPU空闲卡和繁忙卡的统计是资源优化调度的基础。通过热力图标注每张GPU的状态(空闲或繁忙),用户能够快速定位空闲资源,从而高效分配新的任务。同时,对于繁忙的GPU卡,可进一步深入分析其负载,避免资源瓶颈导致的任务延迟。

GPU温度的异常可能导致硬件损耗或运行中断。通过可视化温度分布图,可以实时掌握各节点GPU的温度情况。针对温度过高的节点,系统可以自动建议负载迁移或主动降频,确保硬件安全。

通过结合GPU状态数据(如掉卡、内存错误等),在故障节点中高亮故障节点,及时提供Grafana配置告警并提供详细的故障信息。此外,还可引入自动化运维脚本,在检测到故障时触发修复流程,如重新挂载GPU或重启节点,减少人工干预,提高系统的可靠性。

2.2.2 节点维度可视化实现过程

节点可视化过程中主要困难点在于实时按项目和节点维度查询功耗、GPU和GPU热力图,尤其是GPU热力图。其中GPU热力图主要实现如下:

查询语句:

countQuery := fmt.Sprintf("avg by(node, device, creator, displayname, gpu_product, modelName, trace_sid, pod) (HBOX_DCGM_FI_DEV_GPU_UTIL{node!=\"\"%s%s})", filter, unShared)

查询结果解析:

func resolveMachineResult(data string, dataSource string, isPower bool) (gpuReqType.MachineInfos, error) {
  var ret gpuReqType.MachineInfos
  realResult, err := resolveData(data)
  if err != nil || len(realResult) == 0 {
    logrus.Errorf("resolveMachineResult resolveData err %v, data %s", err, data)
    return ret, err
  }
  ret.MachineInfo = make(map[string][]gpuReqType.MachineInfo)
  for _, v := range realResult {
    metrics := v.(map[string]interface{})
    metric := getMetric(metrics["metric"].(map[string]interface{}))
    if len(metric.Node) == 0 {
      continue
    }
    num := getMetricValue(metrics["value"].([]interface{}), false)
    info := gpuReqType.MachineInfo{
      Series:      metric.GpuProduct,
      Radio:       num,
      Device:      metric.Device,
      DisplayName: metric.DisplayName,
      Creator:     metric.Creator,
    }
    if isPower {
      info.Radio = convertPower(metric.ModelName, num)
    }
    if metric.Pod == "" && metric.Creator == "" {
      info.IsEmpty = true
    }
    if _, ok := ret.MachineInfo[metric.Node]; !ok {
      ret.MachineInfo[metric.Node] = []gpuReqType.MachineInfo{info}
    } else {
      ret.MachineInfo[metric.Node] = append(ret.MachineInfo[metric.Node], info)
    }
  }
  logrus.Debugf("resolveMachineResult ret: %v\n", ret)
  return ret, nil
}
func resolveData(data string) ([]interface{}, error) {
  var result map[string]interface{}
  if err := json.Unmarshal([]byte(data), &result); err != nil {
    logrus.Errorf("resolveData unmarshal result: %v, err %v, data %s", result, err, data)
    return nil, err
  }
  if resultData, ok := result["data"]; ok {
    logrus.Debugf("resolveData data type %v, data %v\n", reflect.TypeOf(resultData), resultData)
    resultDataMap, ok := resultData.(map[string]interface{})
    if !ok {
      logrus.Errorf("resultData type %v, data %v\n", reflect.TypeOf(resultDataMap), resultDataMap)
      return nil, errors.New("resolveData invalid data format")
    }
    realResult, ok := resultDataMap["result"].([]interface{})
    if !ok || len(realResult) == 0 {
      logrus.Errorf("resolveData error  type %v, data %v\n", reflect.TypeOf(resultDataMap), resultDataMap)
      return nil, nil
    }
    return realResult, nil
  }
  return nil, nil
}

函数 1:resolveMachineResult的功能

  • 解析原始数据 data,提取并构造 GPU 信息。

  • 以节点为key封装 GPU 资源的详细信息。

  • 根据 isPower 判断是不是GPU 的功率数据。

关键逻辑分析

  1. 输入参数

  • data: Prometheus查询语句返回的结果,包含GPU相关信息。

  • isPower: 决定是否对GPU的功率进行转换。

解析原始数据

  • 调用 resolveData(data) 解析Prometheus查询返回的JSON数据;如果解析失败或结果为空,记录错误日志并返回空结果。

初始化返回值

  • 创建 ret(类型为 gpuReqType.MachineInfos),其核心字段 MachineInfo 是一个按节点分组的 map,结构为 map[string][]gpuReqType.MachineInfo,主要保存节点及节点上每张卡的信息。

遍历解析关键字段

  • 每条记录 v 是一个 map[string]interface{},表示一条 GPU 信息。

  • 调用 getMetric 提取关键字段(如节点、设备、显示名称、创建者等),。

  • 获取 value 字段的数值,通过 getMetricValue 转换为浮点数,获取查询结果。

生成 GPU 信息对象

  • 根据解析出的字段构造 gpuReqType.MachineInfo 对象,包含 GPU 产品系列、gpu利用率、设备名等。

  • 如果 isPower 为真,通过 convertPower 转换功率数据得到功耗情况。

判断是否为空 GPU 信息

  • 如果记录中 PodCreator 均为空,标记该信息为 IsEmpty = true

按节点分组存储

  • 如果当前节点 metric.Node 尚未存在于 ret.MachineInfo,初始化一个新列表。

  • 否则将该 GPU 信息追加到对应节点的列表中。

返回结果

  • 返回ret包含每个节点及对应gpu利用率、设备号、创建者、卡型号的结果。


函数 2:resolveData

功能

  • 解析Prometheus查询返回的JSON 数据,提取 result 字段的内容,返回切片形式的 GPU指标数据。

关键逻辑分析

  1. 输入参数

  • data: JSON 格式的字符串,即Prometheus查询结果。

反序列化 JSON 数据

  • 将 JSON 字符串解析为 map[string]interface{};如果解析失败,记录错误日志并返回错误。

提取数据

  • 检查反序列化结果是否包含 data 字段。

  • data 字段解析为 map[string]interface{},进一步检查是否包含 result 字段。

返回结果

  1. 如果 result 存在且为切片类型,则返回该切片。

否则记录日志并返回 nil

2.2.3 节点维度可视化实现效果

节点的功耗(HBOX_DCGM_FI_DEV_POWER_USAGE为优化后指标)和GPU利用率(HBOX_DCGM_FI_DEV_GPU_UTIL为优化后指标)

#GPU利用率
avg(avg_over_time(HBOX_DCGM_FI_DEV_GPU_UTIL{node!=""%s}[10m])) by (node,creator,displayname, gpu_product)
#功耗
avg(avg_over_time(HBOX_DCGM_FI_DEV_POWER_USAGE{node!=""%s}[10m])) by (node, creator,displayname, modelName)

bb7dec390feb1f76524a7c859a056d05.png445bbb5346fd78eef9a330b3a6134819.png

热力图指标查询及实现效果如下,获取节点、设备、创建者、gpu型号、容器名称等维度信息。

unShared := fmt.Sprintf(",shared_scheduling!=\"true\"", os.Getenv("mode"))
  countQuery := fmt.Sprintf("avg by(node, device, creator, displayname, gpu_product, modelName, trace_sid, pod) (HBOX_DCGM_FI_DEV_POWER_USAGE{node!=\"\"%s%s})", filter, unShared)

2.3 任务维度可视化

2.3.1 任务维度可视化的意义

通过对近7天任务总数、运行中任务数、任务提交量、任务成功率、以及GPU利用率和功耗等指标的全面监控,可以实现以下几方面的价值:

首先通过统计近7天创建的任务总数和当前运行中的任务数,运维人员可以快速了解系统的整体任务负载情况。当任务量出现异常波动时,监控系统可以提供预警,便于及时调整资源。例如,任务总数的持续增加可能预示着系统资源即将不足,而运行中的任务异常减少可能提示系统出现故障。

其次任务最近10分钟的GPU利用率和功耗监控,可以精确掌握每个任务对硬件资源的使用情况。通过分析GPU资源利用率,发现资源过度分配或利用不足的任务,从而对资源进行动态调整。例如,对于GPU利用率较低的任务,可以分配较低性能的GPU,释放更多资源给高优先级任务;而对于高功耗任务,可以采取节能优化策略。

接着监控任务提交量和任务成功率,能够帮助团队快速发现任务失败的根本原因。通过分析任务失败率的变化趋势,可以定位可能的系统瓶颈(如网络故障、存储问题或计算资源不足)。对于任务成功率低的系统,可以结合历史数据优化任务调度逻辑或改进硬件环境,从而提升整体任务完成率。

最后任务级别监控还可以作为系统健康状态的核心指标之一。GPU利用率、功耗及运行中任务数的异常变化往往是系统问题的前兆。例如,GPU利用率持续低于阈值可能提示GPU设备故障,功耗异常升高可能与硬件老化或散热问题有关。通过实时监控这些指标,可以大幅减少故障发生的概率。

2.3.2 任务维度可视化实现过程

func GetGpuUsageBySid(projectId []string, sids []string) (map[string]TaskGpuInfo, error) {
  beginTime := time.Now()
  filter := dealTaskGpu(TaskFilterReq{ProjectId: projectId, Sids: sids})
  var g errgroup.Group
  var taskGpuUsage, powUsage map[string]TaskGpuInfo
  g.Go(func() error {
    startTime := time.Now()
    countQuery := ""
    if len(projectId) > 0 {
      countQuery = fmt.Sprintf("avg(avg_over_time(HBOX_DCGM_FI_DEV_GPU_UTIL{node!=\"\"%s}[10m])) by (trace_app, trace_pid, trace_sid, product_type,trace_uid)", filter)
    } else {
      countQuery = fmt.Sprintf("avg(avg_over_time(HBOX_DCGM_FI_DEV_GPU_UTIL{node!=\"\"%s}[10m])) by (trace_app, trace_sid, product_type,trace_uid)", filter)
    }
    logrus.Infof("[taskstatistcis/GetGpuUsageBySid] PrometheusQuery gpu usage query: %s", countQuery)
    data, err := prometheusquery.PrometheusQuery(countQuery, time.Now().Unix(), PrometheusVM)
    if err != nil {
      logrus.Errorf("[taskstatistcis/GetGpuUsageBySid] PrometheusQuery gpu usage failed: %s", err)
      return err
    }
    taskGpuUsage, err = resolveNodeResult(data, false)
    if err != nil {
      logrus.Errorf("[taskstatistcis/GetGpuUsageBySid] resolveNodeResult gpu usage failed: %s", err)
      return err
    }
    logrus.Infof("[taskstatistcis/GetGpuUsageBySid] get gpu spend time %v", time.Since(startTime))
    return nil
  })
  g.Go(func() error {
    startTime := time.Now()
    countQuery := ""
    if len(projectId) > 0 {
      countQuery = fmt.Sprintf("avg(avg_over_time(HBOX_DCGM_FI_DEV_POWER_USAGE{node!=\"\"%s}[10m])) by (trace_app, trace_pid, trace_sid, product_type,trace_uid, modelName)", filter)
    } else {
      countQuery = fmt.Sprintf("avg(avg_over_time(HBOX_DCGM_FI_DEV_POWER_USAGE{node!=\"\"%s}[10m])) by (trace_app, trace_sid, product_type,trace_uid, modelName)", filter)
    }
    powData, err := prometheusquery.PrometheusQuery(countQuery, time.Now().Unix(), PrometheusVM)
    if err != nil {
      logrus.Errorf("[taskstatistcis/GetGpuUsageBySid] PrometheusQuery power failed: %s", err)
      return err
    }
    powUsage, err = resolveNodeResult(powData, true)
    if err != nil {
      logrus.Errorf("[taskstatistcis/GetGpuUsageBySid] resolveNodeResult power failed: %s", err)
      return err
    }
    logrus.Infof("[taskstatistcis/GetGpuUsageBySid] get pow spend time %v", time.Since(startTime))
    return nil
  })
  if err := g.Wait(); err != nil {
    logrus.Errorf("[taskstatistcis/GetGpuUsageBySid] get usage and power goroutine failed: %s", err)
    return nil, err
  }
  //logrus.Debugf("[taskstatistcis/GetGpuUsageBySid] taskGpuUsage %v, ----- powUsage %v", taskGpuUsage, powUsage)
  for k, v := range taskGpuUsage {
    if usage, ok := powUsage[k]; ok {
      v.GpuUsages.GpuPower = usage.GpuUsages.GpuPower
    }
    taskGpuUsage[k] = v
  }
  logrus.Infof("[taskstatistcis/GetGpuUsageBySid] GetGpuUsageBySid cost %v", time.Now().Sub(beginTime))
  return taskGpuUsage, nil
}

函数 GetGpuUsageBySid 的主要功能是获取与指定任务 (sid) 或项目 (projectId) 相关的 GPU利用率和功耗信息,并将其整合返回。它通过 Prometheus 查询GPU指标,采用并发优化执行查询操作。

函数逻辑详细说明

  • 开始计时

beginTime := time.Now()

记录函数执行的起始时间,用于后续计算整体耗时。

  • 构建 Prometheus 查询过滤条件

filter := dealTaskGpu(TaskFilterReq{ProjectId: projectId, Sids: sids})

调用 dealTaskGpu 函数生成 PromQL 查询的过滤条件(filter),根据 projectIdsids 构建条件字符串。

  • 声明并发任务组

var g errgroup.Group
var taskGpuUsage, powUsage map[string]TaskGpuInfo

errgroup.Group:用于管理两个并发任务的执行,确保在一个任务失败时可以统一处理错误;taskGpuUsagepowUsage:分别存储 GPU 使用率和功耗查询的结果。

  • 并发查询 GPU 使用率

g.Go(func() error {
  // 定义 Prometheus 查询语句
  countQuery := ""
  if len(projectId) > 0 {
    countQuery = fmt.Sprintf("avg(avg_over_time(HBOX_DCGM_FI_DEV_GPU_UTIL{node!=\"\"%s}[10m])) by (trace_app, trace_pid, trace_sid, product_type,trace_uid)", filter)
  } else {
    countQuery = fmt.Sprintf("avg(avg_over_time(HBOX_DCGM_FI_DEV_GPU_UTIL{node!=\"\"%s}[10m])) by (trace_app, trace_sid, product_type,trace_uid)", filter)
  }


       .....
})

主要逻辑

  1. 构建 Prometheus 查询语句,使用 avg_over_time 计算过去 10 分钟的平均 GPU 使用率。

  • 如果 projectId 不为空,则按 trace_pidtrace_sid 进行细粒度查询。

  • 否则只按 trace_sid 进行查询。

调用 prometheusquery.PrometheusQuery 执行 Prometheus 查询。

调用 resolveNodeResult 解析查询结果,返回键值对形式的数据。

  • 并发查询 GPU 功耗

g.Go(func() error {  // 构建 Prometheus 查询语句  countQuery := ""  if len(projectId) > 0 {    countQuery = fmt.Sprintf("avg(avg_over_time(HBOX_DCGM_FI_DEV_POWER_USAGE{node!=\"\"%s}[10m])) by (trace_app, trace_pid, trace_sid, product_type,trace_uid, modelName)", filter)  } else {    countQuery = fmt.Sprintf("avg(avg_over_time(HBOX_DCGM_FI_DEV_POWER_USAGE{node!=\"\"%s}[10m])) by (trace_app, trace_sid, product_type,trace_uid, modelName)", filter)  }  .....})

主要逻辑

  1. 查询 GPU 功耗指标 HBOX_DCGM_FI_DEV_POWER_USAGE,计算过去 10 分钟的平均值。

  2. 与 GPU 使用率类似,调用 Prometheus 查询接口,并解析返回结果。

  • 等待所有并发任务完成

if err := g.Wait(); err != nil {  logrus.Errorf("get usage and power goroutine failed: %s", err)  return nil, err}

g.Wait() 会阻塞,直到所有 goroutines 完成或有任务出错。如果出错,统一返回错误信息。

  • 合并 GPU 使用率和功耗,并返回结果

for k, v := range taskGpuUsage {
  if usage, ok := powUsage[k]; ok {
    v.GpuUsages.GpuPower = usage.GpuUsages.GpuPower
  }
  taskGpuUsage[k] = v
}
return taskGpuUsage, nil


2.3.3 任务维度可视化的实现效果

8e87251d80b7c408c2ce2a8259df088f.png

任务维度查询语句及实现效果如下:

countQuery = fmt.Sprintf("avg(avg_over_time(HBOX_DCGM_FI_DEV_GPU_UTIL{node!=\"\"%s}[10m])) by (trace_app, trace_pid, trace_sid, product_type,trace_uid)", filter)

426ad2687a8f81a0a5bc868a04a4eb88.png

3.DCGM和kube-state-metric的优化

3.1 DCGM(NVIDIA Data Center GPU Manager)

3.1.1 简介

  • DCGM 是由NVIDIA提供的一款GPU管理和监控工具,专为数据中心环境设计。

  • DCGM用于简化和增强对GPU的监控、管理和诊断的能力。

  • DCGM支持NVIDIA GPU的性能监控、健康检查、运行时诊断等,是GPU集群管理中的核心工具之一。

3.1.2 DCGM的优化改造

主要针对DCM_FI_DEV_GPU_UTIL、DCGM_FI_DEV_GPU_TEMP、DCGM_FI_DEV_POWER_USAGE、DCGM_FI_PROF_SM_ACTIVE等常用的DCGM指标扩展。

  • 提供metrics接口,metrics接口访问dcgm的/metric接口获取节点指标信息

var res stringurl := fmt.Sprintf("http://%s:%s%s", nvidiaDcgmExporterIP, config.Config.NvidiaExporter.Port, config.Config.NvidiaExporter.Path)err := requests.URL(url).ToString(&res).Fetch(context.TODO())if err != nil {    panic(err)}
  • 解析相关指标并拓展label、annatations

for scanner.Scan() {
    ...
    metricInfo := model.ParseMetric(line)
    metricInfo.ExtendLabels()
    ....
}
  • 返回解析结果

3.1.3 最终实现采集指标效果

[
  {
    "metric": {
      "__name__": "HBOX_DCGM_FI_DEV_GPU_UTIL",
      "DCGM_FI_DRIVER_VERSION": "470.161.03",
      "Hostname": "nvidia-dcgm-exporter-2s5l4",
      "UUID": "GPU-4231541d-0f79-b22f-d173-55c2b27ce684",
      "cluster": "xxx-prd",
      "container": "xxx-0",
      "creator": "xxx",
      "device": "nvidia2",
      "gpu": "2",
      "gpu_product": "k40m-2",
      "instance": "xxxx:9400",
      "job": "hbox-dcgm-exporter",
      "modelName": "Tesla K40m",
      "namespace": "xxxx",
      "node": "dlgpu16xxxx",
      "node_ip": "xxx",
      "node_label_dev1_hbox_qihoo_net_owner": "15",
      "node_label_dev1_hbox_qihoo_net_products_training": "true",
      "node_label_dev1_hbox_qihoo_net_usage_online_status": "online",
      "pci_bus_id": "00000000:83:00.0",
      "pod": "xxxxx",
      "pod_annotation_hbox_qihoo_net_trace_pname": "xxxx",
      "pod_annotation_hbox_qihoo_net_user_mail": "xxx",
      "pod_ip": "xxxx",
      "pod_label_hbox_qihoo_net_priority": "PO",
      "pod_label_hbox_qihoo_net_product_creator": "HBoxServing",
      "pod_label_hbox_qihoo_net_trace_env": "prdp",
      "pod_label_hbox_qihoo_net_trace_pid": "xxxx",
      "product_type": "perception",
      "prometheus_replica": "prometheus-[0|1]",
      "xx_app": "xxx-http-online-gpu",
      "xx_pid": "xxx",
      "xx_sid": "xxx",
      "xx_uid": "213",
      "xHawk_collector_id": "159"
    },
    "value": [
      1735540179.101,
      "16"
    ],
    "group": 1
  }
]

3.2 kube-state-metrics

3.2.1 简介

  • kube-state-metrics 是 Kubernetes 官方提供的开源工具,用于从 Kubernetes 的 API Server 收集集群的状态信息,并将其暴露为 Prometheus 格式的指标。

  • 它并不直接与资源的实时使用数据打交道(例如 CPU 或内存的实时使用情况),而是专注于 Kubernetes 对象的状态信息

3.2.2 kube-state-metrics的优化改造

主要针对kube_pod_container_resource_requests、kube_node_status_allocatable等常用的kube-state-metrics指标扩展。

  • 提供metrics接口

r.GET("metrics", GetStateMetrcs)
  • metrics接口访问dcgm的/metric接口获取节点指标信息

url := fmt.Sprintf("http://%s:%s%s", metricUrl, config.StateExporter.Port, config.StateExporter.Path)
resp, err := http.Get(url)
if err != nil {
  logrus.Errorf("Error fetching metrics: %v", err)
  c.JSON(200, fmt.Sprintf("Error fetching metrics: %v", err))
  return
}
defer resp.Body.Close()
  • 解析相关指标并拓展label、annatations

for scanner.Scan() {
    line := scanner.Text()
    if strings.HasPrefix(line, "#") || len(strings.TrimSpace(line)) == 0 {
      continue
    }
    metricInfo := model.ParseMetric(line)
    if _, ok := allowPodLabel[metricInfo.Name]; ok {
  metricInfo.ExtendPodMessage(podMap)
    }
    metricInfo.ExtendLabels()
    ....
}
  • 具体解析过程

func (m *Metric) extendPodLabel(pod v1.Pod) {
  podLabels := pod.Labels


  for k, v := range podLabels {
    if util.AllowCollect(k, config.Config.Common.GlobalAllow) || util.AllowCollect(k, config.Config.Common.AllowPodLabels) {
      k = strings.ReplaceAll(k, ".", "_")
      k = strings.ReplaceAll(k, "/", "_")
      k = strings.ReplaceAll(k, "-", "_")
      m.Label["pod_label_"+k] = v
    }
  }
  gpuType, ok := podLabels["hbox.qihoo.net/product.series"]
  if !ok || len(gpuType) == 0 {
    gpuType = getGpuProduct(pod)
    if len(gpuType) == 0 {
      gpuType = podLabels["hbox.qihoo.net/gpu.product"]
    }
  }
  if len(gpuType) > 0 {
    m.Label["gpu_product"] = gpuType
  }
  v, ok := podLabels["hbox.qihoo.net/product.type"]
  if ok && v == "perception" {
    m.Label["old_sid"] = m.Label["xxx_sid"]
    m.Label["xxx_sid"] = podLabels["global/index"]
    m.Label["xxx_app"] = podLabels["app"]
  } else if v == "hbox2pro" || v == "hbox2proSub" {
    m.Label["xxx_app"] = podLabels["volcano.sh/job-name"]
  } else {
    logrus.Debugf("[extendPodLabel] values %s, podLabels: %v", v, podLabels)
  }
}

3.1.3 最终实现采集指标效果

[
  {
    [
  {
    "metric": {
      "__name__": "hbox_kube_pod_container_resource_requests",
      "app_kubernetes_io_component": "metrics",
      "app_kubernetes_io_instance": "hbox-state-metrics",
      "app_kubernetes_io_managed_by": "Helm",
      "app_kubernetes_io_name": "hbox-state-metrics",
      "app_kubernetes_io_part_of": "hbox-state-metrics",
      "app_kubernetes_io_version": "2.3.0",
      "cluster": "xxx-prd",
      "container": "xxxx-0",
      "gpu_product": "800-2",
      "helm_sh_chart": "hbox-state-metrics-4.6.0",
      "instance": "xxx:xxx",
      "kubernetes_name": "hbox-state-metrics",
      "kubernetes_namespace": "monitoring",
      "namespace": "xxxx",
      "node": "800-gpuxxx",
      "pod": "xxxxx",
      "pod_annotation_hbox_qihoo_net_trace_pid": "xxx",
      "pod_annotation_hbox_qihoo_net_user_displayname": "xxx",
      "pod_ip": "xxx",
      "pod_label_hbox_qihoo_net_priority": "P5",
      "pod_label_hbox_qihoo_net_product_creator": "hbox-serving",
      "pod_label_hbox_qihoo_net_trace_app": "xxx-0",
      "pod_label_hbox_qihoo_net_trace_env": "prdp",
      "pod_label_hbox_qihoo_net_trace_sid": "xxxx",
      "pod_label_hbox_qihoo_net_trace_uid": "xxx",
      "pod_label_hbox_qihoo_net_trace_uname": "xxx",
      "product_type": "deployment",
      "prometheus_replica": "prometheus-[0|1]",
      "resource": "cpu",
      "trace_pid": "xxx",
      "uid": "6e11f590-8912-42c2-94a6-9d4b6899793c",
      "unit": "core"
    },
    "value": [
      1735547103.019,
      "120"
],
    "group": 1
  }
]

4.总结

集群资源可视化不仅是提高资源利用率的重要工具,更是推动系统优化和技术创新的核心手段。随着技术的不断进步,未来可视化将融入更多智能分析与自动化优化能力,为计算集群释放更大潜力,持续提升 GPU 利用率并减少资源浪费。

本文深入探讨了集群资源可视化的四个关键维度:集群资源监控、节点、任务和人维度分析,并结合实际案例,展示了这些技术在提升集群性能和节约资源成本中的重要价值。通过优化DCGM和kube-state-metrics的指标,我们不仅实现了资源使用的透明化,还为集群性能调优提供了强有力的支持。

END

天纪AI开发平台TAI,是面向业务开发者、算法工程师、AI初学者以及AI项目负责人等相关人员提供的一站式AI开发平台。可快速创建和部署模型,管理AI生产全周期,覆盖数据加工标注、模型构建、模型开发、模型训练、模型部署、训练推理优化等AI开发全链路服务。底层对接智算基座,算力强劲,支持多种调度策略,支持异构算力。内置行业场景模型和案例,为初级用户提供低门槛接入方式,同时具备专业、深度、可扩展性,构建机器学习、深度学习和大模型场景所需的系统平台。

产品使用地址:https://console.zyun.360.cn/product/tai

欢迎使用和合作!


http://www.kler.cn/a/509748.html

相关文章:

  • Spring Security(maven项目) 3.0.2.5版本中改
  • 汽车网络信息安全-ISO/SAE 21434解析(上)
  • 企业分类相似度筛选实战:基于规则与向量方法的对比分析
  • 基于Python+Gurobi的库存分配问题建模求解
  • 【服务治理中间件】consul介绍和基本原理
  • 指针的进阶
  • Java开发提效秘籍:巧用Apache Commons IO工具库
  • 【力扣Hot 100】子串
  • 力扣动态规划-3【算法学习day.97】
  • React 中hooks之useLayoutEffect 用法总结以及与useEffect的区别
  • 多种vue前端框架介绍
  • 【项目推荐】CakeMu-RV:一个开放的 RISC-V 处理器模拟器学习项目
  • 服务器卡顿是否等同于遭受CC攻击?
  • Windows 下 Postgres 安装 TimescaleDB 插件
  • (RAG系列) FastGPT通过API调用工作流问答
  • ESP8266-01S的TCP/IP相关的AT指令
  • 《深入浅出HTTPS​​​​​​​​​​​​​​​​​》读书笔记(29):TLS/SSL协议
  • 新星杯-ESP32智能硬件开发--ESP32开发环境
  • R语言的并发编程
  • 【华为OD-E卷 - 字符串加密 100分(python、java、c++、js、c)】
  • python初学者需要注意的几个基础点
  • 弱口令漏洞+文件上传漏洞
  • 【深度学习】1.深度学习解决问题与应用领域
  • MuJoCo仿真中的两轮平衡小车项目【问题集合】
  • 安卓java端service如何在native进程进行访问-跨进程通讯高端知识
  • 数据结构(初阶)(一)----算法复杂度