[ROS 系列学习教程] rosbag Python API
ROS 系列学习教程(总目录)
本文目录
- 1. 构造函数与关闭文件
- 2. 属性值
- 3. 写bag文件内容
- 4. 读bag文件内容
- 5. 将bag文件缓存写入磁盘
- 6. 重建 bag 文件索引
- 7. 获取bag文件的压缩信息
- 8. 获取bag文件的消息数量
- 9. 获取bag文件记录的起止时间
- 10. 获取话题信息与消息类型
rosbag 的 Python API 主要位于 rosbag
包的 Bag
类中,通过 import rosbag
导入。
Bag
类中的常用接口如下:
1. 构造函数与关闭文件
class Bag(
f: Any,
mode: str = 'r',
compression: str = Compression.NONE,
chunk_threshold: int = 768 * 1024,
allow_unindexed: bool = False,
options: Any | None = None,
skip_index: bool = False
)
class Compression:
NONE = 'none'
BZ2 = 'bz2'
LZ4 = 'lz4'
close(self)
其中,
-
f
:bag文件 -
mode
:文件操作模式(r, w, a) -
compression
:文件压缩模式,见如上Compression
,默认Compression.NONE
-
chunk_threshold
:Bag 文件中每个块的最大字节数,默认768 * 1024
-
allow_unindexed
:是否允许打开未建立索引的bag文件。说明:在实际使用中,如果你只是想查看或播放bag文件中的所有消息,而不需要基于时间戳进行精确查询,那么你可以将allow_unindexed
设置为True
。但是,如果你打算对bag文件进行基于时间的查询或其他高级操作,最好先使用rosbag index
命令建立索引,并确保在打开bag文件时allow_unindexed
为False
(或者简单地省略该参数,因为它默认为 False)。 -
options
:字典格式,用于统一设置Bag
的参数,目前只支持compression
和chunk_threshold
,源码处理如下:-
if options is not None: if type(options) is not dict: raise ValueError('options must be of type dict') if 'compression' in options: compression = options['compression'] if 'chunk_threshold' in options: chunk_threshold = options['chunk_threshold']
-
-
skip_index
:打开bag文件时是否跳过文件的索引,这可以节省一些内存和加载时间,特别是在处理非常大的bag文件时。然而,这也意味着将无法使用基于索引的高级查询功能,比如按时间戳搜索特定的消息。
2. 属性值
# 只能获取
options # 同上options
filename # bag文件名
version # rosbag的版本号
mode # 文件操作模式(r, w, a)
size # Bag文件的大小(bytes)
# 可设置可获取
compression # 文件压缩模式
chunk_threshold # Bag 文件中每个块的最大字节数
3. 写bag文件内容
write(self, topic, msg, t=None, raw=False, connection_header=None)
其中,
topic
:写入的topic名称msg
:写入的msgt
:时间戳,默认None以当前时间为时间戳raw
:是否已原始数据格式写入,通常不推荐这样做。connection_header
:连接头信息,通常不需要手动设置。通常用于内部处理,不是常规用法的一部分。
代码示例:
import rosbag
import rospkg
from std_msgs.msg import Int32, String
rospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"
bag = rosbag.Bag(bags_path+'/pytest.bag', 'w')
try:
s = String()
s.string = 'hello'
i = Int32()
i.int = 42
bag.write('/chatter', s)
bag.write('/number', i)
finally:
bag.close()
4. 读bag文件内容
read_messages(self,
topics=None,
start_time=None,
end_time=None,
connection_filter=None,
raw=False,
return_connection_header=False)
其中,
topics
:指定读取的topic,可以是一个topic列表,如果不指定,默认读取全部topic(可选)start_time
:通过时间戳过滤消息(消息的最早时间戳,rospy.Time对象)(可选)end_time
:通过时间戳过滤消息(消息的最晚时间戳,rospy.Time对象)(可选)connection_filter
:一个函数,用于过滤连接。它应该接受一个连接对象并返回一个布尔值,以决定是否保留该连接的消息。如果为 None,则不过滤连接。raw
:一个布尔值,指定是否以原始字节形式返回消息。如果为 True,则返回原始字节数据;如果为 False(默认值),则返回解析后的 ROS 消息对象。return_connection_header
: 一个布尔值,如果为 True,则返回的每条消息都会是一个元组,其中第二个元素是连接头信息。如果为 False(默认值),则只返回消息本身。- 返回值:以(topic, message, timestamp)格式返回
代码示例:
import rosbag
import rospkg
rospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"
bag = rosbag.Bag(bags_path+'/pytest.bag')
for topic, msg, t in bag.read_messages(topics=['/chatter', '/number']):
print(f"Received message on topic {topic} at time {t}: {msg}")
bag.close()
结果如下:
5. 将bag文件缓存写入磁盘
flush(self)
当你使用 write()
写入数据到 bag 文件时,数据可能不会立即被写入磁盘,而是会先被缓存起来以提高性能。调用 flush()
方法可以强制将这些缓存的数据写入到磁盘中,以确保所有挂起的数据都被写入到 bag 文件中。
它没有参数,并且执行后没有返回值。
在以下情况下,可能需要调用 flush()
方法:
-
确保数据持久化:当你想要确保所有已经写入 rosbag.Bag 对象的数据都已经持久化到磁盘上时,可以调用
flush()
。这在你准备关闭 bag 文件或程序即将退出时特别有用,以确保不会有数据丢失。 -
实时备份:如果你正在实时记录数据到 bag 文件,并且想要定期备份这些数据,你可以在备份之前调用
flush()
,以确保备份时所有需要的数据都已经写入到 bag 文件中。
代码示例:
import rospy
import rosbag
import rospkg
from std_msgs.msg import String, Int32
rospy.init_node('bag_writer')
rospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"
# 创建一个bag文件用于写入
with rosbag.Bag(bags_path + '/flush.bag', 'w') as bag:
# 写入一条消息
msg = String(data='Hello, ROSbag!')
bag.write('/chatter', msg, rospy.Time.now())
# 在写入更多消息之前调用flush()
bag.flush()
# 这里可以继续写入更多消息
msg = Int32(data=25)
bag.write('/number', msg, rospy.Time.now())
# 在退出with块之前,flush()会被自动调用(如果需要的话)
查看bag文件信息如下:
6. 重建 bag 文件索引
reindex(self)
当使用 rosbag 记录或播放数据时,rosbag 会维护一个内部索引,以便能够高效地检索和访问数据。然而,有时索引可能会损坏或变得不一致,这时就需要使用 reindex
方法来重新构建索引。
reindex(self)
方法没有参数,它会对当前打开的 bag 文件执行索引重建操作。重建索引可能需要一些时间,具体取决于 bag 文件的大小和内容。
以下是可能需要使用 reindex
方法的一些场景:
-
索引损坏:如果你怀疑 bag 文件的索引已经损坏或不一致,导致无法正常访问数据,你可以尝试使用
reindex
方法来修复它。 -
添加新数据:如果你在 bag 文件关闭后向其中添加了新数据,但没有重新构建索引,那么这些新数据将不会被包含在旧的索引中。在这种情况下,你需要调用
reindex
方法来更新索引,以便能够访问这些新数据。 -
优化性能:有时,即使索引没有损坏,重新构建索引也可能有助于提高访问数据的性能。特别是当 bag 文件非常大或包含大量数据时,重建索引可以帮助优化数据结构,提高检索速度。
代码示例:
import rosbag
import rospkg
rospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"
with rosbag.Bag(bags_path+'/pytest.bag', 'r') as bag:
try:
for topic, msg, t in bag.read_messages():
print(f"Received message on topic {topic} at time {t}: {msg}")
except Exception as e:
print(f"An error occurred while reading the bag file: {e}")
print("Reindexing the bag file...")
bag.reindex() # 尝试重新索引 bag 文件
print("Reindexing completed.")
# 现在可以正常使用 bag 文件中的数据了
7. 获取bag文件的压缩信息
get_compression_info(self)
这个方法返回一个tuple(str, int, int),其中包含了关于 bag 文件压缩的详细信息,每一位表示如下:
tuple[0]
:压缩格式,例如none
(表示没有压缩)或bz2
(表示使用了 bzip2 压缩)。tuple[1]
:未压缩的数据大小(以字节为单位)tuple[2]
:压缩后的数据大小(以字节为单位)
代码示例:
import rosbag
import rospkg
rospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"
# 打开一个存在的 bag 文件
with rosbag.Bag(bags_path+'/pytest.bag', 'r') as bag:
# 获取压缩信息
compression_info = bag.get_compression_info()
# 打印压缩信息
if compression_info:
print("Compression Type:", compression_info[0])
print("UnCompressed Size:", compression_info[1])
print("compressed Size:", compression_info[2])
else:
print("The bag file is not compressed.")
8. 获取bag文件的消息数量
get_message_count(self, topic_filters=None)
这个方法允许你指定一个或多个话题过滤器(topic_filters),以便只计算特定话题的消息数。该参数接收一个字符串或字符串列表,用于指定要计算消息数的话题。如果未提供或设置为 None,则计算 bag 文件中所有话题的消息数。
代码示例:
import rosbag
import rospkg
rospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"
# 打开一个存在的 bag 文件
with rosbag.Bag(bags_path+'/pytest.bag', 'r') as bag:
# 定义我们想要计算消息数的话题过滤器列表
topic_filters = ['/chatter', '/number']
# 获取指定话题的消息数量
message_counts = bag.get_message_count(topic_filters=topic_filters)
# 打印话题的消息数量
print(f"Message Count: {message_counts}")
9. 获取bag文件记录的起止时间
get_start_time(self)
get_end_time(self)
get_start_time
函数的返回类型是 float,表示以秒为单位的时间戳,其中的小数部分,表示秒的分数。
代码示例:
import rosbag
import rospkg
from datetime import datetime
rospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"
# 打开一个存在的 bag 文件
with rosbag.Bag(bags_path+'/pytest.bag', 'r') as bag:
# 获取 bag 文件的开始结束时间
start_time = bag.get_start_time()
end_time = bag.get_end_time()
# 将时间转换为更易读的字符串格式
start_time_str = datetime.fromtimestamp(start_time)
end_time_str = datetime.fromtimestamp(end_time)
# 打印开始结束时间
print(f"Bag file start time: {start_time_str}")
print(f"Bag file end time: {end_time_str}")
运行结果如下:
10. 获取话题信息与消息类型
get_type_and_topic_info(self, topic_filters=None)
其中,topic_filters
用于过滤指定的话题,如果没有提供,则分析所有话题。
函数返回值类型如下:
TypesAndTopicsTuple(dict(str, str),
dict(str, TopicTuple(str, int, int, float)))
其中各值说明如下:
TypesAndTopicsTuple(
msg_types{key:type name, val: md5hash},
topics{key: topic name,
value: TopicTuple(msg_type: msg type (Ex. "std_msgs/String"),
message_count: the number of messages of the particular type,
connections: the number of connections,
frequency: the data frequency)})
其中,
msg_types
:是一个字典,key为msg类型,value为msgMD5值。topics
:是一个字典,key为topic名称,value是一个元组,其中:msg_type
:该topic的消息类型message_count
:该topic的消息数量connections
:该topic的连接数量frequency
:该topic的数据频率
代码示例:
import rosbag
import rospkg
from datetime import datetime
rospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"
# 打开一个存在的 bag 文件
with rosbag.Bag(bags_path+'/pytest.bag', 'r') as bag:
topic_filters = ['/chatter', '/number']
msg_types, topics = bag.get_type_and_topic_info(topic_filters)
print("Message Types:")
for type_name, md5_hash in msg_types.items():
print(f" {type_name}: {md5_hash}")
print("Topics:")
for type_name, topic_info in topics.items():
print(" Topic: {}, Type: {}, MessageCount: {}, Connections: {}, Frequency: {}".format(
type_name,
topic_info.msg_type,
topic_info.message_count,
topic_info.connections,
topic_info.frequency))
运行结果: