从头开始开发基于虹软SDK的人脸识别考勤系统(python+RTSP开源)(三)
接着前几天的继续。之前说到了添加人员,现在咱们继续后面剩余的部分。
一、人员的查删
除了增加以外,还有删改查部分的代码,不过修改其实在这里没啥多大的作用,我就没写,直接是查询和删除,修改应该是可以修改部门和重新更换照片,有兴趣的自己写,也是很简单。
def get_staff(self, staff_id=None):
cursor = self.conn.cursor()
if staff_id:
cursor.execute('SELECT * FROM staff WHERE id=? AND is_active=1', (staff_id,))
return cursor.fetchone()
cursor.execute('SELECT * FROM staff WHERE is_active=1')
return cursor.fetchall()
def delete_staff(self, staff_id):
try:
cursor = self.conn.cursor()
cursor.execute('UPDATE staff SET is_active=0 WHERE id=?', (staff_id,))
self.conn.commit()
except sqlite3.Error as e:
print(f"删除员工失败: {e}")
raise
二、考勤规则
我这边考勤规则三次设置,8点30分前一次,中午11点30-14点一次,下午17点以后一次,第一次和第二次取符合时间区间的最早的记录,并且只记录一次,第三次则取时间区间内最晚的一次。也就是三个阶段
0:00-11:30第一阶段; 对应标识morning
11:30-14:00第二阶段 对应标识noon
14:00-23:59第三阶段 对应标识night
加标识非常方便后续的统计输出。
每个阶段都可打卡,这是考勤部分的可考勤时间规则不是最终统计符合考勤要求的规则。
def record_attendance(self, staff_id, check_type, is_holiday=False):
with self.lock:
try:
current_date = datetime.now().strftime('%Y-%m-%d')
cursor = self.conn.cursor()
# 检查该员工在当前日期、当前考勤时间段内是否已经有考勤记录
cursor.execute('''
SELECT id, check_time
FROM attendance
WHERE staff_id =? AND check_type =? AND date =?
''', (staff_id, check_type, current_date))
existing_record = cursor.fetchone()
if existing_record:
record_id, existing_check_time = existing_record
existing_check_time = datetime.strptime(existing_check_time, "%Y-%m-%d %H:%M:%S.%f")
current_time = datetime.now()
if check_type in ('morning', 'noon'):
# 第一次和第二次打卡取首次打卡时间,不更新
pass
elif check_type == 'night':
# 第三次打卡更新为最新时间
cursor.execute('''
UPDATE attendance
SET check_time =?
WHERE id =?
''', (current_time, record_id))
self.conn.commit()
else:
# 没有记录则插入新记录
cursor.execute('''
INSERT INTO attendance (staff_id, check_time, date, check_type, is_holiday)
VALUES (?,?,?,?,?)
''', (staff_id, datetime.now(), current_date, check_type, is_holiday))
self.conn.commit()
except sqlite3.Error as e:
print(f"记录考勤失败: {e}")
raise
代码中都做了注释了,不多解释了。
考勤时间区间的规则如此,则对应查询亦如此。
# 新增查询方法
def query_attendance(self, staff_name=None, start_date=None, end_date=None):
cursor = self.conn.cursor()
conditions = []
values = []
if staff_name:
conditions.append("s.name =?")
values.append(staff_name)
if start_date:
conditions.append("a.date >=?")
values.append(start_date)
if end_date:
conditions.append("a.date <=?")
values.append(end_date)
query = '''
SELECT s.name, a.date,
MIN(CASE WHEN a.check_type ='morning' THEN a.check_time END) AS morning_check_time,
MIN(CASE WHEN a.check_type = 'noon' THEN a.check_time END) AS noon_check_time,
MAX(CASE WHEN a.check_type = 'night' THEN a.check_time END) AS night_check_time
FROM attendance a
JOIN staff s ON a.staff_id = s.id
'''
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " GROUP BY s.name, a.date"
logging.info(f"执行查询: {query}, 参数: {values}")
cursor.execute(query, tuple(values))
return cursor.fetchall()
三、虹软SDK实现 重点内容
重点来了,虹软SDK实现,一个ArcSoftService类完整解决。一步步看。
很多代码都做了注释,直接看代码就很清晰了。
def __init__(self, db_service):
self.db = db_service
self.handle = None
self.lib = None
self.engine_mutex = QMutex() # 添加互斥锁
self.data_mutex = QMutex() # 初始化data_mutex
try:
self._init_engine()
self.known_ids, self.known_features = self._load_known_features()
except Exception as e:
print(f"ArcSoftService初始化失败: {e}")
引擎部分,包括加载SDK,激活检测等等,这里注意激活结果的代码输出,相关代码可以在虹软开发知识库上查到。
def _init_engine(self):
self.appid = config.arcsoft_appid
self.sdkkey = config.arcsoft_sdkkey
try:
self.lib = cdll.LoadLibrary(config.arcsoft_lib_path)
print("虹软SDK库文件加载成功")
self.lib.ASFActivation.argtypes = [c_char_p, c_char_p]
self.lib.ASFActivation.restype = c_int
self.lib.ASFGetActiveFileInfo.argtypes = [POINTER(ASF_ActiveFileInfo)]
self.lib.ASFGetActiveFileInfo.restype = c_int
self.lib.ASFInitEngine.argtypes = [c_int, c_int, c_int, c_int, c_int, POINTER(c_void_p)]
self.lib.ASFDetectFaces.restype = c_int
self.lib.ASFFaceFeatureExtract.argtypes = [c_void_p, c_int, c_int, c_int, c_void_p,
POINTER(ASF_SingleFaceInfo),
POINTER(ASF_FaceFeature)]
self.lib.ASFFaceFeatureExtract.restype = c_int
self.lib.ASFFaceFeatureCompare.argtypes = [c_void_p, POINTER(ASF_FaceFeature),
POINTER(ASF_FaceFeature),
POINTER(c_float)]
self.lib.ASFFaceFeatureCompare.restype = c_int
self.lib.ASFUninitEngine.argtypes = [c_void_p]
self.lib.ASFUninitEngine.restype = c_int
except Exception as e:
error_msg = f"虹软SDK库文件加载失败,请检查文件路径或重新安装SDK。具体错误信息: {e}"
print(error_msg)
QMessageBox.critical(None, "错误", error_msg)
raise
# 检查是否已经激活
active_file_info = ASF_ActiveFileInfo()
get_info_result = self.lib.ASFGetActiveFileInfo(ctypes.byref(active_file_info))
if get_info_result == 0:
print("虹软SDK已激活,跳过激活步骤")
else:
# 激活SDK
activation_result = self.lib.ASFActivation(
c_char_p(self.appid),
c_char_p(self.sdkkey)
)
if activation_result != 0:
print(f"虹软SDK激活失败,错误码: {activation_result}")
self._handle_error(activation_result)
raise Exception(f"虹软SDK激活失败,错误码: {activation_result}")
else:
print("虹软SDK激活成功")
# 初始化引擎
combinedMask = ASF_FACE_DETECT | ASF_FACERECOGNITION | ASF_AGE | ASF_GENDER | ASF_FACE3DANGLE
detectMode = ASF_DETECT_MODE_IMAGE
detectFaceOrientPriority = ASF_OP_0_ONLY
minFaceSize = 20
maxFaceNum = 50
self.handle = c_void_p()
init_result = self.lib.ASFInitEngine(
detectMode,
detectFaceOrientPriority,
minFaceSize,
maxFaceNum,
combinedMask,
ctypes.byref(self.handle)
)
if init_result != 0:
print(f"虹软引擎初始化失败,错误码: {init_result}")
self._handle_error(init_result)
raise Exception(f"虹软引擎初始化失败,错误码: {init_result}")
else:
print("虹软引擎初始化成功")
0就是激活成功了,其他错误代码如下:
def _handle_error(self, error_code):
# 根据虹软SDK手册添加具体错误处理逻辑
error_messages = {
-1: "未知错误",
-2: "参数错误",
-3: "内存分配失败",
-4: "引擎初始化失败",
-5: "引擎释放失败",
-6: "引擎未初始化",
-7: "引擎已初始化",
-8: "引擎激活失败",
-9: "引擎激活已过期",
-10: "引擎未激活",
-11: "引擎激活次数超出限制",
-12: "引擎激活失败,请检查激活码是否正确",
# 添加其他错误码及其对应的错误信息
}
if error_code in error_messages:
print(f"具体错误信息: {error_messages[error_code]}")
else:
print("未识别的错误码,请参考虹软SDK手册")
引擎释放
def _release_engine(self):
if self.handle:
self.engine_mutex.lock() # 加锁
try:
uninit_result = self.lib.ASFUninitEngine(self.handle)
if uninit_result != 0:
print(f"虹软引擎释放失败,错误码: {uninit_result}")
self._handle_error(uninit_result)
self.handle = None
print("虹软引擎已释放")
finally:
self.engine_mutex.unlock() # 解锁
if self.lib:
del self.lib
print("虹软SDK库已卸载")
特征库提取
def refresh_data(self):
self.data_mutex.lock() # 加锁
try:
self.known_features.clear()
self.known_ids.clear()
staff_list = self.db.get_staff()
# print(f"从数据库中获取到的人员数量: {len(staff_list)}")
for staff in staff_list:
self.known_ids.append(staff[0])
# 虹软特征数据直接存储bytes
feature_bytes = staff[4]
face_feature = ASF_FaceFeature()
face_feature.feature = ctypes.cast(ctypes.create_string_buffer(feature_bytes), ctypes.c_void_p)
face_feature.featureSize = len(feature_bytes)
self.known_features.append(face_feature)
# print(f"刷新后已知特征数量: {len(self.known_features)}")
finally:
self.data_mutex.unlock() # 解锁
def _load_known_features(self):
known_ids = []
known_features = []
staff_list = self.db.get_staff()
for staff in staff_list:
known_ids.append(staff[0])
feature_bytes = staff[4]
face_feature = ASF_FaceFeature()
face_feature.feature = ctypes.cast(ctypes.create_string_buffer(feature_bytes), ctypes.c_void_p)
face_feature.featureSize = len(feature_bytes)
known_features.append(face_feature)
return known_ids, known_features
把OpenCV读取摄像头获取图像帧转换为虹软SDK所需的格式
def _convert_frame_to_arcsoft_format(self, frame):
"""
将OpenCV读取的图像帧转换为虹软SDK所需的格式
:param frame: OpenCV读取的图像帧(numpy.ndarray)
:return: 转换data, width, height, format
"""
height, width, channels = frame.shape
if channels == 3:
format = ASF_PAF_RGB24_B8G8R8
else:
raise ValueError("不支持的图像通道数,仅支持3通道(RGB)图像")
data = frame.ctypes.data_as(ctypes.POINTER(ctypes.c_uint8))
return data, width, height, format
人脸特征比对,支持多人识别
def recognize(self, frame):
# print("进入ArcSoft识别方法")
multi_face_info = ASF_MultiFaceInfo()
data, width, height, format = self._convert_frame_to_arcsoft_format(frame)
self.engine_mutex.lock() # 加锁
try:
detect_result = self.lib.ASFDetectFaces(
self.handle,
width,
height,
format,
data,
ctypes.byref(multi_face_info)
)
finally:
self.engine_mutex.unlock() # 解锁
if detect_result != 0:
print(f"虹软人脸检测失败,错误码:{detect_result}")
self._handle_error(detect_result)
return []
print(f"检测到的人脸数量: {multi_face_info.faceNum}")
recognized_ids = []
for i in range(multi_face_info.faceNum):
single_face_info = ASF_SingleFaceInfo()
single_face_info.faceRect = multi_face_info.faceRect[i]
single_face_info.faceOrient = multi_face_info.faceOrient[i]
self.engine_mutex.lock() # 加锁
try:
current_feature = ASF_FaceFeature()
extract_result = self.lib.ASFFaceFeatureExtract(
self.handle,
width,
height,
format,
data,
ctypes.byref(single_face_info),
ctypes.byref(current_feature)
)
finally:
self.engine_mutex.unlock() # 解锁
if extract_result != 0:
print(f"虹软特征提取失败,错误码:{extract_result}")
self._handle_error(extract_result)
continue
else:
print(f"虹软特征提取成功,特征大小: {current_feature.featureSize}")
self.data_mutex.lock() # 加锁
try:
for j, known_feature in enumerate(self.known_features):
similarity = c_float()
self.engine_mutex.lock() # 加锁
try:
compare_result = self.lib.ASFFaceFeatureCompare(
self.handle,
ctypes.byref(current_feature),
ctypes.byref(known_feature),
ctypes.byref(similarity)
)
finally:
self.engine_mutex.unlock() # 解锁
if compare_result != 0:
print(f"虹软特征比对失败,错误码:{compare_result}")
self._handle_error(compare_result)
continue
print(f"当前人脸与已知人脸{self.known_ids[j]}的相似度: {similarity.value}")
print(f"比对阈值: {config.recognition_threshold}") # 输出比对阈值
if similarity.value > config.recognition_threshold:
recognized_ids.append(self.known_ids[j])
break
else:
print(f"当前人脸与已知人脸{self.known_ids[j]}相似度未达到阈值")
finally:
self.data_mutex.unlock() # 解锁
print(f"最终识别出的人员ID: {recognized_ids}")
return recognized_ids
增加人员时,导入照片自动识别是否存在人脸并且提取特征
def get_face_feature(image_path):
engine = None
try:
engine = ArcSoftService(None)
image = cv2.imread(image_path)
data, width, height, format = engine._convert_frame_to_arcsoft_format(image)
# 人脸检测
multi_face_info = ASF_MultiFaceInfo()
res = engine.lib.ASFDetectFaces(
engine.handle,
width,
height,
format,
data,
ctypes.byref(multi_face_info)
)
if res == 0 and multi_face_info.faceNum > 0:
single_face_info = ASF_SingleFaceInfo()
single_face_info.faceRect = multi_face_info.faceRect[0]
single_face_info.faceOrient = multi_face_info.faceOrient[0]
# 特征提取
face_feature = ASF_FaceFeature()
res = engine.lib.ASFFaceFeatureExtract(
engine.handle,
width,
height,
format,
data,
ctypes.byref(single_face_info),
ctypes.byref(face_feature)
)
if res == 0:
feature_bytes = ctypes.create_string_buffer(face_feature.featureSize)
ctypes.memmove(feature_bytes, face_feature.feature, face_feature.featureSize)
return feature_bytes.raw
else:
print(f"虹软特征提取失败,错误码:{res}")
engine._handle_error(res)
else:
print(f"虹软人脸检测失败,错误码:{res}或未检测到人脸")
engine._handle_error(res)
return None
except Exception as e:
print(f"虹软特征提取失败: {e}")
return None
finally:
if engine:
engine._release_engine()
今天先到这里,后续还有。