使用MediaPipe Face Mesh 面部动作检测
一、技术选型
-
OpenCV(Open Source Computer Vision Library)
- 用于视频流捕捉、图像预处理和基本图像处理操作。
-
MediaPipe
- 提供高效的人脸检测与关键点提取功能(Face Mesh)。
-
Python
- 作为后端开发语言,整合上述库进行图像处理和动作识别。
-
Flask/Django(可选)
- 用于构建后端API服务,处理前端请求。
二、整体流程概述
-
视频流或图片获取
- 前端通过摄像头捕捉视频流或图片,并将数据发送至后端。
-
图像预处理
- 对接收到的图像数据进行解码、缩放和颜色空间转换。
-
人脸检测与关键点提取
- 使用 MediaPipe 提取面部关键点(Face Mesh)。
-
动作识别
- 根据关键点数据,分析用户的具体动作(如转头、眨眼、张嘴)。
-
结果返回
- 将识别结果以 JSON 格式返回前端。
三、详细实现步骤
1. 视频流或图片获取
前端(微信小程序)捕捉到视频帧或图片后,通过 API 将图像数据(通常为 Base64 编码或二进制数据)发送至后端。
前端发送图像数据示例(微信小程序):
wx.chooseImage({
count: 1,
success: function(res) {
const tempFilePaths = res.tempFilePaths;
wx.getFileSystemManager().readFile({
filePath: tempFilePaths[0],
encoding: 'base64',
success: function(data) {
wx.request({
url: 'https://localhost/api/task/detect',
method: 'POST',
data: {
user_id: 'unique_user_id',
image_data: data.data
},
success: function(response) {
// 处理后端返回的检测结果
}
});
}
});
}
});
2. 图像预处理
后端接收到图像数据后,进行解码和预处理。
示例代码(Python):
import base64
import cv2
import numpy as np
def decode_image(image_base64):
# 解码 Base64 图像数据
img_data = base64.b64decode(image_base64)
# 转换为 numpy 数组
np_arr = np.frombuffer(img_data, np.uint8)
# 使用 OpenCV 解码图像
img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
return img
3. 人脸检测与关键点提取
使用 MediaPipe 的 Face Mesh 模型提取面部关键点。
安装 MediaPipe:
pip install mediapipe
示例代码(Python):
import mediapipe as mp
# 初始化 MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
def get_face_landmarks(image):
# 将图像从 BGR 转换为 RGB
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# 获取关键点
results = face_mesh.process(rgb_image)
if results.multi_face_landmarks:
# 返回第一个人脸的关键点
return results.multi_face_landmarks[0]
else:
return None
4. 动作识别
基于提取的关键点数据,分析用户的具体动作。以下分别介绍左右转头、眨眼和张嘴的检测方法。
4.1 左右转头检测
通过分析左右眼和鼻子的关键点位置,计算头部的旋转角度。
关键点选择:
- 鼻尖(例如 MediaPipe Face Mesh 的 1 号关键点)
- 左眼外角(例如 33 号关键点)
- 右眼外角(例如 263 号关键点)
实现步骤:
- 计算左眼外角与鼻尖的连线向量。
- 计算右眼外角与鼻尖的连线向量。
- 通过向量之间的角度差,判断头部是否向左或向右转动。
示例代码:
import math
def calculate_angle(p1, p2):
# 计算两点连线的角度(相对于水平线)
delta_y = p2.y - p1.y
delta_x = p2.x - p1.x
angle = math.degrees(math.atan2(delta_y, delta_x))
return angle
def detect_head_turn(landmarks):
# 关键点索引(根据 MediaPipe Face Mesh)
nose_tip = landmarks.landmark[1]
left_eye_outer = landmarks.landmark[33]
right_eye_outer = landmarks.landmark[263]
# 计算角度
left_angle = calculate_angle(nose_tip, left_eye_outer)
right_angle = calculate_angle(nose_tip, right_eye_outer)
# 计算平均角度
avg_angle = (left_angle + right_angle) / 2
# 定义阈值(根据实际测试调整)
TURN_LEFT_THRESHOLD = -15 # 向左转头
TURN_RIGHT_THRESHOLD = 15 # 向右转头
if avg_angle < TURN_LEFT_THRESHOLD:
return 'left'
elif avg_angle > TURN_RIGHT_THRESHOLD:
return 'right'
else:
return 'straight'
4.2 眨眼检测
通过监测眼睛的纵横比(EAR, Eye Aspect Ratio)来检测眨眼次数。
关键点选择:
- 左眼:关键点 362, 385, 387, 263, 373, 380
- 右眼:关键点 33, 160, 158, 133, 153, 144
实现步骤:
- 计算每只眼睛的 EAR。
- 当 EAR 低于某个阈值时,判断为闭眼。
- 记录眨眼次数。
示例代码:
def eye_aspect_ratio(landmarks, eye_indices):
# 计算 EAR
# eye_indices 包含 6 个关键点的索引
p1 = landmarks.landmark[eye_indices[1]]
p2 = landmarks.landmark[eye_indices[5]]
p3 = landmarks.landmark[eye_indices[2]]
p4 = landmarks.landmark[eye_indices[4]]
p5 = landmarks.landmark[eye_indices[0]]
p6 = landmarks.landmark[eye_indices[3]]
# 纵向距离
vertical_1 = math.sqrt((p2.x - p4.x)**2 + (p2.y - p4.y)**2)
vertical_2 = math.sqrt((p3.x - p5.x)**2 + (p3.y - p5.y)**2)
# 横向距离
horizontal = math.sqrt((p1.x - p6.x)**2 + (p1.y - p6.y)**2)
ear = (vertical_1 + vertical_2) / (2.0 * horizontal)
return ear
def detect_blink(landmarks, blink_counter, total_blinks):
LEFT_EYE = [362, 385, 387, 263, 373, 380]
RIGHT_EYE = [33, 160, 158, 133, 153, 144]
EAR_THRESHOLD = 0.21 # 根据实际测试调整
CONSEC_FRAMES = 3 # 眨眼最少持续的帧数
left_ear = eye_aspect_ratio(landmarks, LEFT_EYE)
right_ear = eye_aspect_ratio(landmarks, RIGHT_EYE)
ear = (left_ear + right_ear) / 2.0
if ear < EAR_THRESHOLD:
blink_counter += 1
else:
if blink_counter >= CONSEC_FRAMES:
total_blinks += 1
blink_counter = 0
return blink_counter, total_blinks
4.3 张嘴检测
通过计算嘴部纵横比(MAR, Mouth Aspect Ratio)来检测张嘴动作。
关键点选择:
- 上唇上方(例如 13 号关键点)
- 下唇下方(例如 14 号关键点)
- 左嘴角(78 号关键点)
- 右嘴角(308 号关键点)
实现步骤:
- 计算嘴部的 MAR。
- 当 MAR 超过某个阈值时,判断为张嘴。
示例代码:
def mouth_aspect_ratio(landmarks):
# 关键点索引(根据 MediaPipe Face Mesh)
upper_lip = landmarks.landmark[13]
lower_lip = landmarks.landmark[14]
left_mouth = landmarks.landmark[78]
right_mouth = landmarks.landmark[308]
# 纵向距离
vertical = math.sqrt((upper_lip.x - lower_lip.x)**2 + (upper_lip.y - lower_lip.y)**2)
# 横向距离
horizontal = math.sqrt((left_mouth.x - right_mouth.x)**2 + (left_mouth.y - right_mouth.y)**2)
mar = vertical / horizontal
return mar
def detect_mouth_open(landmarks, mouth_opened):
MAR_THRESHOLD = 0.6 # 根据实际测试调整
mar = mouth_aspect_ratio(landmarks)
if mar > MAR_THRESHOLD:
mouth_opened = True
else:
mouth_opened = False
return mouth_opened
5. 综合动作识别
将上述各个动作的检测方法整合,形成综合的动作识别流程。
示例代码:
def recognize_actions(landmarks, state):
# state 包含用于记录眨眼状态的变量,如 blink_counter, total_blinks, mouth_opened
action_results = {}
# 检测左右转头
head_direction = detect_head_turn(landmarks)
action_results['head_turn'] = head_direction
# 检测眨眼
state['blink_counter'], state['total_blinks'] = detect_blink(
landmarks, state['blink_counter'], state['total_blinks']
)
action_results['blink_count'] = state['total_blinks']
# 检测张嘴
state['mouth_opened'] = detect_mouth_open(landmarks, state['mouth_opened'])
action_results['mouth_opened'] = state['mouth_opened']
return action_results
6. 后端 API 实现
使用 Flask 构建后端 API,处理前端请求,执行上述图像处理和动作识别逻辑,并返回结果。
安装 Flask:
pip install Flask
示例代码(Flask 应用):
from flask import Flask, request, jsonify
import cv2
import base64
import numpy as np
app = Flask(__name__)
# 初始化 MediaPipe Face Mesh
import mediapipe as mp
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
# 状态管理(简单示例,实际应用建议使用数据库或缓存)
user_states = {}
@app.route('/api/task/detect', methods=['POST'])
def detect_task():
data = request.json
user_id = data.get('user_id')
image_base64 = data.get('image_data')
if not user_id or not image_base64:
return jsonify({'success': False, 'message': '缺少参数'}), 400
# 解码图像
try:
img = decode_image(image_base64)
except Exception as e:
return jsonify({'success': False, 'message': '图像解码失败'}), 400
# 获取关键点
landmarks = get_face_landmarks(img)
if not landmarks:
return jsonify({'success': False, 'message': '未检测到人脸'}), 200
# 初始化用户状态
if user_id not in user_states:
user_states[user_id] = {
'current_step': 1,
'blink_counter': 0,
'total_blinks': 0,
'mouth_opened': False
}
state = user_states[user_id]
current_step = state['current_step']
# 识别动作
action_results = recognize_actions(landmarks, state)
# 判断当前步骤
success = False
next_task = ''
if current_step == 1:
if action_results['head_turn'] in ['left', 'right']:
success = True
next_task = '请眨眼'
state['current_step'] += 1
elif current_step == 2:
if action_results['blink_count'] >= 1:
success = True
next_task = '请张嘴'
state['current_step'] += 1
elif current_step == 3:
if action_results['mouth_opened']:
success = True
next_task = '所有任务完成'
state['current_step'] += 1
else:
# 所有任务完成
success = True
next_task = '所有任务已完成'
if success:
if state['current_step'] > 3:
return jsonify({
'success': True,
'message': '成功完成所有任务',
'next_task': '完成',
'current_step': state['current_step']
}), 200
else:
return jsonify({
'success': True,
'message': '检测成功,进入下一步',
'next_task': next_task,
'current_step': state['current_step']
}), 200
else:
return jsonify({
'success': False,
'message': '检测失败,请重新开始',
'current_step': 1
}), 200
@app.route('/api/task/reset', methods=['POST'])
def reset_task():
data = request.json
user_id = data.get('user_id')
if not user_id:
return jsonify({'success': False, 'message': '缺少 user_id'}), 400
user_states[user_id] = {
'current_step': 1,
'blink_counter': 0,
'total_blinks': 0,
'mouth_opened': False
}
return jsonify({
'success': True,
'message': '任务已重置',
'current_step': 1
}), 200
def decode_image(image_base64):
img_data = base64.b64decode(image_base64)
np_arr = np.frombuffer(img_data, np.uint8)
img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
return img
def get_face_landmarks(image):
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = face_mesh.process(rgb_image)
if results.multi_face_landmarks:
return results.multi_face_landmarks[0]
else:
return None
# 包含上述动作检测函数的 recognize_actions 等
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
四、示例说明
假设用户正在进行 “检测用户是否向左转头” 的第一步任务:
-
前端捕捉图像:
- 用户在小程序中启动任务,摄像头捕捉当前帧,并将图像数据发送至后端的
/api/task/detect
接口。
- 用户在小程序中启动任务,摄像头捕捉当前帧,并将图像数据发送至后端的
-
后端处理:
- 解码图像数据,并使用 MediaPipe 提取面部关键点。
- 计算鼻尖与左右眼外角的角度差,判断用户是否向左或向右转头。
- 假设检测到用户向左转头,更新用户任务状态为第二步。
-
后端返回结果:
- 返回
success: true
,提示用户进入下一步任务“请眨眼”。
- 返回
-
前端反馈:
- 小程序根据后端返回的结果,更新界面提示用户“请眨眼”,并更新进度条。
具体代码执行过程:
- 用户完成向左转头动作,前端发送图像数据。
- 后端解码图像,提取关键点,计算角度。
- 检测到头部向左转动,
detect_head_turn
返回'left'
。 - 后端判断当前步骤为 1,检测成功,更新步骤为 2,提示下一步任务。
- 前端接收到
success: true
,显示“请眨眼”。
五、优化与注意事项
-
实时性与性能优化
- 后端:使用异步框架(如
FastAPI
)提升并发处理能力;使用 GPU 加速(如 NVIDIA CUDA)提升 MediaPipe 的处理速度。 - 前端:优化图像上传频率,减少网络传输延迟;使用合适的图像分辨率,平衡识别精度与传输速度。
- 后端:使用异步框架(如
-
准确性提升
- 调整动作识别的阈值(如 EAR_THRESHOLD、MAR_THRESHOLD),根据实际测试数据进行优化。
- 使用更多关键点或更复杂的算法(如深度学习模型)提升动作识别的准确性。
-
错误处理与用户体验
- 后端:处理异常情况,如未检测到人脸,返回友好的错误信息。
- 前端:根据后端返回的错误信息,提供明确的用户指引,如“未检测到人脸,请调整位置并重试”。
-
安全性
- 使用 HTTPS 协议保护数据传输安全。
- 对上传的图像数据进行限制,防止恶意攻击(如限制图像大小、格式等)。
-
扩展性
- 设计模块化的代码结构,便于后续增加更多动作识别任务。
- 使用数据库或缓存系统(如 Redis)管理用户状态,支持大规模用户同时使用。
六、扩展示例:添加“微笑”检测
假设需要增加一个新任务,检测用户是否微笑。以下为实现步骤:
-
关键点选择:
- 上嘴唇中点(例如 13 号关键点)
- 下嘴唇中点(例如 14 号关键点)
- 左嘴角(78 号关键点)
- 右嘴角(308 号关键点)
-
微笑检测逻辑:
def smile_aspect_ratio(landmarks): # 关键点索引 upper_lip = landmarks.landmark[13] lower_lip = landmarks.landmark[14] left_mouth = landmarks.landmark[78] right_mouth = landmarks.landmark[308] # 纵向距离 vertical = math.sqrt((upper_lip.x - lower_lip.x)**2 + (upper_lip.y - lower_lip.y)**2) # 横向距离 horizontal = math.sqrt((left_mouth.x - right_mouth.x)**2 + (left_mouth.y - right_mouth.y)**2) sar = vertical / horizontal return sar def detect_smile(landmarks): SAR_THRESHOLD = 0.5 # 根据实际测试调整 sar = smile_aspect_ratio(landmarks) return sar > SAR_THRESHOLD
-
集成到后端 API:
在
recognize_actions
函数中添加微笑检测逻辑,并在任务流程中增加相应步骤。
后端实现对视频流或图片的人脸检测与动作识别功能,关键在于有效利用 MediaPipe 提供的高效人脸关键点提取功能,并基于这些关键点设计合理的动作识别算法。结合前端的摄像头捕捉和后端的高效处理,可以实现实时、准确的任务检测与反馈,提升用户体验。
附录: 关键点索引参考
MediaPipe Face Mesh 提供 468 个面部关键点,常用的一些关键点索引如下:
- 鼻尖:1
- 左眼外角:33
- 右眼外角:263
- 左眼上方:159
- 左眼下方:145
- 右眼上方:386
- 右眼下方:374
- 上唇上方:13
- 下唇下方:14
- 左嘴角:78
- 右嘴角:308
详细的关键点索引可以参考 MediaPipe Face Mesh 。