当前位置: 首页 > article >正文

基于华为atlas环境下的OpenPose人体关键点检测的人员跨越、坐立检测

整体思路:

收集数据集,数据集中包含3种类型的数据,分别是跨越、坐立、其他(站立、睡着等等)。3种类型的数据样本量持平。

首先基于OpenPose进行人体关键点的检测,得到人体的18个关键点。然后基于该算法将上面的数据集跑一遍,得到所有数据的人体关键点和类别。基于该识别结果输入xgboost模型进行3分类训练,得到最终的输出结果。

数据集准备:

跨越数据,类别为0

坐立数据,类别为1

其他类型数据,类别为2

mxpiOpenposeProto库的编译:

这里使用的protoc是3.14.0的版本,protobuf是3.19.0的版本

cd proto
bash build.sh

后处理插件编译:

cd plugins
bash build.sh
chmod 440 build/libmxpi_openposepostprocess.so
cp build/libmxpi_openposepostprocess.so ${SDK_INSTALL_PATH}/mxVision/lib/plugins/ # ${SDK_INSTALL_PATH}替换为用户的SDK安装路径

模型转化:

atc --model=./simplified_560_openpose_pytorch.onnx --framework=5 --output=openpose_pytorch_560 --soc_version=Ascend310P3 --input_shape="data:1, 3, 560, 560" --input_format=NCHW --insert_op_conf=./insert_op.cfg

推理代码实现:

import sys
import os
import enum

import numpy as np
import cv2
import time

from StreamManagerApi import StreamManagerApi, MxDataInput, StringVector
sys.path.append("../proto")
import mxpiOpenposeProto_pb2 as mxpiOpenposeProto



from xgb import XGB


COCO_PAIRS = [(1, 2), (1, 5), (2, 3), (3, 4), (5, 6), (6, 7), (1, 8), (8, 9), (9, 10), (1, 11),
             (11, 12), (12, 13), (1, 0), (0, 14), (14, 16), (0, 15), (15, 17), (2, 16), (5, 17)]  # = 19

COCO_PAIRS_RENDER = COCO_PAIRS[:-2]

COCO_COLORS = [[255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0], [0, 255, 0],
              [0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255], [0, 0, 255], [85, 0, 255],
              [170, 0, 255], [255, 0, 255], [255, 0, 170], [255, 0, 85]]


class OPENPOSE(object):
    def __init__(self):

        # init stream manager
        self.stream_manager_api = StreamManagerApi()
        ret = self.stream_manager_api.InitManager()
        if ret != 0:
            print("Failed to init Stream manager, ret=%s" % str(ret))
            exit()

        # create streams by pipeline config file
        with open("./pipeline/Openpose.pipeline", "rb") as f:
            pipeline_str = f.read()
        ret = self.stream_manager_api.CreateMultipleStreams(pipeline_str)
        if ret != 0:
            print("Failed to create Stream, ret=%s" % str(ret))
            exit()


        #跨越、坐立预测
        self.xgb = XGB()


    
    def preproc(self, img, img_size, swap=(2, 0, 1)):
        h,w = img.shape[:2]
        h_ratio = img_size[0] / img.shape[0]
        w_ratio = img_size[1] / img.shape[1]
        resized_img = cv2.resize(
            img,
            (img_size[1], img_size[0]),
            interpolation=cv2.INTER_AREA,
        ).astype(np.uint8)
    
        return resized_img, h_ratio, w_ratio

    def get_pose_bbox(self, person_list, h_ratio, w_ratio):
        joints, xcenter = [], []
        for person in person_list:
            skeletons = person.skeletonInfoVec
            x_coords, y_coords, centers = [], [], {}
            seen_idx = []
            for skele in skeletons:
                part_idx1 = skele.cocoSkeletonIndex1
                part_idx2 = skele.cocoSkeletonIndex2
                if part_idx1 not in seen_idx:
                    seen_idx.append(part_idx1)
                    center = (int(skele.x0 / w_ratio), int(skele.y0 / h_ratio))
                    centers[part_idx1] = center
                    x_coords.append(center[0])
                    y_coords.append(center[1])
    
                if part_idx2 not in seen_idx:
                    seen_idx.append(part_idx2)
                    center = (int(skele.x1 / w_ratio), int(skele.y1 / h_ratio))
                    centers[part_idx2] = center
                    x_coords.append(center[0])
                    y_coords.append(center[1])
            
            joints.append(centers)
        return joints
    
    
    def draw(self, npimg, results):
    
        for joint in results:
            minx, miny = -1,-1
            for key, value in joint["joint"].items():
                # draw keypoints
                center = value
                cv2.circle(npimg, center, 3, COCO_COLORS[key], thickness=3, lineType=8, shift=0)
                
                if minx==-1 and miny==-1:
                    minx = center[0]
                    miny = center[1]
    
    
    
            # draw skeletons
            for pair_order, pair in enumerate(COCO_PAIRS_RENDER):
                if pair[0] not in joint["joint"].keys() or pair[1] not in joint["joint"].keys():
                    continue
                cv2.line(npimg, joint["joint"][pair[0]], joint["joint"][pair[1]], COCO_COLORS[pair_order], 3, cv2.LINE_AA)

            label = joint["pred_label"] +" {:.3f}".format(joint["prob"])
            cv2.putText(npimg, label, (minx, miny), 0, 0.6, (255,255,255), thickness=1, lineType=cv2.LINE_AA)
    
        return npimg
    


    def process(self, image):
        stream_name = b"classification+detection"
        in_plugin_id = 0
        
 
        h0, w0 = image.shape[:2]

        input_shape = (560, 560)
        pre_img, h_ratio, w_ratio = self.preproc(image, input_shape)
        pre_img = np.ascontiguousarray(pre_img)

        image_bytes = cv2.imencode('.jpg', pre_img)[1].tobytes()
        
        
        data_input = MxDataInput()
        data_input.data = image_bytes



        unique_id = self.stream_manager_api.SendData(stream_name, in_plugin_id, data_input)
        if unique_id < 0:
            print("Failed to send data to stream.")
            exit()



        keys = [b"mxpi_openposepostprocess0"]
        key_vec = StringVector()
        for key in keys:
            key_vec.push_back(key)
        infer_result = self.stream_manager_api.GetProtobuf(stream_name, in_plugin_id, key_vec)
        if infer_result.size() == 0:
            print("infer_result is null")
            exit()
        if infer_result[0].errorCode != 0:
            print("infer_result error. errorCode=%d" % (infer_result[0].errorCode))
            exit()
        result_personlist = mxpiOpenposeProto.MxpiPersonList()
        result_personlist.ParseFromString(infer_result[0].messageBuf)
        detect_person_list = result_personlist.personInfoVec
    

        joints  = self.get_pose_bbox(detect_person_list, h_ratio, w_ratio)

        results = []
        for joint in joints:
            joint_np = np.ones((1,36))*(-1)
            for i in range(18):
                if i in joint.keys():
                    joint_np[0,2*i] = joint[i][0]
                    joint_np[0,2*i+1] = joint[i][1]
            pred, pred_prob, pred_label = self.xgb.pred(joint_np)
            
            results.append({"joint":joint, "pred":pred, "prob":pred_prob, "pred_label":pred_label})



        return results

    
    def __del__(self):
        # destroy streams
        self.stream_manager_api.DestroyAllStreams()
   



def test_image():
    openpose = OPENPOSE()

    file_name = "./images/1.jpg"
    image = cv2.imread(file_name, 1)
    

    results = openpose.process(image)
    print("#####", results)
    image_show = openpose.draw(image, results)
    cv2.imwrite(file_name.split('.')[0] + "_detect_result.jpg", image_show)


def test_images():
    openpose = OPENPOSE()


    data_dir = "./data/other/"
    #data_dir = "./data/sit/"
    #data_dir = "./data/span/"

    for name in os.listdir(data_dir):
        fullname = os.path.join(data_dir, name)
        image = cv2.imread(fullname, 1)

        joints = openpose.process(image)
        #print(name, joints)
        for joint in joints:
            out = ""+name +" 2"
            for i in range(18):
                if i in joint["joint"].keys():
                    out = out + " "+ str(joint["joint"][i][0]) + " " + str(joint["joint"][i][1])
                else:
                    out = out+" -1 -1"
            print(out)

        #image_show = openpose.draw(image, joints)
        #cv2.imwrite(name, image_show)


def test_video():
    openpose = OPENPOSE()

    # Open the video file
    video_path = "./images/span.mp4"
    cap = cv2.VideoCapture(video_path)

    fourcc = cv2.VideoWriter_fourcc('X', 'V', 'I', 'D') # 确定视频被保存后的编码格式
    output = cv2.VideoWriter("output.mp4", fourcc, 20, (640, 480)) # 创建VideoWriter类对象

    # Loop through the video frames
    while cap.isOpened():
        # Read a frame from the video
        success, frame = cap.read()
    
        if success:
            # Run YOLOv8 tracking on the frame, persisting tracks between frames
            t1 = time.time()
            results = openpose.process(frame)
            t2 = time.time()
    
    
            annotated_frame = openpose.draw(frame, results)
            print("time", t2-t1)
            
            output.write(annotated_frame)
    
            # Break the loop if 'q' is pressed
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break
        else:
            # Break the loop if the end of the video is reached
            break
    
    # Release the video capture object and close the display window
    cap.release()
    cv2.destroyAllWindows()

if __name__ == '__main__':
    #test_image()
    test_images()
    #test_video()

模型识别效果:

Xgboost数据集准备:

基于OpenPose模型将数据集跑一遍,得到关键点坐标数据集,数据集保存在txt里面,每一行格式为(图片名 类别 关键点xy坐标),如果身体遮挡没有关键点的使用-1代替。

Xgboost模型训练:

训练代码,

import xgboost
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
#from sklearn.externals import joblib
import joblib
import numpy as np
from matplotlib import pyplot as plt
import os

from matrix import DrawConfusionMatrix
data_label = ["span", "sit", "other"]
drawconfusionmatrix = DrawConfusionMatrix(labels_name=data_label)

def train():
    # 载入数据集
    dataset = np.loadtxt('./data/all.txt', delimiter=" ", usecols=list(range(1,38)))

    X = dataset[:, 1:]
    Y = dataset[:, 0]


    """
    #数据归一化
    x = X[:,0:36:2].copy()
    y = X[:,1:36:2].copy()
    maxx = np.max(x, axis=1).reshape(-1,1)
    x[x==-1]=10000
    minx = np.min(x, axis=1).reshape(-1,1)
    
    maxy = np.max(y, axis=1).reshape(-1,1)
    y[y==-1]=10000
    miny = np.min(y, axis=1).reshape(-1,1)

    minxy = np.hstack([minx, miny])
    maxxy = np.hstack([maxx, maxy])
    
    minxy = np.tile(minxy,(1,18))
    maxxy = np.tile(maxxy,(1,18))


    X = (X - minxy)/(maxxy-minxy)
    """
 
    # 把数据集拆分成训练集和测试集
    seed = 7
    test_size = 0.15
    X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=test_size, random_state=seed)


    evalset = [(X_train, y_train), (X_test, y_test)]
    model = XGBClassifier(max_depth=2, num_class=3, learning_rate=0.1, n_estimators=300, silent=True, reg_alpha=0.01, objective='multi:softprob')
    
    
    model.fit(X_train, y_train, eval_metric=['mlogloss','merror'], eval_set = evalset, verbose=True)
    
   
    results = model.evals_result()
    
    plt.plot(results["validation_0"]["mlogloss"], label="train", linestyle="solid", color='k')
    plt.plot(results["validation_1"]["mlogloss"], label="test", linestyle="dotted", color='k')
    plt.legend()
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.savefig("loss.png")
    
    
    plt.plot(1.0-np.array(results["validation_0"]["merror"]), label="train", linestyle="solid", color='k')
    plt.plot(1.0-np.array(results["validation_1"]["merror"]), label="test", linestyle="dotted", color='k')
    plt.legend()
    
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.savefig("acc.png")
    
    

    # 保存模型
    joblib.dump(model, "xg.model") 
    # 加载模型
    model = joblib.load("xg.model")
    # 对测试集做预测
    
    y_pred = model.predict(X_test)
    predictions = [round(value) for value in y_pred]
    
    
    y_pred_prob = model.predict_proba(X_test)
    
    #混淆矩阵
    drawconfusionmatrix.update(np.array(predictions,np.int32), y_test.astype(np.int32))
    drawconfusionmatrix.drawMatrix()
 
    # 评估预测结果
    accuracy = accuracy_score(y_test, predictions)
    print("Accuracy: %.2f%%" % (accuracy * 100.0))


def test():
    xgb = XGB()
    data = "283 183 271 225 244 224 226 285 272 309 297 226 296 282 300 304 246 323 333 345 296 435 281 318 350 348 294 435 275 175 289 176 263 182 297 183"
    data = data.split(" ")
    x_test = np.array(data,np.float32).reshape(-1, 36)
    predictions, predictions_label = xgb.pred(x_test)
    print(predictions, predictions_label)


class XGB():
    def __init__(self):
        self.model = joblib.load("./models/xg.model")
        self.data_label = {0:"span", 1:"sit", 2:"other"}

    def pred(self, X_test):
        #pred = self.model.predict(X_test)[0]
        pred_prob = self.model.predict_proba(X_test)[0]
        pred = int(np.argmax(pred_prob))
        pred_prob = pred_prob[pred]
        pred_label = self.data_label[pred]
        return pred, pred_prob, pred_label

if __name__ == "__main__":
    train()
    #test()

训练结果,

整体测试:

整体感受:

(1)目前基于该方法有一定的效果,精度不高主要还是因为训练数据太少。关键点模型也是直接使用的开源的模型,没有在自己私有数据上微调,等等问题都会对最终的结果有影响。

(2)本质来看,跨越、坐立还是一个时序问题,基于时序的思路解答这个问题效果应该是会高一个量级的。但是时序的模型一般都大,过程复杂,效率偏低,工程视频实时推理使用又不实际。

参考链接:

https://github.com/Daniil-Osokin/lightweight-human-pose-estimation.pytorch

https://gitee.com/ascend/mindxsdk-referenceapps/tree/master/contrib/OpenposeKeypointDetection

快速安装昇腾环境 — 昇腾开源 1.0 文档


http://www.kler.cn/a/371919.html

相关文章:

  • 数据库1-4讲
  • Flutter鸿蒙化 在鸿蒙应用中添加Flutter页面
  • 【unity调用c++动态库,c++和c#相互调用】
  • 密码学原理技术-第十一章-Hash Functions
  • git理解记录
  • 网络安全抓包
  • Mybatis-15.动态SQL-if
  • 【Hadoop之hdfs】hdfs一些简单明了的总结(一篇足以,字少但都是精华)
  • pytest 单元框架里,前置条件
  • MySQL数据集成至金蝶云星空的解决方案
  • 【Fastjson反序列化漏洞:深入了解与防范】
  • 类加载机制123
  • HTML入门教程9:HTML引用
  • java 大集合切分成一个集合中有多个小集合
  • Java程序设计基础 第十七章:反射和设计模式
  • 大话PM | 从项目管理软件看项目管理的三个原则两个思维两个工具
  • 深入 Prometheus 监控生态 - 第五篇:利用 API 信息进行监控(NAS 备份任务监控 + 解决思路)
  • 【约束优化】一次搞定拉格朗日,对偶问题,弱对偶定理,Slater条件和KKT条件
  • 画思维导图的app有哪些?5个软件让你轻松画思维导图不求人
  • PostgreSQL 不同模式之间的数据迁移
  • Python小游戏18——中国象棋
  • 安卓13 连接usb设备后不更新ui
  • Android 应用权限管理详解
  • 【Linux】线程锁同步互斥生产消费模型
  • Windows: 如何实现CLIPTokenizer.from_pretrained`本地加载`stable-diffusion-2-1-base`
  • 网络爬虫的基本原理是什么?