当前位置: 首页 > article >正文

【原创】使用ElasticSearch存储向量实现大模型RAG

一、概述

检索增强生成(Retrieval-Augmented Generation,RAG)已成为大型语言模型(LLM)应用的重要架构,通过结合外部知识库来增强模型的回答能力,特别是在处理专业领域知识、最新信息或企业私有数据时。本报告将系统梳理使用 Elasticsearch(ES)作为向量数据库实现 RAG 系统的优缺点,与传统向量数据库及其他存储解决方案的对比,以及基于 Deepseek V3 和 Qwen2.5 大模型的实现方案。

二、Elasticsearch 作为 RAG 向量数据库的优缺点分析

1. 优点

低门槛的独立技术栈
  • 一站式解决方案:ES 能够一站式完成向量生成、存储、索引和检索,通过配置即可实现大部分功能
  • 成熟的生态系统:作为成熟的搜索引擎,拥有丰富的文档、社区支持和工具集
  • 简化的部署和维护:相比需要部署多个组件的解决方案,ES 可以作为单一系统处理所有 RAG 相关任务
高性能和扩展性
  • 分布式架构:支持百万级 QPS 和千亿级数据量
  • 灵活的扩展能力:可以通过添加节点水平扩展,满足不断增长的数据需求
  • 高可用性:内置的分片和复制机制确保系统的可靠性
混合检索能力
  • 文本与向量的结合:同时支持传统的全文检索和向量相似性搜索
  • 提高检索精度:混合检索策略能够显著提升搜索结果的准确性和多样性
  • 处理短查询优势:对于短查询,传统关键词搜索可以弥补纯向量搜索的不足
丰富的数据处理能力
  • 强大的文本分析:内置多种语言分析器,支持分词、同义词、停用词等处理
  • 结构化和非结构化数据支持:可以同时处理结构化字段和非结构化文本
  • 聚合和分析功能:提供丰富的聚合功能,可用于数据分析和可视化
多种相似度计算算法支持
  • 余弦相似度(Cosine Similarity):适用于文本语义搜索,不受向量长度影响
  • 点积(Dot Product):适用于推荐系统,考虑向量长度和方向
  • 欧几里得距离(L2 Norm):适用于图像特征、地理位置等场景
  • 脚本评分(Script Score):支持自定义脚本实现更复杂的相似度计算
  • 函数评分(Function Score):允许结合衰减函数、字段值等因素调整相似度分数
与大模型的无缝集成
  • 简化的集成流程:提供 API 和工具,便于与各种大模型集成
  • 灵活的检索配置:可以根据不同大模型的特点调整检索策略

2. 不足

向量搜索性能限制
  • 非专用架构:ES 的架构不是专为向量搜索设计的,在大规模向量搜索时性能可能不如专用向量数据库
  • 延迟问题:在大规模向量集上,搜索延迟通常为毫秒级,而专用向量数据库可达微秒级
  • 资源消耗较高:向量操作可能需要更多的内存和计算资源
向量功能相对有限
  • 算法支持有限:支持的向量索引和搜索算法相对较少,主要是 HNSW
  • 缺乏专业优化:缺少针对向量操作的专门优化,如 GPU 加速
  • 向量维度限制:在处理超高维向量时可能存在效率问题
学习和配置复杂性
  • 配置复杂:需要正确配置索引映射、分片策略等
  • 调优难度:优化 ES 性能需要专业知识和经验
  • 维护成本:需要定期维护和监控集群状态
存储效率问题
  • 存储开销:存储向量数据可能需要更多空间
  • 索引大小:包含向量的索引通常比纯文本索引大得多

三、Elasticsearch 向量存储与检索原理

1. 向量数据存储原理

Elasticsearch 使用 dense_vector 字段类型来存储向量数据,其工作原理如下:

基本存储结构
  • 文档结构:向量被存储为文档的一个字段,与其他字段(如文本、数字等)一起构成完整的文档。
  • 向量表示:向量以浮点数数组的形式存储,每个数组元素对应向量的一个维度。
  • 索引映射:通过索引映射定义向量字段的属性,包括维度大小、索引方式和相似度计算方法。

存储过程简述
  1. 1. 向量生成:通过嵌入模型将文本、图像等内容转换为固定维度的向量。

  2. 2. 文档创建:创建包含向量字段的文档,将向量数据与其他元数据一起存储。

  3. 3. 分片分配:ES 将文档分配到不同的分片中,每个分片可以位于不同的节点上。

  4. 4. 磁盘存储:向量数据最终以 Lucene 索引格式存储在磁盘上,同时部分热数据会缓存在内存中。

2. 向量检索原理

Elasticsearch 主要使用 HNSW(Hierarchical Navigable Small World)算法进行向量检索,这是一种近似最近邻(ANN)搜索算法:

HNSW 算法简介
  • 多层图结构:HNSW 构建一个多层的图结构,顶层包含少量节点,底层包含所有节点。
  • 导航原理:搜索从顶层开始,通过"贪心"策略快速找到大致方向,然后在下层进行更精细的搜索。
  • 近似搜索:通过牺牲一定的精确度来换取显著的性能提升。

检索过程
  1. 1. 查询向量生成:将用户查询转换为向量表示。

  2. 2. 分片搜索:在每个相关分片上执行向量搜索。

  3. 3. 相似度计算:根据配置的相似度方法(余弦、点积或 L2 范数)计算查询向量与索引向量的相似度。

  4. 4. 结果合并:将各分片的搜索结果合并,并按相似度排序。

  5. 5. 返回结果:返回最相似的文档作为搜索结果。

性能优化参数
  • ef_construction:构建索引时的精度参数,值越大索引质量越高但构建越慢。
  • ef_search:搜索时的精度参数,值越大搜索结果越精确但速度越慢。
  • m:每个节点的最大连接数,影响图的连通性和搜索效率。

3. 多模态向量存储支持

Elasticsearch 不仅可以存储文本向量,还可以存储来自图片、音频和视频等多模态数据的向量表示:

图像向量存储
  • 实现原理:使用图像嵌入模型(如 ResNet、ViT 等)将图像转换为向量表示。
  • 存储方式:与文本向量相同,使用 dense_vector 字段存储图像向量。
  • 应用场景:图像相似度搜索、以图搜图、视觉内容检索。

音频向量存储
  • 实现原理:使用音频嵌入模型(如 Wav2Vec、CLAP 等)将音频转换为向量表示。
  • 存储方式:同样使用 dense_vector 字段,维度根据音频嵌入模型而定。
  • 应用场景:音乐推荐、语音搜索、音频内容分类。

视频向量存储
  • 实现原理:可以通过两种方式处理:
    1. 将视频分解为关键帧,然后使用图像嵌入模型处理每个关键帧。
    2. 使用专门的视频嵌入模型(如 Video Transformers)直接生成视频向量。
  • 存储方式:可以存储单个视频的整体向量,或者存储多个关键帧的向量序列。
  • 应用场景:视频内容搜索、相似视频推荐、视频分类。

多模态向量存储的简单示例
# 图像向量存储示例
from PIL import Image
from transformers import AutoFeatureExtractor, AutoModel
import torch

# 加载图像嵌入模型
image_model = AutoModel.from_pretrained("openai/clip-vit-base-patch32")
feature_extractor = AutoFeatureExtractor.from_pretrained("openai/clip-vit-base-patch32")

# 处理图像
image = Image.open("example.jpg")
inputs = feature_extractor(images=image, return_tensors="pt")
with torch.no_grad():
    image_features = image_model.get_image_features(**inputs)
image_vector = image_features[0].tolist()

# 存储到Elasticsearch
doc = {
    "title": "示例图片",
    "description": "这是一张示例图片",
    "image_path": "example.jpg",
    "image_vector": image_vector  # 使用dense_vector字段存储
}

es.index(index="image-vectors", document=doc)

四、Elasticsearch 与其他向量存储解决方案的对比

1. 各向量存储解决方案综合对比

特性ElasticsearchPGVector (PostgreSQL)Redis VectorMongoDB AtlasFAISSMilvusPineconeQdrantChroma
架构类型分布式搜索引擎关系型数据库扩展内存数据库扩展文档数据库扩展库/嵌入式分布式向量数据库托管向量数据库向量数据库嵌入式向量数据库
部署模式自托管/云服务自托管/云服务自托管/云服务自托管/云服务嵌入式自托管/云服务仅云服务自托管/云服务嵌入式/自托管
向量索引算法HNSWHNSW, IVFHNSWHNSWHNSW, IVF, PQ 等多种HNSW, IVF, FLAT 等多种HNSW 变体HNSWHNSW
查询性能毫秒级毫秒级亚毫秒级毫秒级微秒-毫秒级微秒-毫秒级毫秒级毫秒级毫秒级
扩展性优秀有限有限优秀有限优秀优秀良好有限
混合查询原生支持支持 SQL+向量有限支持支持文档+向量不支持支持有限支持支持支持
元数据过滤强大强大(SQL)有限强大(文档查询)有限支持支持支持支持
GPU 加速不支持不支持不支持不支持支持支持内部支持不支持不支持
存储容量TB-PB 级TB 级GB-TB 级TB-PB 级受内存限制TB 级TB 级TB 级GB-TB 级
全文搜索优秀基础良好基础不支持有限不支持有限有限
相似度算法余弦、点积、L2 范数余弦、L2 范数余弦、L2 范数余弦、点积、欧几里得多种(余弦、内积、L2 等)多种(余弦、内积、L2 等)余弦、点积余弦、点积、欧几里得余弦
事务支持有限完整 ACID有限文档级事务不支持有限不支持不支持不支持
易用性中等高(SQL 熟悉度)高(MongoDB 用户)复杂中等简单简单简单
维护成本中等中等低(嵌入式)低(托管)
社区支持强大强大强大强大活跃活跃有限活跃活跃
成本开源,自托管开源,自托管开源+商业版开源+商业版开源,免费开源+商业版商业版,按量付费开源+商业版开源,免费
适用场景混合搜索结构化数据+向量高性能、低延迟半结构化数据+向量高性能向量搜索大规模向量管理简单部署、高可用中小规模应用快速原型开发

2. 不同相似度计算方法对比

相似度方法数学表达式值域适用场景特点支持的存储系统
余弦相似度cos(θ) = A·B / (‖A‖·‖B‖)[-1, 1]文本语义搜索不受向量长度影响,只考虑方向全部
点积A·B = ∑(A_i × B_i)无限制推荐系统考虑向量长度和方向,通常需要归一化ES, MongoDB, FAISS, Milvus, Pinecone, Qdrant
欧几里得距离‖A-B‖ = √(∑(A_i - B_i)²)[0, ∞)图像特征、地理位置值越小表示越相似,需要转换为相似度ES, PGVector, Redis, MongoDB, FAISS, Milvus, Qdrant
曼哈顿距离∑|A_i - B_i|[0, ∞)网格空间、特征差异计算简单,对异常值不敏感FAISS, Milvus
杰卡德相似度J(A,B) = |A∩B| / |A∪B|[0, 1]集合比较、文档相似度适用于二进制特征或集合ES(脚本), PGVector(SQL)
汉明距离H(A,B) = ∑(A_i ⊕ B_i)[0, dim]二进制特征、错误检测计算二进制向量中不同位的数量FAISS

3. 适用场景对比总结

解决方案最适合场景不适合场景
Elasticsearch需要混合搜索(文本+向量)、已有 ES 基础设施、需要复杂文本处理超大规模纯向量搜索、极低延迟要求、GPU 加速需求
PGVector已有 PostgreSQL 基础设施、需要事务支持、结构化数据+向量超大规模向量集、分布式部署需求
Redis Vector超低延迟需求、缓存层向量搜索、临时数据持久化要求高、复杂查询、大规模数据
MongoDB Atlas半结构化数据、文档+向量混合、MongoDB 用户复杂文本分析、极高性能要求
FAISS纯向量搜索、算法研究、GPU 加速、嵌入式应用需要持久化、分布式部署、元数据过滤
Milvus大规模向量管理、需要丰富索引算法、混合查询简单应用、资源受限环境
Pinecone快速部署、无运维需求、按需扩展自托管需求、成本敏感、复杂查询
Qdrant中小规模应用、需要良好元数据过滤、开源需求超大规模、GPU 加速需求
Chroma快速原型开发、简单应用、本地部署企业级应用、大规模数据、高并发

五、基于 Elasticsearch 的 RAG 实现方案

1. 系统架构设计

基于 Elasticsearch 实现 RAG 系统的整体架构如下:

  1. 1. 数据处理层:负责文档的收集、清洗、分块和向量化

  2. 2. 存储层:使用 Elasticsearch 存储文本内容和向量表示

  3. 3. 检索层:实现混合检索策略,结合文本和向量搜索

  4. 4. 生成层:集成 Deepseek V3 或 Qwen2.5 大模型,基于检索结果生成回答

  5. 5. 应用层:提供用户界面和 API 接口

2. 实现步骤详解

2.1 环境准备

首先,我们需要安装必要的依赖:

# 安装必要的库
pip install elasticsearch langchain langchain-elasticsearch langchain-community langchain-openai
pip install deepseek-ai qwen-api
2.2 Elasticsearch 配置

设置 Elasticsearch 集群并创建适合 RAG 的索引,展示不同相似度计算方法:

from elasticsearch import Elasticsearch

# 连接到Elasticsearch
es = Elasticsearch(
    hosts=["http://localhost:9200"],
    basic_auth=("user", "password"),  # 如果有认证
    request_timeout=60
)

# 创建索引映射,使用cosine相似度
cosine_index_mapping = {
    "mappings": {
        "properties": {
            "title": {"type": "text", "analyzer": "standard"},
            "content": {"type": "text", "analyzer": "standard"},
            "source": {"type": "keyword"},
            "content_vector": {
                "type": "dense_vector",
                "dims": 1536,  # 根据嵌入模型调整维度
                "index": True,
                "similarity": "cosine"  # 余弦相似度
            }
        }
    },
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1
    }
}

# 创建使用余弦相似度的索引
es.indices.create(index="rag-knowledge-cosine", body=cosine_index_mapping)

# 创建索引映射,使用dot_product相似度
dot_product_index_mapping = {
    "mappings": {
        "properties": {
            "title": {"type": "text", "analyzer": "standard"},
            "content": {"type": "text", "analyzer": "standard"},
            "source": {"type": "keyword"},
            "content_vector": {
                "type": "dense_vector",
                "dims": 1536,
                "index": True,
                "similarity": "dot_product"  # 点积相似度
            }
        }
    },
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1
    }
}

# 创建使用点积相似度的索引
es.indices.create(index="rag-knowledge-dot-product", body=dot_product_index_mapping)

# 创建索引映射,使用l2_norm相似度
l2_norm_index_mapping = {
    "mappings": {
        "properties": {
            "title": {"type": "text", "analyzer": "standard"},
            "content": {"type": "text", "analyzer": "standard"},
            "source": {"type": "keyword"},
            "content_vector": {
                "type": "dense_vector",
                "dims": 1536,
                "index": True,
                "similarity": "l2_norm"  # 欧几里得距离(L2范数)
            }
        }
    },
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1
    }
}

# 创建使用L2范数相似度的索引
es.indices.create(index="rag-knowledge-l2-norm", body=l2_norm_index_mapping)

2.3 文档处理与索引

实现文档的处理、分块和向量化,并分别存储到不同相似度计算方法的索引中:

import os
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings  # 也可以使用其他嵌入模型

# 加载文档
loader = DirectoryLoader("./documents", glob="**/*.txt", loader_cls=TextLoader)
documents = loader.load()

# 文档分块
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    length_function=len,
)
chunks = text_splitter.split_documents(documents)

# 初始化嵌入模型
embeddings = OpenAIEmbeddings()  # 可替换为其他嵌入模型

# 处理文档并索引到不同的Elasticsearch索引
for i, chunk in enumerate(chunks):
    # 生成向量嵌入
    vector = embeddings.embed_query(chunk.page_content)
    
    # 准备索引文档
    doc = {
        "title": f"Chunk {i} from {chunk.metadata.get('source', 'unknown')}",
        "content": chunk.page_content,
        "source": chunk.metadata.get("source", "unknown"),
        "content_vector": vector
    }
    
    # 索引文档到不同相似度计算方法的索引
    es.index(index="rag-knowledge-cosine", document=doc)
    
    # 对于dot_product,通常需要归一化向量
    es.index(index="rag-knowledge-dot-product", document=doc)
    
    # 对于l2_norm,直接使用原始向量
    es.index(index="rag-knowledge-l2-norm", document=doc)

# 刷新索引
es.indices.refresh(index="rag-knowledge-cosine")
es.indices.refresh(index="rag-knowledge-dot-product")
es.indices.refresh(index="rag-knowledge-l2-norm")

2.4 使用 LangChain 与 Elasticsearch 实现 RAG

下面是使用 LangChain 与 Elasticsearch 结合实现 RAG 的代码,支持不同相似度计算方法:

from langchain_elasticsearch import ElasticsearchStore
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain.schema import Document

# 初始化不同相似度计算方法的Elasticsearch向量存储
es_store_cosine = ElasticsearchStore(
    es_url="http://localhost:9200",
    index_name="rag-knowledge-cosine",
    embedding=embeddings,
    es_user="user",  # 如果有认证
    es_password="password"  # 如果有认证
)

es_store_dot_product = ElasticsearchStore(
    es_url="http://localhost:9200",
    index_name="rag-knowledge-dot-product",
    embedding=embeddings,
    es_user="user",
    es_password="password"
)

es_store_l2_norm = ElasticsearchStore(
    es_url="http://localhost:9200",
    index_name="rag-knowledge-l2-norm",
    embedding=embeddings,
    es_user="user",
    es_password="password"
)

# 创建不同相似度计算方法的检索器
retriever_cosine = es_store_cosine.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 5}
)

retriever_dot_product = es_store_dot_product.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 5}
)

retriever_l2_norm = es_store_l2_norm.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 5}
)

# 创建多相似度混合检索器
def multi_similarity_retriever(query, top_k=5):
    # 从三种不同相似度方法获取结果
    cosine_docs = retriever_cosine.get_relevant_documents(query)
    dot_product_docs = retriever_dot_product.get_relevant_documents(query)
    l2_norm_docs = retriever_l2_norm.get_relevant_documents(query)
    
    # 合并结果
    all_docs = cosine_docs + dot_product_docs + l2_norm_docs
    
    # 去重
    unique_docs = list({doc.page_content: doc for doc in all_docs}.values())
    
    # 限制返回数量
    return unique_docs[:top_k]

# 定义提示模板
template = """
你是一个专业的AI助手。请基于以下提供的上下文信息,回答用户的问题。
如果你无法从上下文中找到答案,请直接说"我无法从提供的信息中找到答案",不要编造信息。

上下文信息:
{context}

用户问题: {question}

回答:
"""

prompt = PromptTemplate(
    template=template,
    input_variables=["context", "question"]
)

# 格式化文档函数
def format_docs(docs):
    return "\n\n".join([doc.page_content for doc in docs])

2.5 集成 Deepseek V3 模型

from langchain_deepseek import DeepseekChat

# 初始化Deepseek模型
deepseek_llm = DeepseekChat(
    model_name="deepseek-ai/deepseek-v3",
    api_key="your_api_key",
    temperature=0.1
)

# 构建RAG链(使用余弦相似度)
deepseek_rag_chain_cosine = (
    {"context": retriever_cosine | format_docs, "question": RunnablePassthrough()}
    | prompt
    | deepseek_llm
    | StrOutputParser()
)

# 构建RAG链(使用点积相似度)
deepseek_rag_chain_dot_product = (
    {"context": retriever_dot_product | format_docs, "question": RunnablePassthrough()}
    | prompt
    | deepseek_llm
    | StrOutputParser()
)

# 构建RAG链(使用L2范数相似度)
deepseek_rag_chain_l2_norm = (
    {"context": retriever_l2_norm | format_docs, "question": RunnablePassthrough()}
    | prompt
    | deepseek_llm
    | StrOutputParser()
)

# 构建RAG链(使用多相似度混合检索)
def answer_with_deepseek_multi_similarity(question):
    # 获取多相似度混合检索结果
    docs = multi_similarity_retriever(question)
    context = format_docs(docs)
    
    # 构建RAG链
    chain = (
        {"context": RunnablePassthrough(), "question": RunnablePassthrough()}
        | prompt
        | deepseek_llm
        | StrOutputParser()
    )
    
    # 生成回答
    return chain.invoke({"context": context, "question": question})

# 示例
response = answer_with_deepseek_multi_similarity("什么是向量数据库?")
print(response)

2.6 集成 Qwen2.5 模型

from langchain_community.llms import QianfanLLMEndpoint

# 初始化Qwen模型
qwen_llm = QianfanLLMEndpoint(
    model_name="qwen-2.5",
    api_key="your_api_key",
    secret_key="your_secret_key",
    temperature=0.1
)

# 构建RAG链(使用余弦相似度)
qwen_rag_chain_cosine = (
    {"context": retriever_cosine | format_docs, "question": RunnablePassthrough()}
    | prompt
    | qwen_llm
    | StrOutputParser()
)

# 构建RAG链(使用点积相似度)
qwen_rag_chain_dot_product = (
    {"context": retriever_dot_product | format_docs, "question": RunnablePassthrough()}
    | prompt
    | qwen_llm
    | StrOutputParser()
)

# 构建RAG链(使用L2范数相似度)
qwen_rag_chain_l2_norm = (
    {"context": retriever_l2_norm | format_docs, "question": RunnablePassthrough()}
    | prompt
    | qwen_llm
    | StrOutputParser()
)

# 构建RAG链(使用多相似度混合检索)
def answer_with_qwen_multi_similarity(question):
    # 获取多相似度混合检索结果
    docs = multi_similarity_retriever(question)
    context = format_docs(docs)
    
    # 构建RAG链
    chain = (
        {"context": RunnablePassthrough(), "question": RunnablePassthrough()}
        | prompt
        | qwen_llm
        | StrOutputParser()
    )
    
    # 生成回答
    return chain.invoke({"context": context, "question": question})

# 示例
response = answer_with_qwen_multi_similarity("Elasticsearch的主要优势是什么?")
print(response)

3. 高级功能实现

3.1 实现混合搜索策略
def enhanced_hybrid_search(query, es_client, index_name, embedding_model, similarity_type="cosine"):
    # 生成查询向量
    query_vector = embedding_model.embed_query(query)
    
    # 根据不同相似度类型构建脚本
    if similarity_type == "cosine":
        similarity_script = "cosineSimilarity(params.query_vector, 'content_vector') + 1.0"
    elif similarity_type == "dot_product":
        similarity_script = "dotProduct(params.query_vector, 'content_vector')"
    elif similarity_type == "l2_norm":
        # 对于L2范数,较小的值表示更相似,所以我们使用负值或倒数
        similarity_script = "1 / (1 + l2Norm(params.query_vector, 'content_vector'))"
    else:
        raise ValueError(f"不支持的相似度类型: {similarity_type}")
    
    # 构建混合查询
    hybrid_query = {
        "query": {
            "bool": {
                "should": [
                    # 向量搜索部分
                    {
                        "script_score": {
                            "query": {"match_all": {}},
                            "script": {
                                "source": similarity_script,
                                "params": {"query_vector": query_vector}
                            }
                        }
                    },
                    # 文本搜索部分
                    {
                        "match": {
                            "content": {
                                "query": query,
                                "boost": 0.5  # 调整文本搜索的权重
                            }
                        }
                    }
                ]
            }
        },
        "size": 5
    }
    
    # 执行搜索
    response = es_client.search(index=index_name, body=hybrid_query)
    
    # 处理结果
    results = []
    for hit in response["hits"]["hits"]:
        results.append({
            "score": hit["_score"],
            "title": hit["_source"]["title"],
            "content": hit["_source"]["content"],
            "source": hit["_source"]["source"]
        })
    
    return results

3.2 实现自适应相似度选择
def adaptive_similarity_search(query, es_client, embedding_model):
    # 分析查询特征,决定使用哪种相似度方法
    query_length = len(query.split())
    
    # 短查询(1-3个词)可能更适合关键词搜索
    if query_length <= 3:
        # 使用混合搜索,但给文本搜索更高权重
        results = enhanced_hybrid_search(
            query, 
            es_client, 
            "rag-knowledge-cosine", 
            embedding_model,
            similarity_type="cosine"
        )
    # 中等长度查询(4-10个词)
    elif query_length <= 10:
        # 使用余弦相似度,适合一般语义搜索
        results = enhanced_hybrid_search(
            query, 
            es_client, 
            "rag-knowledge-cosine", 
            embedding_model,
            similarity_type="cosine"
        )
    # 长查询(>10个词)
    else:
        # 对于长查询,点积可能更适合捕捉更多语义信息
        results = enhanced_hybrid_search(
            query, 
            es_client, 
            "rag-knowledge-dot-product", 
            embedding_model,
            similarity_type="dot_product"
        )
    
    return results

3.3 实现多步检索策略
def multi_step_retrieval(query, es_client, embedding_model):
    # 第一步:使用文本搜索获取初步结果
    text_query = {
        "query": {
            "match": {
                "content": query
            }
        },
        "size": 20
    }
    
    text_response = es_client.search(index="rag-knowledge-cosine", body=text_query)
    
    # 提取文档ID和内容
    candidate_docs = []
    for hit in text_response["hits"]["hits"]:
        candidate_docs.append({
            "id": hit["_id"],
            "content": hit["_source"]["content"]
        })
    
    # 如果文本搜索没有足够结果,添加向量搜索
    if len(candidate_docs) < 10:
        # 生成查询向量
        query_vector = embedding_model.embed_query(query)
        
        # 向量搜索
        vector_query = {
            "query": {
                "script_score": {
                    "query": {"match_all": {}},
                    "script": {
                        "source": "cosineSimilarity(params.query_vector, 'content_vector') + 1.0",
                        "params": {"query_vector": query_vector}
                    }
                }
            },
            "size": 20 - len(candidate_docs)
        }
        
        vector_response = es_client.search(index="rag-knowledge-cosine", body=vector_query)
        
        # 添加到候选文档
        for hit in vector_response["hits"]["hits"]:
            # 避免重复
            if not any(doc["id"] == hit["_id"] for doc in candidate_docs):
                candidate_docs.append({
                    "id": hit["_id"],
                    "content": hit["_source"]["content"]
                })
    
    # 第二步:对候选文档进行重新排序
    # 生成查询向量
    query_vector = embedding_model.embed_query(query)
    
    # 为每个候选文档计算相似度分数
    for doc in candidate_docs:
        # 生成文档向量
        doc_vector = embedding_model.embed_query(doc["content"])
        
        # 计算余弦相似度
        similarity = cosine_similarity(
            [query_vector],
            [doc_vector]
        )[0][0]
        
        doc["similarity_score"] = similarity
    
    # 按相似度排序
    ranked_docs = sorted(candidate_docs, key=lambda x: x["similarity_score"], reverse=True)
    
    # 返回前5个最相关文档
    return ranked_docs[:5]

3.4 实现查询扩展和改写
def query_expansion(original_query, llm):
    # 使用大模型扩展查询
    expansion_prompt = f"""
    请基于以下原始查询,生成3个不同的扩展查询,以便更全面地搜索相关信息。
    每个扩展查询应该保留原始查询的核心意图,但可以添加相关术语、同义词或上下文。
    
    原始查询: {original_query}
    
    扩展查询(每行一个):
    """
    
    # 获取扩展查询
    response = llm.invoke(expansion_prompt)
    
    # 解析响应
    expanded_queries = [q.strip() for q in response.split("\n") if q.strip()]
    
    # 过滤掉可能的空行或无关行
    expanded_queries = [q for q in expanded_queries if len(q) > 5]
    
    # 确保至少有一个查询(原始查询)
    if not expanded_queries:
        expanded_queries = [original_query]
    else:
        # 添加原始查询
        expanded_queries.append(original_query)
    
    return expanded_queries

def multi_query_retrieval(original_query, es_client, embedding_model, llm):
    # 扩展查询
    expanded_queries = query_expansion(original_query, llm)
    
    # 对每个扩展查询执行搜索
    all_results = []
    for query in expanded_queries:
        results = enhanced_hybrid_search(
            query, 
            es_client, 
            "rag-knowledge-cosine", 
            embedding_model
        )
        all_results.extend(results)
    
    # 去重
    unique_results = []
    seen_contents = set()
    for result in all_results:
        if result["content"] not in seen_contents:
            unique_results.append(result)
            seen_contents.add(result["content"])
    
    # 重新排序(可以基于原始查询的相似度)
    query_vector = embedding_model.embed_query(original_query)
    
    # 为每个结果计算与原始查询的相似度
    for result in unique_results:
        # 假设我们可以从ES获取内容的向量
        content_vector_query = {
            "query": {
                "term": {
                    "_id": result["id"]
                }
            },
            "_source": ["content_vector"]
        }
        
        vector_response = es_client.search(index="rag-knowledge-cosine", body=content_vector_query)
        content_vector = vector_response["hits"]["hits"][0]["_source"]["content_vector"]
        
        # 计算相似度
        similarity = cosine_similarity(
            [query_vector],
            [content_vector]
        )[0][0]
        
        result["final_score"] = similarity
    
    # 按最终分数排序
    ranked_results = sorted(unique_results, key=lambda x: x["final_score"], reverse=True)
    
    # 返回前5个最相关结果
    return ranked_results[:5]

六、总结与结论

1. Elasticsearch 作为 RAG 向量数据库的优势总结

  1. 1. 全栈解决方案:Elasticsearch 提供了从数据索引到检索的完整解决方案,无需集成多个系统。

  2. 2. 混合检索能力:结合传统文本搜索和向量搜索的能力是其最大优势,能够显著提高检索质量。

  3. 3. 成熟的生态系统:作为成熟的搜索引擎,拥有丰富的文档、工具和社区支持。

  4. 4. 多模态支持:能够存储和检索文本、图像、音频和视频的向量表示,支持多模态 RAG 应用。

  5. 5. 灵活的相似度计算:支持多种相似度计算方法(余弦、点积、L2 范数),适应不同应用场景。

2. 实施建议

  1. 1. 选择合适的相似度计算方法:

    • 文本语义搜索:优先使用余弦相似度
    • 推荐系统:考虑使用点积
    • 图像或地理位置搜索:考虑使用 L2 范数
  2. 2. 优化索引配置:

    • 根据数据量和查询频率调整分片数量
    • 为频繁查询的索引配置足够的副本
    • 合理设置 HNSW 参数,平衡搜索精度和性能
  3. 3. 实现混合检索策略:

    • 结合向量搜索和关键词搜索
    • 根据查询特征动态调整搜索策略
    • 考虑使用查询扩展和多步检索提高召回率
  4. 4. 性能优化:

    • 对于大规模部署,考虑使用专用节点
    • 监控和调整 JVM 堆大小
    • 定期优化索引

3. 未来发展方向

  1. 1. 与专用向量数据库的集成:

    • 考虑 Elasticsearch 与专用向量数据库(如 FAISS、Milvus)的混合架构
    • 利用 ES 的文本处理和混合查询能力,结合专用向量数据库的高性能向量搜索
  2. 2. 知识图谱增强:

    • 将向量搜索与知识图谱结合,实现 GraphRAG
    • 利用实体关系增强检索结果的相关性和可解释性
  3. 3. 多模态 RAG 应用:

    • 扩展到图像、音频和视频内容的检索增强生成
    • 实现跨模态检索,如以文搜图、以图搜文
  4. 4. 自适应检索策略:

    • 开发能够根据查询特征和上下文自动选择最佳检索策略的系统
    • 利用强化学习优化检索参数

4. 结论

Elasticsearch 作为 RAG 系统的向量数据库具有显著优势,特别是在需要混合检索、多模态支持和成熟生态系统的场景下。虽然在纯向量搜索性能上可能不如专用向量数据库,但其全面的功能和灵活性使其成为构建企业级 RAG 系统的有力选择。

通过合理配置和优化,结合本报告提供的实现方案,可以充分发挥 Elasticsearch 在 RAG 系统中的潜力,为大模型应用提供高质量的知识检索支持。

对于需要构建 RAG 系统的团队,建议根据具体需求和现有技术栈选择合适的向量存储解决方案,Elasticsearch 特别适合已有 ES 基础设施、需要混合检索能力以及处理多模态数据的场景。

 


【大模型介绍电子书】

快速揭秘DeepSeek背后的AI工作原理

要获取本书全文PDF内容,请在VX后台留言:“AI大模型基础” 或者 “大模型基础” 就会获得电子书的PDF。

原文地址:https://blog.csdn.net/heiyeshuwu/article/details/146329660
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/593528.html

相关文章:

  • 【VUE】day05-ref引用
  • 代码随想录第55期训练营第七天|LeetCode454.四数相加II、383.赎金信、15.三数之和、18.四数之和
  • 火山引擎(豆包大模型)(抖音平台)之火山方舟的Prompt的使用测试
  • RabbitMQ消息可靠性问题
  • 前沿技术一览科技改变生活新趋势
  • 用gemini画流程图
  • Python实战(2)-数据库支持
  • 【AWS入门】Amazon EC2简介
  • Docker进阶篇1:什么是Docker数据卷?为什么需要Docker数据卷?Docker数据卷3种类型介绍
  • Excel Script Lab学习笔记
  • Vlan初级实验
  • 一次Milvus迁移的记录
  • 【32】单片机编程核心技巧:Switch驱动按键控制跑马灯方向
  • RabbitMQ 基本原理详解
  • Java 并发集合:ConcurrentHashMap 深入解析
  • html5基于Canvas的经典打砖块游戏开发实践
  • 齿轮热处理学习笔记分享
  • 本地部署deepseek-r1建立向量知识库和知识库检索实践【代码】
  • 多模态大模型:将音频向量化
  • 再学:合约继承 、抽象合约 solidity接口、库、事件 合约重入攻击