当前位置: 首页 > article >正文

从头开始开发基于虹软SDK的人脸识别考勤系统(python+RTSP开源)(三)

接着前几天的继续。之前说到了添加人员,现在咱们继续后面剩余的部分。

一、人员的查删

除了增加以外,还有删改查部分的代码,不过修改其实在这里没啥多大的作用,我就没写,直接是查询和删除,修改应该是可以修改部门和重新更换照片,有兴趣的自己写,也是很简单。

    def get_staff(self, staff_id=None):
        cursor = self.conn.cursor()
        if staff_id:
            cursor.execute('SELECT * FROM staff WHERE id=? AND is_active=1', (staff_id,))
            return cursor.fetchone()
        cursor.execute('SELECT * FROM staff WHERE is_active=1')
        return cursor.fetchall()

    def delete_staff(self, staff_id):
        try:
            cursor = self.conn.cursor()
            cursor.execute('UPDATE staff SET is_active=0 WHERE id=?', (staff_id,))
            self.conn.commit()
        except sqlite3.Error as e:
            print(f"删除员工失败: {e}")
            raise

二、考勤规则

我这边考勤规则三次设置,8点30分前一次,中午11点30-14点一次,下午17点以后一次,第一次和第二次取符合时间区间的最早的记录,并且只记录一次,第三次则取时间区间内最晚的一次。也就是三个阶段

0:00-11:30第一阶段;  对应标识morning

11:30-14:00第二阶段   对应标识noon

14:00-23:59第三阶段   对应标识night

加标识非常方便后续的统计输出。

每个阶段都可打卡,这是考勤部分的可考勤时间规则不是最终统计符合考勤要求的规则。

    def record_attendance(self, staff_id, check_type, is_holiday=False):
        with self.lock:
            try:
                current_date = datetime.now().strftime('%Y-%m-%d')
                cursor = self.conn.cursor()
                # 检查该员工在当前日期、当前考勤时间段内是否已经有考勤记录
                cursor.execute('''
                    SELECT id, check_time 
                    FROM attendance 
                    WHERE staff_id =? AND check_type =? AND date =?
                ''', (staff_id, check_type, current_date))
                existing_record = cursor.fetchone()

                if existing_record:
                    record_id, existing_check_time = existing_record
                    existing_check_time = datetime.strptime(existing_check_time, "%Y-%m-%d %H:%M:%S.%f")
                    current_time = datetime.now()
                    if check_type in ('morning', 'noon'):
                        # 第一次和第二次打卡取首次打卡时间,不更新
                        pass
                    elif check_type == 'night':
                        # 第三次打卡更新为最新时间
                        cursor.execute('''
                            UPDATE attendance 
                            SET check_time =? 
                            WHERE id =?
                        ''', (current_time, record_id))
                        self.conn.commit()
                else:
                    # 没有记录则插入新记录
                    cursor.execute('''
                        INSERT INTO attendance (staff_id, check_time, date, check_type, is_holiday)
                        VALUES (?,?,?,?,?)
                    ''', (staff_id, datetime.now(), current_date, check_type, is_holiday))
                    self.conn.commit()
            except sqlite3.Error as e:
                print(f"记录考勤失败: {e}")
                raise

代码中都做了注释了,不多解释了。

考勤时间区间的规则如此,则对应查询亦如此。

# 新增查询方法
    def query_attendance(self, staff_name=None, start_date=None, end_date=None):
        cursor = self.conn.cursor()
        conditions = []
        values = []
        if staff_name:
            conditions.append("s.name =?")
            values.append(staff_name)
        if start_date:
            conditions.append("a.date >=?")
            values.append(start_date)
        if end_date:
            conditions.append("a.date <=?")
            values.append(end_date)

        query = '''
            SELECT s.name, a.date, 
                MIN(CASE WHEN a.check_type ='morning' THEN a.check_time END) AS morning_check_time,
                MIN(CASE WHEN a.check_type = 'noon' THEN a.check_time END) AS noon_check_time,
                MAX(CASE WHEN a.check_type = 'night' THEN a.check_time END) AS night_check_time
            FROM attendance a
            JOIN staff s ON a.staff_id = s.id
        '''
        if conditions:
            query += " WHERE " + " AND ".join(conditions)
        query += " GROUP BY s.name, a.date"
        logging.info(f"执行查询: {query}, 参数: {values}")
        cursor.execute(query, tuple(values))
        return cursor.fetchall()

三、虹软SDK实现   重点内容

重点来了,虹软SDK实现,一个ArcSoftService类完整解决。一步步看。

很多代码都做了注释,直接看代码就很清晰了。

def __init__(self, db_service):
        self.db = db_service
        self.handle = None
        self.lib = None
        self.engine_mutex = QMutex()  # 添加互斥锁
        self.data_mutex = QMutex()  # 初始化data_mutex
        try:
            self._init_engine()
            self.known_ids, self.known_features = self._load_known_features()
        except Exception as e:
            print(f"ArcSoftService初始化失败: {e}")

引擎部分,包括加载SDK,激活检测等等,这里注意激活结果的代码输出,相关代码可以在虹软开发知识库上查到。

def _init_engine(self):
        self.appid = config.arcsoft_appid
        self.sdkkey = config.arcsoft_sdkkey
        try:
            self.lib = cdll.LoadLibrary(config.arcsoft_lib_path)
            print("虹软SDK库文件加载成功")
            self.lib.ASFActivation.argtypes = [c_char_p, c_char_p]
            self.lib.ASFActivation.restype = c_int
            self.lib.ASFGetActiveFileInfo.argtypes = [POINTER(ASF_ActiveFileInfo)]
            self.lib.ASFGetActiveFileInfo.restype = c_int
            self.lib.ASFInitEngine.argtypes = [c_int, c_int, c_int, c_int, c_int, POINTER(c_void_p)]
            self.lib.ASFDetectFaces.restype = c_int
            self.lib.ASFFaceFeatureExtract.argtypes = [c_void_p, c_int, c_int, c_int, c_void_p,
                                                       POINTER(ASF_SingleFaceInfo),
                                                       POINTER(ASF_FaceFeature)]
            self.lib.ASFFaceFeatureExtract.restype = c_int
            self.lib.ASFFaceFeatureCompare.argtypes = [c_void_p, POINTER(ASF_FaceFeature),
                                                       POINTER(ASF_FaceFeature),
                                                       POINTER(c_float)]
            self.lib.ASFFaceFeatureCompare.restype = c_int
            self.lib.ASFUninitEngine.argtypes = [c_void_p]
            self.lib.ASFUninitEngine.restype = c_int

        except Exception as e:
            error_msg = f"虹软SDK库文件加载失败,请检查文件路径或重新安装SDK。具体错误信息: {e}"
            print(error_msg)
            QMessageBox.critical(None, "错误", error_msg)
            raise

        # 检查是否已经激活
        active_file_info = ASF_ActiveFileInfo()
        get_info_result = self.lib.ASFGetActiveFileInfo(ctypes.byref(active_file_info))
        if get_info_result == 0:
            print("虹软SDK已激活,跳过激活步骤")
        else:
            # 激活SDK
            activation_result = self.lib.ASFActivation(
                c_char_p(self.appid),
                c_char_p(self.sdkkey)
            )
            if activation_result != 0:
                print(f"虹软SDK激活失败,错误码: {activation_result}")
                self._handle_error(activation_result)
                raise Exception(f"虹软SDK激活失败,错误码: {activation_result}")
            else:
                print("虹软SDK激活成功")

        # 初始化引擎
        combinedMask = ASF_FACE_DETECT | ASF_FACERECOGNITION | ASF_AGE | ASF_GENDER | ASF_FACE3DANGLE
        detectMode = ASF_DETECT_MODE_IMAGE
        detectFaceOrientPriority = ASF_OP_0_ONLY
        minFaceSize = 20
        maxFaceNum = 50
        self.handle = c_void_p()
        init_result = self.lib.ASFInitEngine(
            detectMode,
            detectFaceOrientPriority,
            minFaceSize,
            maxFaceNum,
            combinedMask,
            ctypes.byref(self.handle)
        )
        if init_result != 0:
            print(f"虹软引擎初始化失败,错误码: {init_result}")
            self._handle_error(init_result)
            raise Exception(f"虹软引擎初始化失败,错误码: {init_result}")
        else:
            print("虹软引擎初始化成功")

0就是激活成功了,其他错误代码如下:

def _handle_error(self, error_code):
        # 根据虹软SDK手册添加具体错误处理逻辑
        error_messages = {
            -1: "未知错误",
            -2: "参数错误",
            -3: "内存分配失败",
            -4: "引擎初始化失败",
            -5: "引擎释放失败",
            -6: "引擎未初始化",
            -7: "引擎已初始化",
            -8: "引擎激活失败",
            -9: "引擎激活已过期",
            -10: "引擎未激活",
            -11: "引擎激活次数超出限制",
            -12: "引擎激活失败,请检查激活码是否正确",
            # 添加其他错误码及其对应的错误信息
        }
        if error_code in error_messages:
            print(f"具体错误信息: {error_messages[error_code]}")
        else:
            print("未识别的错误码,请参考虹软SDK手册")

引擎释放

def _release_engine(self):
        if self.handle:
            self.engine_mutex.lock()  # 加锁
            try:
                uninit_result = self.lib.ASFUninitEngine(self.handle)
                if uninit_result != 0:
                    print(f"虹软引擎释放失败,错误码: {uninit_result}")
                    self._handle_error(uninit_result)
                self.handle = None
                print("虹软引擎已释放")
            finally:
                self.engine_mutex.unlock()  # 解锁
        if self.lib:
            del self.lib
            print("虹软SDK库已卸载")

特征库提取

def refresh_data(self):
        self.data_mutex.lock()  # 加锁
        try:
            self.known_features.clear()
            self.known_ids.clear()
            staff_list = self.db.get_staff()
            # print(f"从数据库中获取到的人员数量: {len(staff_list)}")
            for staff in staff_list:
                self.known_ids.append(staff[0])
                # 虹软特征数据直接存储bytes
                feature_bytes = staff[4]
                face_feature = ASF_FaceFeature()
                face_feature.feature = ctypes.cast(ctypes.create_string_buffer(feature_bytes), ctypes.c_void_p)
                face_feature.featureSize = len(feature_bytes)
                self.known_features.append(face_feature)
            # print(f"刷新后已知特征数量: {len(self.known_features)}")
        finally:
            self.data_mutex.unlock()  # 解锁


def _load_known_features(self):
        known_ids = []
        known_features = []
        staff_list = self.db.get_staff()
        for staff in staff_list:
            known_ids.append(staff[0])
            feature_bytes = staff[4]
            face_feature = ASF_FaceFeature()
            face_feature.feature = ctypes.cast(ctypes.create_string_buffer(feature_bytes), ctypes.c_void_p)
            face_feature.featureSize = len(feature_bytes)
            known_features.append(face_feature)
        return known_ids, known_features

把OpenCV读取摄像头获取图像帧转换为虹软SDK所需的格式

def _convert_frame_to_arcsoft_format(self, frame):
        """
        将OpenCV读取的图像帧转换为虹软SDK所需的格式
        :param frame: OpenCV读取的图像帧(numpy.ndarray)
        :return: 转换data, width, height, format
        """
        height, width, channels = frame.shape
        if channels == 3:
            format = ASF_PAF_RGB24_B8G8R8
        else:
            raise ValueError("不支持的图像通道数,仅支持3通道(RGB)图像")

        data = frame.ctypes.data_as(ctypes.POINTER(ctypes.c_uint8))
        return data, width, height, format

人脸特征比对,支持多人识别

def recognize(self, frame):
        # print("进入ArcSoft识别方法")
        multi_face_info = ASF_MultiFaceInfo()
        data, width, height, format = self._convert_frame_to_arcsoft_format(frame)

        self.engine_mutex.lock()  # 加锁
        try:
            detect_result = self.lib.ASFDetectFaces(
                self.handle,
                width,
                height,
                format,
                data,
                ctypes.byref(multi_face_info)
            )
        finally:
            self.engine_mutex.unlock()  # 解锁

        if detect_result != 0:
            print(f"虹软人脸检测失败,错误码:{detect_result}")
            self._handle_error(detect_result)
            return []
        print(f"检测到的人脸数量: {multi_face_info.faceNum}")

        recognized_ids = []
        for i in range(multi_face_info.faceNum):
            single_face_info = ASF_SingleFaceInfo()
            single_face_info.faceRect = multi_face_info.faceRect[i]
            single_face_info.faceOrient = multi_face_info.faceOrient[i]

            self.engine_mutex.lock()  # 加锁
            try:
                current_feature = ASF_FaceFeature()
                extract_result = self.lib.ASFFaceFeatureExtract(
                    self.handle,
                    width,
                    height,
                    format,
                    data,
                    ctypes.byref(single_face_info),
                    ctypes.byref(current_feature)
                )
            finally:
                self.engine_mutex.unlock()  # 解锁

            if extract_result != 0:
                print(f"虹软特征提取失败,错误码:{extract_result}")
                self._handle_error(extract_result)
                continue
            else:
                print(f"虹软特征提取成功,特征大小: {current_feature.featureSize}")

            self.data_mutex.lock()  # 加锁
            try:
                for j, known_feature in enumerate(self.known_features):
                    similarity = c_float()
                    self.engine_mutex.lock()  # 加锁
                    try:
                        compare_result = self.lib.ASFFaceFeatureCompare(
                            self.handle,
                            ctypes.byref(current_feature),
                            ctypes.byref(known_feature),
                            ctypes.byref(similarity)
                        )
                    finally:
                        self.engine_mutex.unlock()  # 解锁

                    if compare_result != 0:
                        print(f"虹软特征比对失败,错误码:{compare_result}")
                        self._handle_error(compare_result)
                        continue
                    print(f"当前人脸与已知人脸{self.known_ids[j]}的相似度: {similarity.value}")
                    print(f"比对阈值: {config.recognition_threshold}")  # 输出比对阈值
                    if similarity.value > config.recognition_threshold:
                        recognized_ids.append(self.known_ids[j])
                        break
                    else:
                        print(f"当前人脸与已知人脸{self.known_ids[j]}相似度未达到阈值")
            finally:
                self.data_mutex.unlock()  # 解锁

        print(f"最终识别出的人员ID: {recognized_ids}")
        return recognized_ids

增加人员时,导入照片自动识别是否存在人脸并且提取特征

def get_face_feature(image_path):
        engine = None
        try:
            engine = ArcSoftService(None)
            image = cv2.imread(image_path)
            data, width, height, format = engine._convert_frame_to_arcsoft_format(image)

            # 人脸检测
            multi_face_info = ASF_MultiFaceInfo()
            res = engine.lib.ASFDetectFaces(
                engine.handle,
                width,
                height,
                format,
                data,
                ctypes.byref(multi_face_info)
            )

            if res == 0 and multi_face_info.faceNum > 0:
                single_face_info = ASF_SingleFaceInfo()
                single_face_info.faceRect = multi_face_info.faceRect[0]
                single_face_info.faceOrient = multi_face_info.faceOrient[0]

                # 特征提取
                face_feature = ASF_FaceFeature()
                res = engine.lib.ASFFaceFeatureExtract(
                    engine.handle,
                    width,
                    height,
                    format,
                    data,
                    ctypes.byref(single_face_info),
                    ctypes.byref(face_feature)
                )

                if res == 0:
                    feature_bytes = ctypes.create_string_buffer(face_feature.featureSize)
                    ctypes.memmove(feature_bytes, face_feature.feature, face_feature.featureSize)
                    return feature_bytes.raw
                else:
                    print(f"虹软特征提取失败,错误码:{res}")
                    engine._handle_error(res)
            else:
                print(f"虹软人脸检测失败,错误码:{res}或未检测到人脸")
                engine._handle_error(res)
            return None
        except Exception as e:
            print(f"虹软特征提取失败: {e}")
            return None
        finally:
            if engine:
                engine._release_engine()

今天先到这里,后续还有。


http://www.kler.cn/a/580849.html

相关文章:

  • 1688商品列表商品详情API接口全面解析
  • upload-labs详解(13-20)文件上传分析
  • 大湾区经济网战略媒体澳门《红刊》访霍英东集团
  • 转自南京日报:天洑软件创新AI+仿真技术变制造为“智造
  • 从C#中的MemberwiseClone()浅拷贝说起
  • CentOS7离线部署安装Dify
  • 网络安全技术整体架构 一个中心三重防护
  • 基于架构的软件开发(ABSD)
  • 3D模型语义搜索引擎
  • 聚水潭数据集成到MySQL的高效方法
  • 51c视觉~3D~合集2
  • 笔记本电脑外接固态移动硬盘可以用于深度学习吗
  • Cryptography 与 PyCryptodome 源码级解析
  • MCP-代码解读TypeScript版本
  • (二分 数学推导区间 两个数组的距离值)leetcode 1385
  • 【第21节】C++设计模式(行为模式)-Chain of Responsibility(责任链)模式
  • Redis7——进阶篇(五)
  • Consensus 大会全观察:政策、生态与技术交汇,香港能否抢占 Web3 先机?
  • 【网络编程】WSAAsyncSelect 模型
  • redis 用来实现排行榜的功能