当前位置: 首页 > article >正文

【拥抱AI】Milvus 如何处理 TB 级别的大规模向量数据?

处理 TB 级别的大规模向量数据是 Milvus 的核心优势之一。Milvus 通过分布式架构、高效的索引算法和优化的数据管理策略来实现这一目标。下面将详细介绍 Milvus 如何处理 TB 级别向量数据的流程,包括插入代码示例、指令以及流程图。
在这里插入图片描述

1. 分布式架构

Milvus 使用分布式架构来处理大规模数据。它支持水平扩展,可以将数据分布在多个节点上,从而提高存储容量和查询性能。主要组件包括:

  • Milvus Server:负责数据管理和查询。
  • Etcd:用于服务发现和配置管理。
  • MinIO/S3:持久化存储。
  • Pulsar:消息队列,用于数据同步和异步操作。

2. 数据分片

为了处理大规模数据,Milvus 将数据分成多个分片(segments)。每个分片是一个独立的文件,包含一定数量的向量数据。这样可以并行处理数据,提高查询效率。

3. 高效索引

Milvus 支持多种高效的向量索引算法,如 ANNOY、HNSW、IVF-PQ、IVF-SQ8 等。这些索引算法可以在高维空间中快速找到近似最近邻。

4. 插入数据

4.1 创建集合

首先,需要创建一个集合来存储向量数据。集合定义了向量的维度、索引类型等属性。

from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection

# 连接到 Milvus 服务器
connections.connect("default", host="localhost", port="19530")

# 定义字段
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128)
]

# 创建集合模式
schema = CollectionSchema(fields, "Example collection for large-scale vectors")

# 创建集合
collection = Collection("example_collection", schema)
4.2 插入数据

使用 insert 方法将向量数据插入到集合中。

import numpy as np

# 生成随机向量数据
data = [
    [np.random.rand(128).tolist() for _ in range(10000)]  # 10000 条 128 维向量
]

# 插入数据
mr = collection.insert(data)

# 获取插入结果
print(f"Inserted {mr.insert_count} entities with primary keys: {mr.primary_keys}")

5. 建立索引

在插入大量数据后,建立索引以提高查询效率。

# 定义索引参数
index_params = {
    "index_type": "IVF_FLAT",
    "params": {"nlist": 128},
    "metric_type": "L2"
}

# 建立索引
collection.create_index(field_name="embedding", index_params=index_params)

6. 加载集合

加载集合到内存中,以便进行查询。

# 加载集合
collection.load()

7. 查询数据

使用 search 方法进行相似性搜索。

# 定义查询向量
query_vectors = [np.random.rand(128).tolist()]

# 定义搜索参数
search_params = {
    "metric_type": "L2",
    "params": {"nprobe": 10}
}

# 执行搜索
results = collection.search(query_vectors, "embedding", search_params, limit=10)

# 输出结果
for result in results:
    print(f"Top 10 similar vectors: {result.ids}")

8. 流程图

以下是处理 TB 级别向量数据的流程图:

+-------------------+       +-----------------+       +--------------------+       +------------------+
|                   |       |                 |       |                    |       |                  |
|  用户输入         |  -->  |  创建集合       |  -->  |  插入数据          |  -->  |  建立索引        |  -->  |  加载集合        |  -->  |  查询数据        |
|  (向量数据)       |       |  (定义模式)     |       |  (批量插入)        |       |  (选择索引类型)  |       |  (加载到内存)    |       |  (相似性搜索)    |
|                   |       |                 |       |                    |       |                  |       |                  |       |                  |
+-------------------+       +-----------------+       +--------------------+       +------------------+       +------------------+       +------------------+
        |                           |                               |                           |                           |                           |
        |                           |                               |                           |                           |                           |
        |                           v                               v                           v                           v                           v
        |                       +-----------------+           +-----------------+           +-----------------+           +-----------------+           +-----------------+
        |                       |  Etcd (配置)    | <---------+  MinIO/S3 (存储)| <--------+  Pulsar (消息队列)| <-------+  Milvus Server   | <-------+  应用程序      |
        |                       +-----------------+           +-----------------+           +-----------------+           +-----------------+           +-----------------+

9. 详细步骤说明

9.1 连接 Milvus 服务器
from pymilvus import connections

# 连接到 Milvus 服务器
connections.connect("default", host="localhost", port="19530")
9.2 创建集合
from pymilvus import FieldSchema, CollectionSchema, DataType, Collection

# 定义字段
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128)
]

# 创建集合模式
schema = CollectionSchema(fields, "Example collection for large-scale vectors")

# 创建集合
collection = Collection("example_collection", schema)
9.3 插入数据
import numpy as np

# 生成随机向量数据
data = [
    [np.random.rand(128).tolist() for _ in range(10000)]  # 10000 条 128 维向量
]

# 插入数据
mr = collection.insert(data)

# 获取插入结果
print(f"Inserted {mr.insert_count} entities with primary keys: {mr.primary_keys}")
9.4 建立索引
# 定义索引参数
index_params = {
    "index_type": "IVF_FLAT",
    "params": {"nlist": 128},
    "metric_type": "L2"
}

# 建立索引
collection.create_index(field_name="embedding", index_params=index_params)
9.5 加载集合
# 加载集合
collection.load()
9.6 查询数据
# 定义查询向量
query_vectors = [np.random.rand(128).tolist()]

# 定义搜索参数
search_params = {
    "metric_type": "L2",
    "params": {"nprobe": 10}
}

# 执行搜索
results = collection.search(query_vectors, "embedding", search_params, limit=10)

# 输出结果
for result in results:
    print(f"Top 10 similar vectors: {result.ids}")

10. 性能优化

  • 调整索引参数:根据数据特性和查询需求调整索引参数,如 nlistnprobe
  • 增加节点数量:通过增加 Milvus 节点数量来提高处理能力和查询速度。
  • 优化硬件配置:使用高性能的 CPU、GPU 和大容量内存来加速计算和存储。
  • 数据预处理:对数据进行归一化、降维等预处理,以减少存储空间和提高查询效率。

11. 监控与维护

  • 监控工具:使用 Prometheus 和 Grafana 监控 Milvus 的运行状态和性能指标。
  • 定期备份:定期备份数据,防止数据丢失。
  • 日志分析:分析日志文件,及时发现和解决问题。

通过上述步骤,Milvus 可以高效地处理 TB 级别的大规模向量数据,并提供高性能的向量检索能力。


http://www.kler.cn/a/415572.html

相关文章:

  • uniapp在App端定义全局弹窗,当打开关闭弹窗会触发onShow、onHide生命周期怎么解决?
  • SpringBoot小知识(2):日志
  • Java设计模式 —— 【创建型模式】工厂模式(简单工厂、工厂方法模式、抽象工厂)详解
  • Docker pull镜像拉取失败
  • 对智能电视直播App的恶意监控
  • 提升数据分析效率:Excel Power Query和Power Pivot的妙用
  • Vue 前端 el-input 如何实现输入框内容始终添加在尾部%
  • 深入探索Facebook的技术生态:社交网络背后的科技创新
  • 【ruby on rails】dup、deep_dup、clone的区别
  • GitLab CVE-2024-8114 漏洞解决方案
  • 自由学习记录(26)
  • 03-11、SpringCloud第十一章,升级篇,分布式链路跟踪Sleuth
  • ML 系列:第 35 节 - 机器学习中的数据可视化
  • 基于单片机的智能药箱设计
  • ESP32开发板在micropython里直接用requests向web服务器发送请求
  • Hive | Hive 表如何查看所有分区
  • Linux环境变量与本地变量
  • 随笔20241126 Kafka 消费者的自动提交与手动提交偏移量详解
  • 【金猿案例展】无锡征信——百望云绿色金融数据要素+数据资产入表服务方案...
  • React进阶面试题目(二)
  • 基于时间维度优化“开源 AI 智能名片 S2B2C 商城小程序”运营策略:提升触达与转化效能
  • 数据分析流程中的Lambda架构,以及数据湖基于Hadoop、Spark的实现
  • 实例讲解MATLAB绘图坐标轴标签旋转
  • 网络安全运行与维护高级 - 题库汇总百题
  • Linux中创建SFTP用户并设置默认文件权限为775的三种方法
  • 虚拟机ubuntu-20.04.6-live-server搭建OpenStack:Victoria(一:工具、环境准备-controller node)