当前位置: 首页 > article >正文

ros2-7.5 做一个自动巡检机器人

7.5.1 需求及设计

又到了小鱼老师带着做最佳实践项目了。需求:做一个在各个房间不断巡逻并记录图像的机器人。

到达目标点后首先通过语音播放到达目标点信息,

再通过摄像头拍摄一张图片保存到本地。

7.5.2 编写巡检控制节点

在chapt7_ws/src下新建功能包,添加rclpy和nav2_simple_commander依赖。

目录src/autopartol_robot/autopartol_robot/新建文件patrol_node.py

from geometry_msgs.msg import PoseStamped,Pose
from nav2_simple_commander.robot_navigator import BasicNavigator,TaskResult
import rclpy
from rclpy.node import Node
import rclpy.time
from tf2_ros import TransformListener, Buffer #坐标监听器
from tf_transformations import quaternion_from_euler,euler_from_quaternion# 欧拉角转换


class PatrolNode(BasicNavigator):
    def __init__(self,node_name='patrol_node'):
        super().__init__(node_name)
        self.buffer_ = Buffer()
        self.listner_ = TransformListener(self.buffer_,self)
        #声明相关参数
        self.declare_parameter('initial_point',[0.0, 0.0, 0.0])
        self.declare_parameter('target_points',[0.0, 0.0, 0.0, 1.0, 1.0, 1.57])
        self.initial_point_ = self.get_parameter('initial_point').value
        self.initial_points_= self.get_parameter('target_points').value

    def get_pose_by_xyyaw(self, x, y, yaw):
        # 通过x,y,yaw return PoseStamped对象     
        pose = PoseStamped()
        pose.header.frame_id = 'map'
        pose.pose.position.x = x
        pose.pose.position.y = y
        quat = quaternion_from_euler(0,0,yaw)
        pose.pose.orientation.x =quat[0]
        pose.pose.orientation.y =quat[1]
        pose.pose.orientation.z =quat[2]
        pose.pose.orientation.w =quat[3]
        return pose
    def init_robot_pose(self):
        #初始化机器人位姿
        self.initial_point_ = self.get_parameter('initial_point').value
        init_pose = self.get_pose_by_xyyaw(self.initial_point_[0],self.initial_point_[1],self.initial_point_[2]) 
        self.setInitialPose(init_pose)
        self.waitUntilNav2Active()#等待导航可用

    def get_target_points(self):
        #通过参数值获取目标点集合
        points = []
        self.target_points_ = self.get_parameter('target_points').value
        for index in range(int(len(self.target_points_)/3)):
            x = self.target_points_[index*3]
            y = self.target_points_[index*3+1]
            yaw = self.target_points_[index*3+2]
            points.append([x,y,yaw])
            self.get_logger().info(f'获取到目标点:{index}->{x},{y},{yaw}')
        return points    

    def nav_to_pose(self,target_pose):
        #导航到指定位姿
        self.waitUntilNav2Active()
        self.goToPose(target_pose)
        while not self.isTaskComplete():
            feedback = self.getFeedback()
            self.get_logger().info(f'剩余距离:{feedback.distance_remaining }')
        reslut = self.getResult()
        self.get_logger().info(f'导航结果:{reslut}')    

    def get_cuurent_pose(self):
        #通过TF获取当前位姿
        while rclpy.ok():
            try:
                result = self.buffer_.lookup_transform('map','base_footprint',
                        rclpy.time.Time(seconds=0),rclpy.time.Duration(seconds=1))
                transform = result.transform
                self.get_logger().info(f'平移:{transform.translation}')
                # self.get_logger().info(f'旋转:{transform.rotation}')
                # ratation_euler = euler_from_quaternion([
                #     transform.rotation.x,
                #     transform.rotation.y,
                #     transform.rotation.z,
                #     transform.rotation.w]
                # )
                # self.get_logger().info(f'平移rpy:{ratation_euler}')
                return transform
            except Exception as e:
                self.get_logger().warn(f'获取坐标转换异常{str(e)}')

def main():
    rclpy.init()
    patrol = PatrolNode() #节点
    patrol.initial_pose()

    while rclpy.ok():
        points = patrol.get_target_points()
        for point in points:
            x,y,yaw = point[0],point[1],point[2]
            target_pose = patrol.get_pose_by_xyyaw(x,y,yaw)
            patrol.nav_to_pose(target_pose)
    rclpy.shutdown()        

基本上吧之前的单接口调用综合起来。

为方便参数配置,在src/autopartol_robot/新建目录config,新增配置文件

patrol_config.yaml

patrol_node:
  ros__parameters:
    initial_point: [0.0, 0.0, 0.0]
    target_points: [
      0.0,0.0,0.0,
      1.0,2.0,3.14,
      -4.5,1.5,1.57,
      -8.0,-5.0,1.57,
      1.0,-5.0,3.14,
      ]

注意目标点的选取是机器人可达的位置,不能选到障碍物。

启动gazebo模拟器,启动nav2.

再启动patrol_node

 ros2 run autopartol_robot patrol_node --ros-args --params-file /home/bohu/chapt7/chapt7_ws/install/autopartol_robot/share/autopartol_robot/config/patrol_config.yaml

会发现等一会开始初始化地图后,开始沿着设定目标点运动。效果如下

也有异常情况,机器人靠墙太近,gazebo看出距离墙还有段距离,但是在rviz里面局部代价地图有一点变形,导致机器以为有障碍物卡住的现象。

7.5.3 添加语音播报功能

在chatpt7_ws/src下新建功能包autopatrol_interfaces,功能包下新建目录srv,srv新新建接口文件

speachText.srv

string text    # 合成文字
---
bool result    # 合成结果

在CMakeLists.txt注册,并在package.xml添加标签

<member_of_group>rosidl_interface_packages</member_of_group>

重新构建

src/autopartol_robot/autopartol_robot/目录下新建文件speaker.py

import rclpy
from rclpy.node import Node
from autopatrol_interfaces.srv import SpeachText
import espeakng


class Speaker(Node):
    def __init__(self):
        super().__init__('speaker')
        self.speech_service_ = self.create_service(SpeachText,'speech_text',
                                                   self.speech_text_callback)
        self.speaker_ = espeakng.Speaker()
        self.speaker_.voice ='zh'

    def speech_text_callback(self,request,response):
        self.get_logger().info(f'正在朗读 {request.text}')
        self.speaker_.say(request.text)
        self.speaker_.wait()
        response.result = True
        return response
    
def main():
    rclpy.init()
    node = Speaker()
    rclpy.spin(node)
    rclpy.shutdown()

修改patrol_node.py,增加语音合成调用

from geometry_msgs.msg import PoseStamped,Pose
from nav2_simple_commander.robot_navigator import BasicNavigator,TaskResult
import rclpy
from rclpy.node import Node
import rclpy.time
from tf2_ros import TransformListener, Buffer #坐标监听器
from tf_transformations import quaternion_from_euler,euler_from_quaternion# 欧拉角转换
from autopatrol_interfaces.srv import SpeachText


class PatrolNode(BasicNavigator):
    def __init__(self,node_name='patrol_node'):
        super().__init__(node_name)
        self.buffer_ = Buffer()
        self.listner_ = TransformListener(self.buffer_,self)
        #声明相关参数
        self.declare_parameter('initial_point',[0.0, 0.0, 0.0])
        self.declare_parameter('target_points',[0.0, 0.0, 0.0, 1.0, 1.0, 1.57])
        self.initial_point_ = self.get_parameter('initial_point').value
        self.initial_points_= self.get_parameter('target_points').value
        self.speech_client_ = self.create_client(SpeachText,'speech_text')

    def get_pose_by_xyyaw(self, x, y, yaw):
        # 通过x,y,yaw return PoseStamped对象     
        pose = PoseStamped()
        pose.header.frame_id = 'map'
        pose.pose.position.x = x
        pose.pose.position.y = y
        quat = quaternion_from_euler(0,0,yaw)
        pose.pose.orientation.x =quat[0]
        pose.pose.orientation.y =quat[1]
        pose.pose.orientation.z =quat[2]
        pose.pose.orientation.w =quat[3]
        return pose
    def init_robot_pose(self):
        #初始化机器人位姿
        self.initial_point_ = self.get_parameter('initial_point').value
        init_pose = self.get_pose_by_xyyaw(self.initial_point_[0],self.initial_point_[1],self.initial_point_[2]) 
        self.setInitialPose(init_pose)
        self.waitUntilNav2Active()#等待导航可用

    def get_target_points(self):
        #通过参数值获取目标点集合
        points = []
        self.target_points_ = self.get_parameter('target_points').value
        for index in range(int(len(self.target_points_)/3)):
            x = self.target_points_[index*3]
            y = self.target_points_[index*3+1]
            yaw = self.target_points_[index*3+2]
            points.append([x,y,yaw])
            self.get_logger().info(f'获取到目标点:{index}->{x},{y},{yaw}')
        return points    

    def nav_to_pose(self,target_pose):
        #导航到指定位姿
        self.waitUntilNav2Active()
        self.goToPose(target_pose)
        while not self.isTaskComplete():
            feedback = self.getFeedback()
            self.get_logger().info(f'剩余距离:{feedback.distance_remaining }')
        reslut = self.getResult()
        self.get_logger().info(f'导航结果:{reslut}')    

    def get_cuurent_pose(self):
        #通过TF获取当前位姿
        while rclpy.ok():
            try:
                result = self.buffer_.lookup_transform('map','base_footprint',
                        rclpy.time.Time(seconds=0),rclpy.time.Duration(seconds=1))
                transform = result.transform
                self.get_logger().info(f'平移:{transform.translation}')
                # self.get_logger().info(f'旋转:{transform.rotation}')
                # ratation_euler = euler_from_quaternion([
                #     transform.rotation.x,
                #     transform.rotation.y,
                #     transform.rotation.z,
                #     transform.rotation.w]
                # )
                # self.get_logger().info(f'平移rpy:{ratation_euler}')
                return transform
            except Exception as e:
                self.get_logger().warn(f'获取坐标转换异常{str(e)}')
    def speech_text(self,text):
        #调用服务合成语音
        while not self.speech_client_.wait_for_service(timeout_sec=1):
            self.get_logger().info("wait for speech service")
        request = SpeachText.Request()
        request.text = text
        future = self.speech_client_.call_async(request)
        rclpy.spin_until_future_complete(self,future)
        if future.result() is not None:
            result = future.result().result
            if result:
                self.get_logger().info(f'语音合成成功:{text}')
            else:
                self.get_logger().warn(f'语音合成失败:{text}')
        else:
            self.get_logger().warn('语音合成服务请求失败')

def main():
    rclpy.init()
    patrol = PatrolNode() #节点
    patrol.speech_text('正在准备初始化位置')
    patrol.init_robot_pose()
    patrol.speech_text('初始化位置完成')

    while rclpy.ok():
        points = patrol.get_target_points()
        for point in points:
            x,y,yaw = point[0],point[1],point[2]
            target_pose = patrol.get_pose_by_xyyaw(x,y,yaw)
            patrol.speech_text(text=f'准备前往目标点{x},{y}')
            patrol.nav_to_pose(target_pose)
    rclpy.shutdown()        

效果跟上面类似,日志输出多了语音的

[speaker-2] [INFO] [1737017187.818829250] [speaker]: 正在朗读 准备前往目标点1.0,2.0
[patrol_node-1] [INFO] [1737017191.194245817] [patrol_node]: 语音合成成功:准备前往目标点1.0,2.0
[patrol_node-1] [INFO] [1737017195.311579068] [patrol_node]: Nav2 is ready for use!
[patrol_node-1] [INFO] [1737017195.314096555] [patrol_node]: Navigating to goal: 1.0 2.0...
[patrol_node-1] [INFO] [1737017195.509438991] [patrol_node]: 剩余距离:0.2445448786020279
[patrol_node-1] [INFO] [1737017195.612344544] [patrol_node]: 剩余距离:2.230710744857788
[patrol_node-1] [INFO] [1737017195.716231929] [patrol_node]: 剩余距离:2.230710744857788
[patrol_node-1] [INFO] [1737017195.819218225] [patrol_node]: 剩余距离:2.230710744857788
[patrol_node-1] [INFO] [1737017195.923079415] [patrol_node]: 剩余距离:2.230710744857788 

7.5.4订阅图像并记录

from geometry_msgs.msg import PoseStamped,Pose
from nav2_simple_commander.robot_navigator import BasicNavigator,TaskResult
import rclpy
from rclpy.node import Node
import rclpy.time
from tf2_ros import TransformListener, Buffer #坐标监听器
from tf_transformations import quaternion_from_euler,euler_from_quaternion# 欧拉角转换
from autopatrol_interfaces.srv import SpeachText
from sensor_msgs.msg import Image #消息接口
from cv_bridge import CvBridge #图像转换
import cv2 #保存图像


class PatrolNode(BasicNavigator):
    def __init__(self,node_name='patrol_node'):
        super().__init__(node_name)
        self.buffer_ = Buffer()
        self.listner_ = TransformListener(self.buffer_,self)
        #声明相关参数
        self.declare_parameter('initial_point',[0.0, 0.0, 0.0])
        self.declare_parameter('target_points',[0.0, 0.0, 0.0, 1.0, 1.0, 1.57])
        self.initial_point_ = self.get_parameter('initial_point').value
        self.initial_points_= self.get_parameter('target_points').value
        self.speech_client_ = self.create_client(SpeachText,'speech_text')
         # 订阅与保存图像相关定义
        self.declare_parameter('image_save_path', '')
        self.image_save_path = self.get_parameter('image_save_path').value
        self.bridge = CvBridge()
        self.latest_img_ = None
        self.image_sub_ = self.create_subscription(Image,'/camera_sensor/image_raw',self.img_callback,1)

    def img_callback(self,msg):
        self.latest_img_ = msg

    def record_img(self):
        if self.latest_img_ is not Node:
           pose = self.get_cuurent_pose()
           cv_image = self.bridge.imgmsg_to_cv2(self.latest_img_)
           cv2.imwrite(f'{self.image_save_path}img_{pose.translation.x:3.2f}_{pose.translation.y:3.2f}.png',
                       cv_image)   

    def get_pose_by_xyyaw(self, x, y, yaw):
        # 通过x,y,yaw return PoseStamped对象     
        pose = PoseStamped()
        pose.header.frame_id = 'map'
        pose.pose.position.x = x
        pose.pose.position.y = y
        quat = quaternion_from_euler(0,0,yaw)
        pose.pose.orientation.x =quat[0]
        pose.pose.orientation.y =quat[1]
        pose.pose.orientation.z =quat[2]
        pose.pose.orientation.w =quat[3]
        return pose
    def init_robot_pose(self):
        #初始化机器人位姿
        self.initial_point_ = self.get_parameter('initial_point').value
        init_pose = self.get_pose_by_xyyaw(self.initial_point_[0],self.initial_point_[1],self.initial_point_[2]) 
        self.setInitialPose(init_pose)
        self.waitUntilNav2Active()#等待导航可用

    def get_target_points(self):
        #通过参数值获取目标点集合
        points = []
        self.target_points_ = self.get_parameter('target_points').value
        for index in range(int(len(self.target_points_)/3)):
            x = self.target_points_[index*3]
            y = self.target_points_[index*3+1]
            yaw = self.target_points_[index*3+2]
            points.append([x,y,yaw])
            self.get_logger().info(f'获取到目标点:{index}->{x},{y},{yaw}')
        return points    

    def nav_to_pose(self,target_pose):
        #导航到指定位姿
        self.waitUntilNav2Active()
        self.goToPose(target_pose)
        while not self.isTaskComplete():
            feedback = self.getFeedback()
            self.get_logger().info(f'剩余距离:{feedback.distance_remaining }')
        reslut = self.getResult()
        self.get_logger().info(f'导航结果:{reslut}')    

    def get_cuurent_pose(self):
        #通过TF获取当前位姿
        while rclpy.ok():
            try:
                result = self.buffer_.lookup_transform('map','base_footprint',
                        rclpy.time.Time(seconds=0),rclpy.time.Duration(seconds=1))
                transform = result.transform
                self.get_logger().info(f'平移:{transform.translation}')
                # self.get_logger().info(f'旋转:{transform.rotation}')
                # ratation_euler = euler_from_quaternion([
                #     transform.rotation.x,
                #     transform.rotation.y,
                #     transform.rotation.z,
                #     transform.rotation.w]
                # )
                # self.get_logger().info(f'平移rpy:{ratation_euler}')
                return transform
            except Exception as e:
                self.get_logger().warn(f'获取坐标转换异常{str(e)}')
    def speech_text(self,text):
        #调用服务合成语音
        while not self.speech_client_.wait_for_service(timeout_sec=1):
            self.get_logger().info("wait for speech service")
        request = SpeachText.Request()
        request.text = text
        future = self.speech_client_.call_async(request)
        rclpy.spin_until_future_complete(self,future)
        if future.result() is not None:
            result = future.result().result
            if result:
                self.get_logger().info(f'语音合成成功:{text}')
            else:
                self.get_logger().warn(f'语音合成失败:{text}')
        else:
            self.get_logger().warn('语音合成服务请求失败')

def main():
    rclpy.init()
    patrol = PatrolNode() #节点
    patrol.speech_text('正在准备初始化位置')
    patrol.init_robot_pose()
    patrol.speech_text('初始化位置完成')

    while rclpy.ok():
        points = patrol.get_target_points()
        for point in points:
            x,y,yaw = point[0],point[1],point[2]
            target_pose = patrol.get_pose_by_xyyaw(x,y,yaw)
            patrol.speech_text(text=f'准备前往目标点{x},{y}')
            patrol.nav_to_pose(target_pose)
            patrol.speech_text(text=f'已到达目标点{x},{y},准备记录图像')
            patrol.record_img()
            patrol.speech_text(text=f'图像记录完成')
    rclpy.shutdown()        

重新构建后运行,拍照效果如下


http://www.kler.cn/a/510480.html

相关文章:

  • Amazon MSK 开启 Public 访问 SASL 配置的方法
  • ReactiveSwift 简单使用
  • 数据结构漫游记:动态实现栈(stack)
  • Android SystemUI——CarSystemBar视图解析(十一)
  • 蓝桥杯刷题第二天——背包问题
  • 02.02、返回倒数第 k 个节点
  • 【Spring MVC】第二站-Spring MVC请求
  • 快速上手 Spring Boot:基础使用详解
  • 生物识别技术是否可以成为应对安全挑战的最佳选择?
  • 嵌入式硬件篇---PID控制
  • 【部署】将项目部署到云服务器
  • 阿九的python 爬虫进阶课18.3 学习笔记
  • 条件决策树(Conditional Decision Trees)算法详解
  • 基于JavaWeb的宠物救助及领养平台的设计与实现
  • Safari常用快捷键
  • 1166 Summit (25)
  • web前端2--标签
  • C# OpenCV机器视觉:常用滤波算法
  • ASP.NET Core 实战:JWT 身份验证
  • mysql官方文档翻译02-一致性非锁定读与一致性锁定读
  • k8s 容器反复重启
  • 配置管理与动态调整:ShardingSphere 的配置方式与实时调整能力
  • 使用pytorch从头实现一个vit
  • 大数据相关组件介绍
  • 第148场双周赛:循环数组中相邻元素的最大差值、将数组变相同的最小代价、最长特殊路径、所有安放棋子方案的曼哈顿距离
  • 第1章:Python TDD基础与乘法功能测试