milvus实战-基于Ollama+bge-large-zh搭建嵌入模型,fastAPI提供http服务将PDF文件写入milvus向量库
0 环境准备
- ollama已部署嵌入模型quentinz/bge-large-zh-v1.5:latest
- 已安装miniconda环境
- 具备科学上网条件(docker安装milvus时需要)
1 milvus安装
1.1 启动Docker Desktop
windows环境下,docker拉取镜像需要先启动Docker Desktop。否则拉取镜像时会报错,无法拉取镜像。
1.2 下载milvus的docker-compose.yml
在powershell输入以下命令
Invoke-WebRequest https://github.com/milvus-io/milvus/releases/download/v2.4.15/milvus-standalone-docker-compose.yml -OutFile docker-compose.yml
1.3 启动milvus
docker compose up -d
2 开发环境准备
2.1 创建python环境
通过conda命令创建python环境,保持买个项目的python环境独立,防止项目之间包冲突,方便管理项目依赖。
conda create -n LangchainDemo python=3.10
2.2 pycharm创建项目
- 解释器类型:选择自定义环境
- 环境:选择现有
- 类型:选择conda
- 环境:选择上一步创建的环境
2.3 激活python环境
conda activate LangchainDemo
2.4 安装项目依赖包
安装项目必要的依赖,包含fastapi、milvus、pdfplumber、ollama等。pdfpy解析可能存在乱码,选用pdfplumber效果更佳。
pip install fastapi uvicorn pymilvus python-multipart pdfplumber ollama
3 程序实现
3.1 导入依赖包
import os
import uuid
import asyncio
import pdfplumber
from fastapi import FastAPI, UploadFile, File
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
from pymilvus.orm import utility
from tenacity import retry, stop_after_attempt
import ollama
3.2 定义FastAPI
app = FastAPI()
3.3 定义文本切割逻辑
使用pdfplumber打开pdf文件,按自然段落分割文本,设置默认500字符一个分块,且有100个字符重叠。
def extract_text_with_pdfnumber(pdf_path):
"""使用pdfplumber提取PDF文本(保留段落结构)[2]()[5]()"""
with pdfplumber.open(pdf_path) as pdf:
text = []
for page in pdf.pages:
# 按自然段落分割文本
paragraphs = page.extract_text()
text.append(paragraphs.replace('\n', ''))
return '\n\n'.join(text)
def chunk_with_overlap(text, chunk_size=500, overlap=100):
"""带重叠的分块策略[1]()"""
chunks = []
words = text.split()
start_idx = 0
while start_idx < len(words):
end_idx = start_idx + chunk_size
chunk = ' '.join(words[start_idx:end_idx])
chunks.append(chunk)
start_idx = end_idx - overlap # 设置重叠部分
# 处理末尾不足的情况
if end_idx < len(words):
break
return chunks
3.4 构建嵌入模型
连接ollama部署的嵌入模型,bge-large-zh对中文字符处理较好。设置调用嵌入模型失败时可重试3次。
@retry(stop=stop_after_attempt(3))
async def generate_embeddings(text):
"""使用Ollama生成文本嵌入"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None,
lambda: ollama.Client(host='http://localhost:11434').embeddings(
model="quentinz/bge-large-zh-v1.5:latest", prompt=text)['embedding']
3.5 连接milvus
connections.connect("default", host="localhost", port="19530")
collection_name = "pdf_documents"
3.6 构建milvus collection
定义pdf文本存储的collection的schema,对应数据库的表和字段。
if not utility.has_collection(collection_name):
# 创建集合
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=64),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=20000),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256)
]
schema = CollectionSchema(fields=fields, description="pdf_documents")
collection = Collection(name=collection_name, schema=schema)
# 创建索引
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 128}
}
collection.create_index("vector", index_params)
else:
collection = Collection(collection_name)
注意:bge-large-zh只能支持到1024维度,500字符对应content的大于需要20000的长度。如果前面修改了嵌入模型或者分块大小,此处也需要调整。
3.7 定义http上传pdf文件处理流程
@app.post("/upload_pdf")
async def upload_pdf(file: UploadFile = File(...),
chunk_size=500,
overlap=100):
print(f"开始上传文件《{file.filename}》")
"""上传PDF文件"""
try:
# 临时保存文件
temp_path = f"temp_{uuid.uuid4()}.pdf"
with open(temp_path, "wb") as f:
# 流式写入文件
while chunk := await file.read(1024):
f.write(chunk)
# 解析PDF
text = extract_text_with_pdfnumber(temp_path)
os.remove(temp_path)
# 分块处理
chunks = chunk_with_overlap(text, chunk_size, overlap)
# 批量生成嵌入
embeddings = []
for chunk in chunks:
embeddings.append(await generate_embeddings(chunk))
# 构建插入数据
entities = [
{
"id": str(uuid.uuid4()),
"content": chunk,
"vector": emb,
"source": file.filename
} for chunk, emb in zip(chunks, embeddings)
]
batch_size = 100
for i in range(0, len(entities), batch_size):
insert_result = collection.insert(entities[i:i+batch_size])
collection.flush()
return {"status": "success", "chunks_processed": len(chunks)}
except Exception as e:
return {"error": str(e)}, 500
3.8 实现查询milvus逻辑
@app.get("/search")
async def semantic_search(query: str, top_k=5):
query_embedding = await generate_embeddings(query)
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
# 加载集合到内存中
collection.load()
results = collection.search(
data=[query_embedding],
anns_field="vector",
param=search_params,
limit=top_k,
output_fields=["content", "source"]
)
return [{"score": hit.score, "metadata": hit.entity.to_dict()} for hit in results[0]]
3.9 启动http服务
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8321)
4 测试
建议使用apifox请求http接口。
4.1 测试上传PDF文件解析入库
用post请求,在body标签页选择form-data填写file参数,参数类型选择file,然后上传文件。上传成功后返回success。
4.2 查询milvus测试
用get请求,在param中填写query字段,并填写需要查询的内容,如下图:
附录一:完整代码示例
import os
import uuid
import asyncio
import pdfplumber
from fastapi import FastAPI, UploadFile, File
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
from pymilvus.orm import utility
from tenacity import retry, stop_after_attempt
import ollama
app = FastAPI()
def extract_text_with_pdfnumber(pdf_path):
"""使用pdfplumber提取PDF文本(保留段落结构)[2]()[5]()"""
with pdfplumber.open(pdf_path) as pdf:
text = []
for page in pdf.pages:
# 按自然段落分割文本
paragraphs = page.extract_text()
text.append(paragraphs.replace('\n', ''))
return '\n\n'.join(text)
def chunk_with_overlap(text, chunk_size=500, overlap=100):
"""带重叠的分块策略[1]()"""
chunks = []
words = text.split()
start_idx = 0
while start_idx < len(words):
end_idx = start_idx + chunk_size
chunk = ' '.join(words[start_idx:end_idx])
chunks.append(chunk)
start_idx = end_idx - overlap # 设置重叠部分
# 处理末尾不足的情况
if end_idx < len(words):
break
return chunks
@retry(stop=stop_after_attempt(3))
async def generate_embeddings(text):
"""使用Ollama生成文本嵌入"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None,
lambda: ollama.Client(host='http://localhost:11434').embeddings(
model="quentinz/bge-large-zh-v1.5:latest", prompt=text)['embedding']
)
connections.connect("default", host="localhost", port="19530")
collection_name = "pdf_documents"
# 检查集合是否存在,如果存在则删除
# if utility.has_collection(collection_name):
# collection = Collection(collection_name)
# collection.drop()
if not utility.has_collection(collection_name):
# 创建集合
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=64),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=20000),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256)
]
schema = CollectionSchema(fields=fields, description="pdf_documents")
collection = Collection(name=collection_name, schema=schema)
# 创建索引
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 128}
}
collection.create_index("vector", index_params)
else:
collection = Collection(collection_name)
@app.post("/upload_pdf")
async def upload_pdf(file: UploadFile = File(...),
chunk_size=500,
overlap=100):
print(f"开始上传文件《{file.filename}》")
"""上传PDF文件"""
try:
# 临时保存文件
temp_path = f"temp_{uuid.uuid4()}.pdf"
with open(temp_path, "wb") as f:
# 流式写入文件
while chunk := await file.read(1024):
f.write(chunk)
# 解析PDF
text = extract_text_with_pdfnumber(temp_path)
os.remove(temp_path)
# 分块处理
chunks = chunk_with_overlap(text, chunk_size, overlap)
# 批量生成嵌入
embeddings = []
for chunk in chunks:
embeddings.append(await generate_embeddings(chunk))
# 构建插入数据
entities = [
{
"id": str(uuid.uuid4()),
"content": chunk,
"vector": emb,
"source": file.filename
} for chunk, emb in zip(chunks, embeddings)
]
batch_size = 100
for i in range(0, len(entities), batch_size):
insert_result = collection.insert(entities[i:i+batch_size])
collection.flush()
return {"status": "success", "chunks_processed": len(chunks)}
except Exception as e:
return {"error": str(e)}, 500
@app.get("/search")
async def semantic_search(query: str, top_k=5):
query_embedding = await generate_embeddings(query)
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
# 加载集合到内存中
collection.load()
results = collection.search(
data=[query_embedding],
anns_field="vector",
param=search_params,
limit=top_k,
output_fields=["content", "source"]
)
return [{"score": hit.score, "metadata": hit.entity.to_dict()} for hit in results[0]]
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8321)
附录二:错误处理
apifox请求http接口填写接口地址时,如果代码路径search或者upload_pdf后面有"/"斜杠则在apifox请求时也要加"/"斜杠,否则会报错307 Temporary Redirect,Expected boundary character 45, got 8 at index 2,如下:
此时错误的请求,url最后没有斜杠"/"
此时正确的请求url应该最后有斜杠"/",如下: