当前位置: 首页 > article >正文

亚博microros小车-原生ubuntu支持系列:21 颜色追踪

背景知识

这个测试例子用到了很多opencv的函数,举个例子。

  #cv2.findContours函数来找到二值图像中的轮廓。
        #参数:
                #参数1:输  入的二值图像。通常是经过阈值处理后的图像,例如在颜色过滤之后生成的掩码。
                #参数2(cv2.RETR_EXTERNAL):轮廓的检索模式。有几种模式可选,常用的包括:
                   # cv2.RETR_EXTERNAL:只检测最外层的轮廓。
                   # cv2.RETR_LIST:检测所有的轮廓并保存到列表中。
                   # cv2.RETR_CCOMP:检测所有轮廓并将其组织为两层的层次结构。
                   # cv2.RETR_TREE:检测所有轮廓并重构整个轮廓层次结构。
               # 参数3(cv2.CHAIN_APPROX_SIMPLE):轮廓的近似方法。有两种方法可选,常用的有:
                   # cv2.CHAIN_APPROX_SIMPLE:压缩水平、垂直和对角线方向上的所有轮廓,只保留端点。
                   # cv2.CHAIN_APPROX_NONE:保留所有的轮廓点。
        #返回值:      contours:包含检测到的轮廓的列表。每个轮廓由一系列点组成。
#                   _(下划线):层次信息,通常在后续处理中可能会用到。在这里,我们通常用下划线表示我们不关心这个返回值。

我第一次看代码的时候,发现有些没理解,主要是靠deepseek 搜索了加了注释。

src/yahboom_esp32ai_car/yahboom_esp32ai_car/目录下新建astra_common.py

#!/usr/bin/env python
# encoding: utf-8
import time
import cv2 as cv
import numpy as np



def write_HSV(wf_path, value):
    with open(wf_path, "w") as wf:
        wf_str = str(value[0][0]) + ', ' + str(
            value[0][1]) + ', ' + str(value[0][2]) + ', ' + str(
            value[1][0]) + ', ' + str(value[1][1]) + ', ' + str(
            value[1][2])
        wf.write(wf_str)
        wf.flush()


def read_HSV(rf_path):
    rf = open(rf_path, "r+")
    line = rf.readline()
    if len(line) == 0: return ()
    list = line.split(',')
    if len(list) != 6: return ()
    hsv = ((int(list[0]), int(list[1]), int(list[2])),
           (int(list[3]), int(list[4]), int(list[5])))
    rf.flush()
    return hsv


# 定义函数,第一个参数是缩放比例,第二个参数是需要显示的图片组成的元组或者列表
# Define the function, the first parameter is the zoom ratio, and the second parameter is a tuple or list of pictures to be displayed
def ManyImgs(scale, imgarray):
    rows = len(imgarray)  # 元组或者列表的长度 Length of tuple or list
    cols = len(imgarray[0])  # 如果imgarray是列表,返回列表里第一幅图像的通道数,如果是元组,返回元组里包含的第一个列表的长度
    # If imgarray is a list, return the number of channels of the first image in the list, if it is a tuple, return the length of the first list contained in the tuple
    # print("rows=", rows, "cols=", cols)
    # 判断imgarray[0]的类型是否是list,
    # 是list,表明imgarray是一个元组,需要垂直显示
    # Determine whether the type of imgarray[0] is list,
    # It is a list, indicating that imgarray is a tuple and needs to be displayed vertically
    rowsAvailable = isinstance(imgarray[0], list)
    # 第一张图片的宽高
    # The width and height of the first picture
    width = imgarray[0][0].shape[1]
    height = imgarray[0][0].shape[0]
    # print("width=", width, "height=", height)
    # 如果传入的是一个元组
    # If the incoming is a tuple
    if rowsAvailable:
        for x in range(0, rows):
            for y in range(0, cols):
                # 遍历元组,如果是第一幅图像,不做变换
                # Traverse the tuple, if it is the first image, do not transform
                if imgarray[x][y].shape[:2] == imgarray[0][0].shape[:2]:
                    imgarray[x][y] = cv.resize(imgarray[x][y], (0, 0), None, scale, scale)
                # 将其他矩阵变换为与第一幅图像相同大小,缩放比例为scale
                # Transform other matrices to the same size as the first image, and the zoom ratio is scale
                else:
                    imgarray[x][y] = cv.resize(imgarray[x][y], (imgarray[0][0].shape[1], imgarray[0][0].shape[0]), None,
                                               scale, scale)
                # 如果图像是灰度图,将其转换成彩色显示
                # If the image is grayscale, convert it to color display
                if len(imgarray[x][y].shape) == 2:
                    imgarray[x][y] = cv.cvtColor(imgarray[x][y], cv.COLOR_GRAY2BGR)
        # 创建一个空白画布,与第一张图片大小相同
        # Create a blank canvas, the same size as the first picture
        imgBlank = np.zeros((height, width, 3), np.uint8)
        hor = [imgBlank] * rows  # 与第一张图片大小相同,与元组包含列表数相同的水平空白图像
        # The same size as the first picture, and the same number of horizontal blank images as the tuple contains the list
        for x in range(0, rows):
            # 将元组里第x个列表水平排列
            # Arrange the x-th list in the tuple horizontally
            hor[x] = np.hstack(imgarray[x])
        ver = np.vstack(hor)  # 将不同列表垂直拼接 Concatenate different lists vertically
    # 如果传入的是一个列表 If the incoming is a list
    else:
        # 变换操作,与前面相同
        # Transformation operation, same as before
        for x in range(0, rows):
            if imgarray[x].shape[:2] == imgarray[0].shape[:2]:
                imgarray[x] = cv.resize(imgarray[x], (0, 0), None, scale, scale)
            else:
                imgarray[x] = cv.resize(imgarray[x], (imgarray[0].shape[1], imgarray[0].shape[0]), None, scale, scale)
            if len(imgarray[x].shape) == 2:
                imgarray[x] = cv.cvtColor(imgarray[x], cv.COLOR_GRAY2BGR)
        # 将列表水平排列
        # Arrange the list horizontally
        hor = np.hstack(imgarray)
        ver = hor
    return ver


class color_follow:
    def __init__(self):
        '''
        初始化一些参数
	    Initialize some parameters
        '''
        self.Center_x = 0
        self.Center_y = 0
        self.Center_r = 0

    def object_follow(self, img, hsv_msg):
        src = img.copy()
        # 由颜色范围创建NumPy数组
        # Create NumPy array from color range
        src = cv.cvtColor(src, cv.COLOR_BGR2HSV)
        lower = np.array(hsv_msg[0], dtype="uint8")
        upper = np.array(hsv_msg[1], dtype="uint8")
        # 根据特定颜色范围创建mask,inRange的作用是根据阈值进行二值化:阈值内的像素设置为白色(255),阈值外的设置为黑色(0)
        # Create a mask based on a specific color range
        mask = cv.inRange(src, lower, upper)
        color_mask = cv.bitwise_and(src, src, mask=mask)
        # 将图像转为灰度图
        # Convert the image to grayscale
        gray_img = cv.cvtColor(color_mask, cv.COLOR_RGB2GRAY)
        # 获取不同形状的结构元素,定义一个用于形态学操作的“探针”,决定操作影响的区域大小和形状。代码是矩形的。内核越大,处理的区域范围越大(可能过度平滑细节)。
        # Get structure elements of different shapes
        kernel = cv.getStructuringElement(cv.MORPH_RECT, (5, 5))
        # 形态学闭操作,形态学闭运算是 先膨胀(Dilation)后腐蚀(Erosion) 的组合操作,目的是消除图像中的微小暗区(如孔洞)或断裂区域,同时保持主体结构的完整性。
        # Morphological closed operation
        gray_img = cv.morphologyEx(gray_img, cv.MORPH_CLOSE, kernel)
        # 图像二值化操作
        # Image binarization operation
        ret, binary = cv.threshold(gray_img, 10, 255, cv.THRESH_BINARY)
        # 获取轮廓点集(坐标)
        # Get the set of contour points (coordinates)
        find_contours = cv.findContours(binary, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
        #兼容不同版本opencv
        if len(find_contours) == 3:#OpenCV v3、v4-pre 或 v4-alpha
            contours = find_contours[1]
        else:# OpenCV v2.4、v4-beta 或 v4-official
            contours = find_contours[0]
        if len(contours) != 0:#检测到轮廓
            areas = []
            for c in range(len(contours)): areas.append(cv.contourArea(contours[c]))
            max_id = areas.index(max(areas))
            max_rect = cv.minAreaRect(contours[max_id])#计算最小外接矩形
            max_box = cv.boxPoints(max_rect)#获取矩形顶点坐标
            max_box = np.int0(max_box)#转为整数,便于绘图
            #计算物体的最小外接圆,参数为轮廓的点集
            (color_x, color_y), color_radius = cv.minEnclosingCircle(max_box)
            # 将检测到的颜色用原形线圈标记出来
            # Mark the detected color with the original shape coil
            self.Center_x = int(color_x)
            self.Center_y = int(color_y)
            self.Center_r = int(color_radius)
            cv.circle(img, (self.Center_x, self.Center_y), self.Center_r, (255, 0, 255), 2)#画圆显示物体的圆形轮廓
            cv.circle(img, (self.Center_x, self.Center_y), 2, (0, 0, 255), -1)# 画红色实心圆作为中心点
        else:
            self.Center_x = 0
            self.Center_y = 0
            self.Center_r = 0
        return img, binary, (self.Center_x, self.Center_y, self.Center_r)

    def Roi_hsv(self, img, Roi):
        '''
        获取某一区域的HSV的范围
        Get the range of HSV in a certain area
        :param img: Color map 彩色图 
        :param Roi:  (x_min, y_min, x_max, y_max)
        Roi=(290,280,350,340)
        :return: 图像和HSV的范围 例如:(0,0,90)(177,40,150)
	         Image and HSV range E.g:(0,0,90)(177,40,150) 
        '''
        H = [];S = [];V = []
        # 将彩色图转成HSV
        # Convert color image to HSV
        HSV = cv.cvtColor(img, cv.COLOR_BGR2HSV)
        # 画矩形框
        # Draw a rectangular frame
        # cv.rectangle(img, (Roi[0], Roi[1]), (Roi[2], Roi[3]), (0, 255, 0), 2)
        # 依次取出每行每列的H,S,V值放入容器中
        # Take out the H, S, V values of each row and each column in turn and put them into the container
        for i in range(Roi[0], Roi[2]):
            for j in range(Roi[1], Roi[3]):
                H.append(HSV[j, i][0])
                S.append(HSV[j, i][1])
                V.append(HSV[j, i][2])
        # 分别计算出H,S,V的最大最小
        # Calculate the maximum and minimum of H, S, and V respectively
        H_min = min(H); H_max = max(H)
        S_min = min(S); S_max = 253
        V_min = min(V); V_max = 255
        # HSV范围调整
        # HSV range adjustment
        if H_max + 5 > 255: H_max = 255
        else: H_max += 5
        if H_min - 5 < 0: H_min = 0
        else: H_min -= 5
        if S_min - 20 < 0: S_min = 0
        else: S_min -= 20
        if V_min - 20 < 0: V_min = 0
        else: V_min -= 20
        lowerb = 'lowerb : (' + str(H_min) + ' ,' + str(S_min) + ' ,' + str(V_min) + ')'
        upperb = 'upperb : (' + str(H_max) + ' ,' + str(S_max) + ' ,' + str(V_max) + ')'
        txt1 = 'Learning ...'
        txt2 = 'OK !!!'
        if S_min < 5 or V_min < 5:
            cv.putText(img, txt1, (30, 50), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
        else:
            cv.putText(img, txt2, (30, 50), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
        cv.putText(img, lowerb, (150, 30), cv.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
        cv.putText(img, upperb, (150, 50), cv.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
        hsv_range = ((int(H_min), int(S_min), int(V_min)), (int(H_max), int(S_max), int(V_max)))
        return img, hsv_range


class simplePID:
    def __init__(self, kp, ki, kd):
        self.kp = kp
        self.ki = ki
        self.kd = kd
        self.targetpoint = 0
        self.intergral = 0
        self.derivative = 0
        self.prevError = 0

    def compute(self, target, current):
        error = target - current
        self.intergral += error
        self.derivative = error - self.prevError
        self.targetpoint = self.kp * error + self.ki * self.intergral + self.kd * self.derivative
        self.prevError = error
        return self.targetpoint

    def reset(self):
        self.targetpoint = 0
        self.intergral = 0
        self.derivative = 0
        self.prevError = 0

class Tracker(object):
    '''
    追踪者模块,用于追踪指定目标
    Tracker module, used to track the specified target
    '''
    def __init__(self, tracker_type="BOOSTING"):
        '''
        初始化追踪器种类
        Initialize the type of tracker
        '''
        # 获得opencv版本
	# Get the opencv version
        (major_ver, minor_ver, subminor_ver) = (cv.__version__).split('.')
        self.tracker_type = tracker_type
        self.isWorking = False
        # 构造追踪器
	# Construct tracker
        if int(major_ver) < 3: self.tracker = cv.Tracker_create(tracker_type)
        else:
            if tracker_type == 'BOOSTING': self.tracker = cv.TrackerBoosting_create()
            if tracker_type == 'MIL': self.tracker = cv.TrackerMIL_create()
            if tracker_type == 'KCF': self.tracker = cv.TrackerKCF_create()
            if tracker_type == 'TLD': self.tracker = cv.TrackerTLD_create()
            if tracker_type == 'MEDIANFLOW': self.tracker = cv.TrackerMedianFlow_create()
            if tracker_type == 'GOTURN': self.tracker = cv.TrackerGOTURN_create()
            if tracker_type == 'MOSSE': self.tracker = cv.TrackerMOSSE_create()
            if tracker_type == "CSRT": self.tracker = cv.TrackerCSRT_create()

    def initWorking(self, frame, box):
        '''
        Tracker work initialization 追踪器工作初始化
        frame:初始化追踪画面
        box:追踪的区域
        '''
        if not self.tracker: raise Exception("追踪器未初始化Tracker is not initialized")
        status = self.tracker.init(frame, box)
        #if not status: raise Exception("追踪器工作初始化失败Failed to initialize tracker job")
        self.coord = box
        self.isWorking = True

    def track(self, frame):
        if self.isWorking:
            status, self.coord = self.tracker.update(frame)
            if status:
                p1 = (int(self.coord[0]), int(self.coord[1]))
                p2 = (int(self.coord[0] + self.coord[2]), int(self.coord[1] + self.coord[3]))
                cv.rectangle(frame, p1, p2, (255, 0, 0), 2, 1)
                return frame, p1, p2
            else:
                # 跟踪失败
		# Tracking failed
                cv.putText(frame, "Tracking failure detected", (100, 80), cv.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2)
                return frame, (0, 0), (0, 0)
        else: return frame, (0, 0), (0, 0)

src/yahboom_esp32ai_car/yahboom_esp32ai_car/目录下新建colorHSV.py

#ros lib
import rclpy
from rclpy.node import Node
from std_msgs.msg import Bool
from geometry_msgs.msg import Twist
from sensor_msgs.msg import CompressedImage, LaserScan, Image
from yahboomcar_msgs.msg import Position
from cv_bridge import CvBridge
#common lib
import os
import threading
import math
from yahboom_esp32ai_car.astra_common import * #自定义函数
from yahboomcar_msgs.msg import Position

from rclpy.time import Time
import datetime
from ament_index_python.packages import get_package_share_directory #获取shares目录绝对路径

print("import finish")
cv_edition = cv.__version__
print("cv_edition: ",cv_edition)
class Color_Identify(Node):
    def __init__(self,name):
        super().__init__(name)
        #create a publisher 创建发布者
        self.pub_position = self.create_publisher(Position,"/Current_point", 10)# 发布目标位置
        self.pub_cmdVel = self.create_publisher(Twist, '/cmd_vel', 10) #发布速度指令
        self.pub_img = self.create_publisher(Image,'/image_raw',500)  #发布处理后的图
        self.bridge = CvBridge() #opencv 和ros图像转换工具
        #初始化变量
        self.index = 2
        self.Roi_init = () #roi区域坐标
        self.hsv_range = () #hsv 颜色范围
        self.end = 0 #fps 计算用
        self.circle = (0, 0, 0) #检测到目标圆信息(x,y,radius)
        self.point_pose = (0, 0, 0) #目标点位置 (x,y,z)
        self.dyn_update = True
        self.Start_state = True
        self.select_flags = False
        self.gTracker_state = False
        self.windows_name = 'frame'
        self.Track_state = 'identify' #跟踪状态,['init','identify','tracking']
        self.color = color_follow() #颜色跟踪类
        self.cols, self.rows = 0, 0 #鼠标选择区域坐标
        self.Mouse_XY = (0, 0) #鼠标点击坐标
        self.declare_param()#hsv 声明参数

        
        self.hsv_text =    get_package_share_directory('yahboom_esp32ai_car')+'/resource/colorHSV.text'
        #self.timer = self.create_timer(0.001, self.on_timer)
        
    def declare_param(self):
        #HSV
        self.declare_parameter("Hmin",0)
        self.Hmin = self.get_parameter('Hmin').get_parameter_value().integer_value
        self.declare_parameter("Smin",85)
        self.Smin = self.get_parameter('Smin').get_parameter_value().integer_value
        self.declare_parameter("Vmin",126)
        self.Vmin = self.get_parameter('Vmin').get_parameter_value().integer_value
        self.declare_parameter("Hmax",9)
        self.Hmax = self.get_parameter('Hmax').get_parameter_value().integer_value
        self.declare_parameter("Smax",253)
        self.Smax = self.get_parameter('Smax').get_parameter_value().integer_value
        self.declare_parameter("Vmax",253)
        self.Vmax = self.get_parameter('Vmax').get_parameter_value().integer_value
        self.declare_parameter('refresh',False)
        self.refresh = self.get_parameter('refresh').get_parameter_value().bool_value

    #改成my_picturc里面使用即可
    # def on_timer(self):
    #     ret, frame = self.capture.read()
    #     action = cv.waitKey(10) & 0xFF
    #     frame, binary =self.process(frame, action)
    #     start = time.time()
    #     fps = 1 / (start - self.end)
    #     text = "FPS : " + str(int(fps))
    #     self.end = start
    #     cv.putText(frame, text, (30, 30), cv.FONT_HERSHEY_SIMPLEX, 0.6, (100, 200, 200), 1)
    #     if len(binary) != 0: cv.imshow('frame', ManyImgs(1, ([frame, binary])))
    #     else:cv.imshow('frame', frame)
    #     msg = self.bridge.cv2_to_imgmsg(frame, "bgr8")
    #     self.pub_img.publish(msg)
    #     if action == ord('q') or action == 113:
    #         self.capture.release()
    #         cv.destroyAllWindows()
    # 主逻辑,执行颜色识别、跟踪逻辑        
    def process(self, rgb_img, action):
        self.get_param()
        rgb_img = cv.resize(rgb_img, (640, 480))
        binary = []
        if action != 255: self.get_logger().info(f'action={action}')
        if action == 32: self.Track_state = 'tracking' #空格键值
        elif action == ord('i') or action == ord('I'): self.Track_state = "identify"#识别
        elif action == ord('r') or action == ord('R'): self.Reset() #重置
        elif action == ord('q') or action == ord('Q'): self.cancel() #取消
        self.get_logger().info(f'Track_state={self.Track_state}')
        if self.Track_state == 'init':
            cv.namedWindow(self.windows_name, cv.WINDOW_AUTOSIZE)
            cv.setMouseCallback(self.windows_name, self.onMouse, 0)#鼠标回调函数
            if self.select_flags == True:#绘制roi区域
                cv.line(rgb_img, self.cols, self.rows, (255, 0, 0), 2)
                cv.rectangle(rgb_img, self.cols, self.rows, (0, 255, 0), 2)#画框
                if self.Roi_init[0] != self.Roi_init[2] and self.Roi_init[1] != self.Roi_init[3]:
                    rgb_img, self.hsv_range = self.color.Roi_hsv(rgb_img, self.Roi_init)#提取roi区域的HSV范围
                    self.gTracker_state = True
                    self.dyn_update = True
                else: self.Track_state = 'init'
        elif self.Track_state == "identify":
            if os.path.exists(self.hsv_text): self.hsv_range = read_HSV(self.hsv_text)  # 从文件加载HSV范围
            else: self.Track_state = 'init'
        if self.Track_state != 'init':
            if len(self.hsv_range) != 0:
                rgb_img, binary, self.circle = self.color.object_follow(rgb_img, self.hsv_range)#颜色追踪
                if self.dyn_update == True:#参数更新
                    write_HSV(self.hsv_text, self.hsv_range) # 保存HSV范围到文件
                    self.Hmin  = rclpy.parameter.Parameter('Hmin',rclpy.Parameter.Type.INTEGER,self.hsv_range[0][0])
                    self.Smin  = rclpy.parameter.Parameter('Smin',rclpy.Parameter.Type.INTEGER,self.hsv_range[0][1])
                    self.Vmin  = rclpy.parameter.Parameter('Vmin',rclpy.Parameter.Type.INTEGER,self.hsv_range[0][2])
                    self.Hmax  = rclpy.parameter.Parameter('Hmax',rclpy.Parameter.Type.INTEGER,self.hsv_range[1][0])
                    self.Smax  = rclpy.parameter.Parameter('Smax',rclpy.Parameter.Type.INTEGER,self.hsv_range[1][1])
                    self.Vmax  = rclpy.parameter.Parameter('Vmax',rclpy.Parameter.Type.INTEGER,self.hsv_range[1][2])
                    all_new_parameters = [self.Hmin,self.Smin,self.Vmin,self.Hmax,self.Smax,self.Vmax]
                    self.set_parameters(all_new_parameters)
                    
                    self.dyn_update = False     
        if self.Track_state == 'tracking':
            self.Start_state = True
            if self.circle[2] != 0: threading.Thread(
                target=self.execute, args=(self.circle[0], self.circle[1], self.circle[2])).start()#启动控制线程,发布中心坐标
            if self.point_pose[0] != 0 and self.point_pose[1] != 0: threading.Thread(
                target=self.execute, args=(self.point_pose[0], self.point_pose[1], self.point_pose[2])).start()
        else:
            if self.Start_state == True:
                self.pub_cmdVel.publish(Twist())#停止
                self.Start_state = False
        return rgb_img, binary
    #发布目标位置
    def execute(self, x, y, z):
        position = Position()
        position.anglex = x * 1.0
        position.angley = y * 1.0
        position.distance = z * 1.0
        self.pub_position.publish(position)

    def get_param(self):
        #hsv
        self.Hmin = self.get_parameter('Hmin').get_parameter_value().integer_value
        self.Smin = self.get_parameter('Smin').get_parameter_value().integer_value
        self.Vmin = self.get_parameter('Vmin').get_parameter_value().integer_value
        self.Hmax = self.get_parameter('Hmax').get_parameter_value().integer_value
        self.Smax = self.get_parameter('Smax').get_parameter_value().integer_value
        self.Vmax = self.get_parameter('Vmax').get_parameter_value().integer_value
        self.refresh = self.get_parameter('refresh').get_parameter_value().bool_value
        
    def Reset(self):
        self.hsv_range = ()
        self.circle = (0, 0, 0)
        self.Mouse_XY = (0, 0)
        self.Track_state = 'init'
        for i in range(3): self.pub_position.publish(Position())
        print("succes!!!")
        
    def cancel(self):
        print("Shutting down this node.")
        cv.destroyAllWindows()
    #鼠标回调函数,除了param其他都是自动获取    
    def onMouse(self, event, x, y, flags, param):
        if event == 1:#左键点击
            self.Track_state = 'init'
            self.select_flags = True
            self.Mouse_XY = (x, y)
        if event == 4:#左键放开
            self.select_flags = False
            self.Track_state = 'mouse'
        if self.select_flags == True:
            self.cols = min(self.Mouse_XY[0], x), min(self.Mouse_XY[1], y)
            self.rows = max(self.Mouse_XY[0], x), max(self.Mouse_XY[1], y)
            self.Roi_init = (self.cols[0], self.cols[1], self.rows[0], self.rows[1])
        




class MY_Picture(Node):
    def __init__(self, name):
        super().__init__(name)
        self.bridge = CvBridge()
        self.sub_img = self.create_subscription(
            CompressedImage, '/espRos/esp32camera', self.handleTopic, 1) #获取esp32传来的图像

        self.color_identify = Color_Identify("ColorIdentify")
        self.last_stamp = None
        self.new_seconds = 0
        self.fps_seconds = 1

    #图像回调    
    def handleTopic(self, msg):
        self.last_stamp = msg.header.stamp  
        if self.last_stamp:
            total_secs = Time(nanoseconds=self.last_stamp.nanosec, seconds=self.last_stamp.sec).nanoseconds
            delta = datetime.timedelta(seconds=total_secs * 1e-9)
            seconds = delta.total_seconds()*100

            if self.new_seconds != 0:
                self.fps_seconds = seconds - self.new_seconds

            self.new_seconds = seconds#保留这次的值
        start = time.time()
        frame = self.bridge.compressed_imgmsg_to_cv2(msg)#图像转换
        frame = cv.resize(frame, (640, 480))

        action = cv.waitKey(10) & 0xFF
        frame, binary =self.color_identify.process(frame, action)#执行处理逻辑
        if len(binary) != 0: cv.imshow('frame', ManyImgs(1, ([frame, binary])))
        else:cv.imshow('frame', frame)
        msg = self.bridge.cv2_to_imgmsg(frame, "bgr8")
        self.color_identify.pub_img.publish(msg)       
        
        end = time.time()
        fps = 1/((end - start)+self.fps_seconds) 
        
        text = "FPS : " + str(int(fps))
        
        cv.putText(frame, text, (10,20), cv.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,255), 2)
        if len(binary) != 0: 
            cv.imshow('frame', ManyImgs(1, ([frame, binary])))
        else:
            cv.imshow('frame', frame)

        if action == ord('q') or action == 113:
            cv.destroyAllWindows()

        #rclpy.spin(self.QRdetect)
        



def main():
    rclpy.init()
    esp_img = MY_Picture("My_Picture")
    print("start it")
    try:
        rclpy.spin(esp_img)
    except KeyboardInterrupt:
        pass
    finally:
        esp_img.destroy_node()
        rclpy.shutdown()

主要功能模块说明:

  1. Color_Identify 类

    • 实现颜色识别与跟踪的核心逻辑

    • 支持动态ROI选择、HSV参数调整

    • 通过ROS参数服务器管理HSV范围

    • 提供目标位置发布和运动控制功能

  2. MY_Picture 类

    • 主图像处理节点

    • 订阅摄像头图像话题

    • 集成颜色识别功能

    • 实现FPS计算与图像显示

  3. 关键技术点

    • OpenCV颜色空间转换(BGR->HSV)

    • 形态学操作(开闭运算去噪)

    • 轮廓检测与最小外接圆计算

    • ROS2动态参数管理

    • 多线程控制指令发布

  4. 工作流程

    1. 初始化节点和订阅者/发布者

    2. 接收摄像头图像

    3. 颜色识别处理(支持ROI选择)

    4. 目标位置计算

    5. 控制指令生成与发布

    6. 实时图像显示与状态反馈

#ros lib
import rclpy
from rclpy.node import Node
from std_msgs.msg import Bool
from geometry_msgs.msg import Twist
from sensor_msgs.msg import CompressedImage, LaserScan, Image
from yahboomcar_msgs.msg import Position
from std_msgs.msg import Int32, Bool,UInt16
from cv_bridge import CvBridge
#common lib
import os
import threading
import math
import time
from yahboom_esp32ai_car.astra_common import *
from yahboomcar_msgs.msg import Position

print("import done")

class color_Tracker(Node):
    def __init__(self,name):
        super().__init__(name)
        #create the publisher
        self.pub_cmdVel = self.create_publisher(Twist,'/cmd_vel',10)#速度控制
        self.pub_Servo1 = self.create_publisher(Int32,"servo_s1" , 10)#舵机1角度控制
        self.pub_Servo2 = self.create_publisher(Int32,"servo_s2" , 10)#舵机2角度控制
        #create the subscriber
        self.sub_depth = self.create_subscription(Image,"/image_raw", self.depth_img_Callback, 1)#图像
        self.sub_JoyState = self.create_subscription(Bool,'/JoyState',  self.JoyStateCallback,1)
        self.sub_position = self.create_subscription(Position,"/Current_point",self.positionCallback,1)#目标位置
        self.bridge = CvBridge()
        #初始化变量
        self.minDist = 1500  #最小追踪距离(mm)
        self.Center_x = 0  #目标中心点x坐标
        self.Center_y = 0  #目标中心点y坐标
        self.Center_r = 0  #目标半径
        self.Center_prevx = 0 #前一帧目标x坐标
        self.Center_prevr = 0 #前一帧目标半径
        self.prev_time = 0 #时间戳
        self.prev_dist = 0
        self.prev_angular = 0
        self.Joy_active = False
        self.Robot_Run = False  #运行状态
        self.img_flip = False #图像翻转表示
        self.dist = [] 
        self.encoding = ['8UC3']
        self.linear_PID = (8.0, 0.0, 1.0) #线性pid参数(kp,ki,kd)
        self.angular_PID = (0.5, 0.0, 2.0) #角度pid参数
        self.scale = 1000 #pid缩放系数
        self.PID_init()
        self.PWMServo_X = 0  #舵机1当前角度
        self.PWMServo_Y = 10 #舵机2当前角度
        self.s1_init_angle = Int32()  #舵机1 初始角度
        self.s1_init_angle.data = self.PWMServo_X
        self.s2_init_angle = Int32()  #舵机2 初始角度
        self.s2_init_angle.data = self.PWMServo_Y
        self.declare_param() #声明参数

        #确保角度正常处于中间
        for i in range(10):
            self.pub_Servo1.publish(self.s1_init_angle)
            self.pub_Servo2.publish(self.s2_init_angle)
            time.sleep(0.1) #发布间隔0.1秒
            
        print("init done")
        
    def declare_param(self):
        self.declare_parameter("linear_Kp",20.0)
        self.linear_Kp = self.get_parameter('linear_Kp').get_parameter_value().double_value
        self.declare_parameter("linear_Ki",0.0)
        self.linear_Ki = self.get_parameter('linear_Ki').get_parameter_value().double_value
        self.declare_parameter("linear_Kd",2.0)
        self.linear_Kd = self.get_parameter('linear_Kd').get_parameter_value().double_value
        self.declare_parameter("angular_Kp",0.5)
        self.angular_Kp = self.get_parameter('angular_Kp').get_parameter_value().double_value
        self.declare_parameter("angular_Ki",0.0)
        self.angular_Ki = self.get_parameter('angular_Ki').get_parameter_value().double_value
        self.declare_parameter("angular_Kd",2.0)
        self.angular_Kd = self.get_parameter('angular_Kd').get_parameter_value().double_value
        self.declare_parameter("scale",1000)
        self.scale = self.get_parameter('scale').get_parameter_value().integer_value
        self.declare_parameter("minDistance",1.0)
        self.minDistance = self.get_parameter('minDistance').get_parameter_value().double_value
        
        self.declare_parameter('angular_speed_limit',5.0)
        self.angular_speed_limit = self.get_parameter('angular_speed_limit').get_parameter_value().double_value
        self.pub_Servo1.publish(self.s1_init_angle)
        self.pub_Servo2.publish(self.s2_init_angle)
    #从参数服务器获取最新参数值
    def get_param(self):
        self.linear_Kp = self.get_parameter('linear_Kp').get_parameter_value().double_value
        self.linear_Ki = self.get_parameter('linear_Ki').get_parameter_value().double_value
        self.linear_Kd = self.get_parameter('linear_Kd').get_parameter_value().double_value
        self.angular_Kp = self.get_parameter('angular_Kp').get_parameter_value().double_value
        self.angular_Ki = self.get_parameter('angular_Ki').get_parameter_value().double_value
        self.angular_Kd = self.get_parameter('angular_Kd').get_parameter_value().double_value
        self.scale = self.get_parameter('scale').get_parameter_value().integer_value
        self.minDistance = self.get_parameter('minDistance').get_parameter_value().double_value
        
        self.linear_PID = (self.linear_Kp, self.linear_Ki, self.linear_Kd)
        self.angular_PID = (self.angular_Kp, self.angular_Ki, self.angular_Kd)
        self.minDist = self.minDistance * 1000 #转换最小距离单位(米->毫米)
        

    def PID_init(self):
       # self.linear_pid = simplePID(self.linear_PID[0] / 1000.0, self.linear_PID[1] / 1000.0, self.linear_PID[2] / 1000.0)
        #self.angular_pid = simplePID(self.angular_PID[0] / 100.0, self.angular_PID[1] / 100.0, self.angular_PID[2] / 100.0)
        # PID参数除以scale进行归一化处理
        self.linear_pid = simplePID(
            [0, 0],#目标值占位符
            [self.linear_PID[0]  / float(self.scale), self.linear_PID[0]  / float(self.scale)],
            [self.linear_PID[1]  / float(self.scale), self.linear_PID[1]  / float(self.scale)],
            [self.linear_PID[2]  / float(self.scale), self.linear_PID[2]  / float(self.scale)])
    def depth_img_Callback(self, msg):#图像回调
        if not isinstance(msg, Image): return
        depthFrame = self.bridge.imgmsg_to_cv2(msg, desired_encoding=self.encoding[0])#图像转换
        self.action = cv.waitKey(1) #获取键盘输入
        if self.Center_r != 0:#检测到有效目标(半径!=0)
            now_time = time.time()
            if now_time - self.prev_time > 5:#超过5秒未更新则重置
                if self.Center_prevx == self.Center_x and self.Center_prevr == self.Center_r: self.Center_r = 0
                self.prev_time = now_time
            distance = [0, 0, 0, 0, 0]#距离数据 5点平均
            if 0 < int(self.Center_y - 3) and int(self.Center_y + 3) < 480 and 0 < int(
                self.Center_x - 3) and int(self.Center_x + 3) < 640:#检测目标区域是否在图像范围内
                print("depthFrame: ", len(depthFrame), len(depthFrame[0])) #在目标周围5个点采样距离值
                distance[0] = depthFrame[int(self.Center_y - 3)][int(self.Center_x - 3)]
                distance[1] = depthFrame[int(self.Center_y + 3)][int(self.Center_x - 3)]
                distance[2] = depthFrame[int(self.Center_y - 3)][int(self.Center_x + 3)]
                distance[3] = depthFrame[int(self.Center_y + 3)][int(self.Center_x + 3)]
                distance[4] = depthFrame[int(self.Center_y)][int(self.Center_x)]
                distance_ = 1000.0 #默认值
                num_depth_points = 5
                for i in range(5):
                    if 40 < distance[i].all() < 80000: distance_ += distance[i] #有效距离过滤
                    else: num_depth_points -= 1 #无效点计数-1
                if num_depth_points == 0: distance_ = self.minDist #没有有效点,使用最小近距离
                else: distance_ /= num_depth_points #计算平均距离
                #print("Center_x: {}, Center_y: {}, distance_: {}".format(self.Center_x, self.Center_y, distance_))
                self.execute(self.Center_x, self.Center_y)#执行舵机控制
                self.Center_prevx = self.Center_x
                self.Center_prevr = self.Center_r
        else: #无有效目标停止
            if self.Robot_Run ==True:
                self.pub_cmdVel.publish(Twist()) 
                self.Robot_Run = False
        if self.action == ord('q') or self.action == 113: self.cleanup()
        
    def JoyStateCallback(self, msg):#手柄
        if not isinstance(msg, Bool): return
        self.Joy_active = msg.data
        self.pub_cmdVel.publish(Twist())
        
    def positionCallback(self, msg):#目标位置回调
        if not isinstance(msg, Position): return
        self.get_logger().info(f'positionCallback msg:{str(msg)}')
        self.Center_x = msg.anglex
        self.Center_y = msg.angley
        self.Center_r = msg.distance
        
    def cleanup(self):
        self.pub_cmdVel.publish(Twist())
        print ("Shutting down this node.")
        cv.destroyAllWindows()
    #舵机控制    
    def execute(self, point_x, point_y):
        #通过PID计算舵机调整量(参数是计算目标位置与图像中心的偏差,图像中心坐标(320,240))
        [x_Pid, y_Pid] = self.linear_pid .update([point_x - 320, point_y - 240])
        if self.img_flip == True:#根据图像翻转标识调整方向
            self.PWMServo_X += x_Pid
            self.PWMServo_Y += y_Pid
        else:
            self.PWMServo_X  -= x_Pid
            self.PWMServo_Y  += y_Pid
        #角度限制(保护舵机)
        if self.PWMServo_X  >= 45:
            self.PWMServo_X  = 45
        elif self.PWMServo_X  <= -45:
            self.PWMServo_X  = -45
        if self.PWMServo_Y >= 20:
            self.PWMServo_Y = 20
        elif self.PWMServo_Y <= -90:
            self.PWMServo_Y = -90

        #rospy.loginfo("target_servox: {}, target_servoy: {}".format(self.target_servox, self.target_servoy))
        print("servo1",self.PWMServo_X)   
        servo1_angle = Int32()
        servo1_angle.data = int(self.PWMServo_X)
        servo2_angle = Int32()
        servo2_angle.data = int(self.PWMServo_Y)
        self.pub_Servo1.publish(servo1_angle)
        self.pub_Servo2.publish(servo2_angle)
        
        
   #PID控制器 
class simplePID:
    '''very simple discrete PID controller'''

    def __init__(self, target, P, I, D):#初始化pid
        '''Create a discrete PID controller
        each of the parameters may be a vector if they have the same length
        Args:
        target (double) -- the target value(s)
        P, I, D (double)-- the PID parameter
        '''
        # check if parameter shapes are compatabile.
        if (not (np.size(P) == np.size(I) == np.size(D)) or ((np.size(target) == 1) and np.size(P) != 1) or (
                np.size(target) != 1 and (np.size(P) != np.size(target) and (np.size(P) != 1)))):
            raise TypeError('input parameters shape is not compatable')
       # rospy.loginfo('P:{}, I:{}, D:{}'.format(P, I, D))
        self.Kp = np.array(P)
        self.Ki = np.array(I)
        self.Kd = np.array(D)
        self.last_error = 0
        self.integrator = 0
        self.timeOfLastCall = None
        self.setPoint = np.array(target)
        self.integrator_max = float('inf')

    def update(self, current_value):
        '''Updates the PID controller. 更新PID控制器状态并返回控制量
        Args:
            current_value (double): vector/number of same legth as the target given in the constructor
        Returns:
            controll signal (double): vector of same length as the target
        '''
        current_value = np.array(current_value)
        if (np.size(current_value) != np.size(self.setPoint)):
            raise TypeError('current_value and target do not have the same shape')
        if (self.timeOfLastCall is None):
            # the PID was called for the first time. we don't know the deltaT yet
            # no controll signal is applied
            self.timeOfLastCall = time.perf_counter()
            return np.zeros(np.size(current_value))
        error = self.setPoint - current_value
        P = error
        currentTime = time.perf_counter()
        deltaT = (currentTime - self.timeOfLastCall)
        # integral of the error is current error * time since last update  积分项计算
        self.integrator = self.integrator + (error * deltaT)
        I = self.integrator
        # derivative is difference in error / time since last update 微分项计算
        D = (error - self.last_error) / deltaT
        self.last_error = error
        self.timeOfLastCall = currentTime
        # return controll signal 输出控制量
        return self.Kp * P + self.Ki * I + self.Kd * D
        

def main():
    rclpy.init()
    color_tracker = color_Tracker("ColorTracker")
    print("start it")
    rclpy.spin(color_tracker)
  • 控制流程

    • 订阅颜色识别节点发布的/Current_point目标位置

    • 通过深度图像计算目标距离

    • 使用PID控制器计算舵机调整量

    • 发布舵机控制指令实现云台跟踪

  • 关键技术点

    • 深度数据处理:在目标区域采样多点深度值进行平均

    • 抗抖动设计:5秒无更新自动重置目标状态

    • 舵机平滑控制:PID输出限幅防止突变

    • 动态参数配置:通过ROS参数服务器实时调整PID参数

  • colorHSV.py

    主要是完成图像处理,计算出被追踪物体的中心坐标。

  • colorTracker.py

    主要是根据被追踪物体的中心坐标和深度信息,计算出舵机运动角度数据给底盘。

 

程序说明

MicroROS机器人颜色追踪,具备可以随时识别多种颜色,并自主储存当前识别的颜色,控制小车云台追随检测识别的颜色。

MicroROS机器人的颜色追踪还可以实现HSV实时调控的功能,通过调节HSV的高低阈值,过滤掉干扰的颜色,使得方块在复杂的环境中能够非常理想的被识别出来,如果在取色中效果不理想的话,这个时候需要将小车移动到不同环境下校准一下,以达到可以在复杂环境中,识别我们所需的颜色。

运行:

#小车代理
sudo docker run -it --rm -v /dev:/dev -v /dev/shm:/dev/shm --privileged --net=host microros/micro-ros-agent:humble udp4 --port 8090 -v4
#摄像头代理(先启动代理再打开小车开关)
docker run -it --rm -v /dev:/dev -v /dev/shm:/dev/shm --privileged --net=host microros/micro-ros-agent:humble udp4 --port 9999 -v4
#启动舵机校准程序#当舵机的角度不处于中间的时候,需要执行以下命令
ros2 run yahboom_esp32_mediapipe control_servo 
启动颜色追踪程序
ros2 run yahboom_esp32ai_car colorHSV
bohu@bohu-TM1701:~/yahboomcar/yahboomcar_ws$ ros2 run yahboom_esp32ai_car colorHSV
import finish
cv_edition:  4.11.0
start it
[INFO] [1738674982.309149874] [ColorIdentify]: Track_state=identify
[INFO] [1738674982.524295927] [ColorIdentify]: Track_state=identify
[INFO] [1738674982.803851561] [ColorIdentify]: Track_state=identify
[INFO] [1738674982.973336708] [ColorIdentify]: Track_state=identify
...
[INFO] [1738675048.670016124] [ColorIdentify]: action=114
succes!!!
[INFO] [1738675048.673837152] [ColorIdentify]: Track_state=init
[INFO] [1738675048.985173614] [ColorIdentify]: Track_state=init
[INFO] [1738675053.159352295] [ColorIdentify]: Track_state=mouse
[INFO] [1738675053.755473605] [ColorIdentify]: Track_state=mouse
[INFO] [1738675054.460844705] [ColorIdentify]: Track_state=mouse
[INFO] [1738675054.804728880] [ColorIdentify]: Track_state=mouse
[INFO] [1738675055.570826483] [ColorIdentify]: Track_state=mouse
[INFO] [1738675251.874084436] [ColorIdentify]: action=32
[INFO] [1738675251.875798862] [ColorIdentify]: Track_state=tracking
[INFO] [1738675252.089742888] [ColorIdentify]: Track_state=tracking
[INFO] [1738675252.282038043] [ColorIdentify]: Track_state=tracking
[INFO] [1738675252.507568662] [ColorIdentify]: Track_state=tracking
[INFO] [1738675252.734285724] [ColorIdentify]: Track_state=tracking
[INFO] [1738675252.956037454] [ColorIdentify]: Track_state=tracking
[INFO] [1738675253.283054540] [ColorIdentify]: Track_state=tracking

程序启动后,会出现以下画面,

然后按下键盘上的r/R键进入选色模式,用鼠标框出一片区域(该区域只能有一中颜色),

选完后的效果

然后,按下空格键进入追踪模式,缓慢移动物体,能看到机器人云台会追踪色块运动。

遇到的问题

OpenCV中waitKey()函数失效问题

就是获取不到按键,导致没法切换。我问了下deepseek.

真的很棒,OpenCV的GUI窗口没有被聚焦。
waitKey() 函数只有在窗口获得焦点的时候才有效,如果焦点在电脑其他窗口上,OpenCV是不会接受按键事件的。鼠标点击GUI窗口就可以获得焦点。

对比下deepseek.我真的真的被自己蠢哭了。亚博官网的代码可能有bug,需要自己去验证下。

另外,测试过程中不能快了,速度快了摄像头跟不上移动就丢了,得慢慢移动才有效果。

以上。


http://www.kler.cn/a/532982.html

相关文章:

  • 飞行汽车中的无刷外转子电机、人形机器人中的无框力矩电机技术解析与应用
  • Nacos 的介绍和使用
  • 2. 【.NET Aspire 从入门到实战】--理论入门与环境搭建--.NET Aspire 概览
  • 【PyQt】getattr动态访问对象的属性
  • 【c++】类与对象详解
  • 新月军事战略分析系统使用手册
  • 安全实验作业
  • 【Hadoop】Hadoop的HDFS
  • Docker技术相关学习二
  • oracle: 表分区>>范围分区,列表分区,散列分区/哈希分区,间隔分区,参考分区,组合分区,子分区/复合分区/组合分区
  • Tag注解
  • C++滑动窗口技术深度解析:核心原理、高效实现与高阶应用实践
  • 2024.1版android studio创建Java语言项目+上传gitee
  • 解决带空格的字符串输入问题:C/C++中的几种常用函数
  • 网络原理(5)—— 数据链路层详解
  • 使用SpringBoot发送邮件|解决了部署时连接超时的bug|网易163|2025
  • Verilog基础(一):基础元素
  • 用C语言实现一个Shell:Tutorial - Write a Shell in C
  • C语言:深入了解指针2(超详细)
  • LLMs瞬间获得视觉与听觉感知,无需专门训练:Meta的创新——在图像、音频和视频任务上实现最优性能。
  • 基于 Java 开发的 MongoDB 企业级应用全解析
  • ZOMI - AISystem AI Infra 分享
  • 【Rust自学】20.1. 最后的项目:单线程Web服务器
  • 基于python热门歌曲采集分析系统
  • 【力扣】53.最大子数组和
  • open-webui启动报错:OSError: [WinError 1314] 客户端没有所需的特权。