【原创】使用ElasticSearch存储向量实现大模型RAG
一、概述
检索增强生成(Retrieval-Augmented Generation,RAG)已成为大型语言模型(LLM)应用的重要架构,通过结合外部知识库来增强模型的回答能力,特别是在处理专业领域知识、最新信息或企业私有数据时。本报告将系统梳理使用 Elasticsearch(ES)作为向量数据库实现 RAG 系统的优缺点,与传统向量数据库及其他存储解决方案的对比,以及基于 Deepseek V3 和 Qwen2.5 大模型的实现方案。
二、Elasticsearch 作为 RAG 向量数据库的优缺点分析
1. 优点
低门槛的独立技术栈
- 一站式解决方案:ES 能够一站式完成向量生成、存储、索引和检索,通过配置即可实现大部分功能
- 成熟的生态系统:作为成熟的搜索引擎,拥有丰富的文档、社区支持和工具集
- 简化的部署和维护:相比需要部署多个组件的解决方案,ES 可以作为单一系统处理所有 RAG 相关任务
高性能和扩展性
- 分布式架构:支持百万级 QPS 和千亿级数据量
- 灵活的扩展能力:可以通过添加节点水平扩展,满足不断增长的数据需求
- 高可用性:内置的分片和复制机制确保系统的可靠性
混合检索能力
- 文本与向量的结合:同时支持传统的全文检索和向量相似性搜索
- 提高检索精度:混合检索策略能够显著提升搜索结果的准确性和多样性
- 处理短查询优势:对于短查询,传统关键词搜索可以弥补纯向量搜索的不足
丰富的数据处理能力
- 强大的文本分析:内置多种语言分析器,支持分词、同义词、停用词等处理
- 结构化和非结构化数据支持:可以同时处理结构化字段和非结构化文本
- 聚合和分析功能:提供丰富的聚合功能,可用于数据分析和可视化
多种相似度计算算法支持
- 余弦相似度(Cosine Similarity):适用于文本语义搜索,不受向量长度影响
- 点积(Dot Product):适用于推荐系统,考虑向量长度和方向
- 欧几里得距离(L2 Norm):适用于图像特征、地理位置等场景
- 脚本评分(Script Score):支持自定义脚本实现更复杂的相似度计算
- 函数评分(Function Score):允许结合衰减函数、字段值等因素调整相似度分数
与大模型的无缝集成
- 简化的集成流程:提供 API 和工具,便于与各种大模型集成
- 灵活的检索配置:可以根据不同大模型的特点调整检索策略
2. 不足
向量搜索性能限制
- 非专用架构:ES 的架构不是专为向量搜索设计的,在大规模向量搜索时性能可能不如专用向量数据库
- 延迟问题:在大规模向量集上,搜索延迟通常为毫秒级,而专用向量数据库可达微秒级
- 资源消耗较高:向量操作可能需要更多的内存和计算资源
向量功能相对有限
- 算法支持有限:支持的向量索引和搜索算法相对较少,主要是 HNSW
- 缺乏专业优化:缺少针对向量操作的专门优化,如 GPU 加速
- 向量维度限制:在处理超高维向量时可能存在效率问题
学习和配置复杂性
- 配置复杂:需要正确配置索引映射、分片策略等
- 调优难度:优化 ES 性能需要专业知识和经验
- 维护成本:需要定期维护和监控集群状态
存储效率问题
- 存储开销:存储向量数据可能需要更多空间
- 索引大小:包含向量的索引通常比纯文本索引大得多
三、Elasticsearch 向量存储与检索原理
1. 向量数据存储原理
Elasticsearch 使用 dense_vector 字段类型来存储向量数据,其工作原理如下:
基本存储结构
- 文档结构:向量被存储为文档的一个字段,与其他字段(如文本、数字等)一起构成完整的文档。
- 向量表示:向量以浮点数数组的形式存储,每个数组元素对应向量的一个维度。
- 索引映射:通过索引映射定义向量字段的属性,包括维度大小、索引方式和相似度计算方法。
存储过程简述
-
1. 向量生成:通过嵌入模型将文本、图像等内容转换为固定维度的向量。
-
2. 文档创建:创建包含向量字段的文档,将向量数据与其他元数据一起存储。
-
3. 分片分配:ES 将文档分配到不同的分片中,每个分片可以位于不同的节点上。
-
4. 磁盘存储:向量数据最终以 Lucene 索引格式存储在磁盘上,同时部分热数据会缓存在内存中。
2. 向量检索原理
Elasticsearch 主要使用 HNSW(Hierarchical Navigable Small World)算法进行向量检索,这是一种近似最近邻(ANN)搜索算法:
HNSW 算法简介
- 多层图结构:HNSW 构建一个多层的图结构,顶层包含少量节点,底层包含所有节点。
- 导航原理:搜索从顶层开始,通过"贪心"策略快速找到大致方向,然后在下层进行更精细的搜索。
- 近似搜索:通过牺牲一定的精确度来换取显著的性能提升。
检索过程
-
1. 查询向量生成:将用户查询转换为向量表示。
-
2. 分片搜索:在每个相关分片上执行向量搜索。
-
3. 相似度计算:根据配置的相似度方法(余弦、点积或 L2 范数)计算查询向量与索引向量的相似度。
-
4. 结果合并:将各分片的搜索结果合并,并按相似度排序。
-
5. 返回结果:返回最相似的文档作为搜索结果。
性能优化参数
- ef_construction:构建索引时的精度参数,值越大索引质量越高但构建越慢。
- ef_search:搜索时的精度参数,值越大搜索结果越精确但速度越慢。
- m:每个节点的最大连接数,影响图的连通性和搜索效率。
3. 多模态向量存储支持
Elasticsearch 不仅可以存储文本向量,还可以存储来自图片、音频和视频等多模态数据的向量表示:
图像向量存储
- 实现原理:使用图像嵌入模型(如 ResNet、ViT 等)将图像转换为向量表示。
- 存储方式:与文本向量相同,使用 dense_vector 字段存储图像向量。
- 应用场景:图像相似度搜索、以图搜图、视觉内容检索。
音频向量存储
- 实现原理:使用音频嵌入模型(如 Wav2Vec、CLAP 等)将音频转换为向量表示。
- 存储方式:同样使用 dense_vector 字段,维度根据音频嵌入模型而定。
- 应用场景:音乐推荐、语音搜索、音频内容分类。
视频向量存储
- 实现原理:可以通过两种方式处理:
- 将视频分解为关键帧,然后使用图像嵌入模型处理每个关键帧。
- 使用专门的视频嵌入模型(如 Video Transformers)直接生成视频向量。
- 存储方式:可以存储单个视频的整体向量,或者存储多个关键帧的向量序列。
- 应用场景:视频内容搜索、相似视频推荐、视频分类。
多模态向量存储的简单示例
# 图像向量存储示例
from PIL import Image
from transformers import AutoFeatureExtractor, AutoModel
import torch
# 加载图像嵌入模型
image_model = AutoModel.from_pretrained("openai/clip-vit-base-patch32")
feature_extractor = AutoFeatureExtractor.from_pretrained("openai/clip-vit-base-patch32")
# 处理图像
image = Image.open("example.jpg")
inputs = feature_extractor(images=image, return_tensors="pt")
with torch.no_grad():
image_features = image_model.get_image_features(**inputs)
image_vector = image_features[0].tolist()
# 存储到Elasticsearch
doc = {
"title": "示例图片",
"description": "这是一张示例图片",
"image_path": "example.jpg",
"image_vector": image_vector # 使用dense_vector字段存储
}
es.index(index="image-vectors", document=doc)
四、Elasticsearch 与其他向量存储解决方案的对比
1. 各向量存储解决方案综合对比
特性 | Elasticsearch | PGVector (PostgreSQL) | Redis Vector | MongoDB Atlas | FAISS | Milvus | Pinecone | Qdrant | Chroma |
架构类型 | 分布式搜索引擎 | 关系型数据库扩展 | 内存数据库扩展 | 文档数据库扩展 | 库/嵌入式 | 分布式向量数据库 | 托管向量数据库 | 向量数据库 | 嵌入式向量数据库 |
部署模式 | 自托管/云服务 | 自托管/云服务 | 自托管/云服务 | 自托管/云服务 | 嵌入式 | 自托管/云服务 | 仅云服务 | 自托管/云服务 | 嵌入式/自托管 |
向量索引算法 | HNSW | HNSW, IVF | HNSW | HNSW | HNSW, IVF, PQ 等多种 | HNSW, IVF, FLAT 等多种 | HNSW 变体 | HNSW | HNSW |
查询性能 | 毫秒级 | 毫秒级 | 亚毫秒级 | 毫秒级 | 微秒-毫秒级 | 微秒-毫秒级 | 毫秒级 | 毫秒级 | 毫秒级 |
扩展性 | 优秀 | 有限 | 有限 | 优秀 | 有限 | 优秀 | 优秀 | 良好 | 有限 |
混合查询 | 原生支持 | 支持 SQL+向量 | 有限支持 | 支持文档+向量 | 不支持 | 支持 | 有限支持 | 支持 | 支持 |
元数据过滤 | 强大 | 强大(SQL) | 有限 | 强大(文档查询) | 有限 | 支持 | 支持 | 支持 | 支持 |
GPU 加速 | 不支持 | 不支持 | 不支持 | 不支持 | 支持 | 支持 | 内部支持 | 不支持 | 不支持 |
存储容量 | TB-PB 级 | TB 级 | GB-TB 级 | TB-PB 级 | 受内存限制 | TB 级 | TB 级 | TB 级 | GB-TB 级 |
全文搜索 | 优秀 | 基础 | 良好 | 基础 | 不支持 | 有限 | 不支持 | 有限 | 有限 |
相似度算法 | 余弦、点积、L2 范数 | 余弦、L2 范数 | 余弦、L2 范数 | 余弦、点积、欧几里得 | 多种(余弦、内积、L2 等) | 多种(余弦、内积、L2 等) | 余弦、点积 | 余弦、点积、欧几里得 | 余弦 |
事务支持 | 有限 | 完整 ACID | 有限 | 文档级事务 | 不支持 | 有限 | 不支持 | 不支持 | 不支持 |
易用性 | 中等 | 高(SQL 熟悉度) | 高 | 高(MongoDB 用户) | 复杂 | 中等 | 简单 | 简单 | 简单 |
维护成本 | 高 | 中等 | 低 | 中等 | 低(嵌入式) | 中 | 低(托管) | 中 | 低 |
社区支持 | 强大 | 强大 | 强大 | 强大 | 活跃 | 活跃 | 有限 | 活跃 | 活跃 |
成本 | 开源,自托管 | 开源,自托管 | 开源+商业版 | 开源+商业版 | 开源,免费 | 开源+商业版 | 商业版,按量付费 | 开源+商业版 | 开源,免费 |
适用场景 | 混合搜索 | 结构化数据+向量 | 高性能、低延迟 | 半结构化数据+向量 | 高性能向量搜索 | 大规模向量管理 | 简单部署、高可用 | 中小规模应用 | 快速原型开发 |
2. 不同相似度计算方法对比
相似度方法 | 数学表达式 | 值域 | 适用场景 | 特点 | 支持的存储系统 |
余弦相似度 | cos(θ) = A·B / (‖A‖·‖B‖) | [-1, 1] | 文本语义搜索 | 不受向量长度影响,只考虑方向 | 全部 |
点积 | A·B = ∑(A_i × B_i) | 无限制 | 推荐系统 | 考虑向量长度和方向,通常需要归一化 | ES, MongoDB, FAISS, Milvus, Pinecone, Qdrant |
欧几里得距离 | ‖A-B‖ = √(∑(A_i - B_i)²) | [0, ∞) | 图像特征、地理位置 | 值越小表示越相似,需要转换为相似度 | ES, PGVector, Redis, MongoDB, FAISS, Milvus, Qdrant |
曼哈顿距离 | ∑|A_i - B_i| | [0, ∞) | 网格空间、特征差异 | 计算简单,对异常值不敏感 | FAISS, Milvus |
杰卡德相似度 | J(A,B) = |A∩B| / |A∪B| | [0, 1] | 集合比较、文档相似度 | 适用于二进制特征或集合 | ES(脚本), PGVector(SQL) |
汉明距离 | H(A,B) = ∑(A_i ⊕ B_i) | [0, dim] | 二进制特征、错误检测 | 计算二进制向量中不同位的数量 | FAISS |
3. 适用场景对比总结
解决方案 | 最适合场景 | 不适合场景 |
Elasticsearch | 需要混合搜索(文本+向量)、已有 ES 基础设施、需要复杂文本处理 | 超大规模纯向量搜索、极低延迟要求、GPU 加速需求 |
PGVector | 已有 PostgreSQL 基础设施、需要事务支持、结构化数据+向量 | 超大规模向量集、分布式部署需求 |
Redis Vector | 超低延迟需求、缓存层向量搜索、临时数据 | 持久化要求高、复杂查询、大规模数据 |
MongoDB Atlas | 半结构化数据、文档+向量混合、MongoDB 用户 | 复杂文本分析、极高性能要求 |
FAISS | 纯向量搜索、算法研究、GPU 加速、嵌入式应用 | 需要持久化、分布式部署、元数据过滤 |
Milvus | 大规模向量管理、需要丰富索引算法、混合查询 | 简单应用、资源受限环境 |
Pinecone | 快速部署、无运维需求、按需扩展 | 自托管需求、成本敏感、复杂查询 |
Qdrant | 中小规模应用、需要良好元数据过滤、开源需求 | 超大规模、GPU 加速需求 |
Chroma | 快速原型开发、简单应用、本地部署 | 企业级应用、大规模数据、高并发 |
五、基于 Elasticsearch 的 RAG 实现方案
1. 系统架构设计
基于 Elasticsearch 实现 RAG 系统的整体架构如下:
-
1. 数据处理层:负责文档的收集、清洗、分块和向量化
-
2. 存储层:使用 Elasticsearch 存储文本内容和向量表示
-
3. 检索层:实现混合检索策略,结合文本和向量搜索
-
4. 生成层:集成 Deepseek V3 或 Qwen2.5 大模型,基于检索结果生成回答
-
5. 应用层:提供用户界面和 API 接口
2. 实现步骤详解
2.1 环境准备
首先,我们需要安装必要的依赖:
# 安装必要的库
pip install elasticsearch langchain langchain-elasticsearch langchain-community langchain-openai
pip install deepseek-ai qwen-api
2.2 Elasticsearch 配置
设置 Elasticsearch 集群并创建适合 RAG 的索引,展示不同相似度计算方法:
from elasticsearch import Elasticsearch
# 连接到Elasticsearch
es = Elasticsearch(
hosts=["http://localhost:9200"],
basic_auth=("user", "password"), # 如果有认证
request_timeout=60
)
# 创建索引映射,使用cosine相似度
cosine_index_mapping = {
"mappings": {
"properties": {
"title": {"type": "text", "analyzer": "standard"},
"content": {"type": "text", "analyzer": "standard"},
"source": {"type": "keyword"},
"content_vector": {
"type": "dense_vector",
"dims": 1536, # 根据嵌入模型调整维度
"index": True,
"similarity": "cosine" # 余弦相似度
}
}
},
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
}
}
# 创建使用余弦相似度的索引
es.indices.create(index="rag-knowledge-cosine", body=cosine_index_mapping)
# 创建索引映射,使用dot_product相似度
dot_product_index_mapping = {
"mappings": {
"properties": {
"title": {"type": "text", "analyzer": "standard"},
"content": {"type": "text", "analyzer": "standard"},
"source": {"type": "keyword"},
"content_vector": {
"type": "dense_vector",
"dims": 1536,
"index": True,
"similarity": "dot_product" # 点积相似度
}
}
},
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
}
}
# 创建使用点积相似度的索引
es.indices.create(index="rag-knowledge-dot-product", body=dot_product_index_mapping)
# 创建索引映射,使用l2_norm相似度
l2_norm_index_mapping = {
"mappings": {
"properties": {
"title": {"type": "text", "analyzer": "standard"},
"content": {"type": "text", "analyzer": "standard"},
"source": {"type": "keyword"},
"content_vector": {
"type": "dense_vector",
"dims": 1536,
"index": True,
"similarity": "l2_norm" # 欧几里得距离(L2范数)
}
}
},
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
}
}
# 创建使用L2范数相似度的索引
es.indices.create(index="rag-knowledge-l2-norm", body=l2_norm_index_mapping)
2.3 文档处理与索引
实现文档的处理、分块和向量化,并分别存储到不同相似度计算方法的索引中:
import os
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings # 也可以使用其他嵌入模型
# 加载文档
loader = DirectoryLoader("./documents", glob="**/*.txt", loader_cls=TextLoader)
documents = loader.load()
# 文档分块
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
)
chunks = text_splitter.split_documents(documents)
# 初始化嵌入模型
embeddings = OpenAIEmbeddings() # 可替换为其他嵌入模型
# 处理文档并索引到不同的Elasticsearch索引
for i, chunk in enumerate(chunks):
# 生成向量嵌入
vector = embeddings.embed_query(chunk.page_content)
# 准备索引文档
doc = {
"title": f"Chunk {i} from {chunk.metadata.get('source', 'unknown')}",
"content": chunk.page_content,
"source": chunk.metadata.get("source", "unknown"),
"content_vector": vector
}
# 索引文档到不同相似度计算方法的索引
es.index(index="rag-knowledge-cosine", document=doc)
# 对于dot_product,通常需要归一化向量
es.index(index="rag-knowledge-dot-product", document=doc)
# 对于l2_norm,直接使用原始向量
es.index(index="rag-knowledge-l2-norm", document=doc)
# 刷新索引
es.indices.refresh(index="rag-knowledge-cosine")
es.indices.refresh(index="rag-knowledge-dot-product")
es.indices.refresh(index="rag-knowledge-l2-norm")
2.4 使用 LangChain 与 Elasticsearch 实现 RAG
下面是使用 LangChain 与 Elasticsearch 结合实现 RAG 的代码,支持不同相似度计算方法:
from langchain_elasticsearch import ElasticsearchStore
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain.schema import Document
# 初始化不同相似度计算方法的Elasticsearch向量存储
es_store_cosine = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="rag-knowledge-cosine",
embedding=embeddings,
es_user="user", # 如果有认证
es_password="password" # 如果有认证
)
es_store_dot_product = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="rag-knowledge-dot-product",
embedding=embeddings,
es_user="user",
es_password="password"
)
es_store_l2_norm = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="rag-knowledge-l2-norm",
embedding=embeddings,
es_user="user",
es_password="password"
)
# 创建不同相似度计算方法的检索器
retriever_cosine = es_store_cosine.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
retriever_dot_product = es_store_dot_product.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
retriever_l2_norm = es_store_l2_norm.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
# 创建多相似度混合检索器
def multi_similarity_retriever(query, top_k=5):
# 从三种不同相似度方法获取结果
cosine_docs = retriever_cosine.get_relevant_documents(query)
dot_product_docs = retriever_dot_product.get_relevant_documents(query)
l2_norm_docs = retriever_l2_norm.get_relevant_documents(query)
# 合并结果
all_docs = cosine_docs + dot_product_docs + l2_norm_docs
# 去重
unique_docs = list({doc.page_content: doc for doc in all_docs}.values())
# 限制返回数量
return unique_docs[:top_k]
# 定义提示模板
template = """
你是一个专业的AI助手。请基于以下提供的上下文信息,回答用户的问题。
如果你无法从上下文中找到答案,请直接说"我无法从提供的信息中找到答案",不要编造信息。
上下文信息:
{context}
用户问题: {question}
回答:
"""
prompt = PromptTemplate(
template=template,
input_variables=["context", "question"]
)
# 格式化文档函数
def format_docs(docs):
return "\n\n".join([doc.page_content for doc in docs])
2.5 集成 Deepseek V3 模型
from langchain_deepseek import DeepseekChat
# 初始化Deepseek模型
deepseek_llm = DeepseekChat(
model_name="deepseek-ai/deepseek-v3",
api_key="your_api_key",
temperature=0.1
)
# 构建RAG链(使用余弦相似度)
deepseek_rag_chain_cosine = (
{"context": retriever_cosine | format_docs, "question": RunnablePassthrough()}
| prompt
| deepseek_llm
| StrOutputParser()
)
# 构建RAG链(使用点积相似度)
deepseek_rag_chain_dot_product = (
{"context": retriever_dot_product | format_docs, "question": RunnablePassthrough()}
| prompt
| deepseek_llm
| StrOutputParser()
)
# 构建RAG链(使用L2范数相似度)
deepseek_rag_chain_l2_norm = (
{"context": retriever_l2_norm | format_docs, "question": RunnablePassthrough()}
| prompt
| deepseek_llm
| StrOutputParser()
)
# 构建RAG链(使用多相似度混合检索)
def answer_with_deepseek_multi_similarity(question):
# 获取多相似度混合检索结果
docs = multi_similarity_retriever(question)
context = format_docs(docs)
# 构建RAG链
chain = (
{"context": RunnablePassthrough(), "question": RunnablePassthrough()}
| prompt
| deepseek_llm
| StrOutputParser()
)
# 生成回答
return chain.invoke({"context": context, "question": question})
# 示例
response = answer_with_deepseek_multi_similarity("什么是向量数据库?")
print(response)
2.6 集成 Qwen2.5 模型
from langchain_community.llms import QianfanLLMEndpoint
# 初始化Qwen模型
qwen_llm = QianfanLLMEndpoint(
model_name="qwen-2.5",
api_key="your_api_key",
secret_key="your_secret_key",
temperature=0.1
)
# 构建RAG链(使用余弦相似度)
qwen_rag_chain_cosine = (
{"context": retriever_cosine | format_docs, "question": RunnablePassthrough()}
| prompt
| qwen_llm
| StrOutputParser()
)
# 构建RAG链(使用点积相似度)
qwen_rag_chain_dot_product = (
{"context": retriever_dot_product | format_docs, "question": RunnablePassthrough()}
| prompt
| qwen_llm
| StrOutputParser()
)
# 构建RAG链(使用L2范数相似度)
qwen_rag_chain_l2_norm = (
{"context": retriever_l2_norm | format_docs, "question": RunnablePassthrough()}
| prompt
| qwen_llm
| StrOutputParser()
)
# 构建RAG链(使用多相似度混合检索)
def answer_with_qwen_multi_similarity(question):
# 获取多相似度混合检索结果
docs = multi_similarity_retriever(question)
context = format_docs(docs)
# 构建RAG链
chain = (
{"context": RunnablePassthrough(), "question": RunnablePassthrough()}
| prompt
| qwen_llm
| StrOutputParser()
)
# 生成回答
return chain.invoke({"context": context, "question": question})
# 示例
response = answer_with_qwen_multi_similarity("Elasticsearch的主要优势是什么?")
print(response)
3. 高级功能实现
3.1 实现混合搜索策略
def enhanced_hybrid_search(query, es_client, index_name, embedding_model, similarity_type="cosine"):
# 生成查询向量
query_vector = embedding_model.embed_query(query)
# 根据不同相似度类型构建脚本
if similarity_type == "cosine":
similarity_script = "cosineSimilarity(params.query_vector, 'content_vector') + 1.0"
elif similarity_type == "dot_product":
similarity_script = "dotProduct(params.query_vector, 'content_vector')"
elif similarity_type == "l2_norm":
# 对于L2范数,较小的值表示更相似,所以我们使用负值或倒数
similarity_script = "1 / (1 + l2Norm(params.query_vector, 'content_vector'))"
else:
raise ValueError(f"不支持的相似度类型: {similarity_type}")
# 构建混合查询
hybrid_query = {
"query": {
"bool": {
"should": [
# 向量搜索部分
{
"script_score": {
"query": {"match_all": {}},
"script": {
"source": similarity_script,
"params": {"query_vector": query_vector}
}
}
},
# 文本搜索部分
{
"match": {
"content": {
"query": query,
"boost": 0.5 # 调整文本搜索的权重
}
}
}
]
}
},
"size": 5
}
# 执行搜索
response = es_client.search(index=index_name, body=hybrid_query)
# 处理结果
results = []
for hit in response["hits"]["hits"]:
results.append({
"score": hit["_score"],
"title": hit["_source"]["title"],
"content": hit["_source"]["content"],
"source": hit["_source"]["source"]
})
return results
3.2 实现自适应相似度选择
def adaptive_similarity_search(query, es_client, embedding_model):
# 分析查询特征,决定使用哪种相似度方法
query_length = len(query.split())
# 短查询(1-3个词)可能更适合关键词搜索
if query_length <= 3:
# 使用混合搜索,但给文本搜索更高权重
results = enhanced_hybrid_search(
query,
es_client,
"rag-knowledge-cosine",
embedding_model,
similarity_type="cosine"
)
# 中等长度查询(4-10个词)
elif query_length <= 10:
# 使用余弦相似度,适合一般语义搜索
results = enhanced_hybrid_search(
query,
es_client,
"rag-knowledge-cosine",
embedding_model,
similarity_type="cosine"
)
# 长查询(>10个词)
else:
# 对于长查询,点积可能更适合捕捉更多语义信息
results = enhanced_hybrid_search(
query,
es_client,
"rag-knowledge-dot-product",
embedding_model,
similarity_type="dot_product"
)
return results
3.3 实现多步检索策略
def multi_step_retrieval(query, es_client, embedding_model):
# 第一步:使用文本搜索获取初步结果
text_query = {
"query": {
"match": {
"content": query
}
},
"size": 20
}
text_response = es_client.search(index="rag-knowledge-cosine", body=text_query)
# 提取文档ID和内容
candidate_docs = []
for hit in text_response["hits"]["hits"]:
candidate_docs.append({
"id": hit["_id"],
"content": hit["_source"]["content"]
})
# 如果文本搜索没有足够结果,添加向量搜索
if len(candidate_docs) < 10:
# 生成查询向量
query_vector = embedding_model.embed_query(query)
# 向量搜索
vector_query = {
"query": {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'content_vector') + 1.0",
"params": {"query_vector": query_vector}
}
}
},
"size": 20 - len(candidate_docs)
}
vector_response = es_client.search(index="rag-knowledge-cosine", body=vector_query)
# 添加到候选文档
for hit in vector_response["hits"]["hits"]:
# 避免重复
if not any(doc["id"] == hit["_id"] for doc in candidate_docs):
candidate_docs.append({
"id": hit["_id"],
"content": hit["_source"]["content"]
})
# 第二步:对候选文档进行重新排序
# 生成查询向量
query_vector = embedding_model.embed_query(query)
# 为每个候选文档计算相似度分数
for doc in candidate_docs:
# 生成文档向量
doc_vector = embedding_model.embed_query(doc["content"])
# 计算余弦相似度
similarity = cosine_similarity(
[query_vector],
[doc_vector]
)[0][0]
doc["similarity_score"] = similarity
# 按相似度排序
ranked_docs = sorted(candidate_docs, key=lambda x: x["similarity_score"], reverse=True)
# 返回前5个最相关文档
return ranked_docs[:5]
3.4 实现查询扩展和改写
def query_expansion(original_query, llm):
# 使用大模型扩展查询
expansion_prompt = f"""
请基于以下原始查询,生成3个不同的扩展查询,以便更全面地搜索相关信息。
每个扩展查询应该保留原始查询的核心意图,但可以添加相关术语、同义词或上下文。
原始查询: {original_query}
扩展查询(每行一个):
"""
# 获取扩展查询
response = llm.invoke(expansion_prompt)
# 解析响应
expanded_queries = [q.strip() for q in response.split("\n") if q.strip()]
# 过滤掉可能的空行或无关行
expanded_queries = [q for q in expanded_queries if len(q) > 5]
# 确保至少有一个查询(原始查询)
if not expanded_queries:
expanded_queries = [original_query]
else:
# 添加原始查询
expanded_queries.append(original_query)
return expanded_queries
def multi_query_retrieval(original_query, es_client, embedding_model, llm):
# 扩展查询
expanded_queries = query_expansion(original_query, llm)
# 对每个扩展查询执行搜索
all_results = []
for query in expanded_queries:
results = enhanced_hybrid_search(
query,
es_client,
"rag-knowledge-cosine",
embedding_model
)
all_results.extend(results)
# 去重
unique_results = []
seen_contents = set()
for result in all_results:
if result["content"] not in seen_contents:
unique_results.append(result)
seen_contents.add(result["content"])
# 重新排序(可以基于原始查询的相似度)
query_vector = embedding_model.embed_query(original_query)
# 为每个结果计算与原始查询的相似度
for result in unique_results:
# 假设我们可以从ES获取内容的向量
content_vector_query = {
"query": {
"term": {
"_id": result["id"]
}
},
"_source": ["content_vector"]
}
vector_response = es_client.search(index="rag-knowledge-cosine", body=content_vector_query)
content_vector = vector_response["hits"]["hits"][0]["_source"]["content_vector"]
# 计算相似度
similarity = cosine_similarity(
[query_vector],
[content_vector]
)[0][0]
result["final_score"] = similarity
# 按最终分数排序
ranked_results = sorted(unique_results, key=lambda x: x["final_score"], reverse=True)
# 返回前5个最相关结果
return ranked_results[:5]
六、总结与结论
1. Elasticsearch 作为 RAG 向量数据库的优势总结
-
1. 全栈解决方案:Elasticsearch 提供了从数据索引到检索的完整解决方案,无需集成多个系统。
-
2. 混合检索能力:结合传统文本搜索和向量搜索的能力是其最大优势,能够显著提高检索质量。
-
3. 成熟的生态系统:作为成熟的搜索引擎,拥有丰富的文档、工具和社区支持。
-
4. 多模态支持:能够存储和检索文本、图像、音频和视频的向量表示,支持多模态 RAG 应用。
-
5. 灵活的相似度计算:支持多种相似度计算方法(余弦、点积、L2 范数),适应不同应用场景。
2. 实施建议
-
1. 选择合适的相似度计算方法:
- 文本语义搜索:优先使用余弦相似度
- 推荐系统:考虑使用点积
- 图像或地理位置搜索:考虑使用 L2 范数
-
2. 优化索引配置:
- 根据数据量和查询频率调整分片数量
- 为频繁查询的索引配置足够的副本
- 合理设置 HNSW 参数,平衡搜索精度和性能
-
3. 实现混合检索策略:
- 结合向量搜索和关键词搜索
- 根据查询特征动态调整搜索策略
- 考虑使用查询扩展和多步检索提高召回率
- 4. 性能优化:
- 对于大规模部署,考虑使用专用节点
- 监控和调整 JVM 堆大小
- 定期优化索引
3. 未来发展方向
-
1. 与专用向量数据库的集成:
- 考虑 Elasticsearch 与专用向量数据库(如 FAISS、Milvus)的混合架构
- 利用 ES 的文本处理和混合查询能力,结合专用向量数据库的高性能向量搜索
-
2. 知识图谱增强:
- 将向量搜索与知识图谱结合,实现 GraphRAG
- 利用实体关系增强检索结果的相关性和可解释性
-
3. 多模态 RAG 应用:
- 扩展到图像、音频和视频内容的检索增强生成
- 实现跨模态检索,如以文搜图、以图搜文
- 4. 自适应检索策略:
- 开发能够根据查询特征和上下文自动选择最佳检索策略的系统
- 利用强化学习优化检索参数
4. 结论
Elasticsearch 作为 RAG 系统的向量数据库具有显著优势,特别是在需要混合检索、多模态支持和成熟生态系统的场景下。虽然在纯向量搜索性能上可能不如专用向量数据库,但其全面的功能和灵活性使其成为构建企业级 RAG 系统的有力选择。
通过合理配置和优化,结合本报告提供的实现方案,可以充分发挥 Elasticsearch 在 RAG 系统中的潜力,为大模型应用提供高质量的知识检索支持。
对于需要构建 RAG 系统的团队,建议根据具体需求和现有技术栈选择合适的向量存储解决方案,Elasticsearch 特别适合已有 ES 基础设施、需要混合检索能力以及处理多模态数据的场景。
【大模型介绍电子书】
快速揭秘DeepSeek背后的AI工作原理
要获取本书全文PDF内容,请在VX后台留言:“AI大模型基础” 或者 “大模型基础” 就会获得电子书的PDF。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/593528.html 如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!