基于yolov8调用本地摄像头并将读取的信息传入jsonl中
最近在做水面垃圾识别的智能船 用到了yolov8进行目标检测 修改并添加了SEAttention注意力机制
详情见其他大神
【保姆级教程|YOLOv8添加注意力机制】【1】添加SEAttention注意力机制步骤详解、训练及推理使用_yolov8添加se-CSDN博客
并且修改传统的iou方法改为添加了wise-iou的方法 ,对于小目标,传统的IoU可能不够敏感,因为即使是微小的偏移也可能导致IoU显著下降。Wise-IoU通过加权可以更公平地对待小目标,从而提高小目标检测的性能 ,这对于我们船体的摄像头 查找远处或较小漂浮物起到了一定作用。
好了,回归正题。我们写了一个脚本 用于收集识别后的标框和参数信息 将这些信息存储进一个jsonl文件中 启用两个线程 在jetson nano b01 4gb的板子上进行运行 。
目的: 通过存储这些信息我们可以用于计算 例如计算到屏幕正下方的距离 可以做些简单的计算和路径规划等问题 后续我们还在完成这份工作
话不多说,我们先上传代码。 该代码结合gpt添加了许多注释 (真的很多,组里有人看不懂代码 所以写的时候只能加很多注释并让gpt规范格式)不过这样也方便大家的阅读和使用
以下是源码环节:
import cv2
from ultralytics import YOLO
import datetime
import json
import threading
import queue
import time # 导入 time 模块
# 队列用于线程间通信
data_queue = queue.Queue()
# 事件用于通知其他线程停止
stop_event = threading.Event()
# 将 id_counter 定义为全局变量
id_counter = 0
# 修改 detection_data 的定义,去掉 timestamp 并添加 id 作为第一个元素
def process_frames(model, cap):
"""
对摄像头捕捉到的视频帧进行处理,使用YOLO模型进行目标检测,并将结果放入队列中。
Args:
model (YOLO): YOLO目标检测模型实例。
cap (cv2.VideoCapture): 摄像头视频流对象。
Returns:
None
"""
global id_counter # 确保在函数内部使用的是全局变量id_counter
while not stop_event.is_set(): # 当停止事件未设置时,循环继续
ret, frame = cap.read() # 从摄像头读取一帧
if not ret: # 如果无法读取帧(摄像头可能已断开)
print("无法接收帧(可能是摄像头断开)")
break # 跳出循环
results = model(frame) # 使用模型处理帧
for result in results: # 遍历模型检测结果
boxes = result.boxes # 获取检测到的边界框
for box in boxes: # 遍历每个边界框
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 提取边界框坐标
confidence = round(float(box.conf[0]), 3) # 提取置信度
cls = int(box.cls[0]) # 提取类别索引
label = model.names[cls] # 获取类别名称
detection_data = { # 构造检测数据字典
"id": id_counter,
"x1": x1,
"y1": y1,
"x2": x2,
"y2": y2,
"confidence": confidence,
"label": label
}
data_queue.put(detection_data) # 将检测数据放入队列
# 绘制检测框和标签
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # 绘制绿色矩形框
cv2.putText(frame, f"{label} {confidence:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
(0, 255, 0), 2) # 在框上方绘制标签和置信度
id_counter += 1 # 在每次处理后增加 ID 计数器
now = datetime.datetime.now() # 获取当前时间
time_str = now.strftime("%Y-%m-%d %H:%M:%S") # 格式化时间字符串
cv2.putText(frame, time_str, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) # 在画面上显示时间
cv2.imshow('Camera', frame) # 显示画面
if cv2.waitKey(1) & 0xFF == ord('q'): # 如果用户按下 'q' 键
stop_event.set() # 设置停止事件
break # 跳出循环
# 添加 sleep 以控制帧率
time.sleep(1) # 每隔1秒处理一帧
cap.release() # 释放摄像头资源
cv2.destroyAllWindows() # 关闭所有OpenCV窗口
# write_to_file 函数不需要再处理 detection_data 中的 timestamp
def write_to_file(output_file, max_lines=1000):
line_count = 0 # 记录写入的行数
with open(output_file, 'w') as f: # 初始以写模式打开文件,清空文件内容
while True:
try:
detection_data = data_queue.get(timeout=1) # 尝试从队列获取数据
f.write(json.dumps(detection_data) + '\n')
line_count += 1
if line_count >= max_lines:
print(f"已达到{max_lines}行数据,清空文件并继续运行。")
f.close() # 关闭当前文件句柄
f = open(output_file, 'w') # 重新打开文件,清空内容
line_count = 0 # 重置行数计数器
except queue.Empty: # 如果队列为空,则等待下一次尝试
if stop_event.is_set(): # 检查是否需要退出
return
continue
def run_yolov8_detection(model_path="./yolov8n.pt", camera_id=0, output_file="ultralytics-main/detector.jsonl"):
"""
运行 YOLOv8 目标检测算法。
Args:
model_path (str, optional): YOLOv8 模型文件路径,默认为 "./yolov8n.pt"。
camera_id (int, optional): 摄像头设备 ID,默认为 0。
output_file (str, optional): 输出文件路径,默认为 "ultralytics-main/detector.jsonl"。
Returns:
None
"""
# 初始化YOLO模型
model = YOLO(model_path)
# 打开摄像头
cap = cv2.VideoCapture(camera_id)
# 检查摄像头是否成功打开
if not cap.isOpened():
print("无法打开摄像头")
return
# 创建并启动两个线程
# 第一个线程用于处理摄像头捕捉到的帧
processing_thread = threading.Thread(target=process_frames, args=(model, cap))
# 第二个线程用于将处理后的帧写入文件
writing_thread = threading.Thread(target=write_to_file, args=(output_file,))
# 启动两个线程
processing_thread.start()
writing_thread.start()
# 等待两个线程完成
# 等待处理帧的线程完成
processing_thread.join()
# 等待写入文件的线程完成
writing_thread.join()
# 调用函数
run_yolov8_detection()
如有问题请及时私信,欢迎大家指正!!!