Milvus 数据批量导出实战:Python 代码解析
1 引言
由于 Milvus 在单次查询中所能返回的数据量存在固有约束,当处理数据量庞大的 Collection 时,需考虑采用多次查询的策略。本文详细阐述了如何运用多次查询的方式,将 Milvus 中的数据进行分批导出,以有效应对数据量过大带来的挑战 。
2 代码整体概述
我们的目标是从 Milvus 中导出指定集合的数据,并将其保存为 JSON 文件。代码主要分为以下几个部分:
- 连接到 Milvus 服务。
- 定义数据导出函数,包括数据查询和保存逻辑。
- 在主程序中调用导出函数,实现批量导出。
- 释放资源并断开连接。
3 关键代码解析
3.1 连接到 Milvus 服务
connections.connect(
alias="default",
host='127.0.0.1',
port='19530',
user='root',
password='Milvus'
)
通过 connections.connect 方法连接到本地的 Milvus 服务,指定了主机地址、端口号、用户名和密码。
3.2 数据导出函数 export_milvus_colleciton
def export_milvus_colleciton(begin_pos, out_dir):
expr = f'id>{
str(begin_pos)}'
output_fields = [field.name for field in collection.schema.fields]
batch_size = 1000
offset = 0
all_data = []
max_pos = 0
while True:
results = collection.query(
expr=expr,
output_fields=output_fields,
limit=batch_size,
offset=offset
)
if not results:
break
batch_data = []
for result in results:
item = {
}
for field_name in output_fields:
item[field_name] = result[field_name