当前位置: 首页 > article >正文

本地化部署 私有化大语言模型

本地化部署 私有化大语言模型

  • 本地化部署 私有化大语言模型
    • Anaconda 环境
      • 搭建运行
    • 代码概述
      • 环境配置
        • 安装依赖
        • CUDA 环境配置
      • 系统设计与实现
        • 文件处理与加载
        • 文档索引构建
        • 模型加载与推理
        • 文件上传与索引更新
        • 实时对话与文档检索
        • Gradio 前端设计
      • 主要功能
      • 完整代码
      • 功能说明
        • 运行示例
        • 文件上传
        • 资料库对话
        • 模型切换
        • 模型对话
      • 项目文件介绍
      • 执行命令合集

本地化部署 私有化大语言模型

本文介绍了如何使用 Gradio 构建一个基于深度学习的文件上传与知识库查询系统。通过结合 FAISS 和 Sentence-Transformers,系统可以高效地处理和检索大量文档,并基于用户的问题生成回答。通过 Gradio 提供了一个简单直观的用户界面,便于用户上传文件和与智能助理进行对话。

在这里插入图片描述

Anaconda 环境

搭建运行

创建 Anaconda 新环境

请添加图片描述

打开 Terminal(终端)

请添加图片描述

打开工程位置:cd D:\Unity\ChatGLM\PrivateAgent\Private-Agent (这是我的位置)

请添加图片描述

下载环境依赖:pip install -r requirements.txt
下面是 requirements.txt 完整代码
# basic requirements

protobuf>=3.20.0
transformers>=4.41.0
tokenizers>=0.19.1
cpm_kernels>=1.0.11
torch>=2.5.1
gradio>=5.6.0
sentencepiece>=0.2.0
sentence_transformers>=3.3.1
accelerate>=1.1.1
streamlit>=1.40.1
fastapi>=0.115.5
loguru~=0.7.2
mdtex2html>=1.3.0
latex2mathml>=3.77.0
jupyter_client>=8.6.3
faiss-cpu>=1.9.0
peft>=0.13.2
pandas>=2.2.3
python-docx>=1.1.2
python-pptx>=1.0.2
openpyxl>=3.1.5
pymupdf>=1.24.14
frontend>=0.0.3
tools>=0.1.9

在这里插入图片描述

这是下载完的样子

在这里插入图片描述

下载 CUDA:
方法 1 :pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
方法 2 :conda install pytorch torchvision torchaudio pytorch-cuda=11.8 -c pytorch -c nvidia

在这里插入图片描述

这是 CUDA 下载好的样子

在这里插入图片描述

运行试试:python web_demo_Agent.py
可用性为 True 就是加载上了,我的显卡是 RTX 4080 16G显存
我这里加载的模型是:ChatGlm3-6B和ChatGlm3-6B-32K

在这里插入图片描述

到这里就可以访问语言模型了

在这里插入图片描述

代码概述

本系统基于以下几项关键技术:

Gradio:构建交互式Web界面的Python库。
深度学习模型:使用 transformers 和 peft 库加载和应用预训练的自然语言处理模型,如ChatGLM-6B。
FAISS:用于高效地进行相似度检索和文档索引。
Sentence-Transformers:将文档转换为向量形式,以便于在FAISS中进行高效检索。
多种文件格式支持:支持 .txt, .pdf, .docx, .pptx, .xlsx 等多种文件格式的上传与处理。

环境配置

安装依赖
确保安装所需的库:
gradio、torch、sentence-transformers、faiss-cpu、pandas、python-docx 
python-pptx、openpyxl、PyMuPDF
大家如果创建完 requirements.txt 文件的话,直接执行 pip install -r requirements.txt 就行。
CUDA 环境配置
如果你使用GPU加速,确保CUDA环境已经配置好。可以通过以下代码检查CUDA的可用性:

import torch
print("CUDA 可用性:", torch.cuda.is_available())
print("当前设备数量:", torch.cuda.device_count())
if torch.cuda.is_available():
    print("设备名称:", torch.cuda.get_device_name(0))

系统设计与实现

文件处理与加载
系统支持从多种文件格式中加载文本数据,包括 .txt, .pdf, .docx, .pptx, .xlsx 等。
每种文件格式的加载都通过不同的库来解析:

.txt 文件使用标准的 Python 文件操作读取。
.pdf 使用 PyMuPDF 解析文本。
.docx 使用 python-docx 解析段落。
.pptx 使用 python-pptx 解析幻灯片内容。
.xlsx 使用 pandas 读取 Excel 文件内容。
这些文件的文本内容随后会进行清洗,去除无用空白和换行,并添加到文档集合中。
文档索引构建
为了在大量文档中快速进行检索,系统使用 FAISS 来构建文档的向量索引。
通过 Sentence-Transformers 模型将文档转化为向量,并将这些向量添加到FAISS索引中。

from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

embedder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
# 向量索引添加
def build_index(docs):
    embeddings = embedder.encode(docs)
    dimension = embeddings.shape[1]
    index = faiss.IndexFlatL2(dimension)
    index.add(np.array(embeddings))
    return index
模型加载与推理
本系统支持加载多个深度学习模型,使用 transformers 库来加载预训练的 Causal LM 模型
(例如 ChatGLM-6B)。在用户提出问题时,系统会根据当前选择的模型生成回复。

from transformers import AutoTokenizer, AutoModelForCausalLM
# 模型加载方法
def load_model_and_tokenizer(model_dir):
    model = AutoModelForCausalLM.from_pretrained(model_dir)
    tokenizer = AutoTokenizer.from_pretrained(model_dir)
    return model, tokenizer
文件上传与索引更新
用户上传文件后,系统会将其内容提取并增量更新文档索引,确保检索到最新的资料。

import shutil
# 文件更新方法
def upload_file(file_path):
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"文件 {file_path} 不存在。")
    shutil.copy(file_path, './reference_docs/')
    new_documents = load_documents_from_folder('./reference_docs')
    incrementally_update_index(new_documents)
实时对话与文档检索
用户在聊天过程中输入问题,系统会根据问题生成上下文,并从已加载的文档中检索相关内容。
然后,将检索到的文档与问题一起传递给语言模型生成回答。

# 资料库问答
def search_docs(query, top_k=3):
    query_vector = embedder.encode([query])
    distances, indices = index.search(query_vector, top_k)
    return [documents[i] for i in indices[0]]
Gradio 前端设计
前端使用 Gradio 来实现交互界面,用户可以通过文本框输入问题,上传文件,选择不同的模型
调整生成文本的参数(如温度、最大生成长度等)。

import gradio as gr

# Gradio UI 构建
with gr.Blocks() as demo:
    user_input = gr.Textbox(show_label=False, placeholder="请输入您的问题...")
    submit_btn = gr.Button("发送")
    chatbot = gr.Chatbot(value=[{"role": "assistant", "content": "您好,我是您的智能助理!"}])
    
    submit_btn.click(predict, inputs=[user_input], outputs=[chatbot])

主要功能

1. 上传文件:用户可以上传 .txt, .pdf, .docx, .pptx, .xlsx 文件系统会自动处理并更新知识库索引。
2. 文档检索:用户提出问题时,系统会从文档库中检索相关资料,并结合用户问题生成合理的回答。
3. 模型切换:支持动态切换不同的深度学习模型,用户可以根据需求选择不同的推理模型。
4. 实时对话:系统会根据历史对话生成新的回答,并提供流式响应。

完整代码

import os
import gradio as gr
import torch
from threading import Thread
from typing import Union, Tuple
from pathlib import Path
from peft import AutoPeftModelForCausalLM, PeftModelForCausalLM
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    PreTrainedModel,
    PreTrainedTokenizer,
    PreTrainedTokenizerFast,
    StoppingCriteria,
    StoppingCriteriaList,
    TextIteratorStreamer
)
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
import pandas as pd
import docx
import pptx
import fitz  # PyMuPDF for PDF reading
from openpyxl import load_workbook
import logging
import shutil 
import asyncio
import gc
from transformers import AutoModelForCausalLM, AutoTokenizer


ModelType = Union[PreTrainedModel, PeftModelForCausalLM]
TokenizerType = Union[PreTrainedTokenizer, PreTrainedTokenizerFast]

# 默认模型路径列表,用户可以根据需要添加更多模型路径
MODEL_PATHS = {
    'ChatGLM3-6B': 'model/ChatGlm3-6B',
    'ChatGlm3-6B-32K': 'model/ChatGlm3-6B-32K'
}

TOKENIZER_PATH = os.environ.get("TOKENIZER_PATH", '')

# 固定的资料路径
REFERENCE_FOLDER = './reference_docs'  # 指定参考资料文件夹路径

# 默认的招呼语
DEFAULT_GREETING = "您好,我是您的智能助理,有什么可以帮助您的吗?"

# 加载身份描述文件
IDENTITY_FILE = './reference_docs/identity.txt'

# 检查 CUDA 是否可用
print("CUDA 可用性:", torch.cuda.is_available())
print("当前设备数量:", torch.cuda.device_count())
if torch.cuda.is_available():
    print("设备名称:", torch.cuda.get_device_name(0))

# 加载 SentenceTransformer 嵌入模型
embedder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')

# 初始化全局变量
documents = []  # 存储所有加载的文档内容
index = None  # FAISS 索引对象

# 处理和规范化文件路径
def _resolve_path(path: Union[str, Path]) -> Path:
    return Path(path).expanduser().resolve()

# 模型加载函数
def load_model_and_tokenizer(model_dir: Union[str, Path], trust_remote_code: bool = True) -> Tuple[AutoModelForCausalLM, AutoTokenizer]:
    model_dir = Path(model_dir).resolve()  # 解析路径
    if (model_dir / 'adapter_config.json').exists():
        model = AutoPeftModelForCausalLM.from_pretrained(
            model_dir, trust_remote_code=trust_remote_code, device_map='auto'
        )
        tokenizer_dir = model.peft_config['default'].base_model_name_or_path
    else:
        model = AutoModelForCausalLM.from_pretrained(
            model_dir, trust_remote_code=trust_remote_code, device_map='auto'
        )
        tokenizer_dir = model_dir

    # 加载 tokenizer
    tokenizer = AutoTokenizer.from_pretrained(
        tokenizer_dir, trust_remote_code=trust_remote_code
    )
    return model, tokenizer

# 初始加载默认模型
current_model_name = 'ChatGLM3-6B'

# model token 加载
model, tokenizer = load_model_and_tokenizer(MODEL_PATHS[current_model_name], trust_remote_code=True)

# 将模型量化到 INT8(仅在模型支持量化时有效)
#model = torch.quantization.quantize_dynamic(
#    model,  # 量化的模型
#    {torch.nn.Linear},  # 量化的层类型,通常是 Linear 层
#    dtype=torch.qint8  # 量化为 INT8
#)
# 转换为半精度(FP16),如果需要
# model = model.half()
# 评估模式
model = model.cuda().eval()

# 量化为 INT8、转为 FP16、将模型转移到 GPU、设置为评估模式
# model = model.quantize(8).half().cuda().eval()

# Token 停止
class StopOnTokens(StoppingCriteria):
    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
        stop_ids = [0, 2]
        for stop_id in stop_ids:
            if input_ids[0][-1] == stop_id:
                return True
        return False

# 文本解析函数
def parse_text(text):
    lines = text.split("\n")
    lines = [line for line in lines if line != ""]
    count = 0
    for i, line in enumerate(lines):
        if "```" in line:
            count += 1
            items = line.split('`')
            if count % 2 == 1:
                lines[i] = f'<pre><code class="language-{items[-1]}">'
            else:
                lines[i] = f'<br></code></pre>'
        else:
            if i > 0:
                if count % 2 == 1:
                    line = line.replace("`", "\`")
                    line = line.replace("<", "&lt;")
                    line = line.replace(">", "&gt;")
                    line = line.replace(" ", "&nbsp;")
                    line = line.replace("*", "&ast;")
                    line = line.replace("_", "&lowbar;")
                    line = line.replace("-", "&#45;")
                    line = line.replace(".", "&#46;")
                    line = line.replace("!", "&#33;")
                    line = line.replace("(", "&#40;")
                    line = line.replace(")", "&#41;")
                    line = line.replace("$", "&#36;")
                lines[i] = "<br>" + line
    text = "".join(lines)
    return text


# 数据清理函数
def clean_text(text):
    """清洗文本,去除多余的空格和换行"""
    if not text or not isinstance(text, str):
        print("无效文本,跳过处理")
        return None
    cleaned = " ".join(text.split()).strip()
    # print(f"清理前: {text[:50]}... 清理后: {cleaned[:50]}...")
    return cleaned if len(cleaned) > 5 else None  # 最小长度为5

# 索引构建
def build_index(docs):
    global index
    if not docs or len(docs) == 0:
        print("没有文档可以索引。")
        return
    embeddings = embedder.encode(docs)
    dimension = embeddings.shape[1]
    if index is None:
        index = faiss.IndexFlatL2(dimension)
    else:
        index.reset()
    index.add(np.array(embeddings))
    print(f"索引已构建,共包含 {index.ntotal} 个文档。")

# 资料加载
def load_documents_from_folder(folder_path):
    """从文件夹加载所有文档内容,支持递归子文件夹"""
    contents = []
    for root, _, files in os.walk(folder_path):
        for file in files:
            file_path = os.path.join(root, file)
            ext = os.path.splitext(file_path)[-1].lower()
            try:
                if ext == ".txt":
                    with open(file_path, "r", encoding="utf-8") as f:
                        content = clean_text(f.read())
                        if content:
                            contents.append(content)
                            print(f"成功加载TXT文件: {file_path}")
                        else:
                            print(f"TXT文件内容为空或被过滤: {file_path}")
                elif ext == ".pdf":
                    with fitz.Document(file_path) as doc:
                        for page in doc:
                            content = clean_text(page.get_text())
                            if content:
                                contents.append(content)
                                print(f"成功加载PDF文件: {file_path}")
                elif ext == ".docx":
                    doc_file = docx.Document(file_path)
                    for para in doc_file.paragraphs:
                        content = clean_text(para.text)
                        if content:
                            contents.append(content)
                            print(f"成功加载DOCX文件: {file_path}")
                elif ext == ".pptx":
                    prs = pptx.Presentation(file_path)
                    for slide in prs.slides:
                        for shape in slide.shapes:
                            if hasattr(shape, "text"):
                                content = clean_text(shape.text)
                                if content:
                                    contents.append(content)
                                    print(f"成功加载PPTX文件: {file_path}")
                elif ext in [".xls", ".xlsx"]:
                    xls = pd.ExcelFile(file_path)
                    for sheet_name in xls.sheet_names:
                        df = xls.parse(sheet_name)
                        for col in df.columns:
                            content = clean_text(df[col].dropna().astype(str).str.cat(sep='\n'))
                            if content:
                                contents.append(content)
                                print(f"成功加载EXCEL文件: {file_path}")
            except Exception as e:
                logging.error(f"无法加载文件 {file_path},错误信息:{e}", exc_info=True)
    print(f"加载文档数量: {len(contents)}")
    return contents

# 资料初始化
def initialize_reference_documents():
    """初始化加载参考资料文件夹内的所有文档"""
    global documents, index
    
    if not os.path.exists(REFERENCE_FOLDER):
        os.makedirs(REFERENCE_FOLDER)
    
    # 加载文档
    documents = load_documents_from_folder(REFERENCE_FOLDER)
    
    # 如果文档加载成功,构建索引
    if documents:
        build_index(documents)
        print(f"初始化完成,共加载了 {len(documents)} 个文档。")
    else:
        print("未找到任何文档,请检查参考文件夹。")
    
    return len(documents)

# 文件刷新
def refresh_documents(chat_history):
    """刷新文件夹内容"""
    global documents, index
    try:
        documents = load_documents_from_folder(REFERENCE_FOLDER)
        build_index(documents)
        message = f"文档已刷新,共索引了 {len(documents)} 个文档。"
        chat_history.append({"role": "assistant", "content": message})
        return chat_history
    except Exception as e:
        chat_history.append({"role": "assistant", "content": f"刷新资料库失败:{e}"})
        return chat_history

# 增量更新索引的函数
def incrementally_update_index(new_documents):
    global documents, index
    # 仅将新文档的嵌入向量添加到索引
    new_embeddings = [embedder.encode([doc]) for doc in new_documents]  # 假设你有一个 `embedder`
    index.add(np.array(new_embeddings))
    print(f"成功更新索引,添加了 {len(new_documents)} 个文档的嵌入向量。")

# 修改文件上传函数,使用增量索引更新
def upload_file(file_path, chat_history):
    global documents, index

    print(f"开始上传文件:{file_path}")

    if not file_path:
        chat_history.append({"role": "assistant", "content": "没有选择文件,请重新上传!"})
        return chat_history

    if not os.path.exists(file_path):
        chat_history.append({"role": "assistant", "content": "文件不存在,请检查文件路径!"})
        return chat_history

    try:
        file_name = os.path.basename(file_path)
        save_path = os.path.join(REFERENCE_FOLDER, file_name)

        if not os.path.exists(REFERENCE_FOLDER):
            os.makedirs(REFERENCE_FOLDER)

        if os.path.exists(save_path):
            chat_history.append({"role": "assistant", "content": f"文件 {file_name} 已存在,正在重命名..."})
            base_name, ext = os.path.splitext(file_name)
            counter = 1
            while os.path.exists(save_path):
                new_file_name = f"{base_name}_{counter}{ext}"
                save_path = os.path.join(REFERENCE_FOLDER, new_file_name)
                counter += 1

        shutil.copy(file_path, save_path)
        print(f"文件已复制到 {save_path},大小为 {os.path.getsize(save_path)} 字节")

        # 加载新文件并增量更新索引
        new_documents = load_documents_from_folder(REFERENCE_FOLDER)
        incrementally_update_index(new_documents)

        chat_history.append({"role": "assistant", "content": f"文件 {file_name} 上传成功并已保存!"})
        return chat_history

    except FileNotFoundError:
        chat_history.append({"role": "assistant", "content": "文件没有找到,请检查文件路径!"})
    except PermissionError:
        chat_history.append({"role": "assistant", "content": "没有权限访问文件,请检查文件权限!"})
    except Exception as e:
        chat_history.append({"role": "assistant", "content": f"上传过程中发生错误:{e}"})

# 资料库检索
def search_docs(query, top_k=3, distance_threshold=15.0):
    """从资料库检索相关文档"""
    if not index or not documents:
        print("索引或文档未初始化。")
        return ["还没有加载文件。请初始化或刷新参考文件夹。"]
    
    query_vector = embedder.encode([query])
    distances, indices = index.search(query_vector, top_k)
    print(f"检索结果索引: {indices}, 距离: {distances}")
    
    results = []
    for i, dist in zip(indices[0], distances[0]):
        if i < len(documents):
            results.append((documents[i], dist))
    
    if not results:
        print("未找到任何相关内容。")
        return ["未找到与查询相关的内容,请确保资料库中有相关内容。"]
    
    results = sorted(results, key=lambda x: x[1])
    filtered_results = [doc for doc, dist in results if dist < distance_threshold]
    
    if not filtered_results:
        print("距离超过阈值,返回最近的文档。")
        return [doc for doc, _ in results[:top_k]]
    
    return filtered_results

# 读取 identity.txt 的内容
def load_identity(file_path):
    if not os.path.exists(file_path):
        print(f"身份文件未找到:{file_path}")
        return "我是您的智能助理,很高兴为您服务。"
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            identity_content = f.read()
        return identity_content.strip()
    except Exception as e:
        print(f"无法读取身份文件:{e}")
        return "我是您的智能助理,很高兴为您服务。"



# 身份文件 读取
identity_content = load_identity(IDENTITY_FILE)



# 模型预测
def predict(chat_history, max_length, top_p, temperature):
    """处理用户输入,生成回答"""
    stop = StopOnTokens()
    global identity_content

    if not chat_history:
        # 如果 chat_history 为空,提示用户输入
        chat_history = [{"role": "assistant", "content": "请先输入您的问题。"}]
        yield chat_history, chat_history
        return

    # 获取用户的最新输入
    user_input = chat_history[-1]['content']

    # 检索相关文档
    related_docs = search_docs(user_input, top_k=3, distance_threshold=15.0)

    # 确保 related_docs 格式正确
    if not related_docs:
        related_docs = ["未找到相关资料"]

    if "未找到" in related_docs[0]:
        # 更新 chat_history,添加助手的回复
        chat_history.append({"role": "assistant", "content": related_docs[0]})
        yield chat_history, chat_history
        return

    # 构建上下文并限制长度
    context = "\n".join(related_docs[:3])  # 仅使用前3个相关文档
    if len(context) > 1000:
        context = context[:1000] + "..."

    # 将身份描述添加到上下文
    if identity_content:
        identity_prompt = f"助手的自我介绍:\n{identity_content}\n\n"
    else:
        identity_prompt = ""

    # 构造完整输入
    full_input = f"{identity_prompt}以下是相关资料内容:\n{context}\n\n问题:{user_input}\n回答:"
    messages = chat_history[:-1]  # 不包含最后一个用户消息
    messages.append({"role": "user", "content": full_input})

    # 添加一个空的助手回复到 chat_history
    chat_history.append({"role": "assistant", "content": ""})

    print("\n\n====conversation====\n", messages)

    # 生成模型输入
    model_inputs = tokenizer.apply_chat_template(
        messages,
        add_generation_prompt=True,
        tokenize=True,
        return_tensors="pt"
    ).to(next(model.parameters()).device)

    streamer = TextIteratorStreamer(tokenizer, timeout=60, skip_prompt=True, skip_special_tokens=True)
    generate_kwargs = {
        "input_ids": model_inputs,
        "streamer": streamer,
        "max_new_tokens": max_length,
        "do_sample": True,
        "top_p": top_p,
        "temperature": temperature,
        "stopping_criteria": StoppingCriteriaList([stop]),
        "repetition_penalty": 1.2,
    }

    # 启动生成线程
    from concurrent.futures import ThreadPoolExecutor
    executor = ThreadPoolExecutor(max_workers=1)
    future = executor.submit(model.generate, **generate_kwargs)

    # 实时返回生成的内容
    partial_response = ""
    for new_token in streamer:
        try:
            if new_token != '':
                partial_response += new_token
                chat_history[-1]['content'] = partial_response
                yield chat_history, chat_history
        except Exception as e:
            print(f"流式生成时出错:{e}")
            break



# 模型切换
def update_model( model_name):
    global model, tokenizer, current_model_name
    current_model_name = model_name  

    return [{"role": "assistant", "content": f"模型切换中:{current_model_name}"}] 

# 清空缓存并卸载模型
def clear_previous_model():
    global model, tokenizer
    if model:
        del model
    if tokenizer:
        del tokenizer
    torch.cuda.empty_cache()  # 清理 GPU 显存
    import gc
    gc.collect()  # 强制垃圾回收

# 同步模型切换
def update_modelswit(model_name):
    clear_previous_model()  # 清空上一个模型
    global model, tokenizer
    current_model_name = model_name
    
    # 加载新模型
    model, tokenizer = load_model_and_tokenizer(MODEL_PATHS[model_name], trust_remote_code=True)
    
    model = model.to('cuda')  # 将模型移动到 GPU
    model = model.eval()  # 设置为推理模式
 
    # 初始化参考资料
    initialize_reference_documents()
    
    current_model_name = model_name

    # 返回模型切换后的状态
    return [{"role": "assistant", "content": f"模型已切换为:{current_model_name}"}]


# 清空对话
def reset_state():
    return [{"role": "assistant", "content": DEFAULT_GREETING}], [{"role": "assistant", "content": DEFAULT_GREETING}]

# 初始化参考资料
initialize_reference_documents()
 

# 构建 UI
with gr.Blocks() as demo:
    
    # 自定义按钮的样式  #FF7617
    gr.HTML("""
        <style>
            .my-btn {
                background-color: #F26D14 !important;
                color: white !important;
                border-radius: 5px !important;
                font-size: 16px !important;
                padding: 10px 20px !important;
            }
            .my-btn:hover {
                background-color: #E15B10 !important;
            }
            .my-btn:active {
                background-color: #D14A00 !important;
            }

            .submitBtn {
                background-color: #F26D14 !important;
                color: white !important;
                border-radius: 5px !important;
                font-size: 16px !important;
                padding: 10px 20px !important;
            }
            .submitBtn:hover {
                background-color: #E15B10 !important;
            }
            .submitBtn:active {
                background-color: #D14A00 !important;
            }
        </style>
    """)

    gr.HTML(f"""<h1 align="center">Maddie 私有化智能体</h1>
                <h3 align="center">欢迎使用智能知识库,请上传文档并提出问题!</h3>""")

    chatbot = gr.Chatbot(value=[{"role": "assistant", "content": DEFAULT_GREETING}], type="messages")

    with gr.Row():
        with gr.Column(scale=4):  # 保持原比例,输入框和对话历史不变
            # 输入框
            user_input = gr.Textbox(
                show_label=False, 
                placeholder="输入您的问题...", 
                lines=5,  # 保持原来行数
                container=False,
                elem_id="user_input_box"
            )            

            # 发送按钮和上传按钮保持原大小
            submitBtn = gr.Button("发送",elem_classes=["submitBtn"], elem_id="submit_btn")  
          
        with gr.Column(scale=1): 
            # 文件上传组件,减小宽度
            file_upload = gr.File(
                label="上传文件", 
                file_types=['.txt', '.pdf', '.docx', '.xlsx', '.pptx'], 
                type="filepath", 
                elem_id="file_upload", 
                scale=0.7  # 缩小文件上传组件的宽度
            )

            upload_button = gr.Button("上传文件",elem_classes=["my-btn"])
            # refresh_button = gr.Button("刷新资料库")

        with gr.Column(scale=1):  # 右侧的控件部分保持较小比例
            
            model_selector = gr.Dropdown(
                list(MODEL_PATHS.keys()), 
                label="选择模型", 
                # value=current_model_name,
                scale=0.8  # 窄一点的下拉框
            )

            modelswitch = gr.Button("模型切换",elem_classes=["my-btn"], size="sm")

            max_length = gr.Slider(0, 32768, value=8192, step=1.0, label="最大长度", interactive=True)
            top_p = gr.Slider(0, 1, value=0.8, step=0.01, label="Top P", interactive=True)
            temperature = gr.Slider(0.01, 1, value=0.6, step=0.01, label="温度", interactive=True)

            emptyBtn = gr.Button("清空对话",elem_classes=["my-btn"])

    # 初始对话历史
    history = gr.State([{"role": "assistant", "content": DEFAULT_GREETING}])

    # 示例消息解析逻辑
    def parse_text(text):
        return text.strip()  # 简单解析,可以按需扩展

    # 发送消息逻辑
    def send_message(input_text, chat_history):
        chat_history.append({"role": "user", "content": parse_text(input_text)})
        return "", chat_history

    user_input.submit(
        send_message,
        inputs=[user_input, history],
        outputs=[user_input, chatbot]
    )

    # 点击按钮发送消息
    submitBtn.click(
        send_message,
        inputs=[user_input, history],
        outputs=[user_input, history]
    ).then(
        predict,
        inputs=[history, max_length, top_p, temperature],
        outputs=[chatbot, history]
    )

    # 清空对话
    emptyBtn.click(reset_state, outputs=[chatbot, history], queue=False)

    # 模型选择
    model_selector.change(update_model, inputs=[model_selector], outputs=[chatbot])

    # 模型切换
    modelswitch.click(update_modelswit, inputs=[model_selector], outputs=[chatbot])
    

    # 上传文件并刷新资料库
    upload_button.click(
        lambda file_path, chat_history: (
            upload_file(file_path, chat_history)  # 上传文件
        ) if file_path else (
            chat_history  # 如果没有选择文件,返回当前对话
        ),
        inputs=[file_upload, chatbot],
        outputs=[chatbot]
    ).then(
        refresh_documents,  # 上传成功后刷新资料库
        inputs=[chatbot],
        outputs=[chatbot]
    )

    # 监听回车事件 (Enter)
    gr.HTML("""
    <script>
        window.addEventListener('load', function() {
            // 获取用户输入框和按钮元素
            const inputBox = document.querySelector("#user_input_box");  // 通过 ID 获取输入框
            const submitButton = document.querySelector("#submit_btn");  // 通过 ID 获取按钮
            
            // 监听回车键事件
            inputBox.addEventListener("keydown", function(event) {
                // 检查是否按下了回车键
                if (event.key === "Enter") {
                    event.preventDefault();  // 阻止默认的换行行为
                    submitButton.click();  // 模拟点击提交按钮
                }
            });
        });
    </script>
    """)

    # 刷新资料库
    # refresh_button.click(refresh_documents, inputs=[chatbot], outputs=[chatbot])


# 启动服务
demo.queue()
demo.launch(server_name="127.0.0.1", server_port=7870, inbrowser=True, share=True)

功能说明

运行示例
运行成功后在浏览器键入:127.0.0.1:7870 可以打开测试网页,当然也会自动打开。

在这里插入图片描述

文件上传
点击上传文件可以选择你要添加进资料库中的文件。

在这里插入图片描述

点击上传文件可以更新资料库以及索引

在这里插入图片描述

如果成功就会显示这样的提示。

在这里插入图片描述

资料库对话
正常问答就可以访问资料库。

在这里插入图片描述

模型切换
点击模型切换下拉菜单 可以查看模型列表。目前是手动添加。
如需更改注意环境兼容性以及在代码中重新配置模型路径。

# 默认模型路径列表,用户可以根据需要添加更多模型路径
MODEL_PATHS = {
    'ChatGLM3-6B': 'model/ChatGlm3-6B',
    'ChatGlm3-6B-32K': 'model/ChatGlm3-6B-32K'
}

在这里插入图片描述

选择好模型之后,对话框会提示正在切换的模型。

在这里插入图片描述

点击模型切换按钮后台会显示加载模型进度。

在这里插入图片描述

模型加载完毕也会有相关提示。

在这里插入图片描述

模型对话
目前切换完模型之后,资料库索引有一点问题。Gradio 对话显示时好时坏的,暂时还没找到原因。
后期有时间再补充吧,如果哪位大佬修改好了可以踢我就一脚,我去观摩观摩。

在这里插入图片描述

项目文件介绍

model:存放模型文件的文件夹。
reference_docs:资料库文件夹,所有参考资料都在这里。
static:用户生成的文档或数据比如结果文件、日志文件、分析报告等,基本上用不到。

在这里插入图片描述

PrivateAgent.py:就是执行文件。
requirements.txt:依赖加载文件。
requirements02.txt:依赖加载文件备份。
执行命令.txt:放了一些常用的命令。

在这里插入图片描述

执行命令合集

//打开项目盘
D:

//打开根目录
cd D:\Unity\ChatGLM\PrivateAgent\Private-Agent

//依赖安装
pip install -r requirements.txt

//卸载当前版本的 PyTorch:
pip uninstall torch torchvision torchaudio -y

//安装 CUDA
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
conda install pytorch torchvision torchaudio pytorch-cuda=11.8 -c pytorch -c nvidia

//运行程序
python PrivateAgent.py

//查看 Python 版本
python --version

//显卡 CUDA 版本查询
nvidia-smi

//查看当前平台支持的版本
pip debug --verbose

//查看 **依赖 版本 例如:pip show pytorch
pip show  **

暂时先这样吧,如果实在看不明白就留言,看到我会回复的。希望这个教程对您有帮助!
路漫漫其修远兮,与君共勉。


http://www.kler.cn/a/413765.html

相关文章:

  • 【Linux】剧幕中的灵魂更迭:探索Shell下的程序替换
  • python爬虫案例——猫眼电影数据抓取之字体解密,多套字体文件解密方法(20)
  • YOLOv11融合PIDNet中的PagFM模块及相关改进思路
  • CTF之密码学(凯撒加密)
  • Android按键点击事件三种实现方法
  • shell完结
  • ABAP OOALV模板
  • Android中ByteBuffer内存池设计示例
  • 23种设计模式之外观模式
  • linux添加附加磁盘
  • CFD 在生物反应器放大过程中的作用
  • 拍立淘按图搜索实战化,拍立淘API接口参数说明
  • 在 Ubuntu 上部署 MediaWiki 开源维基平台
  • Jetpack业务架构(ViewModel)
  • Linux系统之iotop命令的基本使用
  • 【EI会议征稿通知 | 往届均已见刊检索】第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)
  • 分类预测 | Matlab实现GA-XGBoost分类预测
  • 使用Eureka实现服务注册与发现的具体案例详解
  • go语言怎么实现bash cmd里的mv功能?
  • 【GPT】力量训练是什么,必要吗,有可以替代的方式吗
  • 11.25 Scala案例
  • web3.js + Ganache 模拟以太坊账户间转账
  • Ubuntu 硬盘分区并挂载
  • 【项目日记】仿mudou的高并发服务器 --- 实现基础高并发服务器基础模块
  • 【LeetCode面试150】——56合并区间
  • RabbitMQ5:Fanout交换机、Direct交换机、Topic交换机