基于华为atlas环境下的OpenPose人体关键点检测的人员跨越、坐立检测
整体思路:
收集数据集,数据集中包含3种类型的数据,分别是跨越、坐立、其他(站立、睡着等等)。3种类型的数据样本量持平。
首先基于OpenPose进行人体关键点的检测,得到人体的18个关键点。然后基于该算法将上面的数据集跑一遍,得到所有数据的人体关键点和类别。基于该识别结果输入xgboost模型进行3分类训练,得到最终的输出结果。
数据集准备:
跨越数据,类别为0
坐立数据,类别为1
其他类型数据,类别为2
mxpiOpenposeProto库的编译:
这里使用的protoc是3.14.0的版本,protobuf是3.19.0的版本
cd proto
bash build.sh
后处理插件编译:
cd plugins
bash build.sh
chmod 440 build/libmxpi_openposepostprocess.so
cp build/libmxpi_openposepostprocess.so ${SDK_INSTALL_PATH}/mxVision/lib/plugins/ # ${SDK_INSTALL_PATH}替换为用户的SDK安装路径
模型转化:
atc --model=./simplified_560_openpose_pytorch.onnx --framework=5 --output=openpose_pytorch_560 --soc_version=Ascend310P3 --input_shape="data:1, 3, 560, 560" --input_format=NCHW --insert_op_conf=./insert_op.cfg
推理代码实现:
import sys
import os
import enum
import numpy as np
import cv2
import time
from StreamManagerApi import StreamManagerApi, MxDataInput, StringVector
sys.path.append("../proto")
import mxpiOpenposeProto_pb2 as mxpiOpenposeProto
from xgb import XGB
COCO_PAIRS = [(1, 2), (1, 5), (2, 3), (3, 4), (5, 6), (6, 7), (1, 8), (8, 9), (9, 10), (1, 11),
(11, 12), (12, 13), (1, 0), (0, 14), (14, 16), (0, 15), (15, 17), (2, 16), (5, 17)] # = 19
COCO_PAIRS_RENDER = COCO_PAIRS[:-2]
COCO_COLORS = [[255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0], [0, 255, 0],
[0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255], [0, 0, 255], [85, 0, 255],
[170, 0, 255], [255, 0, 255], [255, 0, 170], [255, 0, 85]]
class OPENPOSE(object):
def __init__(self):
# init stream manager
self.stream_manager_api = StreamManagerApi()
ret = self.stream_manager_api.InitManager()
if ret != 0:
print("Failed to init Stream manager, ret=%s" % str(ret))
exit()
# create streams by pipeline config file
with open("./pipeline/Openpose.pipeline", "rb") as f:
pipeline_str = f.read()
ret = self.stream_manager_api.CreateMultipleStreams(pipeline_str)
if ret != 0:
print("Failed to create Stream, ret=%s" % str(ret))
exit()
#跨越、坐立预测
self.xgb = XGB()
def preproc(self, img, img_size, swap=(2, 0, 1)):
h,w = img.shape[:2]
h_ratio = img_size[0] / img.shape[0]
w_ratio = img_size[1] / img.shape[1]
resized_img = cv2.resize(
img,
(img_size[1], img_size[0]),
interpolation=cv2.INTER_AREA,
).astype(np.uint8)
return resized_img, h_ratio, w_ratio
def get_pose_bbox(self, person_list, h_ratio, w_ratio):
joints, xcenter = [], []
for person in person_list:
skeletons = person.skeletonInfoVec
x_coords, y_coords, centers = [], [], {}
seen_idx = []
for skele in skeletons:
part_idx1 = skele.cocoSkeletonIndex1
part_idx2 = skele.cocoSkeletonIndex2
if part_idx1 not in seen_idx:
seen_idx.append(part_idx1)
center = (int(skele.x0 / w_ratio), int(skele.y0 / h_ratio))
centers[part_idx1] = center
x_coords.append(center[0])
y_coords.append(center[1])
if part_idx2 not in seen_idx:
seen_idx.append(part_idx2)
center = (int(skele.x1 / w_ratio), int(skele.y1 / h_ratio))
centers[part_idx2] = center
x_coords.append(center[0])
y_coords.append(center[1])
joints.append(centers)
return joints
def draw(self, npimg, results):
for joint in results:
minx, miny = -1,-1
for key, value in joint["joint"].items():
# draw keypoints
center = value
cv2.circle(npimg, center, 3, COCO_COLORS[key], thickness=3, lineType=8, shift=0)
if minx==-1 and miny==-1:
minx = center[0]
miny = center[1]
# draw skeletons
for pair_order, pair in enumerate(COCO_PAIRS_RENDER):
if pair[0] not in joint["joint"].keys() or pair[1] not in joint["joint"].keys():
continue
cv2.line(npimg, joint["joint"][pair[0]], joint["joint"][pair[1]], COCO_COLORS[pair_order], 3, cv2.LINE_AA)
label = joint["pred_label"] +" {:.3f}".format(joint["prob"])
cv2.putText(npimg, label, (minx, miny), 0, 0.6, (255,255,255), thickness=1, lineType=cv2.LINE_AA)
return npimg
def process(self, image):
stream_name = b"classification+detection"
in_plugin_id = 0
h0, w0 = image.shape[:2]
input_shape = (560, 560)
pre_img, h_ratio, w_ratio = self.preproc(image, input_shape)
pre_img = np.ascontiguousarray(pre_img)
image_bytes = cv2.imencode('.jpg', pre_img)[1].tobytes()
data_input = MxDataInput()
data_input.data = image_bytes
unique_id = self.stream_manager_api.SendData(stream_name, in_plugin_id, data_input)
if unique_id < 0:
print("Failed to send data to stream.")
exit()
keys = [b"mxpi_openposepostprocess0"]
key_vec = StringVector()
for key in keys:
key_vec.push_back(key)
infer_result = self.stream_manager_api.GetProtobuf(stream_name, in_plugin_id, key_vec)
if infer_result.size() == 0:
print("infer_result is null")
exit()
if infer_result[0].errorCode != 0:
print("infer_result error. errorCode=%d" % (infer_result[0].errorCode))
exit()
result_personlist = mxpiOpenposeProto.MxpiPersonList()
result_personlist.ParseFromString(infer_result[0].messageBuf)
detect_person_list = result_personlist.personInfoVec
joints = self.get_pose_bbox(detect_person_list, h_ratio, w_ratio)
results = []
for joint in joints:
joint_np = np.ones((1,36))*(-1)
for i in range(18):
if i in joint.keys():
joint_np[0,2*i] = joint[i][0]
joint_np[0,2*i+1] = joint[i][1]
pred, pred_prob, pred_label = self.xgb.pred(joint_np)
results.append({"joint":joint, "pred":pred, "prob":pred_prob, "pred_label":pred_label})
return results
def __del__(self):
# destroy streams
self.stream_manager_api.DestroyAllStreams()
def test_image():
openpose = OPENPOSE()
file_name = "./images/1.jpg"
image = cv2.imread(file_name, 1)
results = openpose.process(image)
print("#####", results)
image_show = openpose.draw(image, results)
cv2.imwrite(file_name.split('.')[0] + "_detect_result.jpg", image_show)
def test_images():
openpose = OPENPOSE()
data_dir = "./data/other/"
#data_dir = "./data/sit/"
#data_dir = "./data/span/"
for name in os.listdir(data_dir):
fullname = os.path.join(data_dir, name)
image = cv2.imread(fullname, 1)
joints = openpose.process(image)
#print(name, joints)
for joint in joints:
out = ""+name +" 2"
for i in range(18):
if i in joint["joint"].keys():
out = out + " "+ str(joint["joint"][i][0]) + " " + str(joint["joint"][i][1])
else:
out = out+" -1 -1"
print(out)
#image_show = openpose.draw(image, joints)
#cv2.imwrite(name, image_show)
def test_video():
openpose = OPENPOSE()
# Open the video file
video_path = "./images/span.mp4"
cap = cv2.VideoCapture(video_path)
fourcc = cv2.VideoWriter_fourcc('X', 'V', 'I', 'D') # 确定视频被保存后的编码格式
output = cv2.VideoWriter("output.mp4", fourcc, 20, (640, 480)) # 创建VideoWriter类对象
# Loop through the video frames
while cap.isOpened():
# Read a frame from the video
success, frame = cap.read()
if success:
# Run YOLOv8 tracking on the frame, persisting tracks between frames
t1 = time.time()
results = openpose.process(frame)
t2 = time.time()
annotated_frame = openpose.draw(frame, results)
print("time", t2-t1)
output.write(annotated_frame)
# Break the loop if 'q' is pressed
if cv2.waitKey(1) & 0xFF == ord("q"):
break
else:
# Break the loop if the end of the video is reached
break
# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
#test_image()
test_images()
#test_video()
模型识别效果:
Xgboost数据集准备:
基于OpenPose模型将数据集跑一遍,得到关键点坐标数据集,数据集保存在txt里面,每一行格式为(图片名 类别 关键点xy坐标),如果身体遮挡没有关键点的使用-1代替。
Xgboost模型训练:
训练代码,
import xgboost
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
#from sklearn.externals import joblib
import joblib
import numpy as np
from matplotlib import pyplot as plt
import os
from matrix import DrawConfusionMatrix
data_label = ["span", "sit", "other"]
drawconfusionmatrix = DrawConfusionMatrix(labels_name=data_label)
def train():
# 载入数据集
dataset = np.loadtxt('./data/all.txt', delimiter=" ", usecols=list(range(1,38)))
X = dataset[:, 1:]
Y = dataset[:, 0]
"""
#数据归一化
x = X[:,0:36:2].copy()
y = X[:,1:36:2].copy()
maxx = np.max(x, axis=1).reshape(-1,1)
x[x==-1]=10000
minx = np.min(x, axis=1).reshape(-1,1)
maxy = np.max(y, axis=1).reshape(-1,1)
y[y==-1]=10000
miny = np.min(y, axis=1).reshape(-1,1)
minxy = np.hstack([minx, miny])
maxxy = np.hstack([maxx, maxy])
minxy = np.tile(minxy,(1,18))
maxxy = np.tile(maxxy,(1,18))
X = (X - minxy)/(maxxy-minxy)
"""
# 把数据集拆分成训练集和测试集
seed = 7
test_size = 0.15
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=test_size, random_state=seed)
evalset = [(X_train, y_train), (X_test, y_test)]
model = XGBClassifier(max_depth=2, num_class=3, learning_rate=0.1, n_estimators=300, silent=True, reg_alpha=0.01, objective='multi:softprob')
model.fit(X_train, y_train, eval_metric=['mlogloss','merror'], eval_set = evalset, verbose=True)
results = model.evals_result()
plt.plot(results["validation_0"]["mlogloss"], label="train", linestyle="solid", color='k')
plt.plot(results["validation_1"]["mlogloss"], label="test", linestyle="dotted", color='k')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('loss')
plt.savefig("loss.png")
plt.plot(1.0-np.array(results["validation_0"]["merror"]), label="train", linestyle="solid", color='k')
plt.plot(1.0-np.array(results["validation_1"]["merror"]), label="test", linestyle="dotted", color='k')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.savefig("acc.png")
# 保存模型
joblib.dump(model, "xg.model")
# 加载模型
model = joblib.load("xg.model")
# 对测试集做预测
y_pred = model.predict(X_test)
predictions = [round(value) for value in y_pred]
y_pred_prob = model.predict_proba(X_test)
#混淆矩阵
drawconfusionmatrix.update(np.array(predictions,np.int32), y_test.astype(np.int32))
drawconfusionmatrix.drawMatrix()
# 评估预测结果
accuracy = accuracy_score(y_test, predictions)
print("Accuracy: %.2f%%" % (accuracy * 100.0))
def test():
xgb = XGB()
data = "283 183 271 225 244 224 226 285 272 309 297 226 296 282 300 304 246 323 333 345 296 435 281 318 350 348 294 435 275 175 289 176 263 182 297 183"
data = data.split(" ")
x_test = np.array(data,np.float32).reshape(-1, 36)
predictions, predictions_label = xgb.pred(x_test)
print(predictions, predictions_label)
class XGB():
def __init__(self):
self.model = joblib.load("./models/xg.model")
self.data_label = {0:"span", 1:"sit", 2:"other"}
def pred(self, X_test):
#pred = self.model.predict(X_test)[0]
pred_prob = self.model.predict_proba(X_test)[0]
pred = int(np.argmax(pred_prob))
pred_prob = pred_prob[pred]
pred_label = self.data_label[pred]
return pred, pred_prob, pred_label
if __name__ == "__main__":
train()
#test()
训练结果,
整体测试:
整体感受:
(1)目前基于该方法有一定的效果,精度不高主要还是因为训练数据太少。关键点模型也是直接使用的开源的模型,没有在自己私有数据上微调,等等问题都会对最终的结果有影响。
(2)本质来看,跨越、坐立还是一个时序问题,基于时序的思路解答这个问题效果应该是会高一个量级的。但是时序的模型一般都大,过程复杂,效率偏低,工程视频实时推理使用又不实际。
参考链接:
https://github.com/Daniil-Osokin/lightweight-human-pose-estimation.pytorch
https://gitee.com/ascend/mindxsdk-referenceapps/tree/master/contrib/OpenposeKeypointDetection
快速安装昇腾环境 — 昇腾开源 1.0 文档