数据库表结构和部分数据
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for user
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`name` varchar(60) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '用户名',
`password` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '密码',
`avatar` varchar(60) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '头像',
`sex` smallint(6) NULL DEFAULT 0 COMMENT '性别: 0 未知 1 男 2 女',
`phone` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '手机号',
`is_deleted` smallint(6) NULL DEFAULT 0 COMMENT '是否删除: 0 未删除 1 已删除',
`create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',
`update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 31 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (1, 'admin', '30780cc6f2e56945aaf9c9578c932e22', '1', 0, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
INSERT INTO `user` VALUES (2, 'user', '30780cc6f2e56945aaf9c9578c932e22', '', 1, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
INSERT INTO `user` VALUES (3, 'guest', '30780cc6f2e56945aaf9c9578c932e22', '2', 2, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
INSERT INTO `user` VALUES (4, 'guest', '30780cc6f2e56945aaf9c9578c932e22', '2', 2, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
INSERT INTO `user` VALUES (5, 'guest', '30780cc6f2e56945aaf9c9578c932e22', '2', 2, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
SET FOREIGN_KEY_CHECKS = 1;
python 代码
import logging
from datetime import datetime
from typing import List, Optional
import uvicorn
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from pydantic import create_model
from sqlalchemy import create_engine, Column, Integer, String, SmallInteger, DateTime, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker
# 配置 SQLAlchemy 日志,打印 SQL 语句
logging.basicConfig()
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
# 修改为您的数据库连接信息
DATABASE_URL = "mysql://root:wonderful2021@127.0.0.1:3306/demo"
engine = create_engine(DATABASE_URL, echo=True) # echo=True 打印 SQL 语句
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class User(Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(60), nullable=False)
password = Column(String(64), nullable=False)
avatar = Column(String(60), nullable=True)
sex = Column(SmallInteger, default=0)
phone = Column(String(30), nullable=True)
is_deleted = Column(SmallInteger, default=0)
create_time = Column(DateTime, default=datetime.utcnow)
update_time = Column(DateTime, default=datetime.utcnow)
class UserResponse(BaseModel):
id: int
name: str
avatar: Optional[str] = None
sex: int
phone: Optional[str] = None
is_deleted: int
create_time: datetime
update_time: datetime
class Config:
orm_mode = True
json_encoders = {
datetime: lambda v: v.strftime('%Y-%m-%d %H:%M:%S') # 自定义日期时间格式
}
app = FastAPI()
# # 固定返回字段 字段与类型的映射
field_type_mapping = {
'id': int,
'name': str,
'phone': str,
'avatar': str,
'create_time': str, # 这里使用 str,因为我们会在返回时格式化
'update_time': str # 同上
}
# 动态生成 Pydantic 模型
DynamicUserResponse = create_model(
'DynamicUserResponse',
**{field: (field_type_mapping[field], ...) for field in field_type_mapping.keys()}
)
def get_selected_fields(model, fields):
"""根据字段列表动态选择模型字段"""
return [getattr(model, field.strip()) for field in fields]
def handle_none_values(data: dict) -> dict:
"""处理返回值中的 None 值,转换为空字符串"""
return {k: (v if v is not None else "") for k, v in data.items()}
def format_datetime(dt: Optional[datetime]) -> str:
"""格式化日期时间,返回字符串"""
return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else ""
@app.get("/users/", response_model=List[DynamicUserResponse])
async def read_users(
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1),
name: Optional[str] = None,
sex: Optional[int] = None,
phone: Optional[str] = None
):
with SessionLocal() as session:
filters = [User.is_deleted == 0] # 默认条件:未删除
if name:
filters.append(User.name.like(f"%{name}%")) # 模糊查询
if sex is not None:
filters.append(User.sex == sex) # 精确查询
if phone:
filters.append(User.phone.like(f"%{phone}%")) # 模糊查询
# 固定返回字段
selected_fields = get_selected_fields(User, field_type_mapping.keys())
stmt = select(*selected_fields).where(and_(*filters)).offset(skip).limit(limit)
result = session.execute(stmt)
users = result.all()
# 返回字典列表,处理 None 值并格式化日期时间
return [
handle_none_values({
**dict(zip(field_type_mapping.keys(), user)),
'create_time': format_datetime(user.create_time),
'update_time': format_datetime(user.update_time)
}) for user in users
]
@app.get("/users/{user_id}", response_model=DynamicUserResponse)
async def read_user(
user_id: int
):
with SessionLocal() as session:
# 固定返回字段
selected_fields = get_selected_fields(User, field_type_mapping.keys())
stmt = select(*selected_fields).where(User.id == user_id, User.is_deleted == 0)
result = session.execute(stmt)
user = result.first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
# 返回字典,处理 None 值并格式化日期时间
return handle_none_values({
**dict(zip(field_type_mapping.keys(), user)),
'create_time': format_datetime(user.create_time),
'update_time': format_datetime(user.update_time)
})
@app.get("/usersallfiled/", response_model=List[UserResponse])
async def read_users(
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1),
name: Optional[str] = None,
sex: Optional[int] = None,
phone: Optional[str] = None
):
with SessionLocal() as session:
filters = [User.is_deleted == 0] # 默认条件:未删除
if name:
filters.append(User.name.like(f"%{name}%")) # 模糊查询
if sex is not None:
filters.append(User.sex == sex) # 精确查询
if phone:
filters.append(User.phone.like(f"%{phone}%")) # 模糊查询
stmt = select(User).where(and_(*filters)).offset(skip).limit(limit)
result = session.execute(stmt)
users = result.scalars().all()
return users
@app.get("/usersallfiled/{user_id}", response_model=UserResponse)
async def read_user(
user_id: int
):
with SessionLocal() as session:
stmt = select(User).where(User.id == user_id, User.is_deleted == 0)
result = session.execute(stmt)
user = result.scalars().first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
调用方式
get请求http://127.0.0.1:8000/usersallfiled/?name=admin&skip=0&limit=10
返回结果[{"id":1,"name":"admin","avatar":"1","sex":0,"phone":null,"is_deleted":0,"create_time":"2023-03-29 02:34:58","update_time":"2023-03-29 02:34:58"}]