当前位置: 首页 > article >正文

Ubuntu部署ktransformers

准备工作

一台服务器

CPU:500G

GPU:48G(NVIDIA4090)

系统:Ubuntu20.04(github的文档好像用的是22.04)

第一步:下载权重文件

1.下载hfd

wget https://hf-mirror.com/hfd/hfd.sh
chmod a+x hfd.sh

2.设置环境变量

export HF_ENDPOINT=https://hf-mirror.com

3.下载模型(需要梯子,需要带上huggingface的token)

./hfd.sh gpt2

4.下载数据集(需要梯子,需要带上huggingface的token)

./hfd.sh wikitext --dataset

5.下载大文件(需要梯子,文件很大,大约四五百G)

./hfd.sh unsloth/DeepSeek-R1-GGUF --include DeepSeek-R1-Q4_K_M/*

第二步:拉代码,编译代码

1.使用Anaconda3安装Python3.11

conda create --name ktransformers python=3.11
conda activate ktransformers 
conda install -c conda-forge libstdcxx-ng

2.安装其他依赖

pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip3 install packaging ninja cpufeature numpy
sudo add-apt-repository ppa: ubuntu-toolchain-r/test
sudo apt-get update
sudo apt-get install --only-upgrade libstdc++6
pip install flash-attn --no-build-isolation

3.查看显卡版本及cuda版本

以下两条指令显示的CUDA版本需要一致,若不一致,系统会以nvcc --version的为准

nvcc --version
nvidia-smi

4.拉代码

git clone https://github.com/kvcache-ai/ktransformers.git

cd ktransformers

git submodule init

git submodule update

5.编译

export USE_NUMA=1
make dev_install

第三部:运行

python ktransformers/local_chat.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/dpkj/deepseek/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/ --cpu_infer 50 --cache_lens 1536 --max_new_tokens 8192

# --model_path:模型位置,不需要修改
# --gguf_path:前面下载的大文件,模型文件位置,按照实际情况而定
# --cpu_infer:CPU占用,单位百分比,如果服务器不死DDR5双路CPU,可以适量调低此占比

其他启动参数

python -m transformers.local_chat --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/ --cpu_infer 53 --cache_lens 1536

python ./transformers/local_chat.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/shadeform/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 53 --cache_lens 1536 --optimize_config_path transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin.yaml

python -m transformers.local_chat --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/autodi-tmp/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 128 --cache_lens 1536 --max_new_tokens 8192 --optimize_config_path ./transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin-4.yaml

transformers --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/autodi-tmp/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 65 --cache_lens 1536 --max_new_tokens 8192 --port 6006 --optimize_config_path /transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin-4.yaml

curl -X 'POST"
    "http://localhost:6006/v1/chat/completions'\
    -H 'accept: application/json' \
    -H 'Content-Type: application/json' \
    -d'{
        "messages": [
        "content": "tell a joke",
        "role": "user"
    ],
    "model": "ktranformers-model",
    "stream": true
}'

外传

1. 使用API方式调用

新建文件:chat_openai.py

import argparse
import uvicorn
from typing import List, Dict, Optional, Any
from fastapi import FastAPI, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import os
import sys
import time
from fastapi import Request
from fastapi.responses import StreamingResponse, JSONResponse
import json
import logging

# 设置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

project_dir = os.path.dirname(os.path.dirname(__file__))
sys.path.insert(0, project_dir)
import torch
from transformers import (
    AutoTokenizer,
    AutoConfig,
    AutoModelForCausalLM,
    GenerationConfig,
    TextStreamer,
)
from ktransformers.optimize.optimize import optimize_and_load_gguf
from ktransformers.models.modeling_deepseek import DeepseekV2ForCausalLM
from ktransformers.models.modeling_qwen2_moe import Qwen2MoeForCausalLM
from ktransformers.models.modeling_deepseek_v3 import DeepseekV3ForCausalLM
from ktransformers.models.modeling_llama import LlamaForCausalLM
from ktransformers.models.modeling_mixtral import MixtralForCausalLM
from ktransformers.util.utils import prefill_and_generate
from ktransformers.server.config.config import Config

custom_models = {
    "DeepseekV2ForCausalLM": DeepseekV2ForCausalLM,
    "DeepseekV3ForCausalLM": DeepseekV3ForCausalLM,
    "Qwen2MoeForCausalLM": Qwen2MoeForCausalLM,
    "LlamaForCausalLM": LlamaForCausalLM,
    "MixtralForCausalLM": MixtralForCausalLM,
}

ktransformer_rules_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "optimize", "optimize_rules")
default_optimize_rules = {
    "DeepseekV2ForCausalLM": os.path.join(ktransformer_rules_dir, "DeepSeek-V2-Chat.yaml"),
    "DeepseekV3ForCausalLM": os.path.join(ktransformer_rules_dir, "DeepSeek-V3-Chat.yaml"),
    "Qwen2MoeForCausalLM": os.path.join(ktransformer_rules_dir, "Qwen2-57B-A14B-Instruct.yaml"),
    "LlamaForCausalLM": os.path.join(ktransformer_rules_dir, "Internlm2_5-7b-Chat-1m.yaml"),
    "MixtralForCausalLM": os.path.join(ktransformer_rules_dir, "Mixtral.yaml"),
}

# 全局变量,存储初始化后的模型
chat_model = None

class OpenAIChat:
    def __init__(
        self,
        model_path: str,
        optimize_rule_path: str = None,
        gguf_path: str = None,
        cpu_infer: int = Config().cpu_infer,
        use_cuda_graph: bool = True,
        mode: str = "normal",
    ):
        torch.set_grad_enabled(False)
        Config().cpu_infer = cpu_infer

        self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
        config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
        self.streamer = TextStreamer(self.tokenizer, skip_prompt=True) if not Config().cpu_infer else None
        if mode == 'long_context':
            assert config.architectures[0] == "LlamaForCausalLM", "Only LlamaForCausalLM supports long_context mode"
            torch.set_default_dtype(torch.float16)
        else:
            torch.set_default_dtype(config.torch_dtype)

        with torch.device("meta"):
            if config.architectures[0] in custom_models:
                if "Qwen2Moe" in config.architectures[0]:
                    config._attn_implementation = "flash_attention_2"
                if "Llama" in config.architectures[0]:
                    config._attn_implementation = "eager"
                if "Mixtral" in config.architectures[0]:
                    config._attn_implementation = "flash_attention_2"
                model = custom_models[config.architectures[0]](config)
            else:
                model = AutoModelForCausalLM.from_config(
                    config, trust_remote_code=True, attn_implementation="flash_attention_2"
                )

        if optimize_rule_path is None:
            if config.architectures[0] in default_optimize_rules:
                optimize_rule_path = default_optimize_rules[config.architectures[0]]

        optimize_and_load_gguf(model, optimize_rule_path, gguf_path, config)
        
        try:
            model.generation_config = GenerationConfig.from_pretrained(model_path)
        except:
            model.generation_config = GenerationConfig(
                max_length=128,
                temperature=0.7,
                top_p=0.9,
                do_sample=True
            )
        
        if model.generation_config.pad_token_id is None:
            model.generation_config.pad_token_id = model.generation_config.eos_token_id
        
        model.eval()
        self.model = model
        self.use_cuda_graph = use_cuda_graph
        self.mode = mode
        logger.info("Model loaded successfully!")

    def create_chat_completion(
        self,
        messages: List[Dict[str, str]],
        temperature: float = 0.7,
        max_tokens: int = 1000,
        top_p: float = 0.9,
        force_think: bool = False,
    ) -> Dict:
        input_tensor = self.tokenizer.apply_chat_template(
            messages, add_generation_prompt=True, return_tensors="pt"
        )
        
        if force_think:
            token_thinks = torch.tensor([self.tokenizer.encode("<think>\\n", add_special_tokens=False)],
                                        device=input_tensor.device)
            input_tensor = torch.cat([input_tensor, token_thinks], dim=1)

        generation_config = GenerationConfig(
            temperature=temperature,
            top_p=top_p,
            max_new_tokens=max_tokens,
            do_sample=True  # Ensure do_sample is True if using temperature or top_p
        )

        generated = prefill_and_generate(
            self.model,
            self.tokenizer,
            input_tensor.cuda(),
            max_tokens,
            self.use_cuda_graph,
            self.mode,
            force_think
        )

        # Convert token IDs to text
        generated_text = self.tokenizer.decode(generated, skip_special_tokens=True)

        return {
            "choices": [{
                "message": {
                    "role": "assistant",
                    "content": generated_text
                }
            }],
            "usage": {
                "prompt_tokens": input_tensor.shape[1],
                "completion_tokens": len(generated),
                "total_tokens": input_tensor.shape[1] + len(generated)
            }
        }

class ChatMessage(BaseModel):
    role: str
    content: str

class ChatCompletionRequest(BaseModel):
    messages: List[ChatMessage]  # 确保 messages 是 Pydantic 模型实例的列表
    model: str = "default-model"
    temperature: Optional[float] = 0.7
    top_p: Optional[float] = 0.9
    max_tokens: Optional[int] = 1000
    stream: Optional[bool] = False
    force_think: Optional[bool] = True

class ChatCompletionResponse(BaseModel):
    id: str = "chatcmpl-default"
    object: str = "chat.completion"
    created: int = 0
    model: str = "default-model"
    choices: List[Dict[str, Any]]
    usage: Dict[str, int]

app = FastAPI(title="KVCache.AI API Server")

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = f"{process_time:.4f}s"
    return response

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def chat_completion(request: ChatCompletionRequest):
    try:
        # 如果 messages 是 Pydantic 模型实例列表,使用 model_dump
        messages = [m.model_dump() for m in request.messages]
        response = chat_model.create_chat_completion(
            messages=messages,
            temperature=request.temperature,
            max_tokens=request.max_tokens,
            top_p=request.top_p,
            force_think=request.force_think
        )

        return {
            "id": f"chatcmpl-{int(time.time())}",
            "object": "chat.completion",
            "created": int(time.time()),
            "model": request.model,
            "choices": [{
                "index": 0,
                "message": {
                    "role": "assistant",
                    "content": response['choices'][0]['message']['content']
                },
                "finish_reason": "stop"
            }],
            "usage": response['usage']
        }
    except Exception as e:
        logger.error(f"API Error: {str(e)}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Internal server error: {str(e)}"
        )

def create_app(model_path: str, gguf_path: str, cpu_infer:int, optimize_rule_path: Optional[str] = None):
    global chat_model
    chat_model = OpenAIChat(
        model_path=model_path,
        gguf_path=gguf_path,
        optimize_rule_path=optimize_rule_path,
        cpu_infer=cpu_infer
    )
    return app

def main():
    parser = argparse.ArgumentParser(description="KVCache.AI API Server")
    parser.add_argument("--model_path", type=str, required=True, help="HuggingFace模型路径")
    parser.add_argument("--gguf_path", type=str, required=True, help="GGUF模型文件路径")
    parser.add_argument("--optimize_rule_path", type=str, help="优化规则文件路径")
    parser.add_argument("--port", type=int, default=8000, help="服务端口号")
    parser.add_argument("--cpu_infer", type=int, default=10, help="使用cpu数量")
    parser.add_argument("--host", type=str, default="0.0.0.0", help="绑定地址")
    args = parser.parse_args()

    create_app(
        model_path=args.model_path,
        gguf_path=args.gguf_path,
        optimize_rule_path=args.optimize_rule_path,
        cpu_infer=args.cpu_infer
    )

    uvicorn.run(
        app,
        host=args.host,
        port=args.port,
        loop="uvloop",
        http="httptools",
        timeout_keep_alive=300,
        log_level="info",
        access_log=False
    )

if __name__ == "__main__":
    main()

文件防止位置:

安装依赖:

pip install protobuf uvicorn httptools
pip install uvloop

启动:

python ktransformers/chat_openai.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/dpkj/deepseek/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/

2.使用open-WEBUI进行可视化对接

# 使用Pip下载OPEN-WEBUI

pip install open-webui
# 下载完成后开启服务
open-webui serve
#启动成功如下
在OPEN-WebUI

import os
import json
import requests
from pydantic import BaseModel, Field
from typing import List, Union, Iterator

# Set DEBUG to True to enable detailed logging
DEBUG = False


class Pipe:
    class Valves(BaseModel):
        openai_API_KEY: str = Field(default="none")  # Optional API key if needed
        DEFAULT_MODEL: str = Field(default="DeepSeek-R1")  # Default model identifier

    def __init__(self):
        self.id = "DeepSeek-R1"
        self.type = "manifold"
        self.name = "KT: "
        self.valves = self.Valves(
            **{
                "openai_API_KEY": os.getenv("openai_API_KEY", "none"),
                "DEFAULT_MODEL": os.getenv("openai_DEFAULT_MODEL", "DeepSeek-R1"),
            }
        )
        # Self-hosted FastAPI server details
        self.api_url = (
            "http://localhost:8000/v1/chat/completions"  # FastAPI server endpoint
        )
        self.headers = {"Content-Type": "application/json"}

    def get_openai_models(self):
        """Return available models - for openai we'll return a fixed list"""
        return [{"id": "KT", "name": "DeepSeek-R1"}]

    def pipes(self) -> List[dict]:
        return self.get_openai_models()

    def pipe(self, body: dict) -> Union[str, Iterator[str]]:
        try:
            # Use default model ID since OpenAI has a single endpoint
            model_id = self.valves.DEFAULT_MODEL
            messages = []

            # Process messages including system, user, and assistant messages
            for message in body["messages"]:
                if isinstance(message.get("content"), list):
                    # For OpenAI, we'll join multiple content parts into a single text
                    text_parts = []
                    for content in message["content"]:
                        if content["type"] == "text":
                            text_parts.append(content["text"])
                        elif content["type"] == "image_url":
                            # OpenAI might not support image inputs - add a note about the image
                            text_parts.append(f"[Image: {content['image_url']['url']}]")
                    messages.append(
                        {"role": message["role"], "content": "".join(text_parts)}
                    )
                else:
                    # Handle simple text messages
                    messages.append(
                        {"role": message["role"], "content": message["content"]}
                    )

            if DEBUG:
                print("FastAPI API request:")
                print(" Model:", model_id)
                print(" Messages:", json.dumps(messages, indent=2))

            # Prepare the API call parameters
            payload = {
                "model": model_id,
                "messages": messages,
                "temperature": body.get("temperature", 0.7),
                "top_p": body.get("top_p", 0.9),
                "max_tokens": body.get("max_tokens", 8192),
                "stream": body.get("stream", True),
            }

            # Add stop sequences if provided
            if body.get("stop"):
                payload["stop"] = body["stop"]

            # Sending request to local FastAPI server
            if body.get("stream", False):
                # Streaming response
                def stream_generator():
                    try:
                        response = requests.post(
                            self.api_url,
                            json=payload,
                            headers=self.headers,
                            stream=True,
                        )
                        for line in response.iter_lines():
                            if line:
                                yield line.decode("utf-8")
                    except Exception as e:
                        if DEBUG:
                            print(f"Streaming error: {e}")
                        yield f"Error during streaming: {str(e)}"

                return stream_generator()
            else:
                # Regular response
                response = requests.post(
                    self.api_url, json=payload, headers=self.headers
                )
                if response.status_code == 200:
                    generated_content = (
                        response.json()
                        .get("choices", [{}])[0]
                        .get("message", {})
                        .get("content", "")
                    )
                    return generated_content
                else:
                    return f"Error: {response.status_code}, {response.text}"
        except Exception as e:
            if DEBUG:
                print(f"Error in pipe method: {e}")
            return f"Error: {e}"

    def health_check(self) -> bool:
        """Check if the OpenAI API (local FastAPI service) is accessible"""
        try:
            # Simple health check with a basic prompt
            response = requests.post(
                self.api_url,
                json={
                    "model": self.valves.DEFAULT_MODEL,
                    "messages": [{"role": "user", "content": "Hello"}],
                    "max_tokens": 5,
                },
                headers=self.headers,
            )
            return response.status_code == 200
        except Exception as e:
            if DEBUG:
                print(f"Health check failed: {e}")
            return False

完~
<script src="chrome-extension://bincmiainjofjnhchmcalkanjebghoen/aiscripts/script-main.js"></script>

http://www.kler.cn/a/555342.html

相关文章:

  • MySQL中 undolog和redolog区别
  • Ubuntu 下 nginx-1.24.0 源码分析 - ngx_palloc_small 函数
  • 【Go并发编程】Channel进阶:实现高性能消息队列的5种模式
  • MySQL 视图入门
  • 向量的点乘的几何意义
  • Wireshark使用介绍
  • debezium专栏文章目录
  • Kubernetes 使用 Kube-Prometheus 构建指标监控 +飞书告警
  • 未来AI方向落地场景:小语言模型,super_private_agent
  • 深度学习之图像回归(一)
  • Linux-ubuntu系统移植之Uboot启动流程
  • 使用open-webui+deepseek构建本地AI知识库
  • 黑马Javascript基础02
  • 面向架构评估的质量属性
  • 精读解析:华为MPP营销计划流程培训课件
  • el-input无法输入0.0001的小数,自动转换为0在vue3中的bug
  • 机器学习数学基础:29.t检验
  • 面试编程题
  • 自然语言处理入门1——单词的表示和距离
  • 在 Visual Studio Code (VSCode) 中创建 React 项目