SQLAlchemy
SQLAlchemy 系统性开发文档(扩展版)
目录
- 简介
- 核心概念
- Engine
- Session
- Models (ORM 映射)
- SQLAlchemy 的工作流程
- 使用 SQLAlchemy 进行查询
- 基础查询
- 过滤和条件
- 排序和分页
- 连接查询
- 结合 FastAPI 和 SQLAlchemy
- 集成 SQLAlchemy 与 FastAPI
- 依赖注入
- 使用 SQLModel
- 实践示例
- 定义模型
- 创建数据库会话
- 执行查询
- 详细分析
get_chatmessage
函数
- 最佳实践
- 资源与学习材料
简介
SQLAlchemy 是一个功能强大的 Python SQL 工具包和对象关系映射(ORM)库。它提供了全功能的 SQL 语句构建器和一个 ORM,使得 Python 开发者可以更方便地与数据库交互,而无需编写原生的 SQL 语句。
SQLModel 是一个基于 SQLAlchemy 和 Pydantic 的新兴库,旨在简化数据模型的定义和验证,特别适用于 FastAPI 项目。SQLModel 结合了 SQLAlchemy 的 ORM 功能和 Pydantic 的数据验证能力。
核心概念
Engine
-
定义:
Engine
是 SQLAlchemy 连接数据库的核心对象。它负责管理与数据库的连接池,并执行 SQL 语句。 -
创建
:
from sqlalchemy import create_engine DATABASE_URL = "postgresql://user:password@localhost/mydatabase" engine = create_engine(DATABASE_URL)
Session
-
定义:
Session
是用于与数据库进行交互的高级接口。它管理 ORM 对象的生命周期,并处理事务。 -
创建:
from sqlalchemy.orm import sessionmaker SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
-
使用:
session = SessionLocal() try: # 执行数据库操作 pass finally: session.close()
Models (ORM 映射)
-
定义:模型是 Python 类,它们映射到数据库中的表。每个模型类代表一个表,每个类属性代表一个列。
-
创建:
from sqlalchemy import Column, Integer, String from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True, index=True) name = Column(String, index=True) email = Column(String, unique=True, index=True)
SQLAlchemy 的工作流程
- 定义模型:创建 Python 类并映射到数据库表。
- 创建 Engine 和 Session:配置数据库连接并创建会话工厂。
- 执行数据库操作:使用 Session 进行增删改查操作。
- 管理事务:通过 Session 控制事务的提交与回滚。
使用 SQLAlchemy 进行查询
基础查询
-
获取所有记录:
users = session.query(User).all()
-
获取单条记录:
user = session.query(User).first()
过滤和条件
-
使用
filter
方法:users = session.query(User).filter(User.name == "Alice").all()
-
使用多个条件:
users = session.query(User).filter(User.name == "Alice", User.email.like("%@example.com")).all()
排序和分页
-
排序:
users = session.query(User).order_by(User.name.asc()).all()
-
分页:
page = 2 page_size = 10 users = session.query(User).offset((page - 1) * page_size).limit(page_size).all()
连接查询
-
一对多关系: 假设
User
和Address
模型有一对多关系:class Address(Base): __tablename__ = 'addresses' id = Column(Integer, primary_key=True, index=True) email_address = Column(String, nullable=False) user_id = Column(Integer, ForeignKey('users.id'))
-
查询:
from sqlalchemy.orm import relationship class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True, index=True) name = Column(String, index=True) addresses = relationship("Address", back_populates="user") class Address(Base): __tablename__ = 'addresses' id = Column(Integer, primary_key=True, index=True) email_address = Column(String, nullable=False) user_id = Column(Integer, ForeignKey('users.id')) user = relationship("User", back_populates="addresses") # 获取用户及其地址 users = session.query(User).join(Address).filter(Address.email_address.like("%@example.com")).all()
结合 FastAPI 和 SQLAlchemy
集成 SQLAlchemy 与 FastAPI
-
创建数据库会话依赖:
from fastapi import Depends from sqlalchemy.orm import Session def get_db(): db = SessionLocal() try: yield db finally: db.close()
-
在路由中使用依赖:
from fastapi import APIRouter router = APIRouter() @router.get("/users/") def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)): users = db.query(User).offset(skip).limit(limit).all() return users
依赖注入
-
依赖注入的概念:允许你在函数调用时动态地提供某些参数。FastAPI 使用
Depends
来处理依赖注入,特别适用于数据库会话、认证等场景。 -
使用
Depends
获取数据库会话:
from fastapi import Depends @router.get("/users/") def read_users(db: Session = Depends(get_db)): return db.query(User).all()
使用 SQLModel
SQLModel 是一个建立在 SQLAlchemy 和 Pydantic 之上的库,旨在简化 ORM 和数据验证的使用,特别适用于 FastAPI。
安装
pip install sqlmodel
定义模型
from sqlmodel import SQLModel, Field
class User(SQLModel, table=True):
id: int = Field(default=None, primary_key=True)
name: str
email: str = Field(index=True, nullable=False)
创建数据库会话
from sqlmodel import create_engine, Session
DATABASE_URL = "sqlite:///database.db"
engine = create_engine(DATABASE_URL)
def get_session():
with Session(engine) as session:
yield session
执行查询
from fastapi import FastAPI, Depends
from typing import List
app = FastAPI()
@app.get("/users/", response_model=List[User])
def read_users(session: Session = Depends(get_session)):
users = session.exec(select(User)).all()
return users
实践示例
假设我们要实现一个查询聊天记录的 API。
定义模型
from sqlmodel import SQLModel, Field
from datetime import datetime
class ChatMessage(SQLModel, table=True):
id: int = Field(default=None, primary_key=True)
flow_id: str
chat_id: str
user_id: int
content: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
创建数据库会话
from sqlmodel import create_engine, Session
DATABASE_URL = "sqlite:///database.db"
engine = create_engine(DATABASE_URL, echo=True)
def get_session():
with Session(engine) as session:
yield session
定义 API 路由
from fastapi import FastAPI, Depends, HTTPException
from typing import List, Optional
from sqlmodel import select
import json
app = FastAPI()
# 定义响应模型
class UnifiedResponseModel(BaseModel):
code: int
message: str
data: Any
class ChatMessageRead(BaseModel):
id: int
flow_id: str
chat_id: str
user_id: int
content: str
timestamp: datetime
def resp_200(data):
return UnifiedResponseModel(code=200, message="Success", data=data)
# 假设 UserPayload 和 get_login_user 已定义
@router.get('/chat/history',
response_model=UnifiedResponseModel[List[ChatMessageRead]],
status_code=200) # status_code=200代表请求成功时返回HTTP状态码200
def get_chatmessage(*,
chat_id: str,
flow_id: str,
id: Optional[str] = None,
page_size: Optional[int] = 20,
login_user: UserPayload = Depends(get_login_user),
session: Session = Depends(get_session)):
# 参数校验
if not chat_id or not flow_id:
raise HTTPException(status_code=500, detail='chat_id 和 flow_id 必传参数')
# 构建查询条件
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
ChatMessage.chat_id == chat_id)
if id:
where = where.where(ChatMessage.id < int(id))
# 执行查询
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
# 转换为响应模型
chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
return resp_200(chat_messages)
运行示例
-
创建数据库表:
SQLModel.metadata.create_all(engine)
-
添加示例数据:
with Session(engine) as session: message = ChatMessage(flow_id="flow1", chat_id="chat1", user_id=1, content="Hello World") session.add(message) session.commit()
-
启动 FastAPI 应用:
uvicorn main:app --reload
-
访问 API:
访问
http://127.0.0.1:8000/chat/history?chat_id=chat1&flow_id=flow1
以获取聊天历史记录。
详细分析 get_chatmessage
函数
@router.get('/chat/history',
response_model=UnifiedResponseModel[List[ChatMessageRead]],
status_code=200) # status_code=200代表请求成功时返回HTTP状态码200
def get_chatmessage(*,
chat_id: str,
flow_id: str,
id: Optional[str] = None,
page_size: Optional[int] = 20,
login_user: UserPayload = Depends(get_login_user)):
# 参数校验
if not chat_id or not flow_id:
return {'code': 500, 'message': 'chat_id 和 flow_id 必传参数'}
#
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
ChatMessage.chat_id == chat_id)
if id:
where = where.where(ChatMessage.id < int(id))
with session_getter() as session:
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
return resp_200(db_message)
1. 装饰器和路由定义
@router.get('/chat/history',
response_model=UnifiedResponseModel[List[ChatMessageRead]],
status_code=200)
@router.get('/chat/history')
:定义一个 GET 请求的路由,当客户端访问/chat/history
时,会调用get_chatmessage
函数。response_model=UnifiedResponseModel[List[ChatMessageRead]]
:指定返回的数据模型为UnifiedResponseModel
,其中包含一个List
类型的ChatMessageRead
列表。这有助于 FastAPI 自动生成文档和进行数据验证。status_code=200
:指定成功响应的 HTTP 状态码为 200。
2. 函数定义和参数说明
def get_chatmessage(*,
chat_id: str,
flow_id: str,
id: Optional[str] = None,
page_size: Optional[int] = 20,
login_user: UserPayload = Depends(get_login_user)):
\*
:表示后续参数必须作为关键字参数传入,不能通过位置传递。chat_id: str
:必传参数,表示聊天记录所属的聊天 ID。flow_id: str
:必传参数,表示聊天记录所属的流程 ID。id: Optional[str] = None
:可选参数,用于分页查询,表示从某个特定消息 ID 开始查询。page_size: Optional[int] = 20
:可选参数,表示每次查询返回的记录数量,默认为 20 条。login_user: UserPayload = Depends(get_login_user)
:依赖注入参数,通过Depends
调用get_login_user
函数获取当前登录用户的UserPayload
对象。
3. 参数校验
if not chat_id or not flow_id:
return {'code': 500, 'message': 'chat_id 和 flow_id 必传参数'}
-
功能:检查是否提供了必传参数
chat_id
和flow_id
。 -
逻辑:如果任一参数缺失,返回一个错误响应,提示用户必须提供这两个参数。
-
改进建议:使用
HTTPException
抛出异常,而不是直接返回字典,以便 FastAPI 能够正确处理响应格式。from fastapi import HTTPException if not chat_id or not flow_id: raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
4. 构建查询条件
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
ChatMessage.chat_id == chat_id)
if id:
where = where.where(ChatMessage.id < int(id))
select(ChatMessage)
:使用 SQLAlchemy 的select
函数构建一个查询,选择ChatMessage
表中的记录。.where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id)
:添加查询条件,筛选出flow_id
和chat_id
匹配的记录。if id:
:如果提供了id
参数,则添加一个额外的条件,筛选出id
小于指定值的记录。这通常用于实现分页查询,获取指定消息之前的记录。
5. 执行查询
with session_getter() as session:
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
session_getter()
:一个上下文管理器,用于获取数据库会话。通常,这个函数会创建一个新的数据库会话,并在退出上下文时关闭会话。session.exec(where)
:执行之前构建的查询语句where
。.order_by(ChatMessage.id.desc())
:按id
降序排序,确保获取的是最新的聊天记录。.limit(page_size)
:限制返回的记录数,最多返回page_size
条记录。.all()
:将查询结果转换为一个列表,包含所有符合条件的聊天记录。
6. 返回响应
return resp_200(db_message)
-
resp_200
:一个自定义的响应函数,用于封装标准化的响应格式。它可能类似于:
def resp_200(data): return UnifiedResponseModel(code=200, message="Success", data=data)
-
功能:将查询结果
db_message
封装为UnifiedResponseModel
,并返回给客户端。
详细步骤解析
1. 请求触发
当客户端向 /chat/history
端点发送 GET 请求时,get_chatmessage
函数被调用。请求需要包含 chat_id
和 flow_id
作为查询参数。可选参数 id
和 page_size
可以用于分页查询。
2. 参数校验
函数首先检查 chat_id
和 flow_id
是否存在。如果缺少任一参数,返回一个错误响应,提示客户端提供这两个参数。
3. 构建查询
使用 SQLAlchemy 的 select
构建查询条件,筛选出 flow_id
和 chat_id
匹配的 ChatMessage
记录。如果提供了 id
,则进一步筛选出 id
小于指定值的记录,以支持分页查询。
4. 执行查询
通过数据库会话执行查询,按照 id
降序排序,并限制返回的记录数为 page_size
。最终将查询结果转换为列表 db_message
。
5. 返回响应
将查询结果封装为 UnifiedResponseModel
,并返回给客户端。
SQLAlchemy 相关知识点详细讲解
为了更好地理解 get_chatmessage
函数中使用的 SQLAlchemy 操作,我们将深入讲解相关的 SQLAlchemy 知识点。
1. 定义模型
在 SQLAlchemy 中,模型类(ORM 映射)用于表示数据库表。每个类对应一个表,每个属性对应一个列。例如:
from sqlmodel import SQLModel, Field
from datetime import datetime
class ChatMessage(SQLModel, table=True):
id: int = Field(default=None, primary_key=True)
flow_id: str
chat_id: str
user_id: int
content: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
-
SQLModel
:继承自 SQLAlchemy 和 Pydantic,支持 ORM 和数据验证。 -
table=True
:指示该类映射到一个数据库表。 -
字段定义
:
id
: 主键。flow_id
、chat_id
、user_id
、content
、timestamp
: 表中的其他列。
2. 创建数据库会话
创建数据库连接和会话工厂:
from sqlmodel import create_engine, Session
DATABASE_URL = "sqlite:///database.db"
engine = create_engine(DATABASE_URL, echo=True)
def get_session():
with Session(engine) as session:
yield session
create_engine
:创建一个数据库引擎,用于与数据库交互。Session
:管理数据库会话,用于执行查询和事务。get_session
:一个生成器函数,作为 FastAPI 的依赖注入,确保每个请求使用一个独立的数据库会话,并在请求结束后关闭会话。
3. 构建和执行查询
使用 SQLAlchemy 的查询构建功能,可以灵活地构建复杂的查询条件:
from sqlmodel import select
# 基础查询
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id)
# 添加分页条件
if id:
where = where.where(ChatMessage.id < int(id))
# 执行查询并获取结果
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
select
:构建一个SELECT
查询,选择ChatMessage
表中的记录。.where
:添加过滤条件,筛选出满足flow_id
和chat_id
的记录。.order_by
:指定排序方式,这里是按id
降序排列。.limit
:限制返回的记录数。.exec
和.all()
:执行查询并获取所有符合条件的记录。
4. 使用 Pydantic 模型进行响应
为了确保返回的数据格式符合预期,使用 Pydantic 模型进行数据验证和转换:
from pydantic import BaseModel
from typing import List
from datetime import datetime
class ChatMessageRead(BaseModel):
id: int
flow_id: str
chat_id: str
user_id: int
content: str
timestamp: datetime
ChatMessageRead
:定义一个 Pydantic 模型,描述返回的聊天记录格式。response_model=UnifiedResponseModel[List[ChatMessageRead]]
:在 FastAPI 路由中指定响应模型,确保返回的数据结构符合UnifiedResponseModel
。
实践示例:详细分析 get_chatmessage
函数
函数定义
@router.get('/chat/history',
response_model=UnifiedResponseModel[List[ChatMessageRead]],
status_code=200) # status_code=200代表请求成功时返回HTTP状态码200
def get_chatmessage(*,
chat_id: str,
flow_id: str,
id: Optional[str] = None,
page_size: Optional[int] = 20,
login_user: UserPayload = Depends(get_login_user),
session: Session = Depends(get_session)):
# 参数校验
if not chat_id or not flow_id:
raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
# 构建查询条件
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
ChatMessage.chat_id == chat_id)
if id:
where = where.where(ChatMessage.id < int(id))
# 执行查询
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
# 转换为响应模型
chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
return resp_200(chat_messages)
步骤详解
-
装饰器和路由定义
@router.get('/chat/history', response_model=UnifiedResponseModel[List[ChatMessageRead]], status_code=200)
- 定义一个 GET 请求路由
/chat/history
。 - 指定返回的数据模型为
UnifiedResponseModel
,其中包含一个List
类型的ChatMessageRead
列表。 - 成功响应时返回 HTTP 状态码 200。
- 定义一个 GET 请求路由
-
函数参数和依赖注入
def get_chatmessage(*, chat_id: str, flow_id: str, id: Optional[str] = None, page_size: Optional[int] = 20, login_user: UserPayload = Depends(get_login_user), session: Session = Depends(get_session)):
- 关键字参数:函数使用
*
关键字,要求后续参数必须以关键字形式传入,不能通过位置传递。 chat_id
和flow_id
:必传参数,用于标识要查询的聊天记录所属的聊天和流程。id
:可选参数,用于分页查询,表示从某个特定消息 ID 开始查询。page_size
:可选参数,表示每次查询返回的记录数量,默认值为 20。login_user
:依赖注入参数,通过Depends(get_login_user)
调用get_login_user
函数,获取当前登录用户的UserPayload
对象。session
:依赖注入参数,通过Depends(get_session)
调用get_session
函数,获取当前请求的数据库会话。
- 关键字参数:函数使用
-
参数校验
if not chat_id or not flow_id: raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
- 检查
chat_id
和flow_id
是否存在。 - 如果任一参数缺失,抛出一个 HTTP 400 错误(Bad Request),提示用户必须提供这两个参数。
- 检查
-
构建查询条件
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id) if id: where = where.where(ChatMessage.id < int(id))
- 使用 SQLAlchemy 的
select
函数构建查询,选择ChatMessage
表中的记录。 - 添加
where
条件,筛选出flow_id
和chat_id
匹配的记录。 - 如果提供了
id
参数,进一步筛选出id
小于指定值的记录,以支持分页查询。
- 使用 SQLAlchemy 的
-
执行查询
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
where.order_by(ChatMessage.id.desc())
:按id
降序排序,确保获取的是最新的聊天记录。.limit(page_size)
:限制返回的记录数,最多返回page_size
条记录。session.exec(...)
:执行构建好的查询条件。.all()
:将查询结果转换为列表,包含所有符合条件的聊天记录。
-
转换为响应模型
chat_messages = [ChatMessageRead.from_orm(message) for message in db_message] return resp_200(chat_messages)
ChatMessageRead.from_orm(message)
:将 SQLAlchemy ORM 对象message
转换为 Pydantic 模型ChatMessageRead
。resp_200(chat_messages)
:调用resp_200
函数,将查询结果封装为UnifiedResponseModel
并返回给客户端。
相关 SQLAlchemy 知识点
为了更好地理解 get_chatmessage
函数中的 SQLAlchemy 操作,我们需要详细讲解一些相关的 SQLAlchemy 知识点。
1. select
函数
select
是 SQLAlchemy 的核心查询构建函数,用于创建一个 SELECT
语句。
from sqlalchemy import select
# 基础查询
stmt = select(ChatMessage)
- 功能:创建一个选择所有
ChatMessage
表中的记录的查询。 - 进一步条件:通过
where
方法添加过滤条件。
2. where
方法
where
方法用于添加过滤条件到查询中。
stmt = select(ChatMessage).where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id)
- 多个条件:通过传递多个参数,添加多个
WHERE
条件。 - 逻辑关系:传递多个参数时,默认使用
AND
逻辑关系。
3. 排序和限制
通过 order_by
和 limit
方法,可以对查询结果进行排序和限制返回的记录数。
stmt = stmt.order_by(ChatMessage.id.desc()).limit(page_size)
order_by(ChatMessage.id.desc())
:按id
降序排列。limit(page_size)
:限制返回的记录数,最多返回page_size
条记录。
4. 执行查询
通过 session.exec(stmt)
执行查询,并使用 .all()
方法获取所有符合条件的记录。
db_message = session.exec(stmt).all()
session.exec(stmt)
:执行构建好的查询语句。.all()
:获取所有结果,返回一个列表。
5. 使用 Pydantic 模型进行数据验证
通过 Pydantic 模型,可以将 ORM 对象转换为符合 API 规范的响应模型。
from pydantic import BaseModel
from typing import List
from datetime import datetime
class ChatMessageRead(BaseModel):
id: int
flow_id: str
chat_id: str
user_id: int
content: str
timestamp: datetime
-
ChatMessageRead
:定义一个 Pydantic 模型,用于描述返回的聊天记录格式。 -
from_orm
方法:将 SQLAlchemy ORM 对象转换为 Pydantic 模型。chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
6. 依赖注入和会话管理
通过 FastAPI 的依赖注入机制,可以简化数据库会话的管理,确保每个请求使用独立的会话,并在请求结束后自动关闭会话。
from fastapi import Depends
from sqlmodel import Session
def get_session():
with Session(engine) as session:
yield session
@router.get("/users/")
def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_session)):
users = db.query(User).offset(skip).limit(limit).all()
return users
深入讲解 get_chatmessage
函数中的 SQLAlchemy 操作
为了更深入地理解 get_chatmessage
函数中使用的 SQLAlchemy 操作,以下将逐行解析函数中的关键代码,并解释其背后的 SQLAlchemy 概念。
函数定义和装饰器
@router.get('/chat/history',
response_model=UnifiedResponseModel[List[ChatMessageRead]],
status_code=200)
def get_chatmessage(*,
chat_id: str,
flow_id: str,
id: Optional[str] = None,
page_size: Optional[int] = 20,
login_user: UserPayload = Depends(get_login_user),
session: Session = Depends(get_session)):
@router.get('/chat/history')
:定义一个 GET 请求路由/chat/history
,当客户端访问此 URL 时,get_chatmessage
函数会被调用。response_model=UnifiedResponseModel[List[ChatMessageRead]]
:指定返回的数据模型,用于数据验证和自动生成文档。status_code=200
:指定成功响应的 HTTP 状态码为 200。- 参数列表:
chat_id: str
和flow_id: str
:必传参数,用于标识要查询的聊天记录。id: Optional[str] = None
:可选参数,用于分页查询,表示从某个特定消息 ID 开始查询。page_size: Optional[int] = 20
:可选参数,表示每次查询返回的记录数,默认为 20 条。login_user: UserPayload = Depends(get_login_user)
:依赖注入参数,通过Depends
获取当前登录用户的信息。session: Session = Depends(get_session)
:依赖注入参数,通过Depends
获取当前请求的数据库会话。
参数校验
if not chat_id or not flow_id:
raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
- 功能:确保
chat_id
和flow_id
参数存在。 - 逻辑:如果任一参数缺失,抛出一个 HTTP 400 错误,提示用户必须提供这两个参数。
构建查询条件
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
ChatMessage.chat_id == chat_id)
if id:
where = where.where(ChatMessage.id < int(id))
select(ChatMessage)
:创建一个SELECT
查询,选择ChatMessage
表中的记录。.where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id)
:添加WHERE
条件,筛选出flow_id
和chat_id
匹配的记录。if id:
:如果提供了id
参数,进一步添加一个条件,筛选出id
小于指定值的记录。这有助于实现分页查询,获取某个特定消息之前的记录。
执行查询
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
where.order_by(ChatMessage.id.desc())
:在查询条件where
上添加排序条件,按id
降序排列。这确保返回的是最新的聊天记录。.limit(page_size)
:限制返回的记录数为page_size
条,避免一次性返回过多数据。session.exec(...)
:执行构建好的查询语句。.all()
:获取所有符合条件的记录,返回一个列表db_message
。
转换为响应模型
chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
return resp_200(chat_messages)
ChatMessageRead.from_orm(message)
:将 SQLAlchemy ORM 对象message
转换为 Pydantic 模型ChatMessageRead
。resp_200(chat_messages)
:调用resp_200
函数,将转换后的数据封装为UnifiedResponseModel
,并返回给客户端。
关键 SQLAlchemy 知识点复习
为了更好地理解上述函数中的 SQLAlchemy 操作,以下是一些关键知识点的复习和扩展。
1. select
函数
select
是 SQLAlchemy 中用于构建 SELECT
查询的函数。
-
基本用法:
stmt = select(User)
这将创建一个查询,选择
User
表中的所有记录。 -
添加过滤条件:
stmt = select(User).where(User.name == "Alice")
这将创建一个查询,选择
User
表中name
为 “Alice” 的记录。
2. where
方法
where
方法用于在查询中添加过滤条件。
-
多个条件:
stmt = select(User).where(User.name == "Alice", User.age > 30)
这将创建一个查询,选择
User
表中name
为 “Alice” 且age
大于 30 的记录。 -
动态条件:
if filter_condition: stmt = stmt.where(User.status == filter_condition)
3. 排序和限制
通过 order_by
和 limit
方法,可以对查询结果进行排序和限制返回的记录数。
-
排序:
stmt = stmt.order_by(User.id.desc())
这将按
id
降序排列查询结果。 -
限制返回记录数:
stmt = stmt.limit(10)
这将限制查询结果最多返回 10 条记录。
4. 执行查询
通过 session.exec(stmt)
执行查询,并使用 .all()
或 .first()
获取结果。
-
获取所有结果:
results = session.exec(stmt).all()
-
获取单个结果:
result = session.exec(stmt).first()
5. 使用 Pydantic 模型进行数据验证
通过 Pydantic 模型,可以将 ORM 对象转换为符合 API 规范的响应模型。
-
定义 Pydantic 模型:
from pydantic import BaseModel from datetime import datetime class ChatMessageRead(BaseModel): id: int flow_id: str chat_id: str user_id: int content: str timestamp: datetime
-
从 ORM 对象转换为 Pydantic 模型:
chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
结合 SQLAlchemy 和 FastAPI 的实践示例
为了更好地理解 SQLAlchemy 在 FastAPI 中的应用,我们将通过一个完整的示例来展示如何定义模型、创建数据库会话、执行查询以及返回响应。
1. 定义模型
from sqlmodel import SQLModel, Field
from datetime import datetime
class ChatMessage(SQLModel, table=True):
id: int = Field(default=None, primary_key=True)
flow_id: str
chat_id: str
user_id: int
content: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
ChatMessage
:一个表示聊天记录的模型类,映射到ChatMessage
数据库表。- 字段说明:
id
: 主键。flow_id
、chat_id
、user_id
、content
、timestamp
: 表中的其他列。
2. 创建数据库会话
from sqlmodel import create_engine, Session
DATABASE_URL = "sqlite:///database.db"
engine = create_engine(DATABASE_URL, echo=True)
def get_session():
with Session(engine) as session:
yield session
create_engine
:创建一个数据库引擎,负责连接数据库。Session
:用于管理数据库会话。get_session
:一个生成器函数,作为 FastAPI 的依赖注入,确保每个请求使用一个独立的数据库会话,并在请求结束后自动关闭会话。
3. 定义 API 路由
from fastapi import FastAPI, Depends, HTTPException
from typing import List, Optional
from sqlmodel import select
from pydantic import BaseModel
app = FastAPI()
# 定义响应模型
class UnifiedResponseModel(BaseModel):
code: int
message: str
data: Any
class ChatMessageRead(BaseModel):
id: int
flow_id: str
chat_id: str
user_id: int
content: str
timestamp: datetime
def resp_200(data):
return UnifiedResponseModel(code=200, message="Success", data=data)
# 假设 UserPayload 和 get_login_user 已定义
@app.get('/chat/history',
response_model=UnifiedResponseModel[List[ChatMessageRead]],
status_code=200) # status_code=200代表请求成功时返回HTTP状态码200
def get_chatmessage(*,
chat_id: str,
flow_id: str,
id: Optional[str] = None,
page_size: Optional[int] = 20,
login_user: UserPayload = Depends(get_login_user),
session: Session = Depends(get_session)):
# 参数校验
if not chat_id or not flow_id:
raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
# 构建查询条件
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
ChatMessage.chat_id == chat_id)
if id:
where = where.where(ChatMessage.id < int(id))
# 执行查询
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
# 转换为响应模型
chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
return resp_200(chat_messages)
4. 运行示例
-
创建数据库表:
SQLModel.metadata.create_all(engine)
这将基于
ChatMessage
模型在数据库中创建相应的表结构。 -
添加示例数据:
with Session(engine) as session: message = ChatMessage(flow_id="flow1", chat_id="chat1", user_id=1, content="Hello World") session.add(message) session.commit()
这将向
ChatMessage
表中添加一条示例记录。 -
启动 FastAPI 应用:
uvicorn main:app --reload
这将启动 FastAPI 应用,并监听默认的
8000
端口。 -
访问 API:
访问
http://127.0.0.1:8000/chat/history?chat_id=chat1&flow_id=flow1
以获取聊天历史记录。
最佳实践
- 使用环境变量管理配置:
- 将数据库 URL、密钥等敏感信息存储在环境变量中,而不是硬编码在代码中。
- 遵循 PEP 8 代码规范:
- 保持代码整洁,符合 PEP 8 标准,便于团队协作和维护。
- 使用 Alembic 进行数据库迁移:
- 管理数据库模式的变更,确保数据库结构与 ORM 模型保持一致。
- 处理数据库会话的生命周期:
- 确保每个请求使用独立的数据库会话,并在请求结束后关闭会话,避免资源泄露。
- 优化查询性能:
- 使用适当的索引,避免 N+1 查询问题,合理使用分页和缓存。
- 错误处理:
- 捕获并处理数据库操作中的异常,提供有意义的错误信息给前端。
- 安全性:
- 防止 SQL 注入,确保用户输入的参数被正确过滤和验证。
- 使用 HTTPS 保护数据传输安全。
资源与学习材料
- 官方文档:
- SQLAlchemy 官方文档
- SQLModel 官方文档
- FastAPI 官方文档
- 教程与指南:
- SQLAlchemy 教程
- FastAPI 与 SQLAlchemy 整合教程
- 书籍:
- 《Essential SQLAlchemy》 by Jason Myers 和 Rick Copeland
- 《Mastering SQLAlchemy》 by Rick Copeland
- 社区与支持:
- Stack Overflow SQLAlchemy 标签
- GitHub SQLAlchemy 仓库
详细分析 get_chatmessage
函数
为了更深入地理解 get_chatmessage
函数中使用的 SQLAlchemy 操作,以下将逐行解析函数中的关键代码,并解释其背后的 SQLAlchemy 概念。
函数定义和装饰器
@router.get('/chat/history',
response_model=UnifiedResponseModel[List[ChatMessageRead]],
status_code=200) # status_code=200代表请求成功时返回HTTP状态码200
def get_chatmessage(*,
chat_id: str,
flow_id: str,
id: Optional[str] = None,
page_size: Optional[int] = 20,
login_user: UserPayload = Depends(get_login_user),
session: Session = Depends(get_session)):
-
@router.get('/chat/history')
:定义一个 GET 请求路由/chat/history
,当客户端访问此 URL 时,get_chatmessage
函数会被调用。 -
response_model=UnifiedResponseModel[List[ChatMessageRead]]
:指定返回的数据模型,用于数据验证和自动生成文档。 -
status_code=200
:指定成功响应的 HTTP 状态码为 200。 -
参数列表
:
\*
:表示后续参数必须作为关键字参数传入,不能通过位置传递。chat_id: str
和flow_id: str
:必传参数,用于标识要查询的聊天记录所属的聊天和流程。id: Optional[str] = None
:可选参数,用于分页查询,表示从某个特定消息 ID 开始查询。page_size: Optional[int] = 20
:可选参数,表示每次查询返回的记录数量,默认为 20 条。login_user: UserPayload = Depends(get_login_user)
:依赖注入参数,通过Depends
调用get_login_user
函数,获取当前登录用户的UserPayload
对象。session: Session = Depends(get_session)
:依赖注入参数,通过Depends
调用get_session
函数,获取当前请求的数据库会话。
参数校验
if not chat_id or not flow_id:
raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
- 功能:确保
chat_id
和flow_id
参数存在。 - 逻辑:如果任一参数缺失,抛出一个 HTTP 400 错误,提示用户必须提供这两个参数。
构建查询条件
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
ChatMessage.chat_id == chat_id)
if id:
where = where.where(ChatMessage.id < int(id))
select(ChatMessage)
:使用 SQLAlchemy 的select
函数构建一个查询,选择ChatMessage
表中的记录。.where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id)
:添加查询条件,筛选出flow_id
和chat_id
匹配的记录。if id:
:如果提供了id
参数,则添加一个额外的条件,筛选出id
小于指定值的记录。这通常用于实现分页查询,获取指定消息之前的记录。
执行查询
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
where.order_by(ChatMessage.id.desc())
:在查询条件where
上添加排序条件,按id
降序排列。这确保返回的是最新的聊天记录。.limit(page_size)
:限制返回的记录数为page_size
条,避免一次性返回过多数据。session.exec(...)
:执行构建好的查询语句。.all()
:将查询结果转换为列表,包含所有符合条件的聊天记录。
转换为响应模型
chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
return resp_200(chat_messages)
ChatMessageRead.from_orm(message)
:将 SQLAlchemy ORM 对象message
转换为 Pydantic 模型ChatMessageRead
。resp_200(chat_messages)
:调用resp_200
函数,将查询结果封装为UnifiedResponseModel
,并返回给客户端。
SQLAlchemy 相关知识点深入讲解
为了更好地理解 get_chatmessage
函数中使用的 SQLAlchemy 操作,以下是一些相关的 SQLAlchemy 知识点的详细讲解。
1. select
函数
select
是 SQLAlchemy 的核心查询构建函数,用于创建一个 SELECT
语句。
-
基本用法:
stmt = select(User)
这将创建一个查询,选择
User
表中的所有记录。 -
添加过滤条件:
stmt = select(User).where(User.name == "Alice")
这将创建一个查询,选择
User
表中name
为 “Alice” 的记录。 -
多个条件:
stmt = select(User).where(User.name == "Alice", User.age > 30)
这将创建一个查询,选择
User
表中name
为 “Alice” 且age
大于 30 的记录。
2. where
方法
where
方法用于在查询中添加过滤条件。
-
基本用法:
stmt = select(User).where(User.name == "Alice")
-
多个条件:
stmt = select(User).where(User.name == "Alice", User.age > 30)
-
动态条件:
if filter_condition: stmt = stmt.where(User.status == filter_condition)
3. 排序和限制
通过 order_by
和 limit
方法,可以对查询结果进行排序和限制返回的记录数。
-
排序:
stmt = stmt.order_by(User.id.desc())
这将按
id
降序排列查询结果。 -
限制返回记录数:
stmt = stmt.limit(10)
这将限制查询结果最多返回 10 条记录。
4. 执行查询
通过 session.exec(stmt)
执行查询,并使用 .all()
或 .first()
获取结果。
-
获取所有结果:
results = session.exec(stmt).all()
-
获取单个结果:
result = session.exec(stmt).first()
5. 使用 Pydantic 模型进行数据验证
通过 Pydantic 模型,可以将 ORM 对象转换为符合 API 规范的响应模型。
-
定义 Pydantic 模型:
from pydantic import BaseModel from datetime import datetime class ChatMessageRead(BaseModel): id: int flow_id: str chat_id: str user_id: int content: str timestamp: datetime
-
从 ORM 对象转换为 Pydantic 模型:
chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
6. 依赖注入和会话管理
通过 FastAPI 的依赖注入机制,可以简化数据库会话的管理,确保每个请求使用独立的会话,并在请求结束后自动关闭会话。
from fastapi import Depends
from sqlmodel import Session
def get_session():
with Session(engine) as session:
yield session
@router.get("/users/")
def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_session)):
users = db.query(User).offset(skip).limit(limit).all()
return users
详细步骤解析
以下是对 get_chatmessage
函数中每一行代码的详细解析,结合 SQLAlchemy 和 FastAPI 的概念。
1. 装饰器和路由定义
@router.get('/chat/history',
response_model=UnifiedResponseModel[List[ChatMessageRead]],
status_code=200) # status_code=200代表请求成功时返回HTTP状态码200
- 功能:定义一个 GET 请求路由
/chat/history
,当客户端访问此 URL 时,get_chatmessage
函数会被调用。 - 响应模型:指定返回的数据模型为
UnifiedResponseModel
,其中包含一个List
类型的ChatMessageRead
列表。这有助于 FastAPI 自动生成文档和进行数据验证。 - 状态码:成功响应时返回 HTTP 状态码 200。
2. 函数参数和依赖注入
def get_chatmessage(*,
chat_id: str,
flow_id: str,
id: Optional[str] = None,
page_size: Optional[int] = 20,
login_user: UserPayload = Depends(get_login_user),
session: Session = Depends(get_session)):
\*
:表示后续参数必须作为关键字参数传入,不能通过位置传递。chat_id: str
和flow_id: str
:必传参数,用于标识要查询的聊天记录所属的聊天和流程。id: Optional[str] = None
:可选参数,用于分页查询,表示从某个特定消息 ID 开始查询。page_size: Optional[int] = 20
:可选参数,表示每次查询返回的记录数,默认为 20 条。login_user: UserPayload = Depends(get_login_user)
:依赖注入参数,通过Depends(get_login_user)
调用get_login_user
函数,获取当前登录用户的UserPayload
对象。session: Session = Depends(get_session)
:依赖注入参数,通过Depends(get_session)
调用get_session
函数,获取当前请求的数据库会话。
3. 参数校验
if not chat_id or not flow_id:
raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
- 功能:确保
chat_id
和flow_id
参数存在。 - 逻辑:如果任一参数缺失,抛出一个 HTTP 400 错误,提示用户必须提供这两个参数。
4. 构建查询条件
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
ChatMessage.chat_id == chat_id)
if id:
where = where.where(ChatMessage.id < int(id))
select(ChatMessage)
:使用 SQLAlchemy 的select
函数构建一个查询,选择ChatMessage
表中的记录。.where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id)
:添加查询条件,筛选出flow_id
和chat_id
匹配的记录。if id:
:如果提供了id
参数,则添加一个额外的条件,筛选出id
小于指定值的记录。这通常用于实现分页查询,获取指定消息之前的记录。
5. 执行查询
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
where.order_by(ChatMessage.id.desc())
:在查询条件where
上添加排序条件,按id
降序排列。这确保返回的是最新的聊天记录。.limit(page_size)
:限制返回的记录数为page_size
条,避免一次性返回过多数据。session.exec(...)
:执行构建好的查询语句。.all()
:将查询结果转换为列表,包含所有符合条件的聊天记录。
6. 转换为响应模型
chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
return resp_200(chat_messages)
ChatMessageRead.from_orm(message)
:将 SQLAlchemy ORM 对象message
转换为 Pydantic 模型ChatMessageRead
。resp_200(chat_messages)
:调用resp_200
函数,将查询结果封装为UnifiedResponseModel
,并返回给客户端。