当前位置: 首页 > article >正文

ROS2服务通信与通信接口

ROS服务通信

一、核心特性

特性服务通信话题通信
模式请求-响应发布-订阅
方向双向单向
同步性同步异步
典型应用执行动作/获取数据持续数据流

在这里插入图片描述

二、通信接口定义(.srv文件)

# 示例:AddTwoInts.srv
int64 a   # 请求参数
int64 b
---
int64 sum  # 响应参数

三、服务端实现

import rclpy
from example_interfaces.srv import AddTwoInts

def handle_add(request, response):
    response.sum = request.a + request.b
    return response

def main():
    rclpy.init()
    node = rclpy.create_node('add_two_ints_server')
    srv = node.create_service(
        AddTwoInts, 
        'add_two_ints', 
        handle_add)
    rclpy.spin(node)

四、客户端实现

from example_interfaces.srv import AddTwoInts

async def call_service(node):
    client = node.create_client(AddTwoInts, 'add_two_ints')
    while not client.wait_for_service(timeout_sec=1.0):
        node.get_logger().info('等待服务上线...')
    
    req = AddTwoInts.Request()
    req.a = 3
    req.b = 4
    future = client.call_async(req)
    await future
    return future.result()

五、通信过程详解

  1. 服务发现:通过DDS实现自动发现
  2. 序列化:使用CDR格式进行数据编码
  3. 超时处理:客户端默认等待1分钟
# 带超时的调用示例
try:
    response = client.call(request, timeout_sec=5.0)
except rclpy.exceptions.ServiceException:
    print("服务调用超时")

六、常用调试命令

ros2 service list              # 查看所有服务
ros2 service type /service_name  # 查看服务类型
ros2 interface show srv/AddTwoInts  # 查看服务定义
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 5, b: 3}"

七、高级特性

  1. 服务QoS配置
from rclpy.qos import QoSProfile

qos = QoSProfile(
    depth=10,
    reliability=QoSReliabilityPolicy.RELIABLE
)

node.create_service(
    AddTwoInts,
    'add_two_ints',
    handle_add,
    qos_profile=qos)
  1. 服务超载保护
# 设置并发处理数
executor = rclpy.executors.MultiThreadedExecutor(
    num_threads=4)

八、最佳实践

  1. 设计原则

    • 服务响应时间应 < 1秒
    • 避免在回调函数中执行耗时操作
    • 使用唯一且明确的服务命名
  2. 错误处理机制

class AddTwoIntsService(Node):
    def handle_add(self, request, response):
        try:
            response.sum = request.a + request.b
        except Exception as e:
            self.get_logger().error(f"计算失败: {str(e)}")
            response.sum = -1  # 错误码
        return response

通过服务通信机制,可以实现精确的远程过程调用(RPC),非常适合需要确认执行结果的操作场景,如机械臂控制、状态查询等关键任务。实际开发中建议结合动作(Action)使用,处理更复杂的长时间任务。


ROS2通信接口

1. 核心通信机制

类型模式特点典型应用场景
Topic发布/订阅单向异步,多对多传感器数据流,状态广播
Service请求/响应同步双向,1对1即时指令执行,状态查询
Action目标+反馈+结果异步双向,带进度反馈导航任务,机械臂控制
Parameter键值对存储节点配置管理参数调整,运行配置

2. 接口定义规范

2.1 消息接口(.msg)

# geometry_msgs/msg/Twist.msg
Vector3 linear
Vector3 angular

2.2 服务接口(.srv)

# example_interfaces/srv/AddTwoInts.srv
int64 a
int64 b
---
int64 sum

2.3 动作接口(.action)

# action_tutorials_interfaces/action/Fibonacci.action
int32 order
---
int32[] sequence
---
int32[] partial_sequence

3. 接口代码生成

# 在package.xml中添加依赖
<build_depend>rosidl_default_generators</build_depend>

# CMakeLists.txt配置
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/MyMessage.msg"
  "srv/MyService.srv"
  "action/MyAction.action"
)

4. 通信机制实现对比

4.1 Topic通信示例

# 发布者
pub = node.create_publisher(Twist, 'cmd_vel', 10)
pub.publish(Twist(linear=Vector3(x=0.5)))

# 订阅者
def callback(msg):
    print(f'Received: {msg}')
sub = node.create_subscription(Twist, 'cmd_vel', callback, 10)

4.2 Action通信示例

# 服务端
class FibonacciActionServer(Node):
    def __init__(self):
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)
    
    async def execute_callback(self, goal_handle):
        feedback_msg = Fibonacci.Feedback()
        # 发送进度反馈
        goal_handle.publish_feedback(feedback_msg)
        # 返回结果
        return Fibonacci.Result()

# 客户端
client = ActionClient(node, Fibonacci, 'fibonacci')
goal = Fibonacci.Goal(order=10)
future = client.send_goal_async(goal)

5. QoS服务质量配置

from rclpy.qos import QoSProfile, QoSReliabilityPolicy

qos_profile = QoSProfile(
    depth=10,
    reliability=QoSReliabilityPolicy.RELIABLE  # 或BEST_EFFORT
)

pub = node.create_publisher(Image, 'camera/image', qos_profile)

6. 参数服务使用

# 声明参数
node.declare_parameter('max_speed', 2.0)
node.declare_parameter('sensor_enabled', True)

# 获取参数
max_speed = node.get_parameter('max_speed').value

# 设置参数回调
node.add_on_set_parameters_callback(param_callback)

7. 接口设计

  1. 版本控制:接口变更时升级版本号
  2. 语义明确:字段命名使用lowerCamelCase
  3. 兼容性
    • 新增字段在末尾添加
    • 不删除已存在的字段
  4. 性能优化
    • 高频数据使用平坦数据结构
    • 避免嵌套复杂类型
  5. 文档规范:在接口文件中添加注释说明字段含义

8. 接口调试工具

# 查看接口列表
ros2 interface list

# 显示接口定义
ros2 interface show sensor_msgs/msg/Image

# 手动发布消息
ros2 topic pub /test_topic std_msgs/msg/String "{data: 'hello'}"

9. 高级特性

9.1 接口组合

# 复合消息定义
Header header
Point center
float32 radius
Color color

9.2 动态接口

from rosidl_runtime_py import set_message_fields
msg = Twist()
set_message_fields(msg, {'linear': {'x': 1.0}})

9.3 接口扩展

# 自定义服务错误码
int32 ERROR_NONE=0
int32 ERROR_TIMEOUT=1
int32 error_code

💡 建议:优先使用标准接口(如sensor_msgs),自定义接口应在功能包内保持高内聚,跨功能包接口建议创建独立interface包。


故事是有生命的,但是它如何生长,取决于读它的人。 —约翰·康诺利


http://www.kler.cn/a/547436.html

相关文章:

  • 嵌入式 Linux 驱动开发:点灯大法
  • 【Java】ArrayList与LinkedList的性能对比深度解析
  • 探秘 Map 和 Set 底层:二叉搜索树与哈希表的深度解析,解锁高效数据存储秘密!
  • 石子合并
  • 深入探究Linux树状目录结构
  • Ae:常见的光照控件和材质控件
  • DeepSeek API 调用教程(基于 Ollama 实现)
  • 华为IEC104协议对接华为超充小程序
  • DeepSeek的开源核爆:当技术民主化重构AI权力版图
  • 封装neo4j的持久层和服务层
  • 牛客小白月赛110 —— D 智乃与长短期主义者博弈 python 补题 + 题解
  • 多个用户如何共用一根网线传输数据
  • 价目表终止无法内抛成功
  • STM32 I2C通信协议说明
  • 超纯水设备的智能化控制系统为用户带来安全简便的操作体验
  • 数组_有序数组的平方
  • ELK组成及实现原理
  • 【第7章:注意力机制与Transformer模型—7.4 NLP领域的BERT、GPT系列模型】
  • 高精度四则运算
  • 3.从零开始学会Vue--{{生命周期,工程化,组件化}}