ROS2 python编写 intel realsense D405相机节点通过launch.py启动多个相机并发送图像话题,基于pyrealsense2库
环境配置
ROS2 humble
安装rviz工具:(用于可视化测试我们的脚本)
sudo apt install ros-humble-rviz-visual-tools
ros2 run rviz2 rviz2
文件总体结构树
在工作空间的src/目录下创建了dobot_camera功能包,在该功能包目录下创建了dobot_camera/文件夹用于存放功能包源码及驱动依赖,其中dobot_camera.py为功能包节点源码,定义了一个节点初始化并不断接收一个相机的图像帧并往外发送数据;camera.py及realsense_camera.py是白嫖的越疆机器人 xTrainer基于pyrealsense2的相机驱动代码。
在该功能包目录下创建了launch/文件夹用于存放*.launch.py文件于py/文件夹(注意是放在src/dobot_camera/目录下,不是放在工作空间目录下)
下面给出各个脚本及setup.py文件配置(安装编译后的文件到install/目录下 可以通过ros2 launch 、ros2 run等命令识别)
camera.py
from pathlib import Path
from typing import Optional, Protocol, Tuple
import numpy as np
class CameraDriver(Protocol):
"""Camera protocol.
A protocol for a camera driver. This is used to abstract the camera from the rest of the code.
"""
def read(
self,
img_size: Optional[Tuple[int, int]] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""Read a frame from the camera.
Args:
img_size: The size of the image to return. If None, the original size is returned.
farthest: The farthest distance to map to 255.
Returns:
np.ndarray: The color image.
np.ndarray: The depth image.
"""
class DummyCamera(CameraDriver):
"""A dummy camera for testing."""
def read(
self,
img_size: Optional[Tuple[int, int]] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""Read a frame from the camera.
Args:
img_size: The size of the image to return. If None, the original size is returned.
farthest: The farthest distance to map to 255.
Returns:
np.ndarray: The color image.
np.ndarray: The depth image.
"""
if img_size is None:
return (
np.random.randint(255, size=(480, 640, 3), dtype=np.uint8),
np.random.randint(255, size=(480, 640, 1), dtype=np.uint16),
)
else:
return (
np.random.randint(
255, size=(img_size[0], img_size[1], 3), dtype=np.uint8
),
np.random.randint(
255, size=(img_size[0], img_size[1], 1), dtype=np.uint16
),
)
class SavedCamera(CameraDriver):
def __init__(self, path: str = "example"):
self.path = str(Path(__file__).parent / path)
from PIL import Image
self._color_img = Image.open(f"{self.path}/image.png")
self._depth_img = Image.open(f"{self.path}/depth.png")
def read(
self,
img_size: Optional[Tuple[int, int]] = None,
) -> Tuple[np.ndarray, np.ndarray]:
if img_size is not None:
color_img = self._color_img.resize(img_size)
depth_img = self._depth_img.resize(img_size)
else:
color_img = self._color_img
depth_img = self._depth_img
return np.array(color_img), np.array(depth_img)[:, :, 0:1]
realsense_camera.py
import time
from typing import List, Optional, Tuple
import numpy as np
from .camera import CameraDriver
import cv2
import pyrealsense2 as rs
def get_device_ids() -> List[str]:
ctx = rs.context()
devices = ctx.query_devices()
# print(devices)
device_ids = []
for dev in devices:
dev.hardware_reset()
device_ids.append(dev.get_info(rs.camera_info.serial_number))
time.sleep(2)
return device_ids
class RealSenseCamera(CameraDriver):
def __repr__(self) -> str:
return f"RealSenseCamera(device_id={self._device_id})"
def __init__(self, device_id: Optional[str] = None, flip: bool = False):
import pyrealsense2 as rs
print("init", device_id)
self._device_id = device_id
self._pipeline = rs.pipeline()
config = rs.config()
config.enable_device(device_id)
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 90)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 90)
self._pipeline.start(config)
self._flip = flip
# print(device_id)
for _ in range(50):
self.read()
def read(
self,
img_size: Optional[Tuple[int, int]] = None, # farthest: float = 0.12
) -> Tuple[np.ndarray, np.ndarray]:
"""Read a frame from the camera.
Args:
img_size: The size of the image to return. If None, the original size is returned.
farthest: The farthest distance to map to 255.
Returns:
np.ndarray: The color image, shape=(H, W, 3)
np.ndarray: The depth image, shape=(H, W, 1)
"""
frames = self._pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
color_image = np.asanyarray(color_frame.get_data())
depth_frame = frames.get_depth_frame()
depth_image = np.asanyarray(depth_frame.get_data())
# depth_image = cv2.convertScaleAbs(depth_image, alpha=0.03)
if img_size is None:
image = color_image[:, :, ::-1]
depth = depth_image
else:
image = cv2.resize(color_image, img_size)[:, :, ::-1]
depth = cv2.resize(depth_image, img_size)
# rotate 180 degree's because everything is upside down in order to center the camera
if self._flip:
image = cv2.rotate(image, cv2.ROTATE_180)
depth = cv2.rotate(depth, cv2.ROTATE_180)[:, :, None]
else:
depth = depth[:, :, None]
return image, depth
def _debug_read(camera, save_datastream=False):
cv2.namedWindow("image")
# cv2.namedWindow("depth")
counter = 0
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50]
while True:
# time.sleep(0.1)
tic = time.time()
image, depth = camera.read()
_, image_ = cv2.imencode('.jpg', image, encode_param)
key = cv2.waitKey(1)
cv2.imshow("image", image[:, :, ::-1])
# cv2.imshow("depth", depth)
toc = time.time()
print(image_.shape, "img time(ms): ",(toc - tic)*1000)
counter += 1
if __name__ == "__main__":
device_ids = get_device_ids()
print(f"Found {len(device_ids)} devices")
print(device_ids)
rs = RealSenseCamera(flip=True, device_id=device_ids[0])
im, depth = rs.read()
_debug_read(rs, save_datastream=True)
dobot_camera.py
import rclpy
from rclpy.node import Node
import cv2
import numpy as np
from .realsense_camera import RealSenseCamera
from pathlib import Path
from cv_bridge import CvBridge, CvBridgeError
from sensor_msgs.msg import Image, CompressedImage
import configparser
import os
from ament_index_python.packages import get_package_share_directory
#修改setup.py的entry_points配置即可 "dobot_camera = dobot_camera.xxx:main" #修改此处配置即可
# Thread: camera
npy_list = np.array([np.zeros(480*640*3), np.zeros(480*640*3), np.zeros(480*640*3)])
npy_len_list = np.array([0, 0, 0])
img_list = np.array([np.zeros((480, 640, 3)), np.zeros((480, 640, 3)), np.zeros((480, 640, 3))])
def load_ini_data_camera():
camera_dict = {"top": None, "left": None, "right": None}
package_dic = get_package_share_directory("dobot_camera")
# ini_file_path = str(Path(__file__).parent) + "../config/dobot_settings.ini"
ini_file_path = os.path.join(package_dic,"config","dobot_settings.ini")
ini_file = configparser.ConfigParser()
print(ini_file_path)
ini_file.read(ini_file_path)
print(ini_file)
for _cam in camera_dict.keys():
camera_dict[_cam] = ini_file.get("CAMERA", _cam)
return camera_dict
def run_thread_cam(rs_cam, which_cam):
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50]
while 1:
image_cam, _ = rs_cam.read()
image_cam = image_cam[:, :, ::-1]
img_list[which_cam] = image_cam
_, image_ = cv2.imencode('.jpg', image_cam, encode_param)
npy_list[which_cam][:len(image_)] = image_[:1]
npy_len_list[which_cam] = len(image_)
class Camera_node(Node):
def __init__(
self,
name,
camera_device='front'
):
super().__init__(name)
self.camera_dict = load_ini_data_camera()
self.declare_parameter('camera_device',camera_device)
self.camera_device = self.get_parameter('camera_device').get_parameter_value().string_value
self.publisher = self.create_publisher(Image,f'/camera_{self.camera_device}/image_raw',50)
self.bridge = CvBridge()
self.encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50]
self.rs_cam = RealSenseCamera(flip=True, device_id=self.camera_dict[self.camera_device])
while 1:
image_cam, _ = self.rs_cam.read()
image_cam = image_cam[:, :, ::-1]
# cv2.imshow("1",image_cam)
# cv2.waitKey(1)
imagemsag = self.bridge.cv2_to_imgmsg(image_cam,encoding = 'bgr8')
imagemsag.header.frame_id = "camera_frame"
self.publisher.publish(imagemsag)
# _, image_ = cv2.imencode('.jpg', image_cam, self.encode_param)
def main(args=None):
"""
ros2运行该节点的入口函数
编写ROS2节点的一般步骤
1. 导入库文件
2. 初始化客户端库
3. 新建节点对象
4. spin循环节点
5. 关闭客户端库
"""
rclpy.init(args=args) # 初始化rclpy
node = Camera_node("node_02") # 新建一个节点
# camera init
print("camera init")
# camera_dict = load_ini_data_camera()
# rs1 = RealSenseCamera(flip=True, device_id=camera_dict["top"])
# rs2 = RealSenseCamera(flip=False, device_id=camera_dict["left"])
# rs3 = RealSenseCamera(flip=True, device_id=camera_dict["right"])
# node.get_logger().info("大家好,我是node_02.")
rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
rclpy.shutdown() # 关闭rclpy
dobot_camera.launch.py
'''
Node里面的参数:
executable: 可执行程序
package: 被执行程序所属的功能包
name: 节点名称
namespace: 设置命名空间
exec_name: 设置程序标签
parameters: 设置参数
remappings: 实现话题重映射
arguments: 为节点传参
ros_arguments: 为节点传参
'''
from debian.debtags import output
from launch import LaunchDescription
from launch_ros.actions import Node
from sympy.physics.vector.printing import params
#封装终端指令相关类
#from launch.actions import ExecuteProcess
#from launch.substitutions import FindExecutable
#参数声明与获取
#from launch.actions import DeclareLaunchArgument
#from launch.substitutions import LaunchConfiguration
#文件包含相关
#from launch.actions import IncludeLaunchDescription
#from launch.launch_description_sources import PythonLaunchDescriptionSource
#分组相关
#from launch_ros.actions import PushRosNamespace
#from launch.actions import GroupAction"
#事件相关
#from launch.event_handlers import OnProcessStart, OnProcessExit
#from launch.actions import ExecuteProcess, RegisterEventHandler,LogInfo
#获取功能包下share目录路径
#from ament_index_python.packages import get_package_share_directory
'''
功能:启动多个节点
'''
def generate_launch_description():
turtle1 = Node(package="dobot_camera", #功能包名称
executable="dobot_camera", #节点的可执行程序
namespace='top',
name="top",
output = 'screen',
parameters = [
{'camera_device': "top"} ,
{'publish_freq': "50"},
],
# remappings=[
# ('/turtlesim1/turtle1/cmd_vel', '/turtle1/cmd_vel'),
# ]
) #命名节点名字
turtle2 = Node(package="dobot_camera", #功能包名称
executable="dobot_camera", #节点的可执行程序
namespace='left',
name="left",
output = 'screen',
parameters = [
{'camera_device': "left"} ,
{'publish_freq': "50"},
],
# remappings=[
# ('/turtlesim1/turtle1/cmd_vel', '/turtle1/cmd_vel'),
# ]
) #命名节点名字
turtle3 = Node(package="dobot_camera", #功能包名称
executable="dobot_camera", #节点的可执行程序
namespace='right',
name="right",
output = 'screen',
parameters = [
{'camera_device': "right"} ,
{'publish_freq': "50"},
],
# remappings=[
# ('/turtlesim1/turtle1/cmd_vel', '/turtle1/cmd_vel'),
# ]
) #命名节点名字
#
# turtle2 = Node(package="turtlesim", #功能包名称
# namespace='turtlesim1',
# executable="turtlesim_node", #节点的可执行程序
# name="t2") #命名节点名字
return LaunchDescription([turtle1,turtle2,turtle3]) #自动生成launch文件的函数 [turtle1, turtle2]
setup.py
from setuptools import find_packages, setup
from glob import glob
package_name = 'dobot_camera'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
#config相关
('share/' + package_name + "/config", glob("config/*.ini")),
# launch 文件相关配置
('share/' + package_name + "/launch/py", glob("launch/py/*.launch.py")),
('share/' + package_name + "/launch/xml", glob("launch/xml/*.launch.xml")),
('share/' + package_name + "/launch/yaml", glob("launch/yaml/*.launch.yaml"))
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='robot',
maintainer_email='robot@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
"dobot_camera = dobot_camera.dobot_camera:main" #修改此处配置即可
],
},
)
package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>dobot_camera</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="robot@todo.todo">robot</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>cv-bridge</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<exec_depend>ros2launch</exec_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
编译运行
开一个终端:
colcon build --packages-select dobot_camera
source install/setup.bash
ros2 launch dobot_camera dobot_camera.launch.py
新开一个终端运行rviz2并在rviz中添加三个Image图像进行可视化
ros2 run rviz2 rviz2
其余ros2指令集:
ros2 topic list #查看话题
ros2 topic echo /camera_top/image_raw #查看特定话题
ros2 topic info /camera_top/image_raw
ros2 run dobot_camera dobot_camera