当前位置: 首页 > article >正文

基于微信小程序的面部动作检测系统

引言

本技术文档旨在详细阐述一个基于微信小程序的面部动作检测系统的技术路线、实现方法及关键技术框架。系统的核心功能包括检测用户的左右转头、眨眼和张嘴动作,并根据检测结果逐步引导用户完成任务。为确保系统的安全性和准确性,特别是防止用户通过图片或视频欺骗系统,本文进一步深入分析并提出了相应的优化和防护措施。


系统架构概述

系统采用前后端分离的架构,前端为微信小程序,负责用户交互和界面展示;后端为基于Python的API服务,负责图像处理、动作识别和任务状态管理。系统通过HTTPS协议进行数据交互,前后端通信采用JSON格式。

系统架构图

捕捉图像/视频帧
Base64编码
HTTPS POST请求
图像解码与预处理
人脸检测与关键点提取
任务状态管理
JSON响应
HTTPS响应
解析结果
更新界面
微信小程序前端
图像采集模块
数据传输模块
后端API服务
图像处理模块
动作识别模块
状态管理模块
用户界面

前端实现细节

技术框架与组件

  • 微信小程序框架
    • 使用WXML、WXSS、JavaScript或TypeScript进行开发。
  • UI 组件库
    • 使用WeUI或第三方UI库快速搭建界面。
  • 数据交互
    • 利用wx.request接口与后端API进行通信。
  • 防欺骗措施
    • 实时性要求高,结合前端技术实现活体检测提示。

核心功能模块

  1. 任务显示模块
    • 动态显示当前任务提示(如“请向左转头”)。
  2. 实时反馈模块
    • 实时显示检测结果(成功/失败)。
  3. 进度条与状态提示
    • 使用进度条展示任务完成进度。
  4. 重新开始选项
    • 提供“重新开始”按钮,允许用户重新进行任务检测。
  5. 活体检测提示
    • 在采集图像时提示用户进行自然动作(如眨眼、张嘴)以确保活体性。

数据采集与传输

数据采集
  • 静态图片采集
    • 使用wx.chooseImage捕捉用户当前图像。
  • 视频帧采集
    • 使用摄像头实时捕捉视频流,并定时截取帧进行检测。
数据传输流程
  1. 捕捉图像/视频帧
    • 用户点击“开始检测”后,前端启动摄像头并捕捉图像或视频帧。
  2. 编码图像数据
    • 使用Base64对图像数据进行编码。
  3. 构建JSON请求
    • 包含user_idimage_data字段。
  4. 发送HTTP POST请求
    • 通过wx.request将JSON数据发送至后端API。
示例代码:捕捉静态图片并发送至后端
// pages/capture/capture.js
Page({  
  captureImage: function() {
    wx.chooseImage({
      count: 1,
      sourceType: ['camera'],
      success: function(res) {
        const tempFilePath = res.tempFilePaths[0];
        wx.getFileSystemManager().readFile({
          filePath: tempFilePath,
          encoding: 'base64',
          success: function(data) {
            const base64Data = data.data;
            wx.request({
              url: 'https://192.8.56.100/api/task/detect',
              method: 'POST',
              header: {
                'Content-Type': 'application/json'
              },
              data: {
                user_id: 'unique_user_id',
                image_data: base64Data
              },
              success: function(response) {
                // 处理后端返回的检测结果
                console.log(response.data);
                // 更新界面提示
              },
              fail: function(error) {
                console.error('请求失败', error);
                // 提示用户网络错误
              }
            });
          },
          fail: function(error) {
            console.error('读取文件失败', error);
            // 提示用户读取文件失败
          }
        });
      },
      fail: function(error) {
        console.error('选择图片失败', error);
        // 提示用户选择图片失败
      }
    });
  }
});
示例代码:实时视频帧采集并发送至后端
// pages/capture/capture.js
Page({
  data: {
    cameraContext: null,
    intervalId: null
  },
  onLoad: function() {
    this.setData({
      cameraContext: wx.createCameraContext()
    });
  },
  startCapture: function() {
    const intervalId = setInterval(() => {
      this.data.cameraContext.takePhoto({
        quality: 'low',
        success: (res) => {
          const base64Path = res.tempImagePath;
          wx.getFileSystemManager().readFile({
            filePath: base64Path,
            encoding: 'base64',
            success: (data) => {
              wx.request({
                url: 'https://192.8.56.100/api/task/detect',
                method: 'POST',
                header: {
                  'Content-Type': 'application/json'
                },
                data: {
                  user_id: 'unique_user_id',
                  image_data: data.data
                },
                success: (response) => {
                  // 处理后端返回的检测结果
                  console.log(response.data);
                  // 更新界面提示
                },
                fail: (error) => {
                  console.error('请求失败', error);
                  // 提示用户网络错误
                }
              });
            },
            fail: (error) => {
              console.error('读取文件失败', error);
              // 提示用户读取文件失败
            }
          });
        },
        fail: (error) => {
          console.error('拍照失败', error);
          // 提示用户拍照失败
        }
      });
    }, 1000); // 每秒截取一帧
    this.setData({ intervalId });
  },
  stopCapture: function() {
    clearInterval(this.data.intervalId);
  }
});

用户界面设计

  • 任务提示
    • 显示当前任务描述,如“请向左转头”、“请眨眼”、“请张嘴”。
  • 实时反馈
    • 使用颜色变化或图标显示检测结果(成功/失败)。
  • 进度条
    • 展示任务完成的进度,例如三步任务进度。
  • 重新开始按钮
    • 提供用户在检测失败时重新开始任务的选项。
  • 活体检测提示
    • 在活体检测过程中,提示用户进行自然动作(如“请自然眨眼”),防止用户使用照片或视频欺骗系统。

后端实现细节

技术选型

  • 编程语言:Python
  • Web框架:FastAPI(高性能,支持异步处理)
  • 图像处理库:OpenCV
  • 人脸检测与关键点提取:MediaPipe
  • 状态管理:Redis(高效管理用户任务状态)
  • 容器化:Docker(可选,用于部署)
  • 活体检测模型:基于动作识别的简单活体检测,结合动作提示确保用户实时互动

核心功能模块

  1. API 接口设计
    • POST /api/task/detect:接收用户图像数据,进行动作检测,返回检测结果。
    • GET /api/task/status:获取当前任务状态。
    • POST /api/task/reset:重置任务状态。
  2. 图像预处理
    • 解码Base64图像数据,转换为OpenCV图像数组。
  3. 人脸检测与关键点提取
    • 使用MediaPipe Face Mesh提取面部关键点。
  4. 动作识别
    • 分别检测左右转头、眨眼、张嘴。
    • 增加活体检测逻辑,确保用户进行实时互动。
  5. 状态管理
    • 使用Redis记录每个用户的当前任务进度和状态。
  6. 防欺骗措施
    • 结合活体检测,确保用户进行实时的动作交互,防止使用图片或视频欺骗系统。

数据处理流程

  1. 接收图像数据
    • 接收前端通过POST /api/task/detect发送的Base64编码图像数据和user_id
  2. 解码与预处理
    • 将Base64编码的图像数据解码为OpenCV图像数组。
  3. 人脸检测与关键点提取
    • 使用MediaPipe提取面部关键点,获取468个关键点。
  4. 动作识别与活体检测
    • 根据当前任务步骤,识别相应的动作。
    • 增加活体检测逻辑,通过多次动作交互确保用户为真人。
  5. 结果封装与返回
    • 根据动作识别结果和任务进度,构建JSON响应返回前端。
  6. 状态更新
    • 更新用户的任务进度和状态,存储至Redis。

示例代码

# main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import base64
import cv2
import numpy as np
import mediapipe as mp
import math
import redis
import json
import time
import random

app = FastAPI()

# 初始化 MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
    static_image_mode=False,
    max_num_faces=1,
    refine_landmarks=True,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)

# 初始化 Redis 客户端
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# 定义请求数据模型
class DetectRequest(BaseModel):
    user_id: str
    image_data: str

# 定义响应数据模型
class DetectResponse(BaseModel):
    success: bool
    message: str
    next_task: str = None
    current_step: int = None

@app.post("/api/task/detect", response_model=DetectResponse)
async def detect_task(request: DetectRequest):
    user_id = request.user_id
    image_base64 = request.image_data

    if not user_id or not image_base64:
        raise HTTPException(status_code=400, detail="缺少 user_id 或 image_data")

    # 解码 Base64 图像数据
    try:
        image = decode_image(image_base64)
    except Exception as e:
        raise HTTPException(status_code=400, detail="图像解码失败")

    # 获取人脸关键点
    landmarks = get_face_landmarks(image)
    if not landmarks:
        return DetectResponse(success=False, message="未检测到人脸")

    # 获取或初始化用户状态
    state = get_user_state(user_id)

    # 识别动作
    action_results = recognize_actions(landmarks, state)

    # 评估当前步骤
    success, next_task, updated_step = evaluate_current_step(state, action_results)

    # 更新状态
    if success:
        if updated_step > 4:
            # 所有任务完成,重置状态
            reset_user_state(user_id)
            return DetectResponse(success=True, message="成功完成所有任务", next_task="完成", current_step=updated_step)
        else:
            update_user_state(user_id, 'current_step', updated_step)
            return DetectResponse(success=True, message="检测成功,进入下一步", next_task=next_task, current_step=updated_step)
    else:
        reset_user_state(user_id)
        return DetectResponse(success=False, message="检测失败,请重新开始", current_step=1)

def decode_image(image_base64: str) -> np.ndarray:
    img_data = base64.b64decode(image_base64)
    np_arr = np.frombuffer(img_data, np.uint8)
    img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
    if img is None:
        raise ValueError("无法解码图像")
    return img

def get_face_landmarks(image: np.ndarray):
    rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = face_mesh.process(rgb_image)
    if results.multi_face_landmarks:
        return results.multi_face_landmarks[0]
    else:
        return None

def get_user_state(user_id: str) -> dict:
    state_json = redis_client.get(f"user:{user_id}:state")
    if state_json:
        return json.loads(state_json)
    else:
        # 初始化状态
        initial_state = {
            'current_step': 1,
            'blink_counter': 0,
            'total_blinks': 0,
            'mouth_opened': False,
            'head_direction_history': [],
            'blink_history': [],
            'mouth_history': [],
            'last_action_time': time.time()
        }
        redis_client.set(f"user:{user_id}:state", json.dumps(initial_state))
        return initial_state

def update_user_state(user_id: str, key: str, value):
    state = get_user_state(user_id)
    state[key] = value
    redis_client.set(f"user:{user_id}:state", json.dumps(state))

def reset_user_state(user_id: str):
    initial_state = {
        'current_step': 1,
        'blink_counter': 0,
        'total_blinks': 0,
        'mouth_opened': False,
        'head_direction_history': [],
        'blink_history': [],
        'mouth_history': [],
        'last_action_time': time.time()
    }
    redis_client.set(f"user:{user_id}:state", json.dumps(initial_state))

def recognize_actions(landmarks, state: dict) -> dict:
    action_results = {}

    # 检测左右转头
    head_direction = detect_head_turn(landmarks)
    action_results['head_turn'] = head_direction

    # 检测眨眼
    state['blink_counter'], state['total_blinks'] = detect_blink(
        landmarks, state['blink_counter'], state['total_blinks']
    )
    action_results['blink_count'] = state['total_blinks']

    # 检测张嘴
    state['mouth_opened'] = detect_mouth_open(landmarks, state['mouth_opened'])
    action_results['mouth_opened'] = state['mouth_opened']

    # 记录历史数据用于活体检测
    state['head_direction_history'].append(head_direction)
    state['blink_history'].append(state['total_blinks'])
    state['mouth_history'].append(state['mouth_opened'])

    # 限制历史记录长度
    state['head_direction_history'] = state['head_direction_history'][-10:]
    state['blink_history'] = state['blink_history'][-10:]
    state['mouth_history'] = state['mouth_history'][-10:]

    return action_results

def evaluate_current_step(state: dict, action_results: dict):
    current_step = state['current_step']
    success = False
    next_task = ''
    updated_step = current_step

    current_time = time.time()

    if current_step == 1:
        if action_results['head_turn'] in ['left', 'right']:
            if current_time - state.get('last_action_time', current_time) <= 10:
                success = True
                next_task = '请眨眼'
                updated_step += 1
    elif current_step == 2:
        if action_results['blink_count'] >= 1:
            if current_time - state.get('last_action_time', current_time) <= 10:
                success = True
                next_task = '请张嘴'
                updated_step += 1
    elif current_step == 3:
        if action_results['mouth_opened']:
            if current_time - state.get('last_action_time', current_time) <= 10:
                success = True
                next_task = '完成所有任务'
                updated_step += 1
    elif current_step == 4:
        # 可根据需求增加更多步骤
        success = True
        next_task = '所有任务已完成'
        updated_step += 1
    else:
        # 所有任务完成
        success = True
        next_task = '所有任务已完成'

    if success:
        state['last_action_time'] = current_time

    # 活体检测逻辑
    if not perform_liveness_detection(state):
        success = False
        next_task = '未通过活体检测,请重新开始'
        updated_step = 1

    return success, next_task, updated_step

def perform_liveness_detection(state: dict) -> bool:
    """
    活体检测逻辑,通过检测用户是否进行了多次不同动作。
    """
    head_turns = state.get('head_direction_history', [])
    blinks = state.get('blink_history', [])
    mouth_ops = state.get('mouth_history', [])

    # 检测是否有多次头部转动
    head_turn_count = len([dir for dir in head_turns if dir in ['left', 'right']])
    # 检测是否有眨眼
    blink_total = sum(blinks)
    # 检测是否有张嘴
    mouth_opened_count = len([m for m in mouth_ops if m])

    # 简单阈值判断,具体阈值需根据实际测试调整
    if head_turn_count >= 2 and blink_total >= 2 and mouth_opened_count >= 1:
        return True
    else:
        return False

# 动作检测相关函数
def calculate_angle(p1, p2):
    delta_y = p2.y - p1.y
    delta_x = p2.x - p1.x
    angle = math.degrees(math.atan2(delta_y, delta_x))
    return angle

def detect_head_turn(landmarks):
    nose_tip = landmarks.landmark[1]
    left_eye_outer = landmarks.landmark[33]
    right_eye_outer = landmarks.landmark[263]

    left_angle = calculate_angle(nose_tip, left_eye_outer)
    right_angle = calculate_angle(nose_tip, right_eye_outer)

    avg_angle = (left_angle + right_angle) / 2

    TURN_LEFT_THRESHOLD = -15  # 向左转头
    TURN_RIGHT_THRESHOLD = 15  # 向右转头

    if avg_angle < TURN_LEFT_THRESHOLD:
        return 'left'
    elif avg_angle > TURN_RIGHT_THRESHOLD:
        return 'right'
    else:
        return 'straight'

def eye_aspect_ratio(landmarks, eye_indices):
    p1 = landmarks.landmark[eye_indices[1]]
    p2 = landmarks.landmark[eye_indices[5]]
    p3 = landmarks.landmark[eye_indices[2]]
    p4 = landmarks.landmark[eye_indices[4]]
    p5 = landmarks.landmark[eye_indices[0]]
    p6 = landmarks.landmark[eye_indices[3]]

    vertical_1 = math.sqrt((p2.x - p4.x)**2 + (p2.y - p4.y)**2)
    vertical_2 = math.sqrt((p3.x - p5.x)**2 + (p3.y - p5.y)**2)
    horizontal = math.sqrt((p1.x - p6.x)**2 + (p1.y - p6.y)**2)

    ear = (vertical_1 + vertical_2) / (2.0 * horizontal)
    return ear

def detect_blink(landmarks, blink_counter, total_blinks):
    LEFT_EYE = [362, 385, 387, 263, 373, 380]
    RIGHT_EYE = [33, 160, 158, 133, 153, 144]

    EAR_THRESHOLD = 0.21  # 根据实际测试调整
    CONSEC_FRAMES = 3  # 眨眼最少持续的帧数

    left_ear = eye_aspect_ratio(landmarks, LEFT_EYE)
    right_ear = eye_aspect_ratio(landmarks, RIGHT_EYE)

    ear = (left_ear + right_ear) / 2.0

    if ear < EAR_THRESHOLD:
        blink_counter += 1
    else:
        if blink_counter >= CONSEC_FRAMES:
            total_blinks += 1
        blink_counter = 0

    return blink_counter, total_blinks

def mouth_aspect_ratio(landmarks):
    upper_lip = landmarks.landmark[13]
    lower_lip = landmarks.landmark[14]
    left_mouth = landmarks.landmark[78]
    right_mouth = landmarks.landmark[308]

    vertical = math.sqrt((upper_lip.x - lower_lip.x)**2 + (upper_lip.y - lower_lip.y)**2)
    horizontal = math.sqrt((left_mouth.x - right_mouth.x)**2 + (left_mouth.y - right_mouth.y)**2)

    mar = vertical / horizontal
    return mar

def detect_mouth_open(landmarks, mouth_opened):
    MAR_THRESHOLD = 0.6  # 根据实际测试调整
    mar = mouth_aspect_ratio(landmarks)

    if mar > MAR_THRESHOLD:
        mouth_opened = True
    else:
        mouth_opened = False

    return mouth_opened

# 增强的活体检测方法
def get_next_task(current_step: int) -> str:
    tasks = {
        1: ['请向左转头', '请向右转头'],
        2: ['请眨眼', '请微笑'],
        3: ['请张嘴', '请眨眼'],
        4: ['请向左转头', '请向右转头']
    }
    return random.choice(tasks.get(current_step, ['完成所有任务']))

防止欺骗与误导的措施

为了防止用户通过图片、视频或其他手段欺骗系统,确保检测过程中的用户为真人,系统在设计中引入了以下防护措施:

  1. 活体检测

    • 逻辑:通过多次不同动作的检测,确保用户进行实时互动。
    • 实现
      • 多动作验证:要求用户在不同任务步骤中完成多种不同类型的面部动作,如转头、眨眼、张嘴等。
      • 动作多样性:系统随机提示用户进行不同的动作,增加用户实时交互的随机性,防止用户预先录制视频。
      • 动作时间验证:记录用户完成每个动作的时间,确保动作在合理的时间内完成,防止使用延迟播放的预录制视频。
      • 面部动态特征分析:利用MediaPipe提取的高精度关键点,分析面部的微动态特征,如眼睛的快速眨动、嘴部的微小动作等,进一步验证活体性。
  2. 图像质量与实时性要求

    • 图像质量控制
      • 分辨率限制:前端采集的图像分辨率需满足一定要求,确保后端能够准确提取面部关键点。
      • 图像清晰度:要求用户在良好光线下进行检测,避免因光线不足或过强导致的图像模糊。
      • 动态模糊处理:对采集的图像进行动态模糊检测,避免用户在运动中欺骗系统。
    • 实时性要求
      • 动作完成时间:系统要求用户在合理时间内完成指定动作,确保动作的实时性。
      • 数据传输效率:优化前端与后端的数据传输,减少网络延迟,提升实时性。
  3. 安全性措施

    • 数据传输安全
      • HTTPS协议:所有前后端通信均通过HTTPS协议,确保数据传输的安全性和完整性。
      • 数据加密:对敏感数据进行加密存储,防止数据泄露。
    • 输入验证与防护
      • 数据验证:后端严格验证接收到的数据格式和内容,防止注入攻击和其他安全威胁。
      • 图像大小限制:限制上传图像的大小和格式,防止恶意攻击。
    • 用户隐私保护
      • 数据最小化:仅收集必要的用户数据,避免过度收集。
      • 隐私政策:明确告知用户数据的使用和存储规则,遵守相关隐私法规。
      • 数据删除选项:提供用户数据删除选项,增强用户信任。
  4. 用户体验优化

    • 实时反馈与提示
      • 检测结果展示:实时展示检测结果,提升用户参与感。
      • 加载动画:在数据处理过程中,提供加载动画或进度指示,减少用户等待时的焦虑。
      • 错误提示:提供明确的错误提示和解决方案,如“未检测到人脸,请调整位置并重试”。
    • 界面设计优化
      • 友好的UI组件:使用清晰、简洁的UI组件,提升整体用户体验。
      • 响应式设计:确保界面在不同设备和屏幕尺寸下的良好展示效果。

前后端数据交互详细细节

API 接口定义

1. 获取任务状态
GET /api/task/status

请求参数

  • user_id(可选):标识用户的唯一ID。

响应示例

{
  "current_step": 1,
  "total_steps": 4,
  "task_description": "请向左转头"
}
2. 检测任务
POST /api/task/detect

请求参数

  • user_id:用户唯一ID。
  • image_data:Base64编码的图像数据。

请求数据格式

{
  "user_id": "unique_user_id",
  "image_data": "Base64编码的图像数据"
}

响应数据格式

  • 成功响应

    {
      "success": true,
      "message": "检测成功,进入下一步",
      "next_task": "请眨眼",
      "current_step": 2
    }
    
  • 失败响应

    {
      "success": false,
      "message": "检测失败,请重新开始",
      "current_step": 1
    }
    
3. 重置任务
POST /api/task/reset

请求参数

  • user_id:用户唯一ID。

请求数据格式

{
  "user_id": "unique_user_id"
}

响应数据格式

{
  "success": true,
  "message": "任务已重置",
  "current_step": 1
}

数据格式与标准

1. 前后端通信数据格式
  • 请求数据

    {
      "user_id": "unique_user_id",
      "image_data": "Base64编码的图像数据"
    }
    
  • 响应数据(成功):

    {
      "success": true,
      "message": "检测成功,进入下一步",
      "next_task": "请眨眼",
      "current_step": 2
    }
    
  • 响应数据(失败):

    {
      "success": false,
      "message": "检测失败,请重新开始",
      "current_step": 1
    }
    
2. 图像数据格式
  • 编码方式:Base64编码。
  • 图像格式:JPEG或PNG,根据前端配置。
  • 传输方式:嵌入在JSON请求的image_data字段中。
3. 关键点数据格式
  • MediaPipe关键点:每个关键点包含x, y, z坐标,归一化至[0,1]范围。
  • 数据结构:列表形式,每个元素为关键点对象。

示例

{
  "landmark": [
    {"x": 0.5, "y": 0.5, "z": 0.0},
    {"x": 0.6, "y": 0.5, "z": 0.0},
    ...
  ]
}

数据流转过程示意图

用户在微信小程序中启动任务
前端捕捉图像/视频帧
前端将图像编码为Base64
前端构建JSON请求数据
前端通过HTTPS POST发送至后端API
后端接收请求并解析JSON数据
后端解码Base64图像为图像数组
后端使用MediaPipe提取人脸关键点
后端根据关键点数据进行动作识别
后端评估动作与活体性
后端更新用户任务状态
后端构建JSON响应数据
后端通过HTTPS响应发送结果至前端
前端接收响应并更新用户界面
用户根据提示完成下一步或重新开始

数据处理阶段详解

1. 数据采集阶段
  • 输入:摄像头捕捉到的图像或视频帧。
  • 处理
    • 捕捉图像。
    • 读取文件并编码为Base64。
  • 输出:Base64编码的图像数据。

技术框架:微信小程序API (wx.chooseImage, wx.getFileSystemManager)

2. 数据传输阶段
  • 输入:Base64编码的图像数据。
  • 处理
    • 构建JSON请求体。
    • 通过HTTPS POST请求发送至后端。
  • 输出:通过网络传输的JSON数据。

技术框架:微信小程序API (wx.request)

3. 数据接收与解码阶段
  • 输入:后端接收到的JSON请求。
  • 处理
    • 解析JSON获取user_idimage_data
    • 解码Base64图像数据为图像数组(NumPy)。
  • 输出:解码后的图像数组。

技术框架:FastAPI, OpenCV

4. 人脸检测与关键点提取阶段
  • 输入:图像数组。
  • 处理
    • 使用MediaPipe Face Mesh提取人脸关键点。
  • 输出:人脸关键点数据(MediaPipe Landmark对象)。

技术框架:MediaPipe, OpenCV

5. 动作识别与活体检测阶段
  • 输入:人脸关键点数据。
  • 处理
    • 计算头部转动角度,判断左右转头。
    • 计算EAR,检测眨眼次数。
    • 计算MAR,检测是否张嘴。
    • 记录动作历史,用于活体检测。
    • 评估活体性,决定是否通过检测。
  • 输出:动作识别结果及活体检测结果。

技术框架:Python数学计算

6. 任务状态管理阶段
  • 输入:当前动作识别结果。
  • 处理
    • 根据当前任务步骤和检测结果更新用户状态。
    • 使用Redis存储更新后的状态。
  • 输出:更新后的任务状态。

技术框架:Redis

7. 结果封装与返回阶段
  • 输入:动作识别结果及更新后的任务状态。
  • 处理
    • 根据检测结果和任务进度构建响应消息。
    • 封装为JSON格式。
  • 输出:JSON响应数据。

技术框架:FastAPI, Pydantic

8. 前端接收与反馈阶段
  • 输入:后端返回的JSON响应。
  • 处理
    • 解析JSON数据。
    • 根据success字段更新界面提示和进度条。
  • 输出:更新后的用户界面,提示用户进行下一步任务或重新开始。

技术框架:微信小程序API (wx.request回调函数)


关键技术选型

前端

  • 微信小程序:用于跨平台用户界面开发,支持广泛的微信用户基础。
  • WeUI:提供微信风格的UI组件,提升开发效率和用户体验。
  • 关键API
    • wx.chooseImage:捕捉图像。
    • wx.getFileSystemManager:读取文件内容。
    • wx.request:发送HTTP请求。

后端

  • FastAPI:高性能、易用的Python Web框架,支持异步处理。
  • Uvicorn:ASGI服务器,用于运行FastAPI应用,提升性能。
  • MediaPipe Face Mesh:高效的人脸检测与关键点提取,提供468个面部关键点。
  • OpenCV:用于图像预处理,如颜色空间转换、图像解码等。
  • Redis:高效的内存数据存储,适用于管理用户任务状态,支持高并发访问。
  • Pydantic:用于数据验证和模型定义,确保数据的完整性和准确性。

安全与防护

  • HTTPS:确保所有前后端通信的安全性,防止数据被窃取或篡改。
  • Redis:高效管理用户状态,支持快速读写操作,适合高并发场景。
  • 数据验证:后端使用Pydantic进行严格的数据验证,防止恶意数据输入。

部署与运维

  • Docker:容器化部署后端服务,提升部署效率和环境一致性,简化运维管理。
  • Nginx:作为反向代理服务器,处理前端请求转发,提供负载均衡和安全防护。
  • 云服务:如AWS、阿里云,用于部署后端服务和Redis,确保系统的高可用性和可扩展性。

关键技术框架与算法详细说明

1. FastAPI

特点

  • 高性能,基于ASGI,支持异步处理。
  • 自动生成OpenAPI文档,便于API的调试和测试。
  • 内置数据验证与类型注解支持(Pydantic),确保数据的准确性。

优势

  • 支持异步编程,提升并发处理能力。
  • 简洁的代码结构,易于维护和扩展。
  • 兼容性强,易于与其他Python库集成。

2. MediaPipe Face Mesh

特点

  • 实时高精度人脸关键点检测,提供468个面部关键点。
  • 支持多种平台和设备,适用于移动端应用。
  • 高效,适用于实时应用场景。

优势

  • 精度高,覆盖面部各个区域,适合复杂的动作检测。
  • 易于集成,与OpenCV配合使用,提升图像处理效率。
  • 开源且持续更新,社区支持良好。

3. Redis

特点

  • 高性能的内存数据存储,支持快速读写操作。
  • 支持多种数据结构(字符串、哈希、列表、集合等)。
  • 提供持久化选项,确保数据的可靠性。

优势

  • 低延迟,适用于高并发场景,确保系统响应速度。
  • 支持分布式部署,易于扩展,提升系统的可用性和可扩展性。
  • 丰富的功能,如发布/订阅、事务处理等,满足多样化需求。

4. 动作识别算法

4.1 头部旋转检测
  • 原理:通过计算鼻尖与左右眼外角的向量角度,判断头部旋转方向。
  • 关键步骤
    1. 提取鼻尖、左眼外角和右眼外角的关键点坐标。
    2. 计算鼻尖到左右眼外角的向量角度。
    3. 根据角度差异,判断头部是否向左或向右转动。

实现代码

def calculate_angle(p1, p2):
    delta_y = p2.y - p1.y
    delta_x = p2.x - p1.x
    angle = math.degrees(math.atan2(delta_y, delta_x))
    return angle

def detect_head_turn(landmarks):
    nose_tip = landmarks.landmark[1]
    left_eye_outer = landmarks.landmark[33]
    right_eye_outer = landmarks.landmark[263]

    left_angle = calculate_angle(nose_tip, left_eye_outer)
    right_angle = calculate_angle(nose_tip, right_eye_outer)

    avg_angle = (left_angle + right_angle) / 2

    TURN_LEFT_THRESHOLD = -15  # 向左转头
    TURN_RIGHT_THRESHOLD = 15  # 向右转头

    if avg_angle < TURN_LEFT_THRESHOLD:
        return 'left'
    elif avg_angle > TURN_RIGHT_THRESHOLD:
        return 'right'
    else:
        return 'straight'
4.2 眨眼检测
  • 原理:通过计算眼睛的纵横比(EAR, Eye Aspect Ratio),检测眨眼次数。EAR是眼睛纵向距离与横向距离的比值,眨眼时EAR会显著减小。
  • 关键步骤
    1. 提取左眼和右眼的关键点。
    2. 计算EAR值。
    3. 当EAR低于阈值时,视为闭眼,计数一次眨眼。

实现代码

def eye_aspect_ratio(landmarks, eye_indices):
    p1 = landmarks.landmark[eye_indices[1]]
    p2 = landmarks.landmark[eye_indices[5]]
    p3 = landmarks.landmark[eye_indices[2]]
    p4 = landmarks.landmark[eye_indices[4]]
    p5 = landmarks.landmark[eye_indices[0]]
    p6 = landmarks.landmark[eye_indices[3]]

    vertical_1 = math.sqrt((p2.x - p4.x)**2 + (p2.y - p4.y)**2)
    vertical_2 = math.sqrt((p3.x - p5.x)**2 + (p3.y - p5.y)**2)
    horizontal = math.sqrt((p1.x - p6.x)**2 + (p1.y - p6.y)**2)

    ear = (vertical_1 + vertical_2) / (2.0 * horizontal)
    return ear

def detect_blink(landmarks, blink_counter, total_blinks):
    LEFT_EYE = [362, 385, 387, 263, 373, 380]
    RIGHT_EYE = [33, 160, 158, 133, 153, 144]

    EAR_THRESHOLD = 0.21  # 根据实际测试调整
    CONSEC_FRAMES = 3  # 眨眼最少持续的帧数

    left_ear = eye_aspect_ratio(landmarks, LEFT_EYE)
    right_ear = eye_aspect_ratio(landmarks, RIGHT_EYE)

    ear = (left_ear + right_ear) / 2.0

    if ear < EAR_THRESHOLD:
        blink_counter += 1
    else:
        if blink_counter >= CONSEC_FRAMES:
            total_blinks += 1
        blink_counter = 0

    return blink_counter, total_blinks
4.3 张嘴检测
  • 原理:通过计算嘴部的纵横比(MAR, Mouth Aspect Ratio),检测是否张嘴。MAR是嘴部纵向距离与横向距离的比值,张嘴时MAR会显著增大。
  • 关键步骤
    1. 提取上唇中点、下唇中点、左右嘴角的关键点。
    2. 计算MAR值。
    3. 当MAR超过阈值时,视为张嘴。

实现代码

def mouth_aspect_ratio(landmarks):
    upper_lip = landmarks.landmark[13]
    lower_lip = landmarks.landmark[14]
    left_mouth = landmarks.landmark[78]
    right_mouth = landmarks.landmark[308]

    vertical = math.sqrt((upper_lip.x - lower_lip.x)**2 + (upper_lip.y - lower_lip.y)**2)
    horizontal = math.sqrt((left_mouth.x - right_mouth.x)**2 + (left_mouth.y - right_mouth.y)**2)

    mar = vertical / horizontal
    return mar

def detect_mouth_open(landmarks, mouth_opened):
    MAR_THRESHOLD = 0.6  # 根据实际测试调整
    mar = mouth_aspect_ratio(landmarks)

    if mar > MAR_THRESHOLD:
        mouth_opened = True
    else:
        mouth_opened = False

    return mouth_opened

优化与注意事项

1. 性能优化

1.1 后端优化
  • 异步处理:利用FastAPI的异步特性,提升并发处理能力。
  • GPU加速:在服务器端使用GPU加速MediaPipe和OpenCV的处理速度,提升实时性。
  • 批处理请求:在高并发场景下,考虑批量处理图像请求,减少处理开销。
1.2 前端优化
  • 图像压缩:在前端对图像进行适当压缩,减少传输数据量,提升传输效率。
  • 采集频率控制:合理控制视频帧采集频率,平衡实时性与性能。

2. 安全性

2.1 数据传输安全
  • 强制使用HTTPS:确保所有数据传输通过HTTPS,防止中间人攻击。
  • 身份验证:引入用户身份验证机制,如Token验证,确保请求的合法性。
2.2 输入验证
  • 严格数据验证:后端使用Pydantic进行严格的数据验证,确保数据格式和内容的正确性。
  • 图像大小与格式限制:限制上传图像的大小和格式,防止恶意文件上传。

3. 用户体验

3.1 实时反馈
  • 检测结果展示:前端实时展示检测结果,提升用户参与感和体验。
  • 加载动画:在后端处理图像时,前端显示加载动画,减少用户等待时的焦虑。
3.2 错误处理
  • 明确错误提示:在检测失败或网络异常时,提供明确的错误提示和解决方案。
  • 重试机制:在网络请求失败时,提供重试选项,提升系统的鲁棒性。

4. 扩展性

4.1 模块化设计
  • 代码结构清晰:前后端代码结构清晰,易于维护和扩展。
  • 插件化组件:前端使用插件化的UI组件,后端使用模块化的函数库,便于添加新功能。
4.2 可配置性
  • 参数化阈值:将动作检测的阈值(如EAR_THRESHOLD、MAR_THRESHOLD)配置化,方便后期调整优化。
  • 任务步骤配置:将任务步骤和动作提示配置化,便于添加或修改检测任务。
4.3 服务扩展
  • 微服务架构:考虑将不同功能模块(如人脸检测、动作识别、状态管理)拆分为独立的微服务,提升系统的可维护性和扩展性。
  • 负载均衡:使用Nginx等负载均衡工具,提升系统的并发处理能力和稳定性。

http://www.kler.cn/a/464610.html

相关文章:

  • 理解linux内核中的几种地址
  • STLG_01_08_程序设计C语言 - 数组
  • 代码实战:基于InvSR对视频进行超分辨率重建
  • 数据挖掘入门介绍及代码实战
  • 海南省大数据发展中心:数据资产场景化评估案例手册(第二期)
  • 【Domain Generalization(2)】领域泛化在文生图领域的工作之——PromptStyler(ICCV23)
  • 几句话分析org.springframework.web.servlet.HandlerMapping体系机构
  • 在C#中,如何使用委托实现事件处理?
  • 计算机网络 (20)高速以太网
  • 【QT】:QT图形化界面概述
  • 解读一个新建的 Spring Boot 项目
  • 若依引入腾讯地图
  • FastDeploy部署paddlecls分类模型(windows)
  • element-plus大版本一样,但是小版本不一样导致页面出bug
  • 人工智能知识分享第六天-机器学习_​逻辑回归(Logistic Regression)
  • @Data
  • 关于Flutter应用国际化语言的设置
  • 复合机器人正以其高效、精准、灵活的特点,逐渐在汽车装配线上崭露头角
  • 使用XGBoost算法进行机器学习任务:从理论到实践
  • 树莓派之旅-在wsl-x86-64 上进行树莓派的交叉编译
  • 戴尔/Dell 电脑按什么快捷键可以进入 Bios 设置界面?
  • pyspark执行group by操作
  • df.drop()
  • 【剪映绿化版】剪映免费绿色版,全部功能可用
  • Centos7中安装X11vnc
  • 基于 GPUTasker 的 GPU 使用情况钉钉推送机器人实现