【拥抱AI】Milvus 如何处理 TB 级别的大规模向量数据?
处理 TB 级别的大规模向量数据是 Milvus 的核心优势之一。Milvus 通过分布式架构、高效的索引算法和优化的数据管理策略来实现这一目标。下面将详细介绍 Milvus 如何处理 TB 级别向量数据的流程,包括插入代码示例、指令以及流程图。
1. 分布式架构
Milvus 使用分布式架构来处理大规模数据。它支持水平扩展,可以将数据分布在多个节点上,从而提高存储容量和查询性能。主要组件包括:
- Milvus Server:负责数据管理和查询。
- Etcd:用于服务发现和配置管理。
- MinIO/S3:持久化存储。
- Pulsar:消息队列,用于数据同步和异步操作。
2. 数据分片
为了处理大规模数据,Milvus 将数据分成多个分片(segments)。每个分片是一个独立的文件,包含一定数量的向量数据。这样可以并行处理数据,提高查询效率。
3. 高效索引
Milvus 支持多种高效的向量索引算法,如 ANNOY、HNSW、IVF-PQ、IVF-SQ8 等。这些索引算法可以在高维空间中快速找到近似最近邻。
4. 插入数据
4.1 创建集合
首先,需要创建一个集合来存储向量数据。集合定义了向量的维度、索引类型等属性。
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection
# 连接到 Milvus 服务器
connections.connect("default", host="localhost", port="19530")
# 定义字段
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128)
]
# 创建集合模式
schema = CollectionSchema(fields, "Example collection for large-scale vectors")
# 创建集合
collection = Collection("example_collection", schema)
4.2 插入数据
使用 insert
方法将向量数据插入到集合中。
import numpy as np
# 生成随机向量数据
data = [
[np.random.rand(128).tolist() for _ in range(10000)] # 10000 条 128 维向量
]
# 插入数据
mr = collection.insert(data)
# 获取插入结果
print(f"Inserted {mr.insert_count} entities with primary keys: {mr.primary_keys}")
5. 建立索引
在插入大量数据后,建立索引以提高查询效率。
# 定义索引参数
index_params = {
"index_type": "IVF_FLAT",
"params": {"nlist": 128},
"metric_type": "L2"
}
# 建立索引
collection.create_index(field_name="embedding", index_params=index_params)
6. 加载集合
加载集合到内存中,以便进行查询。
# 加载集合
collection.load()
7. 查询数据
使用 search
方法进行相似性搜索。
# 定义查询向量
query_vectors = [np.random.rand(128).tolist()]
# 定义搜索参数
search_params = {
"metric_type": "L2",
"params": {"nprobe": 10}
}
# 执行搜索
results = collection.search(query_vectors, "embedding", search_params, limit=10)
# 输出结果
for result in results:
print(f"Top 10 similar vectors: {result.ids}")
8. 流程图
以下是处理 TB 级别向量数据的流程图:
+-------------------+ +-----------------+ +--------------------+ +------------------+
| | | | | | | |
| 用户输入 | --> | 创建集合 | --> | 插入数据 | --> | 建立索引 | --> | 加载集合 | --> | 查询数据 |
| (向量数据) | | (定义模式) | | (批量插入) | | (选择索引类型) | | (加载到内存) | | (相似性搜索) |
| | | | | | | | | | | |
+-------------------+ +-----------------+ +--------------------+ +------------------+ +------------------+ +------------------+
| | | | | |
| | | | | |
| v v v v v
| +-----------------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+
| | Etcd (配置) | <---------+ MinIO/S3 (存储)| <--------+ Pulsar (消息队列)| <-------+ Milvus Server | <-------+ 应用程序 |
| +-----------------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+
9. 详细步骤说明
9.1 连接 Milvus 服务器
from pymilvus import connections
# 连接到 Milvus 服务器
connections.connect("default", host="localhost", port="19530")
9.2 创建集合
from pymilvus import FieldSchema, CollectionSchema, DataType, Collection
# 定义字段
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128)
]
# 创建集合模式
schema = CollectionSchema(fields, "Example collection for large-scale vectors")
# 创建集合
collection = Collection("example_collection", schema)
9.3 插入数据
import numpy as np
# 生成随机向量数据
data = [
[np.random.rand(128).tolist() for _ in range(10000)] # 10000 条 128 维向量
]
# 插入数据
mr = collection.insert(data)
# 获取插入结果
print(f"Inserted {mr.insert_count} entities with primary keys: {mr.primary_keys}")
9.4 建立索引
# 定义索引参数
index_params = {
"index_type": "IVF_FLAT",
"params": {"nlist": 128},
"metric_type": "L2"
}
# 建立索引
collection.create_index(field_name="embedding", index_params=index_params)
9.5 加载集合
# 加载集合
collection.load()
9.6 查询数据
# 定义查询向量
query_vectors = [np.random.rand(128).tolist()]
# 定义搜索参数
search_params = {
"metric_type": "L2",
"params": {"nprobe": 10}
}
# 执行搜索
results = collection.search(query_vectors, "embedding", search_params, limit=10)
# 输出结果
for result in results:
print(f"Top 10 similar vectors: {result.ids}")
10. 性能优化
- 调整索引参数:根据数据特性和查询需求调整索引参数,如
nlist
和nprobe
。 - 增加节点数量:通过增加 Milvus 节点数量来提高处理能力和查询速度。
- 优化硬件配置:使用高性能的 CPU、GPU 和大容量内存来加速计算和存储。
- 数据预处理:对数据进行归一化、降维等预处理,以减少存储空间和提高查询效率。
11. 监控与维护
- 监控工具:使用 Prometheus 和 Grafana 监控 Milvus 的运行状态和性能指标。
- 定期备份:定期备份数据,防止数据丢失。
- 日志分析:分析日志文件,及时发现和解决问题。
通过上述步骤,Milvus 可以高效地处理 TB 级别的大规模向量数据,并提供高性能的向量检索能力。