chat2db调用ollama实现数据库的操作。
只试了mysql的调用。
其它的我也不用,本来想充钱算了。最后一看单位是美刀。就放弃了这分心。于是折腾了一下。
本地运行chat2db 及chat2db ui
https://gitee.com/ooooinfo/Chat2DB
clone 后运行起来 chat2db的java端,我现在搞不清这一个项目是有没有链接到数据库里去。
在idea项目中运行
前端在:chat2db-client中。我的环境是 node 20 ,yarn , 注意可能需要 yarn add electron
直接运行:yarn start
安装ollama 以及模型 qwen2.5 这里我也不懂。不知装那一个好。
https://ollama.com/
在powershell下运行:ollama run qwen2.5 或 ollama pull qwen2.5
最终你要保证ollama启运。
仿一下openai的接口 调用ollama 提供给chat2db:
chat2db中这样设置,所以需要我自己写一个app.py 去做一下代理请求ollama,不是我不想写自定义,主要是总不成功。不如直接仿openai .
app.py的部分代码。
我用的conda 创建的3.9的环境:
requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
httpx==0.25.1
tenacity==8.2.3
backoff
相关的app.py的代码:
from fastapi import FastAPI, HTTPException, Request, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
import httpx
import uvicorn
import traceback
import sys
import time
import json
import asyncio
import re
import backoff # 确保已安装 backoff 库
app = FastAPI(
title="Ollama API Adapter",
description="An adapter for Ollama API that mimics OpenAI API format",
version="1.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
API_KEY = "sk-123456"
OLLAMA_BASE_URL = "http://127.0.0.1:11434"
DEFAULT_MODEL = "qwen2.5:latest"
def verify_api_key(request: Request):
authorization: str = request.headers.get('authorization')
if not authorization:
raise HTTPException(status_code=401, detail="Authorization header is missing.")
token_type, _, token = authorization.partition(' ')
if token_type.lower() != 'bearer' or token != API_KEY:
raise HTTPException(status_code=401, detail="Unauthorized: API Key is invalid or missing.")
@app.get("/v1", dependencies=[Depends(verify_api_key)])
async def root():
"""
Root endpoint that returns API information
"""
return {
"version": "1.0.0",
"status": "ok",
"endpoints": [
"/v1/chat/completions",
"/v1/models",
"/health"
]
}
@app.get("/v1/models", dependencies=[Depends(verify_api_key)])
async def list_models():
"""
List available models
"""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{OLLAMA_BASE_URL}/api/tags")
if response.status_code == 200:
models = response.json().get("models", [])
return {
"data": [
{
"id": model["name"],
"object": "model",
"created": 0,
"owned_by": "ollama"
}
for model in models
]
}
else:
raise HTTPException(status_code=503, detail="Ollama service unavailable")
except Exception as e:
print(f"Error listing models: {str(e)}")
raise HTTPException(status_code=503, detail=str(e))
@app.get("/health", dependencies=[Depends(verify_api_key)])
async def health_check():
"""
健康检查接口
"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{OLLAMA_BASE_URL}/api/tags")
if response.status_code == 200:
print("Ollama service is healthy.")
return {"status": "healthy", "message": "服务运行正常"}
else:
print("Ollama service is not healthy.")
raise HTTPException(status_code=503, detail="Ollama 服务不可用")
except httpx.HTTPStatusError as exc:
print(f"HTTP error occurred: {exc.response.status_code}")
raise HTTPException(status_code=exc.response.status_code, detail=str(exc))
except httpx.RequestError as exc:
print(f"An error occurred while requesting {exc.request.url!r}.")
raise HTTPException(status_code=500, detail=str(exc))
async def generate_sse_response(content):
# 提取 SQL 语句并添加换行
sql_match = re.search(r'```sql\n(.*?)\n```', content, re.DOTALL)
sql = sql_match.group(1).strip() if sql_match else content
# 添加换行符
formatted_content = f"{sql}\n"
# 构造 OpenAI API 格式的响应
response_data = {
"id": f"chatcmpl-{int(time.time())}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "gpt-3.5-turbo",
"choices": [{
"delta": {
"content": formatted_content # 使用带换行的内容
},
"finish_reason": None,
"index": 0
}]
}
# 发送主要内容
yield f"data: {json.dumps(response_data, ensure_ascii=False)}\n\n"
# 发送结束消息
finish_response = {
"id": f"chatcmpl-{int(time.time())}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "gpt-3.5-turbo",
"choices": [{
"delta": {},
"finish_reason": "stop",
"index": 0
}]
}
yield f"data: {json.dumps(finish_response, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
# 重试策略装饰器
@backoff.on_exception(backoff.expo, httpx.ReadTimeout, max_tries=5, max_time=300)
async def send_request(ollama_request):
timeout_config = httpx.Timeout(10.0, read=120.0) # 连接超10秒,读取超时120秒
async with httpx.AsyncClient(timeout=timeout_config) as client:
try:
response = await client.post(
f"{OLLAMA_BASE_URL}/api/chat",
json=ollama_request
)
print(f"Response received with status {response.status_code}")
return response
except httpx.RequestError as exc:
print(f"An error occurred while requesting {exc.request.url!r}.")
raise HTTPException(status_code=500, detail=str(exc))
@app.post("/v1/chat/completions", dependencies=[Depends(verify_api_key)])
@app.post("/chat/completions", dependencies=[Depends(verify_api_key)])
@app.post("/", dependencies=[Depends(verify_api_key)])
async def chat_completions(request: Request):
try:
body = await request.json()
messages = body.get("messages", [])
stream = body.get("stream", True)
print(f"Received request with body: {body}") # 使用 print 打印请求体
ollama_request = {
"model": DEFAULT_MODEL,
"messages": messages,
"stream": False
}
response = await send_request(ollama_request)
print(f"Received response: {response.text}") # 使用 print 打印响应文本
if response.status_code != 200:
print(f"Failed to get response from model, status code: {response.status_code}")
raise HTTPException(status_code=400, detail="Failed to get response from model")
ollama_response = response.json()
content = ollama_response.get("message", {}).get("content", "")
print(f"Processed content: {content}") # 使用 print 打印处理后的内容
if not stream:
result = {
"id": f"chatcmpl-{int(time.time())}",
"object": "chat.completion",
"created": int(time.time()),
"model": DEFAULT_MODEL,
"choices": [{
"message": {
"role": "database developer and expert",
"content": content
},
"finish_reason": "stop",
"index": 0
}]
}
print(f"Returning non-stream response: {result}") # 使用 print 打印非流响应
return result
headers = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
return StreamingResponse(
generate_sse_response(content),
media_type="text/event-stream",
headers=headers
)
except json.JSONDecodeError as e:
print(f"JSON decoding error: {str(e)}")
return JSONResponse(status_code=400, content={"message": "Invalid JSON data"})
except Exception as e:
print(f"Error during chat completions: {str(e)}")
print(traceback.format_exc()) # 使用 print 打印堆栈跟踪
return JSONResponse(
status_code=500,
content={"message": "Internal server error"}
)
if __name__ == "__main__":
print("Starting server on 0.0.0.0:8080")
uvicorn.run(app, host="0.0.0.0", port=8080)
上面代码装key及model都写死,所以你一下要先下载下来相关的模型 。
python app.py
再注意以下本置:
在chat2db做好链接,再输入你的提示词。见下面效果:
响应速度几秒钟,当时看自己电脑响应速度了。都不花钱了,就不要什么自行车了。