ROS2服务通信与通信接口
ROS服务通信
一、核心特性
特性 | 服务通信 | 话题通信 |
---|---|---|
模式 | 请求-响应 | 发布-订阅 |
方向 | 双向 | 单向 |
同步性 | 同步 | 异步 |
典型应用 | 执行动作/获取数据 | 持续数据流 |
二、通信接口定义(.srv文件)
# 示例:AddTwoInts.srv
int64 a # 请求参数
int64 b
---
int64 sum # 响应参数
三、服务端实现
import rclpy
from example_interfaces.srv import AddTwoInts
def handle_add(request, response):
response.sum = request.a + request.b
return response
def main():
rclpy.init()
node = rclpy.create_node('add_two_ints_server')
srv = node.create_service(
AddTwoInts,
'add_two_ints',
handle_add)
rclpy.spin(node)
四、客户端实现
from example_interfaces.srv import AddTwoInts
async def call_service(node):
client = node.create_client(AddTwoInts, 'add_two_ints')
while not client.wait_for_service(timeout_sec=1.0):
node.get_logger().info('等待服务上线...')
req = AddTwoInts.Request()
req.a = 3
req.b = 4
future = client.call_async(req)
await future
return future.result()
五、通信过程详解
- 服务发现:通过DDS实现自动发现
- 序列化:使用CDR格式进行数据编码
- 超时处理:客户端默认等待1分钟
# 带超时的调用示例
try:
response = client.call(request, timeout_sec=5.0)
except rclpy.exceptions.ServiceException:
print("服务调用超时")
六、常用调试命令
ros2 service list # 查看所有服务
ros2 service type /service_name # 查看服务类型
ros2 interface show srv/AddTwoInts # 查看服务定义
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 5, b: 3}"
七、高级特性
- 服务QoS配置:
from rclpy.qos import QoSProfile
qos = QoSProfile(
depth=10,
reliability=QoSReliabilityPolicy.RELIABLE
)
node.create_service(
AddTwoInts,
'add_two_ints',
handle_add,
qos_profile=qos)
- 服务超载保护:
# 设置并发处理数
executor = rclpy.executors.MultiThreadedExecutor(
num_threads=4)
八、最佳实践
-
设计原则:
- 服务响应时间应 < 1秒
- 避免在回调函数中执行耗时操作
- 使用唯一且明确的服务命名
-
错误处理机制:
class AddTwoIntsService(Node):
def handle_add(self, request, response):
try:
response.sum = request.a + request.b
except Exception as e:
self.get_logger().error(f"计算失败: {str(e)}")
response.sum = -1 # 错误码
return response
通过服务通信机制,可以实现精确的远程过程调用(RPC),非常适合需要确认执行结果的操作场景,如机械臂控制、状态查询等关键任务。实际开发中建议结合动作(Action)使用,处理更复杂的长时间任务。
ROS2通信接口
1. 核心通信机制
类型 | 模式 | 特点 | 典型应用场景 |
---|---|---|---|
Topic | 发布/订阅 | 单向异步,多对多 | 传感器数据流,状态广播 |
Service | 请求/响应 | 同步双向,1对1 | 即时指令执行,状态查询 |
Action | 目标+反馈+结果 | 异步双向,带进度反馈 | 导航任务,机械臂控制 |
Parameter | 键值对存储 | 节点配置管理 | 参数调整,运行配置 |
2. 接口定义规范
2.1 消息接口(.msg)
# geometry_msgs/msg/Twist.msg
Vector3 linear
Vector3 angular
2.2 服务接口(.srv)
# example_interfaces/srv/AddTwoInts.srv
int64 a
int64 b
---
int64 sum
2.3 动作接口(.action)
# action_tutorials_interfaces/action/Fibonacci.action
int32 order
---
int32[] sequence
---
int32[] partial_sequence
3. 接口代码生成
# 在package.xml中添加依赖
<build_depend>rosidl_default_generators</build_depend>
# CMakeLists.txt配置
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/MyMessage.msg"
"srv/MyService.srv"
"action/MyAction.action"
)
4. 通信机制实现对比
4.1 Topic通信示例
# 发布者
pub = node.create_publisher(Twist, 'cmd_vel', 10)
pub.publish(Twist(linear=Vector3(x=0.5)))
# 订阅者
def callback(msg):
print(f'Received: {msg}')
sub = node.create_subscription(Twist, 'cmd_vel', callback, 10)
4.2 Action通信示例
# 服务端
class FibonacciActionServer(Node):
def __init__(self):
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
async def execute_callback(self, goal_handle):
feedback_msg = Fibonacci.Feedback()
# 发送进度反馈
goal_handle.publish_feedback(feedback_msg)
# 返回结果
return Fibonacci.Result()
# 客户端
client = ActionClient(node, Fibonacci, 'fibonacci')
goal = Fibonacci.Goal(order=10)
future = client.send_goal_async(goal)
5. QoS服务质量配置
from rclpy.qos import QoSProfile, QoSReliabilityPolicy
qos_profile = QoSProfile(
depth=10,
reliability=QoSReliabilityPolicy.RELIABLE # 或BEST_EFFORT
)
pub = node.create_publisher(Image, 'camera/image', qos_profile)
6. 参数服务使用
# 声明参数
node.declare_parameter('max_speed', 2.0)
node.declare_parameter('sensor_enabled', True)
# 获取参数
max_speed = node.get_parameter('max_speed').value
# 设置参数回调
node.add_on_set_parameters_callback(param_callback)
7. 接口设计
- 版本控制:接口变更时升级版本号
- 语义明确:字段命名使用lowerCamelCase
- 兼容性:
- 新增字段在末尾添加
- 不删除已存在的字段
- 性能优化:
- 高频数据使用平坦数据结构
- 避免嵌套复杂类型
- 文档规范:在接口文件中添加注释说明字段含义
8. 接口调试工具
# 查看接口列表
ros2 interface list
# 显示接口定义
ros2 interface show sensor_msgs/msg/Image
# 手动发布消息
ros2 topic pub /test_topic std_msgs/msg/String "{data: 'hello'}"
9. 高级特性
9.1 接口组合
# 复合消息定义
Header header
Point center
float32 radius
Color color
9.2 动态接口
from rosidl_runtime_py import set_message_fields
msg = Twist()
set_message_fields(msg, {'linear': {'x': 1.0}})
9.3 接口扩展
# 自定义服务错误码
int32 ERROR_NONE=0
int32 ERROR_TIMEOUT=1
int32 error_code
💡 建议:优先使用标准接口(如sensor_msgs),自定义接口应在功能包内保持高内聚,跨功能包接口建议创建独立interface包。
故事是有生命的,但是它如何生长,取决于读它的人。 —约翰·康诺利