Ubuntu部署ktransformers
准备工作
一台服务器
CPU:500G
GPU:48G(NVIDIA4090)
系统:Ubuntu20.04(github的文档好像用的是22.04)
第一步:下载权重文件
1.下载hfd
wget https://hf-mirror.com/hfd/hfd.sh
chmod a+x hfd.sh
2.设置环境变量
export HF_ENDPOINT=https://hf-mirror.com
3.下载模型(需要梯子,需要带上huggingface的token)
./hfd.sh gpt2
4.下载数据集(需要梯子,需要带上huggingface的token)
./hfd.sh wikitext --dataset
5.下载大文件(需要梯子,文件很大,大约四五百G)
./hfd.sh unsloth/DeepSeek-R1-GGUF --include DeepSeek-R1-Q4_K_M/*
第二步:拉代码,编译代码
1.使用Anaconda3安装Python3.11
conda create --name ktransformers python=3.11
conda activate ktransformers
conda install -c conda-forge libstdcxx-ng
2.安装其他依赖
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip3 install packaging ninja cpufeature numpy
sudo add-apt-repository ppa: ubuntu-toolchain-r/test
sudo apt-get update
sudo apt-get install --only-upgrade libstdc++6
pip install flash-attn --no-build-isolation
3.查看显卡版本及cuda版本
以下两条指令显示的CUDA版本需要一致,若不一致,系统会以nvcc --version的为准
nvcc --version
nvidia-smi
4.拉代码
git clone https://github.com/kvcache-ai/ktransformers.git
cd ktransformers
git submodule init
git submodule update
5.编译
export USE_NUMA=1
make dev_install
第三部:运行
python ktransformers/local_chat.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/dpkj/deepseek/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/ --cpu_infer 50 --cache_lens 1536 --max_new_tokens 8192
# --model_path:模型位置,不需要修改
# --gguf_path:前面下载的大文件,模型文件位置,按照实际情况而定
# --cpu_infer:CPU占用,单位百分比,如果服务器不死DDR5双路CPU,可以适量调低此占比
其他启动参数
python -m transformers.local_chat --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/ --cpu_infer 53 --cache_lens 1536
python ./transformers/local_chat.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/shadeform/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 53 --cache_lens 1536 --optimize_config_path transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin.yaml
python -m transformers.local_chat --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/autodi-tmp/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 128 --cache_lens 1536 --max_new_tokens 8192 --optimize_config_path ./transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin-4.yaml
transformers --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/autodi-tmp/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 65 --cache_lens 1536 --max_new_tokens 8192 --port 6006 --optimize_config_path /transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin-4.yaml
curl -X 'POST"
"http://localhost:6006/v1/chat/completions'\
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d'{
"messages": [
"content": "tell a joke",
"role": "user"
],
"model": "ktranformers-model",
"stream": true
}'
外传
1. 使用API方式调用
新建文件:chat_openai.py
import argparse
import uvicorn
from typing import List, Dict, Optional, Any
from fastapi import FastAPI, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import os
import sys
import time
from fastapi import Request
from fastapi.responses import StreamingResponse, JSONResponse
import json
import logging# 设置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)project_dir = os.path.dirname(os.path.dirname(__file__))
sys.path.insert(0, project_dir)
import torch
from transformers import (
AutoTokenizer,
AutoConfig,
AutoModelForCausalLM,
GenerationConfig,
TextStreamer,
)
from ktransformers.optimize.optimize import optimize_and_load_gguf
from ktransformers.models.modeling_deepseek import DeepseekV2ForCausalLM
from ktransformers.models.modeling_qwen2_moe import Qwen2MoeForCausalLM
from ktransformers.models.modeling_deepseek_v3 import DeepseekV3ForCausalLM
from ktransformers.models.modeling_llama import LlamaForCausalLM
from ktransformers.models.modeling_mixtral import MixtralForCausalLM
from ktransformers.util.utils import prefill_and_generate
from ktransformers.server.config.config import Configcustom_models = {
"DeepseekV2ForCausalLM": DeepseekV2ForCausalLM,
"DeepseekV3ForCausalLM": DeepseekV3ForCausalLM,
"Qwen2MoeForCausalLM": Qwen2MoeForCausalLM,
"LlamaForCausalLM": LlamaForCausalLM,
"MixtralForCausalLM": MixtralForCausalLM,
}ktransformer_rules_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "optimize", "optimize_rules")
default_optimize_rules = {
"DeepseekV2ForCausalLM": os.path.join(ktransformer_rules_dir, "DeepSeek-V2-Chat.yaml"),
"DeepseekV3ForCausalLM": os.path.join(ktransformer_rules_dir, "DeepSeek-V3-Chat.yaml"),
"Qwen2MoeForCausalLM": os.path.join(ktransformer_rules_dir, "Qwen2-57B-A14B-Instruct.yaml"),
"LlamaForCausalLM": os.path.join(ktransformer_rules_dir, "Internlm2_5-7b-Chat-1m.yaml"),
"MixtralForCausalLM": os.path.join(ktransformer_rules_dir, "Mixtral.yaml"),
}# 全局变量,存储初始化后的模型
chat_model = Noneclass OpenAIChat:
def __init__(
self,
model_path: str,
optimize_rule_path: str = None,
gguf_path: str = None,
cpu_infer: int = Config().cpu_infer,
use_cuda_graph: bool = True,
mode: str = "normal",
):
torch.set_grad_enabled(False)
Config().cpu_infer = cpu_inferself.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
self.streamer = TextStreamer(self.tokenizer, skip_prompt=True) if not Config().cpu_infer else None
if mode == 'long_context':
assert config.architectures[0] == "LlamaForCausalLM", "Only LlamaForCausalLM supports long_context mode"
torch.set_default_dtype(torch.float16)
else:
torch.set_default_dtype(config.torch_dtype)with torch.device("meta"):
if config.architectures[0] in custom_models:
if "Qwen2Moe" in config.architectures[0]:
config._attn_implementation = "flash_attention_2"
if "Llama" in config.architectures[0]:
config._attn_implementation = "eager"
if "Mixtral" in config.architectures[0]:
config._attn_implementation = "flash_attention_2"
model = custom_models[config.architectures[0]](config)
else:
model = AutoModelForCausalLM.from_config(
config, trust_remote_code=True, attn_implementation="flash_attention_2"
)if optimize_rule_path is None:
if config.architectures[0] in default_optimize_rules:
optimize_rule_path = default_optimize_rules[config.architectures[0]]optimize_and_load_gguf(model, optimize_rule_path, gguf_path, config)
try:
model.generation_config = GenerationConfig.from_pretrained(model_path)
except:
model.generation_config = GenerationConfig(
max_length=128,
temperature=0.7,
top_p=0.9,
do_sample=True
)
if model.generation_config.pad_token_id is None:
model.generation_config.pad_token_id = model.generation_config.eos_token_id
model.eval()
self.model = model
self.use_cuda_graph = use_cuda_graph
self.mode = mode
logger.info("Model loaded successfully!")def create_chat_completion(
self,
messages: List[Dict[str, str]],
temperature: float = 0.7,
max_tokens: int = 1000,
top_p: float = 0.9,
force_think: bool = False,
) -> Dict:
input_tensor = self.tokenizer.apply_chat_template(
messages, add_generation_prompt=True, return_tensors="pt"
)
if force_think:
token_thinks = torch.tensor([self.tokenizer.encode("<think>\\n", add_special_tokens=False)],
device=input_tensor.device)
input_tensor = torch.cat([input_tensor, token_thinks], dim=1)generation_config = GenerationConfig(
temperature=temperature,
top_p=top_p,
max_new_tokens=max_tokens,
do_sample=True # Ensure do_sample is True if using temperature or top_p
)generated = prefill_and_generate(
self.model,
self.tokenizer,
input_tensor.cuda(),
max_tokens,
self.use_cuda_graph,
self.mode,
force_think
)# Convert token IDs to text
generated_text = self.tokenizer.decode(generated, skip_special_tokens=True)return {
"choices": [{
"message": {
"role": "assistant",
"content": generated_text
}
}],
"usage": {
"prompt_tokens": input_tensor.shape[1],
"completion_tokens": len(generated),
"total_tokens": input_tensor.shape[1] + len(generated)
}
}class ChatMessage(BaseModel):
role: str
content: strclass ChatCompletionRequest(BaseModel):
messages: List[ChatMessage] # 确保 messages 是 Pydantic 模型实例的列表
model: str = "default-model"
temperature: Optional[float] = 0.7
top_p: Optional[float] = 0.9
max_tokens: Optional[int] = 1000
stream: Optional[bool] = False
force_think: Optional[bool] = Trueclass ChatCompletionResponse(BaseModel):
id: str = "chatcmpl-default"
object: str = "chat.completion"
created: int = 0
model: str = "default-model"
choices: List[Dict[str, Any]]
usage: Dict[str, int]app = FastAPI(title="KVCache.AI API Server")
@app.get("/health")
async def health_check():
return {"status": "healthy"}@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = f"{process_time:.4f}s"
return responseapp.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def chat_completion(request: ChatCompletionRequest):
try:
# 如果 messages 是 Pydantic 模型实例列表,使用 model_dump
messages = [m.model_dump() for m in request.messages]
response = chat_model.create_chat_completion(
messages=messages,
temperature=request.temperature,
max_tokens=request.max_tokens,
top_p=request.top_p,
force_think=request.force_think
)return {
"id": f"chatcmpl-{int(time.time())}",
"object": "chat.completion",
"created": int(time.time()),
"model": request.model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": response['choices'][0]['message']['content']
},
"finish_reason": "stop"
}],
"usage": response['usage']
}
except Exception as e:
logger.error(f"API Error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error: {str(e)}"
)def create_app(model_path: str, gguf_path: str, cpu_infer:int, optimize_rule_path: Optional[str] = None):
global chat_model
chat_model = OpenAIChat(
model_path=model_path,
gguf_path=gguf_path,
optimize_rule_path=optimize_rule_path,
cpu_infer=cpu_infer
)
return appdef main():
parser = argparse.ArgumentParser(description="KVCache.AI API Server")
parser.add_argument("--model_path", type=str, required=True, help="HuggingFace模型路径")
parser.add_argument("--gguf_path", type=str, required=True, help="GGUF模型文件路径")
parser.add_argument("--optimize_rule_path", type=str, help="优化规则文件路径")
parser.add_argument("--port", type=int, default=8000, help="服务端口号")
parser.add_argument("--cpu_infer", type=int, default=10, help="使用cpu数量")
parser.add_argument("--host", type=str, default="0.0.0.0", help="绑定地址")
args = parser.parse_args()create_app(
model_path=args.model_path,
gguf_path=args.gguf_path,
optimize_rule_path=args.optimize_rule_path,
cpu_infer=args.cpu_infer
)uvicorn.run(
app,
host=args.host,
port=args.port,
loop="uvloop",
http="httptools",
timeout_keep_alive=300,
log_level="info",
access_log=False
)if __name__ == "__main__":
main()
文件防止位置:
安装依赖:
pip install protobuf uvicorn httptools
pip install uvloop
启动:
python ktransformers/chat_openai.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/dpkj/deepseek/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/
2.使用open-WEBUI进行可视化对接
# 使用Pip下载OPEN-WEBUI
pip install open-webui# 下载完成后开启服务open-webui serve





import os
import json
import requests
from pydantic import BaseModel, Field
from typing import List, Union, Iterator# Set DEBUG to True to enable detailed logging
DEBUG = False
class Pipe:
class Valves(BaseModel):
openai_API_KEY: str = Field(default="none") # Optional API key if needed
DEFAULT_MODEL: str = Field(default="DeepSeek-R1") # Default model identifierdef __init__(self):
self.id = "DeepSeek-R1"
self.type = "manifold"
self.name = "KT: "
self.valves = self.Valves(
**{
"openai_API_KEY": os.getenv("openai_API_KEY", "none"),
"DEFAULT_MODEL": os.getenv("openai_DEFAULT_MODEL", "DeepSeek-R1"),
}
)
# Self-hosted FastAPI server details
self.api_url = (
"http://localhost:8000/v1/chat/completions" # FastAPI server endpoint
)
self.headers = {"Content-Type": "application/json"}def get_openai_models(self):
"""Return available models - for openai we'll return a fixed list"""
return [{"id": "KT", "name": "DeepSeek-R1"}]def pipes(self) -> List[dict]:
return self.get_openai_models()def pipe(self, body: dict) -> Union[str, Iterator[str]]:
try:
# Use default model ID since OpenAI has a single endpoint
model_id = self.valves.DEFAULT_MODEL
messages = []# Process messages including system, user, and assistant messages
for message in body["messages"]:
if isinstance(message.get("content"), list):
# For OpenAI, we'll join multiple content parts into a single text
text_parts = []
for content in message["content"]:
if content["type"] == "text":
text_parts.append(content["text"])
elif content["type"] == "image_url":
# OpenAI might not support image inputs - add a note about the image
text_parts.append(f"[Image: {content['image_url']['url']}]")
messages.append(
{"role": message["role"], "content": "".join(text_parts)}
)
else:
# Handle simple text messages
messages.append(
{"role": message["role"], "content": message["content"]}
)if DEBUG:
print("FastAPI API request:")
print(" Model:", model_id)
print(" Messages:", json.dumps(messages, indent=2))# Prepare the API call parameters
payload = {
"model": model_id,
"messages": messages,
"temperature": body.get("temperature", 0.7),
"top_p": body.get("top_p", 0.9),
"max_tokens": body.get("max_tokens", 8192),
"stream": body.get("stream", True),
}# Add stop sequences if provided
if body.get("stop"):
payload["stop"] = body["stop"]# Sending request to local FastAPI server
if body.get("stream", False):
# Streaming response
def stream_generator():
try:
response = requests.post(
self.api_url,
json=payload,
headers=self.headers,
stream=True,
)
for line in response.iter_lines():
if line:
yield line.decode("utf-8")
except Exception as e:
if DEBUG:
print(f"Streaming error: {e}")
yield f"Error during streaming: {str(e)}"return stream_generator()
else:
# Regular response
response = requests.post(
self.api_url, json=payload, headers=self.headers
)
if response.status_code == 200:
generated_content = (
response.json()
.get("choices", [{}])[0]
.get("message", {})
.get("content", "")
)
return generated_content
else:
return f"Error: {response.status_code}, {response.text}"
except Exception as e:
if DEBUG:
print(f"Error in pipe method: {e}")
return f"Error: {e}"def health_check(self) -> bool:
"""Check if the OpenAI API (local FastAPI service) is accessible"""
try:
# Simple health check with a basic prompt
response = requests.post(
self.api_url,
json={
"model": self.valves.DEFAULT_MODEL,
"messages": [{"role": "user", "content": "Hello"}],
"max_tokens": 5,
},
headers=self.headers,
)
return response.status_code == 200
except Exception as e:
if DEBUG:
print(f"Health check failed: {e}")
return False

