激光雷达不够用,怎么办?Ubuntu如何用一个激光雷达实现两个激光雷达的扫描点云效果?点云配准ICP,点云拼接、话题转换、ROS重录制bag包。
1.首先至少得有一个激光雷达,如果没有的话,可以考虑花呗买一个,毕竟研究这东西,有个实物还是比较稳妥,这里我选择的是Livox Mid-360,哈哈哈,公司的,大概长这样:
2. 比如我们想要用激光雷达去扫描一个货车,由于激光雷达扫到的面有限,无法获得具体的形状比如:
单独从上述图片不能很好的知道货车的三维形状,怎么办,但是如果有两个激光雷达进行拼接就好了,按照这个思路,首先分别录制上述两个图片所示的bag包。
3.首先对其中一个bag进行重新录制,因为我们只有一个激光雷达,其ip地址是一样的,就算换不同的位置录制,其bag包话题不能自动更改,对于我这个雷达,其话题一直是/livox/lidar.呢么我们就要想办法给他转成其它话题,比如我转到/livox/lidar2,可以先启动核心,循环播放自己已有激光雷达的bag包,然后运行以下代码:
roscore
rosbag play xxx.bag -l
import rospy
from sensor_msgs.msg import PointCloud2
class PointCloudRelay:
def __init__(self):
# 初始化节点
rospy.init_node('point_cloud_relay', anonymous=True)
# 设置固定的订阅和发布话题名称
self.input_topic = '/livox/lidar'
self.output_topic = '/livox/lidar2'
# 创建订阅者和发布者
self.point_cloud_sub = rospy.Subscriber(self.input_topic, PointCloud2, self.callback)
self.point_cloud_pub = rospy.Publisher(self.output_topic, PointCloud2, queue_size=10)
rospy.loginfo(f"订阅话题: {self.input_topic}")
rospy.loginfo(f"发布话题: {self.output_topic}")
def callback(self, msg):
# 收到点云消息时,直接发布出去
rospy.loginfo("收到点云数据,重新发布中...")
self.point_cloud_pub.publish(msg)
def run(self):
rospy.spin()
if __name__ == '__main__':
try:
relay = PointCloudRelay()
relay.run()
except rospy.ROSInterruptException:
pass
4.这样当前bag包的话题/livox/lidar就转成了/livox/lidar2,然后运行以下命令录制该话题为一个新的bag包
rosbag record -o 包名.bag /话题topic
rosbag record -o 11.bag /livox/lidar2
5.此时你已经拥有两个激光雷达的数据了,一个话题为 /livox/lidar的bag包和一个话题为/livox/lidar2的bag包,呢么该如何对此点云进行拼接呢,接下来先对bag包生成pcd文件,对pcd文件利用icp算法进行对准,其中初始矩阵可以通过两个测量位置进行推算旋转平移矩阵,可简单,百度一下就行,比如旋转平移矩阵是一种将物体在空间中进行旋转和移动的数学表示,通常用于三维几何变换。它将旋转矩阵(3x3)和位移向量(3x1)结合成一个4x4的矩阵,其中旋转矩阵描述物体的旋转,位移向量表示平移,我百度的。然后运行以下代码进行标定:
import open3d as o3d
import numpy as np
def load_point_cloud(file_path):
"""加载 .pcd 格式的点云文件"""
point_cloud = o3d.io.read_point_cloud(file_path)
if point_cloud.is_empty():
raise ValueError(f"点云文件 {file_path} 为空或加载失败。")
return point_cloud
def preprocess_point_cloud(pcd, voxel_size):
"""
预处理点云以加速配准。
参数:
- pcd: 输入的点云
- voxel_size: 体素化大小,用于法向量估计
返回:
- 体素化并估计法向量后的点云
"""
pcd = pcd.voxel_down_sample(voxel_size)
pcd.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2, max_nn=30))
return pcd
def apply_icp(source, target, threshold=0.02, init_transformation=None, use_point_to_plane=False):
"""
使用 ICP 进行点云配准。
参数:
- source: 源点云 (o3d.geometry.PointCloud)
- target: 目标点云 (o3d.geometry.PointCloud)
- threshold: 配准的距离阈值
- init_transformation: 初始对齐矩阵 (4x4)
- use_point_to_plane: 是否使用点到平面 ICP 方法 (默认 False)
返回:
- 精确配准后的变换矩阵 (4x4)
"""
print("运行 ICP 配准...")
# 选择 ICP 估计方法:点到点或点到平面
estimation_method = o3d.pipelines.registration.TransformationEstimationPointToPoint()
if use_point_to_plane:
estimation_method = o3d.pipelines.registration.TransformationEstimationPointToPlane()
# 多尺度 ICP 配准
max_iterations = [60, 30, 10] # 多尺度迭代次数
thresholds = [threshold * 5, threshold * 2, threshold] # 多尺度阈值
transformation = init_transformation
for scale, max_iter, th in zip([5, 2, 1], max_iterations, thresholds):
print(f"Scale {scale}: 距离阈值 = {th}, 最大迭代次数 = {max_iter}")
reg_p2p = o3d.pipelines.registration.registration_icp(
source, target, th, transformation, estimation_method,
o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=max_iter)
)
transformation = reg_p2p.transformation
print(f"Scale {scale} 结果变换矩阵:\n", transformation)
return transformation
def main():
# 加载点云文件 (替换为实际路径)
source_pcd_file = "/home/xsl/Downloads/pcl点云处理学习/ct货车车厢识别/pcd1/1.pcd" # 源点云文件路径
target_pcd_file = "/home/xsl/Downloads/pcl点云处理学习/ct货车车厢识别/pcd2/2.pcd" # 目标点云文件路径
source_cloud = load_point_cloud(source_pcd_file)
target_cloud = load_point_cloud(target_pcd_file)
# 预处理点云以估计法向量
voxel_size = 0.2 # 根据点云密度调整体素大小
source_cloud = preprocess_point_cloud(source_cloud, voxel_size)
target_cloud = preprocess_point_cloud(target_cloud, voxel_size)
# 可视化初始状态
print("初始点云对齐可视化")
o3d.visualization.draw_geometries([source_cloud.paint_uniform_color([1, 0, 0]),
target_cloud.paint_uniform_color([0, 1, 0])])
# 初始变换矩阵
initial_transformation = np.array([
[0.319, 0.947, 0.016, 5.343],
[-0.948, 0.319, 0.004, 9.281],
[-0.001, -0.017, 1, -0.296],
[0, 0, 0, 1]
])
# ICP 配准
transformation_matrix = apply_icp(source_cloud, target_cloud, threshold=0.5,
init_transformation=initial_transformation, use_point_to_plane=True)
# 将源点云变换到目标点云坐标系
source_cloud.transform(transformation_matrix)
# 可视化配准结果
print("配准结果可视化")
o3d.visualization.draw_geometries([source_cloud.paint_uniform_color([1, 0, 0]),
target_cloud.paint_uniform_color([0, 1, 0])])
# 保存结果
np.savetxt("transformation_matrix.txt", transformation_matrix, fmt="%.6f")
print("外参矩阵已保存到 transformation_matrix.txt")
if __name__ == "__main__":
main()
会生成点云配准前后的对比效果,比如:
也会生成相应的旋转平移矩阵,因为是多尺度,所以越往下配准精度较高,结果如下图所示:
6.得到这个旋转平移矩阵,有大用,因为可以以这个为依据将其中一个点云数据配准拼接到另一个点云,怎么做,很简单,修改以下代码中的旋转平移矩阵,为你自己的bag包用icp跑出来的旋转平移矩阵。代码如下:
import rospy
import numpy as np
import sensor_msgs.point_cloud2 as pc2
from sensor_msgs.msg import PointCloud2
import std_msgs.msg
class PointCloudMerger:
def __init__(self):
# 初始化节点
rospy.init_node('point_cloud_merger', anonymous=True)
# 设置固定的订阅和发布话题名称
self.input_topic1 = '/livox/lidar'
self.input_topic2 = '/livox/lidar2'
self.output_topic = '/merged_point_cloud'
# 变换矩阵
self.transformation_matrix = np.array([
[0.296464, 0.954431, -0.001073, 5.280927],
[-0.955159, 0.296485, 0.015795, 9.223657],
[0.015717, -0.004231, 1.000011, -0.151635],
[0.000000, 0.000000, 0.000000, 1.000000]
])
# 初始化点云数据缓存
self.point_cloud1 = None
self.point_cloud2 = None
# 创建订阅者和发布者
self.point_cloud_sub1 = rospy.Subscriber(self.input_topic1, PointCloud2, self.callback_point_cloud1)
self.point_cloud_sub2 = rospy.Subscriber(self.input_topic2, PointCloud2, self.callback_point_cloud2)
self.point_cloud_pub = rospy.Publisher(self.output_topic, PointCloud2, queue_size=10)
rospy.loginfo(f"订阅话题: {self.input_topic1} 和 {self.input_topic2}")
rospy.loginfo(f"发布话题: {self.output_topic}")
def callback_point_cloud1(self, msg):
# 缓存第一个点云消息
self.point_cloud1 = msg
self.try_publish_merged_cloud()
def callback_point_cloud2(self, msg):
# 缓存第二个点云消息
self.point_cloud2 = msg
self.try_publish_merged_cloud()
def transform_point_cloud(self, point_cloud_msg, transformation_matrix):
"""使用外参矩阵转换点云坐标"""
# 转换为点云坐标的生成器
points = pc2.read_points(point_cloud_msg, field_names=("x", "y", "z"), skip_nans=True)
# 应用外参矩阵进行坐标变换
transformed_points = [
tuple(np.dot(transformation_matrix, np.array([x, y, z, 1.0]))[:3]) for x, y, z in points
]
# 创建新的点云消息
header = std_msgs.msg.Header()
header.stamp = rospy.Time.now()
header.frame_id = point_cloud_msg.header.frame_id
return pc2.create_cloud_xyz32(header, transformed_points)
def try_publish_merged_cloud(self):
# 如果两个点云数据均已接收到,进行合并并发布
if self.point_cloud1 and self.point_cloud2:
rospy.loginfo("正在合并点云...")
# 转换第二个点云到第一个点云的坐标系
transformed_cloud2 = self.transform_point_cloud(self.point_cloud2, self.transformation_matrix)
# 合并两个点云
points1 = list(pc2.read_points(self.point_cloud1, field_names=("x", "y", "z"), skip_nans=True))
points2 = list(pc2.read_points(transformed_cloud2, field_names=("x", "y", "z"), skip_nans=True))
merged_points = points1 + points2
# 创建并发布合并后的点云消息
header = std_msgs.msg.Header()
header.stamp = rospy.Time.now()
header.frame_id = self.point_cloud1.header.frame_id
merged_cloud_msg = pc2.create_cloud_xyz32(header, merged_points)
self.point_cloud_pub.publish(merged_cloud_msg)
rospy.loginfo("合并后的点云已发布。")
def run(self):
rospy.spin()
if __name__ == '__main__':
try:
merger = PointCloudMerger()
merger.run()
except rospy.ROSInterruptException:
pass
7.然后打开rviz,订阅点云拼接配准后的话题/merged_point_cloud,结果如下所示:
8.呢么怎么知道具体的大小呢,后续考虑用聚类方法获取其位置、倾斜角以及长宽高,下班下班。
最最最最最最重要的是一定保证:新录制bag包的播放,以及另一个未录制bag包的播放,不要播放错了,我研究半天,哈哈哈哈,最后一定改写相应的旋转平移矩阵。