当前位置: 首页 > article >正文

SQLAlchemy

SQLAlchemy 系统性开发文档(扩展版)

目录

  1. 简介
  2. 核心概念
    • Engine
    • Session
    • Models (ORM 映射)
  3. SQLAlchemy 的工作流程
  4. 使用 SQLAlchemy 进行查询
    • 基础查询
    • 过滤和条件
    • 排序和分页
    • 连接查询
  5. 结合 FastAPI 和 SQLAlchemy
    • 集成 SQLAlchemy 与 FastAPI
    • 依赖注入
  6. 使用 SQLModel
  7. 实践示例
    • 定义模型
    • 创建数据库会话
    • 执行查询
    • 详细分析 get_chatmessage 函数
  8. 最佳实践
  9. 资源与学习材料

简介

SQLAlchemy 是一个功能强大的 Python SQL 工具包和对象关系映射(ORM)库。它提供了全功能的 SQL 语句构建器和一个 ORM,使得 Python 开发者可以更方便地与数据库交互,而无需编写原生的 SQL 语句。

SQLModel 是一个基于 SQLAlchemy 和 Pydantic 的新兴库,旨在简化数据模型的定义和验证,特别适用于 FastAPI 项目。SQLModel 结合了 SQLAlchemy 的 ORM 功能和 Pydantic 的数据验证能力。

核心概念

Engine
  • 定义Engine 是 SQLAlchemy 连接数据库的核心对象。它负责管理与数据库的连接池,并执行 SQL 语句。

  • 创建

    from sqlalchemy import create_engine
    
    DATABASE_URL = "postgresql://user:password@localhost/mydatabase"
    engine = create_engine(DATABASE_URL)
    
Session
  • 定义Session 是用于与数据库进行交互的高级接口。它管理 ORM 对象的生命周期,并处理事务。

  • 创建:

    from sqlalchemy.orm import sessionmaker
    
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    
  • 使用:

    session = SessionLocal()
    try:
        # 执行数据库操作
        pass
    finally:
        session.close()
    
Models (ORM 映射)
  • 定义:模型是 Python 类,它们映射到数据库中的表。每个模型类代表一个表,每个类属性代表一个列。

  • 创建:

    from sqlalchemy import Column, Integer, String
    from sqlalchemy.ext.declarative import declarative_base
    
    Base = declarative_base()
    
    class User(Base):
        __tablename__ = 'users'
        id = Column(Integer, primary_key=True, index=True)
        name = Column(String, index=True)
        email = Column(String, unique=True, index=True)
    

SQLAlchemy 的工作流程

  1. 定义模型:创建 Python 类并映射到数据库表。
  2. 创建 Engine 和 Session:配置数据库连接并创建会话工厂。
  3. 执行数据库操作:使用 Session 进行增删改查操作。
  4. 管理事务:通过 Session 控制事务的提交与回滚。

使用 SQLAlchemy 进行查询

基础查询
  • 获取所有记录

    users = session.query(User).all()
    
  • 获取单条记录

    user = session.query(User).first()
    
过滤和条件
  • 使用 filter 方法

    users = session.query(User).filter(User.name == "Alice").all()
    
  • 使用多个条件

    users = session.query(User).filter(User.name == "Alice", User.email.like("%@example.com")).all()
    
排序和分页
  • 排序

    users = session.query(User).order_by(User.name.asc()).all()
    
  • 分页

    page = 2
    page_size = 10
    users = session.query(User).offset((page - 1) * page_size).limit(page_size).all()
    
连接查询
  • 一对多关系: 假设 UserAddress 模型有一对多关系:

    class Address(Base):
        __tablename__ = 'addresses'
        id = Column(Integer, primary_key=True, index=True)
        email_address = Column(String, nullable=False)
        user_id = Column(Integer, ForeignKey('users.id'))
    
  • 查询

    from sqlalchemy.orm import relationship
    
    class User(Base):
        __tablename__ = 'users'
        id = Column(Integer, primary_key=True, index=True)
        name = Column(String, index=True)
        addresses = relationship("Address", back_populates="user")
    
    class Address(Base):
        __tablename__ = 'addresses'
        id = Column(Integer, primary_key=True, index=True)
        email_address = Column(String, nullable=False)
        user_id = Column(Integer, ForeignKey('users.id'))
        user = relationship("User", back_populates="addresses")
    
    # 获取用户及其地址
    users = session.query(User).join(Address).filter(Address.email_address.like("%@example.com")).all()
    

结合 FastAPI 和 SQLAlchemy

集成 SQLAlchemy 与 FastAPI
  1. 创建数据库会话依赖

    from fastapi import Depends
    from sqlalchemy.orm import Session
    
    def get_db():
        db = SessionLocal()
        try:
            yield db
        finally:
            db.close()
    
  2. 在路由中使用依赖

    from fastapi import APIRouter
    
    router = APIRouter()
    
    @router.get("/users/")
    def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
        users = db.query(User).offset(skip).limit(limit).all()
        return users
    
依赖注入
  • 依赖注入的概念:允许你在函数调用时动态地提供某些参数。FastAPI 使用 Depends 来处理依赖注入,特别适用于数据库会话、认证等场景。

  • 使用 Depends 获取数据库会话

    from fastapi import Depends
    
    @router.get("/users/")
    def read_users(db: Session = Depends(get_db)):
        return db.query(User).all()
    

使用 SQLModel

SQLModel 是一个建立在 SQLAlchemy 和 Pydantic 之上的库,旨在简化 ORM 和数据验证的使用,特别适用于 FastAPI。

安装
pip install sqlmodel
定义模型
from sqlmodel import SQLModel, Field

class User(SQLModel, table=True):
    id: int = Field(default=None, primary_key=True)
    name: str
    email: str = Field(index=True, nullable=False)
创建数据库会话
from sqlmodel import create_engine, Session

DATABASE_URL = "sqlite:///database.db"
engine = create_engine(DATABASE_URL)

def get_session():
    with Session(engine) as session:
        yield session
执行查询
from fastapi import FastAPI, Depends
from typing import List

app = FastAPI()

@app.get("/users/", response_model=List[User])
def read_users(session: Session = Depends(get_session)):
    users = session.exec(select(User)).all()
    return users

实践示例

假设我们要实现一个查询聊天记录的 API。

定义模型
from sqlmodel import SQLModel, Field
from datetime import datetime

class ChatMessage(SQLModel, table=True):
    id: int = Field(default=None, primary_key=True)
    flow_id: str
    chat_id: str
    user_id: int
    content: str
    timestamp: datetime = Field(default_factory=datetime.utcnow)
创建数据库会话
from sqlmodel import create_engine, Session

DATABASE_URL = "sqlite:///database.db"
engine = create_engine(DATABASE_URL, echo=True)

def get_session():
    with Session(engine) as session:
        yield session
定义 API 路由
from fastapi import FastAPI, Depends, HTTPException
from typing import List, Optional
from sqlmodel import select
import json

app = FastAPI()

# 定义响应模型
class UnifiedResponseModel(BaseModel):
    code: int
    message: str
    data: Any

class ChatMessageRead(BaseModel):
    id: int
    flow_id: str
    chat_id: str
    user_id: int
    content: str
    timestamp: datetime

def resp_200(data):
    return UnifiedResponseModel(code=200, message="Success", data=data)

# 假设 UserPayload 和 get_login_user 已定义

@router.get('/chat/history',
            response_model=UnifiedResponseModel[List[ChatMessageRead]],
            status_code=200) # status_code=200代表请求成功时返回HTTP状态码200
def get_chatmessage(*,
                    chat_id: str,
                    flow_id: str,
                    id: Optional[str] = None,
                    page_size: Optional[int] = 20,
                    login_user: UserPayload = Depends(get_login_user),
                    session: Session = Depends(get_session)):
    # 参数校验
    if not chat_id or not flow_id:
        raise HTTPException(status_code=500, detail='chat_id 和 flow_id 必传参数')
    
    # 构建查询条件
    where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
                                      ChatMessage.chat_id == chat_id)
    if id:
        where = where.where(ChatMessage.id < int(id))
    
    # 执行查询
    db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
    
    # 转换为响应模型
    chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
    return resp_200(chat_messages)
运行示例
  1. 创建数据库表

    SQLModel.metadata.create_all(engine)
    
  2. 添加示例数据

    with Session(engine) as session:
        message = ChatMessage(flow_id="flow1", chat_id="chat1", user_id=1, content="Hello World")
        session.add(message)
        session.commit()
    
  3. 启动 FastAPI 应用

    uvicorn main:app --reload
    
  4. 访问 API

    访问 http://127.0.0.1:8000/chat/history?chat_id=chat1&flow_id=flow1 以获取聊天历史记录。

详细分析 get_chatmessage 函数

@router.get('/chat/history',
            response_model=UnifiedResponseModel[List[ChatMessageRead]],
            status_code=200) # status_code=200代表请求成功时返回HTTP状态码200
def get_chatmessage(*,
                    chat_id: str,
                    flow_id: str,
                    id: Optional[str] = None,
                    page_size: Optional[int] = 20,
                    login_user: UserPayload = Depends(get_login_user)):
    # 参数校验
    if not chat_id or not flow_id:
        return {'code': 500, 'message': 'chat_id 和 flow_id 必传参数'}
    # 
    where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
                                      ChatMessage.chat_id == chat_id)
    if id:
        where = where.where(ChatMessage.id < int(id))
    with session_getter() as session:
        db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
    return resp_200(db_message)
1. 装饰器和路由定义
@router.get('/chat/history',
            response_model=UnifiedResponseModel[List[ChatMessageRead]],
            status_code=200)
  • @router.get('/chat/history'):定义一个 GET 请求的路由,当客户端访问 /chat/history 时,会调用 get_chatmessage 函数。
  • response_model=UnifiedResponseModel[List[ChatMessageRead]]:指定返回的数据模型为 UnifiedResponseModel,其中包含一个 List 类型的 ChatMessageRead 列表。这有助于 FastAPI 自动生成文档和进行数据验证。
  • status_code=200:指定成功响应的 HTTP 状态码为 200。
2. 函数定义和参数说明
def get_chatmessage(*,
                    chat_id: str,
                    flow_id: str,
                    id: Optional[str] = None,
                    page_size: Optional[int] = 20,
                    login_user: UserPayload = Depends(get_login_user)):
  • \*:表示后续参数必须作为关键字参数传入,不能通过位置传递。
  • chat_id: str:必传参数,表示聊天记录所属的聊天 ID。
  • flow_id: str:必传参数,表示聊天记录所属的流程 ID。
  • id: Optional[str] = None:可选参数,用于分页查询,表示从某个特定消息 ID 开始查询。
  • page_size: Optional[int] = 20:可选参数,表示每次查询返回的记录数量,默认为 20 条。
  • login_user: UserPayload = Depends(get_login_user):依赖注入参数,通过 Depends 调用 get_login_user 函数获取当前登录用户的 UserPayload 对象。
3. 参数校验
if not chat_id or not flow_id:
    return {'code': 500, 'message': 'chat_id 和 flow_id 必传参数'}
  • 功能:检查是否提供了必传参数 chat_idflow_id

  • 逻辑:如果任一参数缺失,返回一个错误响应,提示用户必须提供这两个参数。

  • 改进建议:使用 HTTPException 抛出异常,而不是直接返回字典,以便 FastAPI 能够正确处理响应格式。

    from fastapi import HTTPException
    
    if not chat_id or not flow_id:
        raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
    
4. 构建查询条件
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
                                  ChatMessage.chat_id == chat_id)
if id:
    where = where.where(ChatMessage.id < int(id))
  • select(ChatMessage):使用 SQLAlchemy 的 select 函数构建一个查询,选择 ChatMessage 表中的记录。
  • .where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id):添加查询条件,筛选出 flow_idchat_id 匹配的记录。
  • if id::如果提供了 id 参数,则添加一个额外的条件,筛选出 id 小于指定值的记录。这通常用于实现分页查询,获取指定消息之前的记录。
5. 执行查询
with session_getter() as session:
    db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
  • session_getter():一个上下文管理器,用于获取数据库会话。通常,这个函数会创建一个新的数据库会话,并在退出上下文时关闭会话。
  • session.exec(where):执行之前构建的查询语句 where
  • .order_by(ChatMessage.id.desc()):按 id 降序排序,确保获取的是最新的聊天记录。
  • .limit(page_size):限制返回的记录数,最多返回 page_size 条记录。
  • .all():将查询结果转换为一个列表,包含所有符合条件的聊天记录。
6. 返回响应
return resp_200(db_message)
  • resp_200

    :一个自定义的响应函数,用于封装标准化的响应格式。它可能类似于:

    def resp_200(data):
        return UnifiedResponseModel(code=200, message="Success", data=data)
    
  • 功能:将查询结果 db_message 封装为 UnifiedResponseModel,并返回给客户端。

详细步骤解析

1. 请求触发

当客户端向 /chat/history 端点发送 GET 请求时,get_chatmessage 函数被调用。请求需要包含 chat_idflow_id 作为查询参数。可选参数 idpage_size 可以用于分页查询。

2. 参数校验

函数首先检查 chat_idflow_id 是否存在。如果缺少任一参数,返回一个错误响应,提示客户端提供这两个参数。

3. 构建查询

使用 SQLAlchemy 的 select 构建查询条件,筛选出 flow_idchat_id 匹配的 ChatMessage 记录。如果提供了 id,则进一步筛选出 id 小于指定值的记录,以支持分页查询。

4. 执行查询

通过数据库会话执行查询,按照 id 降序排序,并限制返回的记录数为 page_size。最终将查询结果转换为列表 db_message

5. 返回响应

将查询结果封装为 UnifiedResponseModel,并返回给客户端。

SQLAlchemy 相关知识点详细讲解

为了更好地理解 get_chatmessage 函数中使用的 SQLAlchemy 操作,我们将深入讲解相关的 SQLAlchemy 知识点。

1. 定义模型

在 SQLAlchemy 中,模型类(ORM 映射)用于表示数据库表。每个类对应一个表,每个属性对应一个列。例如:

from sqlmodel import SQLModel, Field
from datetime import datetime

class ChatMessage(SQLModel, table=True):
    id: int = Field(default=None, primary_key=True)
    flow_id: str
    chat_id: str
    user_id: int
    content: str
    timestamp: datetime = Field(default_factory=datetime.utcnow)
  • SQLModel:继承自 SQLAlchemy 和 Pydantic,支持 ORM 和数据验证。

  • table=True:指示该类映射到一个数据库表。

  • 字段定义

    • id: 主键。
    • flow_idchat_iduser_idcontenttimestamp: 表中的其他列。
2. 创建数据库会话

创建数据库连接和会话工厂:

from sqlmodel import create_engine, Session

DATABASE_URL = "sqlite:///database.db"
engine = create_engine(DATABASE_URL, echo=True)

def get_session():
    with Session(engine) as session:
        yield session
  • create_engine:创建一个数据库引擎,用于与数据库交互。
  • Session:管理数据库会话,用于执行查询和事务。
  • get_session:一个生成器函数,作为 FastAPI 的依赖注入,确保每个请求使用一个独立的数据库会话,并在请求结束后关闭会话。
3. 构建和执行查询

使用 SQLAlchemy 的查询构建功能,可以灵活地构建复杂的查询条件:

from sqlmodel import select

# 基础查询
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id)

# 添加分页条件
if id:
    where = where.where(ChatMessage.id < int(id))

# 执行查询并获取结果
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
  • select:构建一个 SELECT 查询,选择 ChatMessage 表中的记录。
  • .where:添加过滤条件,筛选出满足 flow_idchat_id 的记录。
  • .order_by:指定排序方式,这里是按 id 降序排列。
  • .limit:限制返回的记录数。
  • .exec.all():执行查询并获取所有符合条件的记录。
4. 使用 Pydantic 模型进行响应

为了确保返回的数据格式符合预期,使用 Pydantic 模型进行数据验证和转换:

from pydantic import BaseModel
from typing import List
from datetime import datetime

class ChatMessageRead(BaseModel):
    id: int
    flow_id: str
    chat_id: str
    user_id: int
    content: str
    timestamp: datetime
  • ChatMessageRead:定义一个 Pydantic 模型,描述返回的聊天记录格式。
  • response_model=UnifiedResponseModel[List[ChatMessageRead]]:在 FastAPI 路由中指定响应模型,确保返回的数据结构符合 UnifiedResponseModel

实践示例:详细分析 get_chatmessage 函数

函数定义
@router.get('/chat/history',
            response_model=UnifiedResponseModel[List[ChatMessageRead]],
            status_code=200) # status_code=200代表请求成功时返回HTTP状态码200
def get_chatmessage(*,
                    chat_id: str,
                    flow_id: str,
                    id: Optional[str] = None,
                    page_size: Optional[int] = 20,
                    login_user: UserPayload = Depends(get_login_user),
                    session: Session = Depends(get_session)):
    # 参数校验
    if not chat_id or not flow_id:
        raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
    
    # 构建查询条件
    where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
                                      ChatMessage.chat_id == chat_id)
    if id:
        where = where.where(ChatMessage.id < int(id))
    
    # 执行查询
    db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
    
    # 转换为响应模型
    chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
    return resp_200(chat_messages)
步骤详解
  1. 装饰器和路由定义

    @router.get('/chat/history',
                response_model=UnifiedResponseModel[List[ChatMessageRead]],
                status_code=200)
    
    • 定义一个 GET 请求路由 /chat/history
    • 指定返回的数据模型为 UnifiedResponseModel,其中包含一个 List 类型的 ChatMessageRead 列表。
    • 成功响应时返回 HTTP 状态码 200。
  2. 函数参数和依赖注入

    def get_chatmessage(*,
                        chat_id: str,
                        flow_id: str,
                        id: Optional[str] = None,
                        page_size: Optional[int] = 20,
                        login_user: UserPayload = Depends(get_login_user),
                        session: Session = Depends(get_session)):
    
    • 关键字参数:函数使用 * 关键字,要求后续参数必须以关键字形式传入,不能通过位置传递。
    • chat_idflow_id:必传参数,用于标识要查询的聊天记录所属的聊天和流程。
    • id:可选参数,用于分页查询,表示从某个特定消息 ID 开始查询。
    • page_size:可选参数,表示每次查询返回的记录数量,默认值为 20。
    • login_user:依赖注入参数,通过 Depends(get_login_user) 调用 get_login_user 函数,获取当前登录用户的 UserPayload 对象。
    • session:依赖注入参数,通过 Depends(get_session) 调用 get_session 函数,获取当前请求的数据库会话。
  3. 参数校验

    if not chat_id or not flow_id:
        raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
    
    • 检查 chat_idflow_id 是否存在。
    • 如果任一参数缺失,抛出一个 HTTP 400 错误(Bad Request),提示用户必须提供这两个参数。
  4. 构建查询条件

    where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
                                      ChatMessage.chat_id == chat_id)
    if id:
        where = where.where(ChatMessage.id < int(id))
    
    • 使用 SQLAlchemy 的 select 函数构建查询,选择 ChatMessage 表中的记录。
    • 添加 where 条件,筛选出 flow_idchat_id 匹配的记录。
    • 如果提供了 id 参数,进一步筛选出 id 小于指定值的记录,以支持分页查询。
  5. 执行查询

    db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
    
    • where.order_by(ChatMessage.id.desc()):按 id 降序排序,确保获取的是最新的聊天记录。
    • .limit(page_size):限制返回的记录数,最多返回 page_size 条记录。
    • session.exec(...):执行构建好的查询条件。
    • .all():将查询结果转换为列表,包含所有符合条件的聊天记录。
  6. 转换为响应模型

    chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
    return resp_200(chat_messages)
    
    • ChatMessageRead.from_orm(message):将 SQLAlchemy ORM 对象 message 转换为 Pydantic 模型 ChatMessageRead
    • resp_200(chat_messages):调用 resp_200 函数,将查询结果封装为 UnifiedResponseModel 并返回给客户端。

相关 SQLAlchemy 知识点

为了更好地理解 get_chatmessage 函数中的 SQLAlchemy 操作,我们需要详细讲解一些相关的 SQLAlchemy 知识点。

1. select 函数

select 是 SQLAlchemy 的核心查询构建函数,用于创建一个 SELECT 语句。

from sqlalchemy import select

# 基础查询
stmt = select(ChatMessage)
  • 功能:创建一个选择所有 ChatMessage 表中的记录的查询。
  • 进一步条件:通过 where 方法添加过滤条件。
2. where 方法

where 方法用于添加过滤条件到查询中。

stmt = select(ChatMessage).where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id)
  • 多个条件:通过传递多个参数,添加多个 WHERE 条件。
  • 逻辑关系:传递多个参数时,默认使用 AND 逻辑关系。
3. 排序和限制

通过 order_bylimit 方法,可以对查询结果进行排序和限制返回的记录数。

stmt = stmt.order_by(ChatMessage.id.desc()).limit(page_size)
  • order_by(ChatMessage.id.desc()):按 id 降序排列。
  • limit(page_size):限制返回的记录数,最多返回 page_size 条记录。
4. 执行查询

通过 session.exec(stmt) 执行查询,并使用 .all() 方法获取所有符合条件的记录。

db_message = session.exec(stmt).all()
  • session.exec(stmt):执行构建好的查询语句。
  • .all():获取所有结果,返回一个列表。
5. 使用 Pydantic 模型进行数据验证

通过 Pydantic 模型,可以将 ORM 对象转换为符合 API 规范的响应模型。

from pydantic import BaseModel
from typing import List
from datetime import datetime

class ChatMessageRead(BaseModel):
    id: int
    flow_id: str
    chat_id: str
    user_id: int
    content: str
    timestamp: datetime
  • ChatMessageRead:定义一个 Pydantic 模型,用于描述返回的聊天记录格式。

  • from_orm 方法:将 SQLAlchemy ORM 对象转换为 Pydantic 模型。

    chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
    
6. 依赖注入和会话管理

通过 FastAPI 的依赖注入机制,可以简化数据库会话的管理,确保每个请求使用独立的会话,并在请求结束后自动关闭会话。

from fastapi import Depends
from sqlmodel import Session

def get_session():
    with Session(engine) as session:
        yield session

@router.get("/users/")
def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_session)):
    users = db.query(User).offset(skip).limit(limit).all()
    return users

深入讲解 get_chatmessage 函数中的 SQLAlchemy 操作

为了更深入地理解 get_chatmessage 函数中使用的 SQLAlchemy 操作,以下将逐行解析函数中的关键代码,并解释其背后的 SQLAlchemy 概念。

函数定义和装饰器
@router.get('/chat/history',
            response_model=UnifiedResponseModel[List[ChatMessageRead]],
            status_code=200)
def get_chatmessage(*,
                    chat_id: str,
                    flow_id: str,
                    id: Optional[str] = None,
                    page_size: Optional[int] = 20,
                    login_user: UserPayload = Depends(get_login_user),
                    session: Session = Depends(get_session)):
  • @router.get('/chat/history'):定义一个 GET 请求路由 /chat/history,当客户端访问此 URL 时,get_chatmessage 函数会被调用。
  • response_model=UnifiedResponseModel[List[ChatMessageRead]]:指定返回的数据模型,用于数据验证和自动生成文档。
  • status_code=200:指定成功响应的 HTTP 状态码为 200。
  • 参数列表:
    • chat_id: strflow_id: str:必传参数,用于标识要查询的聊天记录。
    • id: Optional[str] = None:可选参数,用于分页查询,表示从某个特定消息 ID 开始查询。
    • page_size: Optional[int] = 20:可选参数,表示每次查询返回的记录数,默认为 20 条。
    • login_user: UserPayload = Depends(get_login_user):依赖注入参数,通过 Depends 获取当前登录用户的信息。
    • session: Session = Depends(get_session):依赖注入参数,通过 Depends 获取当前请求的数据库会话。
参数校验
if not chat_id or not flow_id:
    raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
  • 功能:确保 chat_idflow_id 参数存在。
  • 逻辑:如果任一参数缺失,抛出一个 HTTP 400 错误,提示用户必须提供这两个参数。
构建查询条件
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
                                  ChatMessage.chat_id == chat_id)
if id:
    where = where.where(ChatMessage.id < int(id))
  • select(ChatMessage):创建一个 SELECT 查询,选择 ChatMessage 表中的记录。
  • .where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id):添加 WHERE 条件,筛选出 flow_idchat_id 匹配的记录。
  • if id::如果提供了 id 参数,进一步添加一个条件,筛选出 id 小于指定值的记录。这有助于实现分页查询,获取某个特定消息之前的记录。
执行查询
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
  • where.order_by(ChatMessage.id.desc()):在查询条件 where 上添加排序条件,按 id 降序排列。这确保返回的是最新的聊天记录。
  • .limit(page_size):限制返回的记录数为 page_size 条,避免一次性返回过多数据。
  • session.exec(...):执行构建好的查询语句。
  • .all():获取所有符合条件的记录,返回一个列表 db_message
转换为响应模型
chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
return resp_200(chat_messages)
  • ChatMessageRead.from_orm(message):将 SQLAlchemy ORM 对象 message 转换为 Pydantic 模型 ChatMessageRead
  • resp_200(chat_messages):调用 resp_200 函数,将转换后的数据封装为 UnifiedResponseModel,并返回给客户端。

关键 SQLAlchemy 知识点复习

为了更好地理解上述函数中的 SQLAlchemy 操作,以下是一些关键知识点的复习和扩展。

1. select 函数

select 是 SQLAlchemy 中用于构建 SELECT 查询的函数。

  • 基本用法

    stmt = select(User)
    

    这将创建一个查询,选择 User 表中的所有记录。

  • 添加过滤条件

    stmt = select(User).where(User.name == "Alice")
    

    这将创建一个查询,选择 User 表中 name 为 “Alice” 的记录。

2. where 方法

where 方法用于在查询中添加过滤条件。

  • 多个条件

    stmt = select(User).where(User.name == "Alice", User.age > 30)
    

    这将创建一个查询,选择 User 表中 name 为 “Alice” 且 age 大于 30 的记录。

  • 动态条件

    if filter_condition:
        stmt = stmt.where(User.status == filter_condition)
    
3. 排序和限制

通过 order_bylimit 方法,可以对查询结果进行排序和限制返回的记录数。

  • 排序

    stmt = stmt.order_by(User.id.desc())
    

    这将按 id 降序排列查询结果。

  • 限制返回记录数

    stmt = stmt.limit(10)
    

    这将限制查询结果最多返回 10 条记录。

4. 执行查询

通过 session.exec(stmt) 执行查询,并使用 .all().first() 获取结果。

  • 获取所有结果

    results = session.exec(stmt).all()
    
  • 获取单个结果

    result = session.exec(stmt).first()
    
5. 使用 Pydantic 模型进行数据验证

通过 Pydantic 模型,可以将 ORM 对象转换为符合 API 规范的响应模型。

  • 定义 Pydantic 模型

    from pydantic import BaseModel
    from datetime import datetime
    
    class ChatMessageRead(BaseModel):
        id: int
        flow_id: str
        chat_id: str
        user_id: int
        content: str
        timestamp: datetime
    
  • 从 ORM 对象转换为 Pydantic 模型

    chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
    

结合 SQLAlchemy 和 FastAPI 的实践示例

为了更好地理解 SQLAlchemy 在 FastAPI 中的应用,我们将通过一个完整的示例来展示如何定义模型、创建数据库会话、执行查询以及返回响应。

1. 定义模型
from sqlmodel import SQLModel, Field
from datetime import datetime

class ChatMessage(SQLModel, table=True):
    id: int = Field(default=None, primary_key=True)
    flow_id: str
    chat_id: str
    user_id: int
    content: str
    timestamp: datetime = Field(default_factory=datetime.utcnow)
  • ChatMessage:一个表示聊天记录的模型类,映射到 ChatMessage 数据库表。
  • 字段说明:
    • id: 主键。
    • flow_idchat_iduser_idcontenttimestamp: 表中的其他列。
2. 创建数据库会话
from sqlmodel import create_engine, Session

DATABASE_URL = "sqlite:///database.db"
engine = create_engine(DATABASE_URL, echo=True)

def get_session():
    with Session(engine) as session:
        yield session
  • create_engine:创建一个数据库引擎,负责连接数据库。
  • Session:用于管理数据库会话。
  • get_session:一个生成器函数,作为 FastAPI 的依赖注入,确保每个请求使用一个独立的数据库会话,并在请求结束后自动关闭会话。
3. 定义 API 路由
from fastapi import FastAPI, Depends, HTTPException
from typing import List, Optional
from sqlmodel import select
from pydantic import BaseModel

app = FastAPI()

# 定义响应模型
class UnifiedResponseModel(BaseModel):
    code: int
    message: str
    data: Any

class ChatMessageRead(BaseModel):
    id: int
    flow_id: str
    chat_id: str
    user_id: int
    content: str
    timestamp: datetime

def resp_200(data):
    return UnifiedResponseModel(code=200, message="Success", data=data)

# 假设 UserPayload 和 get_login_user 已定义

@app.get('/chat/history',
         response_model=UnifiedResponseModel[List[ChatMessageRead]],
         status_code=200) # status_code=200代表请求成功时返回HTTP状态码200
def get_chatmessage(*,
                   chat_id: str,
                   flow_id: str,
                   id: Optional[str] = None,
                   page_size: Optional[int] = 20,
                   login_user: UserPayload = Depends(get_login_user),
                   session: Session = Depends(get_session)):
    # 参数校验
    if not chat_id or not flow_id:
        raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
    
    # 构建查询条件
    where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
                                      ChatMessage.chat_id == chat_id)
    if id:
        where = where.where(ChatMessage.id < int(id))
    
    # 执行查询
    db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
    
    # 转换为响应模型
    chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
    return resp_200(chat_messages)
4. 运行示例
  1. 创建数据库表

    SQLModel.metadata.create_all(engine)
    

    这将基于 ChatMessage 模型在数据库中创建相应的表结构。

  2. 添加示例数据

    with Session(engine) as session:
        message = ChatMessage(flow_id="flow1", chat_id="chat1", user_id=1, content="Hello World")
        session.add(message)
        session.commit()
    

    这将向 ChatMessage 表中添加一条示例记录。

  3. 启动 FastAPI 应用

    uvicorn main:app --reload
    

    这将启动 FastAPI 应用,并监听默认的 8000 端口。

  4. 访问 API

    访问 http://127.0.0.1:8000/chat/history?chat_id=chat1&flow_id=flow1 以获取聊天历史记录。

最佳实践

  1. 使用环境变量管理配置
    • 将数据库 URL、密钥等敏感信息存储在环境变量中,而不是硬编码在代码中。
  2. 遵循 PEP 8 代码规范
    • 保持代码整洁,符合 PEP 8 标准,便于团队协作和维护。
  3. 使用 Alembic 进行数据库迁移
    • 管理数据库模式的变更,确保数据库结构与 ORM 模型保持一致。
  4. 处理数据库会话的生命周期
    • 确保每个请求使用独立的数据库会话,并在请求结束后关闭会话,避免资源泄露。
  5. 优化查询性能
    • 使用适当的索引,避免 N+1 查询问题,合理使用分页和缓存。
  6. 错误处理
    • 捕获并处理数据库操作中的异常,提供有意义的错误信息给前端。
  7. 安全性
    • 防止 SQL 注入,确保用户输入的参数被正确过滤和验证。
    • 使用 HTTPS 保护数据传输安全。

资源与学习材料

  • 官方文档
    • SQLAlchemy 官方文档
    • SQLModel 官方文档
    • FastAPI 官方文档
  • 教程与指南
    • SQLAlchemy 教程
    • FastAPI 与 SQLAlchemy 整合教程
  • 书籍
    • 《Essential SQLAlchemy》 by Jason Myers 和 Rick Copeland
    • 《Mastering SQLAlchemy》 by Rick Copeland
  • 社区与支持
    • Stack Overflow SQLAlchemy 标签
    • GitHub SQLAlchemy 仓库

详细分析 get_chatmessage 函数

为了更深入地理解 get_chatmessage 函数中使用的 SQLAlchemy 操作,以下将逐行解析函数中的关键代码,并解释其背后的 SQLAlchemy 概念。

函数定义和装饰器
@router.get('/chat/history',
            response_model=UnifiedResponseModel[List[ChatMessageRead]],
            status_code=200) # status_code=200代表请求成功时返回HTTP状态码200
def get_chatmessage(*,
                    chat_id: str,
                    flow_id: str,
                    id: Optional[str] = None,
                    page_size: Optional[int] = 20,
                    login_user: UserPayload = Depends(get_login_user),
                    session: Session = Depends(get_session)):
  • @router.get('/chat/history'):定义一个 GET 请求路由 /chat/history,当客户端访问此 URL 时,get_chatmessage 函数会被调用。

  • response_model=UnifiedResponseModel[List[ChatMessageRead]]:指定返回的数据模型,用于数据验证和自动生成文档。

  • status_code=200:指定成功响应的 HTTP 状态码为 200。

  • 参数列表

    • \*:表示后续参数必须作为关键字参数传入,不能通过位置传递。
    • chat_id: strflow_id: str:必传参数,用于标识要查询的聊天记录所属的聊天和流程。
    • id: Optional[str] = None:可选参数,用于分页查询,表示从某个特定消息 ID 开始查询。
    • page_size: Optional[int] = 20:可选参数,表示每次查询返回的记录数量,默认为 20 条。
    • login_user: UserPayload = Depends(get_login_user):依赖注入参数,通过 Depends 调用 get_login_user 函数,获取当前登录用户的 UserPayload 对象。
    • session: Session = Depends(get_session):依赖注入参数,通过 Depends 调用 get_session 函数,获取当前请求的数据库会话。
参数校验
if not chat_id or not flow_id:
    raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
  • 功能:确保 chat_idflow_id 参数存在。
  • 逻辑:如果任一参数缺失,抛出一个 HTTP 400 错误,提示用户必须提供这两个参数。
构建查询条件
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
                                  ChatMessage.chat_id == chat_id)
if id:
    where = where.where(ChatMessage.id < int(id))
  • select(ChatMessage):使用 SQLAlchemy 的 select 函数构建一个查询,选择 ChatMessage 表中的记录。
  • .where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id):添加查询条件,筛选出 flow_idchat_id 匹配的记录。
  • if id::如果提供了 id 参数,则添加一个额外的条件,筛选出 id 小于指定值的记录。这通常用于实现分页查询,获取指定消息之前的记录。
执行查询
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
  • where.order_by(ChatMessage.id.desc()):在查询条件 where 上添加排序条件,按 id 降序排列。这确保返回的是最新的聊天记录。
  • .limit(page_size):限制返回的记录数为 page_size 条,避免一次性返回过多数据。
  • session.exec(...):执行构建好的查询语句。
  • .all():将查询结果转换为列表,包含所有符合条件的聊天记录。
转换为响应模型
chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
return resp_200(chat_messages)
  • ChatMessageRead.from_orm(message):将 SQLAlchemy ORM 对象 message 转换为 Pydantic 模型 ChatMessageRead
  • resp_200(chat_messages):调用 resp_200 函数,将查询结果封装为 UnifiedResponseModel,并返回给客户端。

SQLAlchemy 相关知识点深入讲解

为了更好地理解 get_chatmessage 函数中使用的 SQLAlchemy 操作,以下是一些相关的 SQLAlchemy 知识点的详细讲解。

1. select 函数

select 是 SQLAlchemy 的核心查询构建函数,用于创建一个 SELECT 语句。

  • 基本用法

    stmt = select(User)
    

    这将创建一个查询,选择 User 表中的所有记录。

  • 添加过滤条件

    stmt = select(User).where(User.name == "Alice")
    

    这将创建一个查询,选择 User 表中 name 为 “Alice” 的记录。

  • 多个条件

    stmt = select(User).where(User.name == "Alice", User.age > 30)
    

    这将创建一个查询,选择 User 表中 name 为 “Alice” 且 age 大于 30 的记录。

2. where 方法

where 方法用于在查询中添加过滤条件。

  • 基本用法

    stmt = select(User).where(User.name == "Alice")
    
  • 多个条件

    stmt = select(User).where(User.name == "Alice", User.age > 30)
    
  • 动态条件

    if filter_condition:
        stmt = stmt.where(User.status == filter_condition)
    
3. 排序和限制

通过 order_bylimit 方法,可以对查询结果进行排序和限制返回的记录数。

  • 排序

    stmt = stmt.order_by(User.id.desc())
    

    这将按 id 降序排列查询结果。

  • 限制返回记录数

    stmt = stmt.limit(10)
    

    这将限制查询结果最多返回 10 条记录。

4. 执行查询

通过 session.exec(stmt) 执行查询,并使用 .all().first() 获取结果。

  • 获取所有结果

    results = session.exec(stmt).all()
    
  • 获取单个结果

    result = session.exec(stmt).first()
    
5. 使用 Pydantic 模型进行数据验证

通过 Pydantic 模型,可以将 ORM 对象转换为符合 API 规范的响应模型。

  • 定义 Pydantic 模型

    from pydantic import BaseModel
    from datetime import datetime
    
    class ChatMessageRead(BaseModel):
        id: int
        flow_id: str
        chat_id: str
        user_id: int
        content: str
        timestamp: datetime
    
  • 从 ORM 对象转换为 Pydantic 模型

    chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
    
6. 依赖注入和会话管理

通过 FastAPI 的依赖注入机制,可以简化数据库会话的管理,确保每个请求使用独立的会话,并在请求结束后自动关闭会话。

from fastapi import Depends
from sqlmodel import Session

def get_session():
    with Session(engine) as session:
        yield session

@router.get("/users/")
def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_session)):
    users = db.query(User).offset(skip).limit(limit).all()
    return users

详细步骤解析

以下是对 get_chatmessage 函数中每一行代码的详细解析,结合 SQLAlchemy 和 FastAPI 的概念。

1. 装饰器和路由定义
@router.get('/chat/history',
            response_model=UnifiedResponseModel[List[ChatMessageRead]],
            status_code=200) # status_code=200代表请求成功时返回HTTP状态码200
  • 功能:定义一个 GET 请求路由 /chat/history,当客户端访问此 URL 时,get_chatmessage 函数会被调用。
  • 响应模型:指定返回的数据模型为 UnifiedResponseModel,其中包含一个 List 类型的 ChatMessageRead 列表。这有助于 FastAPI 自动生成文档和进行数据验证。
  • 状态码:成功响应时返回 HTTP 状态码 200。
2. 函数参数和依赖注入
def get_chatmessage(*,
                    chat_id: str,
                    flow_id: str,
                    id: Optional[str] = None,
                    page_size: Optional[int] = 20,
                    login_user: UserPayload = Depends(get_login_user),
                    session: Session = Depends(get_session)):
  • \*:表示后续参数必须作为关键字参数传入,不能通过位置传递。
  • chat_id: strflow_id: str:必传参数,用于标识要查询的聊天记录所属的聊天和流程。
  • id: Optional[str] = None:可选参数,用于分页查询,表示从某个特定消息 ID 开始查询。
  • page_size: Optional[int] = 20:可选参数,表示每次查询返回的记录数,默认为 20 条。
  • login_user: UserPayload = Depends(get_login_user):依赖注入参数,通过 Depends(get_login_user) 调用 get_login_user 函数,获取当前登录用户的 UserPayload 对象。
  • session: Session = Depends(get_session):依赖注入参数,通过 Depends(get_session) 调用 get_session 函数,获取当前请求的数据库会话。
3. 参数校验
if not chat_id or not flow_id:
    raise HTTPException(status_code=400, detail='chat_id 和 flow_id 必传参数')
  • 功能:确保 chat_idflow_id 参数存在。
  • 逻辑:如果任一参数缺失,抛出一个 HTTP 400 错误,提示用户必须提供这两个参数。
4. 构建查询条件
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
                                  ChatMessage.chat_id == chat_id)
if id:
    where = where.where(ChatMessage.id < int(id))
  • select(ChatMessage):使用 SQLAlchemy 的 select 函数构建一个查询,选择 ChatMessage 表中的记录。
  • .where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id):添加查询条件,筛选出 flow_idchat_id 匹配的记录。
  • if id::如果提供了 id 参数,则添加一个额外的条件,筛选出 id 小于指定值的记录。这通常用于实现分页查询,获取指定消息之前的记录。
5. 执行查询
db_message = session.exec(where.order_by(ChatMessage.id.desc()).limit(page_size)).all()
  • where.order_by(ChatMessage.id.desc()):在查询条件 where 上添加排序条件,按 id 降序排列。这确保返回的是最新的聊天记录。
  • .limit(page_size):限制返回的记录数为 page_size 条,避免一次性返回过多数据。
  • session.exec(...):执行构建好的查询语句。
  • .all():将查询结果转换为列表,包含所有符合条件的聊天记录。
6. 转换为响应模型
chat_messages = [ChatMessageRead.from_orm(message) for message in db_message]
return resp_200(chat_messages)
  • ChatMessageRead.from_orm(message):将 SQLAlchemy ORM 对象 message 转换为 Pydantic 模型 ChatMessageRead
  • resp_200(chat_messages):调用 resp_200 函数,将查询结果封装为 UnifiedResponseModel,并返回给客户端。

http://www.kler.cn/a/420360.html

相关文章:

  • L1-049 天梯赛座位分配
  • ultralytics-YOLOv11的目标检测解析
  • SpringBoot+MyBatis整合ClickHouse实践
  • Python 3 教程第33篇(MySQL - mysql-connector 驱动)
  • Spring AI 框架介绍
  • 网络安全(三):网路安全协议
  • Spring,SpringMVC,SpringBoot,SpringCloud有什么区别和联系?
  • 汽车操作系统详解
  • dhcpd服务器的配置与管理(超详细!!!)
  • 贝叶斯统计的核心思想与基础知识:中英双语
  • 含k个3的数
  • 产品转后端?
  • 使用 Docker 部署 Spring Boot 项目流程
  • STM32 ADC --- 多通道序列采样
  • 应对智能时代——读《人工智能时代的生存指南》
  • TP6 html生成ptf并加盖骑缝章
  • 运输层2——UDP协议
  • liteflow 架构详解
  • springboot370高校宣讲会管理系统(论文+源码)_kaic
  • 相较于传统的实体展厅,VR虚拟展厅有哪些优势?
  • vue3的项目目录和关键文件
  • Hive中分区与分桶的区别
  • windows C#-强制转换和类型转换
  • AI获客的成本与传统获客方式相比有何优势?
  • 【vue for beginner】ref和reactive
  • SQL面试题——日期交叉问题 合并日期重叠的活动