当前位置: 首页 > article >正文

Python后端入门

1

目录

  1. 引言
  2. 使用 Pydantic 定义数据模型
  3. 创建 FastAPI 应用
    • 路由定义
    • 接口定义与实现
  4. 使用 Uvicorn 运行程序
  5. 日志记录
  6. 异步编程
  7. 错误处理
  8. WebSocket 接口
  9. 使用 Postman 测试接口
  10. 总结

1. 引言

欢迎进入 Python 后端开发的世界!本教材旨在帮助有 Python 脚本编写经验但缺乏后端开发经验的开发者,快速掌握使用 FastAPI 进行后端开发的核心概念和实践。

FastAPI 是一个现代、快速(高性能)的 Web 框架,基于 Python 3.6+ 标准类型提示。它提供了高效的开发体验,并且性能媲美 Node.js 和 Go。


2. 使用 Pydantic 定义数据模型

Pydantic 是一个用于数据验证和设置的库,基于 Python 类型提示。它与 FastAPI 紧密集成,用于定义请求体和响应的数据模型。

安装 Pydantic

pip install pydantic

定义数据模型

from pydantic import BaseModel

class Item(BaseModel):
    name: str
    price: float
    is_offer: bool = None
解释
  • BaseModel:所有 Pydantic 模型都需要继承的基类。
  • name: str:定义了一个字符串类型的必需字段 name
  • price: float:定义了一个浮点数类型的必需字段 price
  • is_offer: bool = None:定义了一个布尔类型的可选字段 is_offer,默认值为 None

3. 创建 FastAPI 应用

安装 FastAPI

pip install fastapi

3.1 初始化应用

from fastapi import FastAPI

app = FastAPI()

3.2 路由定义

路由用于将 URL 路径与特定的函数(处理程序)关联。

@app.get("/")
async def read_root():
    return {"message": "Hello World"}
  • @app.get("/"):装饰器,指定该函数处理对根路径的 GET 请求。
  • async def read_root():定义一个异步函数作为请求处理程序。

3.3 接口定义与实现

获取指定 ID 的物品
@app.get("/items/{item_id}")
async def read_item(item_id: int):
    return {"item_id": item_id}
  • {item_id}:路径参数,接收请求路径中的变量部分。
创建新物品
@app.post("/items/")
async def create_item(item: Item):
    return {"item_name": item.name, "item_price": item.price}
  • item: Item:请求体参数,FastAPI 会根据 Pydantic 模型自动解析和验证请求数据。

4. 使用 Uvicorn 运行程序

Uvicorn 是一个快速的 ASGI 服务器,用于运行 FastAPI 应用。

安装 Uvicorn

pip install uvicorn[standard]

运行应用

uvicorn main:app --reload
  • main:app:表示在 main.py 文件中创建的 app 对象。
  • --reload:代码更改时自动重启服务器,方便开发调试。

5. 日志记录

日志对于调试和监控应用非常重要。

配置日志

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

使用日志

@app.get("/items/{item_id}")
async def read_item(item_id: int):
    logger.info(f"Fetching item with ID: {item_id}")
    return {"item_id": item_id}
  • logging.getLogger(__name__):获取以当前模块命名的日志记录器。
  • logger.info():记录信息级别的日志。

6. 异步编程

FastAPI 基于 Python 的异步特性,支持高性能的并发请求处理。

异步函数示例

import asyncio

@app.get("/async-example")
async def async_example():
    await asyncio.sleep(1)
    return {"message": "This was async!"}
  • async def:定义异步函数。
  • await:等待异步操作完成。

7. 错误处理

使用 HTTPException

from fastapi import HTTPException

@app.get("/items/{item_id}")
async def read_item(item_id: int):
    if item_id not in fake_items_db:
        raise HTTPException(status_code=404, detail="Item not found")
    return {"item": fake_items_db[item_id]}
  • HTTPException:用于返回特定的 HTTP 错误状态码和信息。

全局异常处理

from fastapi import Request
from fastapi.responses import JSONResponse

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    return JSONResponse(
        status_code=500,
        content={"message": "Internal server error"},
    )
  • 捕获未处理的异常并返回统一的错误响应。

8. WebSocket 接口

实现 WebSocket 端点

from fastapi import WebSocket

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Echo: {data}")
  • @app.websocket("/ws"):定义 WebSocket 路径。
  • await websocket.accept():接受 WebSocket 连接。
  • await websocket.receive_text():接收来自客户端的消息。
  • await websocket.send_text():发送消息到客户端。

9. 使用 Postman 测试接口

下载并安装 Postman

从 Postman 官网 下载适用于你操作系统的版本并安装。

测试步骤

  1. 创建请求:点击 “New” 按钮,选择 “HTTP Request”。
  2. 设置请求方法和 URL:选择 GET、POST 等方法,输入请求的 URL,例如 http://localhost:8000/items/1
  3. 添加请求头(可选):如果需要特定的请求头,可以在 “Headers” 选项卡中添加。
  4. 添加请求体(对于 POST/PUT 请求):在 “Body” 选项卡中,选择 “raw” 和 “JSON”,输入 JSON 格式的请求数据。
  5. 发送请求:点击 “Send” 按钮。
  6. 查看响应:在下方的响应区域查看服务器返回的数据。

示例:测试创建物品接口

  • URLhttp://localhost:8000/items/
  • 方法:POST
  • 请求体
{
    "name": "Test Item",
    "price": 25.5,
    "is_offer": true
}
  • 预期响应
{
    "item_name": "Test Item",
    "item_price": 25.5
}

10. 总结

通过本教材,你已经学习了:

  • 使用 Pydantic 定义数据模型:了解如何使用类型提示和 Pydantic 进行数据验证。
  • 创建 FastAPI 应用:掌握了路由、接口定义与实现的方法。
  • 使用 Uvicorn 运行程序:学会了如何启动和运行你的应用。
  • 日志记录:理解了如何配置和使用日志。
  • 异步编程:了解了异步函数的定义和使用。
  • 错误处理:学会了如何处理异常并返回适当的错误响应。
  • WebSocket 接口:掌握了如何创建实时通信的 WebSocket 端点。
  • 使用 Postman 测试接口:学会了如何使用 Postman 进行 API 测试。

2

  1. 使用Pydantic定义数据模型
  2. 创建FastAPI应用并包含路由
  3. 接口定义和接口实现
  4. 使用Uvicorn运行程序
  5. 日志记录
  6. 异步编程
  7. 错误处理
  8. WebSocket接口
  9. 使用Postman测试接口

每一章节将包括理论讲解、代码示例和实践练习,确保你能够全面掌握Python后端开发的各个方面。


目录

  1. 前言
  2. 环境搭建
  3. 使用Pydantic定义数据模型
  4. 创建FastAPI应用并包含路由
  5. 接口定义和接口实现
  6. 使用Uvicorn运行程序
  7. 日志记录
  8. 异步编程
  9. 错误处理
  10. WebSocket接口
  11. 使用Postman测试接口
  12. 综合实例
  13. 总结与进一步学习

前言

Python因其简洁和高效的特性,已成为后端开发的热门语言之一。FastAPI作为现代、快速(高性能)的Web框架,结合Pydantic的数据验证能力,为开发者提供了极大的便利。本教材将带你逐步掌握使用FastAPI进行后端开发的关键技术。


环境搭建

在开始之前,我们需要准备开发环境。

1. 安装Python

确保你的系统安装了Python 3.7及以上版本。你可以通过以下命令检查Python版本:

python --version

如果未安装,请访问Python官网下载安装。

2. 创建虚拟环境

使用虚拟环境可以隔离项目依赖,避免冲突。

# 创建虚拟环境
python -m venv env

# 激活虚拟环境
# Windows
env\Scripts\activate

# macOS/Linux
source env/bin/activate

3. 安装必要的库

安装FastAPI、Uvicorn、Pydantic等必要库。

pip install fastapi uvicorn pydantic

使用Pydantic定义数据模型

Pydantic是用于数据验证和设置管理的库,FastAPI大量依赖Pydantic进行请求和响应的数据验证。

1. Pydantic基础

Pydantic使用Python的类型提示(type hints)来验证和解析数据。

2. 定义数据模型

让我们定义一个简单的用户数据模型。

from pydantic import BaseModel, EmailStr
from typing import Optional

class User(BaseModel):
    id: int
    name: str
    email: EmailStr
    age: Optional[int] = None

3. 模型解释

  • BaseModel: 所有Pydantic模型都应继承自BaseModel
  • 字段类型: id为整数,name为字符串,email为有效的电子邮件地址,age为可选整数。
  • 默认值: age有一个默认值None,表示该字段是可选的。

4. 数据验证示例

try:
    user = User(id=1, name='Alice', email='alice@example.com', age=30)
    print(user)
except ValidationError as e:
    print(e.json())

如果提供的数据不符合模型定义,Pydantic会抛出ValidationError,并提供详细的错误信息。

5. 实践练习

任务: 定义一个Product模型,包含以下字段:

  • id: 整数
  • name: 字符串
  • price: 浮点数,必须大于0
  • description: 可选字符串

参考答案:

from pydantic import BaseModel, condecimal
from typing import Optional

class Product(BaseModel):
    id: int
    name: str
    price: condecimal(gt=0)
    description: Optional[str] = None

创建FastAPI应用并包含路由

FastAPI框架允许你快速创建高性能的API。下面我们将创建一个简单的FastAPI应用,并定义一些基本路由。

1. 创建FastAPI应用

from fastapi import FastAPI

app = FastAPI()

2. 定义路由

路由用于处理不同的HTTP请求(如GET、POST等)。

@app.get("/")
def read_root():
    return {"message": "Hello, FastAPI!"}

3. 启动应用

在终端中运行以下命令:

uvicorn main:app --reload
  • main: Python文件名(main.py
  • app: FastAPI实例
  • --reload: 自动重启服务器,适用于开发环境

4. 访问路由

打开浏览器,访问 http://127.0.0.1:8000,你将看到:

{
  "message": "Hello, FastAPI!"
}

5. 定义更多路由

@app.get("/items/{item_id}")
def read_item(item_id: int, q: Optional[str] = None):
    return {"item_id": item_id, "q": q}
  • /{item_id}: 路径参数
  • q: 查询参数

6. 实践练习

任务: 创建一个POST路由/users/,接收一个User模型,并返回该用户数据。

参考答案:

from fastapi import FastAPI
from pydantic import BaseModel, EmailStr
from typing import Optional

app = FastAPI()

class User(BaseModel):
    id: int
    name: str
    email: EmailStr
    age: Optional[int] = None

@app.post("/users/")
def create_user(user: User):
    return user

接口定义和接口实现

在后端开发中,接口(API)定义和实现是核心部分。FastAPI通过路由和依赖注入机制,使接口定义和实现更加清晰和简洁。

1. 接口定义

接口定义涉及确定API的端点、请求方法、请求参数和响应格式。通常包括以下内容:

  • 端点(Endpoint): URL路径,例如/users/
  • 请求方法: GET, POST, PUT, DELETE等
  • 请求参数: 路径参数、查询参数、请求体
  • 响应格式: JSON, XML等

2. 接口实现

使用FastAPI,你可以直接在路由函数中实现接口逻辑。让我们通过一个简单的示例来理解。

示例: 用户管理API
  • GET /users/{user_id}: 获取用户信息
  • POST /users/: 创建新用户
  • PUT /users/{user_id}: 更新用户信息
  • DELETE /users/{user_id}: 删除用户
代码实现
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, EmailStr
from typing import Optional, Dict

app = FastAPI()

# 数据模型
class User(BaseModel):
    id: int
    name: str
    email: EmailStr
    age: Optional[int] = None

# 模拟数据库
fake_db: Dict[int, User] = {}

# 创建用户
@app.post("/users/", response_model=User)
def create_user(user: User):
    if user.id in fake_db:
        raise HTTPException(status_code=400, detail="User already exists")
    fake_db[user.id] = user
    return user

# 获取用户
@app.get("/users/{user_id}", response_model=User)
def get_user(user_id: int):
    user = fake_db.get(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

# 更新用户
@app.put("/users/{user_id}", response_model=User)
def update_user(user_id: int, updated_user: User):
    if user_id not in fake_db:
        raise HTTPException(status_code=404, detail="User not found")
    fake_db[user_id] = updated_user
    return updated_user

# 删除用户
@app.delete("/users/{user_id}")
def delete_user(user_id: int):
    if user_id not in fake_db:
        raise HTTPException(status_code=404, detail="User not found")
    del fake_db[user_id]
    return {"detail": "User deleted"}

3. 代码解释

  • 数据模型: 定义了User模型,用于请求和响应的数据验证。
  • 模拟数据库: 使用字典fake_db模拟简单的数据库操作。
  • 路由函数: 每个路由函数对应一个API端点,包含逻辑处理和错误处理。

4. 实践练习

任务: 扩展用户管理API,添加一个GET路由/users/,返回所有用户列表。

参考答案:

@app.get("/users/", response_model=List[User])
def get_all_users():
    return list(fake_db.values())

使用Uvicorn运行程序

Uvicorn是一个高性能的ASGI服务器,用于运行FastAPI应用。

1. 安装Uvicorn

如果尚未安装,请使用pip安装:

pip install uvicorn

2. 启动FastAPI应用

假设你的应用在main.py文件中,FastAPI实例名为app,使用以下命令启动:

uvicorn main:app --reload
  • main: Python文件名(不包括.py
  • app: FastAPI实例
  • --reload: 自动重启服务器,当代码修改时非常有用,适用于开发环境

3. 部署环境

在生产环境中,你可能希望不使用--reload,并结合其他工具如Gunicorn进行部署。例如:

pip install gunicorn
gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app
  • -w 4: 启动4个工作进程
  • -k uvicorn.workers.UvicornWorker: 使用Uvicorn的工作类

4. 配置文件

你可以创建配置文件来管理启动参数,简化命令。例如,创建run.sh脚本:

#!/bin/bash
uvicorn main:app --host 0.0.0.0 --port 8000 --reload

给脚本执行权限:

chmod +x run.sh

运行脚本:

./run.sh

5. 实践练习

任务: 使用Uvicorn启动你的用户管理API,并确保能够通过浏览器访问/docs获取自动生成的API文档。

参考答案:

  1. 确保main.py包含FastAPI实例和路由。

  2. 启动Uvicorn:

    uvicorn main:app --reload
    
  3. 打开浏览器,访问 http://127.0.0.1:8000/docs,你将看到交互式的API文档。


日志记录

日志记录对于调试和监控后端应用至关重要。Python的内置logging模块与FastAPI结合,可以实现高效的日志记录。

1. Python的logging模块

logging模块提供了灵活的日志记录系统,支持多种日志级别和输出方式。

2. 配置日志

main.py中配置日志:

import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("app.log"),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

3. 在路由中使用日志

@app.get("/")
def read_root():
    logger.info("Root endpoint called")
    return {"message": "Hello, FastAPI!"}

4. 日志级别

常用的日志级别:

  • DEBUG: 详细的信息,通常仅在诊断问题时使用
  • INFO: 确认程序按预期运行
  • WARNING: 指出有潜在问题的情况
  • ERROR: 由于更严重的问题,程序无法执行某些功能
  • CRITICAL: 严重错误,程序可能无法继续运行

5. 自定义日志

你可以为不同的模块或功能创建不同的日志记录器。

user_logger = logging.getLogger("user")
order_logger = logging.getLogger("order")

6. 实践练习

任务: 在用户管理API中,为每个CRUD操作添加相应的日志记录。

参考答案:

@app.post("/users/", response_model=User)
def create_user(user: User):
    if user.id in fake_db:
        logger.warning(f"Attempt to create user with existing id: {user.id}")
        raise HTTPException(status_code=400, detail="User already exists")
    fake_db[user.id] = user
    logger.info(f"User created: {user}")
    return user

@app.get("/users/{user_id}", response_model=User)
def get_user(user_id: int):
    user = fake_db.get(user_id)
    if not user:
        logger.error(f"User not found: {user_id}")
        raise HTTPException(status_code=404, detail="User not found")
    logger.info(f"User retrieved: {user}")
    return user

@app.put("/users/{user_id}", response_model=User)
def update_user(user_id: int, updated_user: User):
    if user_id not in fake_db:
        logger.error(f"Attempt to update non-existent user: {user_id}")
        raise HTTPException(status_code=404, detail="User not found")
    fake_db[user_id] = updated_user
    logger.info(f"User updated: {updated_user}")
    return updated_user

@app.delete("/users/{user_id}")
def delete_user(user_id: int):
    if user_id not in fake_db:
        logger.error(f"Attempt to delete non-existent user: {user_id}")
        raise HTTPException(status_code=404, detail="User not found")
    del fake_db[user_id]
    logger.info(f"User deleted: {user_id}")
    return {"detail": "User deleted"}

异步编程

异步编程可以提高应用的性能,特别是在处理I/O密集型任务时。FastAPI支持异步(async)和同步(def)的路由函数。

1. 异步基础

Python的asyncio库支持异步编程,允许在单线程中处理多个任务。

2. 定义异步路由

使用async def定义异步路由函数。

@app.get("/async")
async def async_route():
    await some_async_function()
    return {"message": "This is an async route"}

3. 异步数据库操作

假设你使用异步数据库驱动,如asyncpg,可以在路由中执行异步数据库操作。

4. 异步与同步的选择

  • 异步: 适用于I/O密集型任务,如数据库查询、网络请求。
  • 同步: 适用于CPU密集型任务,如复杂的计算。

5. 示例: 异步路由

import asyncio

@app.get("/wait")
async def wait_route():
    await asyncio.sleep(1)
    return {"message": "Waited for 1 second"}

6. 实践练习

任务: 将用户创建路由改为异步,并模拟一个异步数据库写入操作。

参考答案:

@app.post("/users/", response_model=User)
async def create_user(user: User):
    if user.id in fake_db:
        logger.warning(f"Attempt to create user with existing id: {user.id}")
        raise HTTPException(status_code=400, detail="User already exists")
    # 模拟异步数据库写入
    await asyncio.sleep(0.1)
    fake_db[user.id] = user
    logger.info(f"User created: {user}")
    return user

错误处理

有效的错误处理可以提升API的健壮性和用户体验。FastAPI提供了多种方式来处理错误,包括使用HTTPException和自定义异常处理器。

1. 使用HTTPException

HTTPException是FastAPI内置的异常类,用于返回HTTP错误响应。

from fastapi import HTTPException

@app.get("/items/{item_id}")
def read_item(item_id: int):
    if item_id not in fake_db:
        raise HTTPException(status_code=404, detail="Item not found")
    return fake_db[item_id]

2. 自定义异常处理器

有时需要针对特定的异常类型定义自定义处理逻辑。

示例: 处理验证错误
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from fastapi import Request

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    logger.error(f"Validation error: {exc}")
    return JSONResponse(
        status_code=422,
        content={"detail": exc.errors(), "body": exc.body},
    )

3. 自定义异常类

你可以创建自己的异常类,并定义相应的处理器。

示例: 自定义UserNotFoundException
class UserNotFoundException(Exception):
    def __init__(self, user_id: int):
        self.user_id = user_id

@app.exception_handler(UserNotFoundException)
async def user_not_found_exception_handler(request: Request, exc: UserNotFoundException):
    logger.error(f"User not found: {exc.user_id}")
    return JSONResponse(
        status_code=404,
        content={"message": f"User with id {exc.user_id} not found"},
    )

# 在路由中使用自定义异常
@app.get("/users/{user_id}", response_model=User)
def get_user(user_id: int):
    user = fake_db.get(user_id)
    if not user:
        raise UserNotFoundException(user_id=user_id)
    return user

4. 实践练习

任务: 为用户创建路由添加异常处理,当用户数据不符合要求时,返回详细的错误信息。

参考答案:

Pydantic已经在请求体验证失败时自动返回详细错误信息。因此,你无需额外添加异常处理。但是,如果你有自定义逻辑,可以结合前面的内容使用HTTPException或自定义异常。


WebSocket接口

WebSocket是一种持久化的双向通信协议,适用于实时应用,如聊天系统、实时通知等。FastAPI对WebSocket有原生支持。

1. 什么是WebSocket?

WebSocket允许客户端和服务器之间进行实时、双向通信,而无需每次都建立新的HTTP连接。

2. FastAPI中的WebSocket

在FastAPI中,可以使用WebSocket类来处理WebSocket连接。

3. 示例: 简单的WebSocket聊天

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

# 存储活跃连接
active_connections: List[WebSocket] = []

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    active_connections.append(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # 广播消息给所有连接的客户端
            for connection in active_connections:
                await connection.send_text(f"Message: {data}")
    except WebSocketDisconnect:
        active_connections.remove(websocket)
        logger.info("WebSocket disconnected")

4. 客户端示例

你可以使用JavaScript在浏览器中连接WebSocket:

<!DOCTYPE html>
<html>
<head>
    <title>WebSocket Test</title>
</head>
<body>
    <h1>WebSocket Test</h1>
    <input id="messageText" type="text" placeholder="Type a message">
    <button onclick="sendMessage()">Send</button>
    <ul id="messages"></ul>

    <script>
        const ws = new WebSocket("ws://localhost:8000/ws");
        ws.onmessage = function(event) {
            const messages = document.getElementById('messages');
            const message = document.createElement('li');
            message.textContent = event.data;
            messages.appendChild(message);
        };

        function sendMessage() {
            const input = document.getElementById("messageText");
            ws.send(input.value);
            input.value = '';
        }
    </script>
</body>
</html>

5. 实践练习

任务: 扩展WebSocket聊天,使得每个消息都包含发送者的用户名。

参考答案:

  1. 修改客户端,添加用户名输入。
<input id="username" type="text" placeholder="Username">
<input id="messageText" type="text" placeholder="Type a message">
<button onclick="sendMessage()">Send</button>
  1. 修改客户端发送消息格式为JSON。
function sendMessage() {
    const username = document.getElementById("username").value;
    const input = document.getElementById("messageText");
    const message = {
        username: username,
        message: input.value
    };
    ws.send(JSON.stringify(message));
    input.value = '';
}
  1. 修改服务器端处理接收的JSON消息。
import json

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    active_connections.append(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            username = message.get("username")
            text = message.get("message")
            full_message = f"{username}: {text}"
            for connection in active_connections:
                await connection.send_text(full_message)
    except WebSocketDisconnect:
        active_connections.remove(websocket)
        logger.info("WebSocket disconnected")

使用Postman测试接口

Postman是一款流行的API开发工具,允许你发送各种类型的HTTP请求,测试和调试API。

1. 安装Postman

访问Postman官网下载并安装适合你操作系统的版本。

2. 基本操作

发送GET请求
  1. 打开Postman,点击“New”创建一个新的请求。
  2. 选择“GET”方法。
  3. 输入URL,例如 http://127.0.0.1:8000/users/1。
  4. 点击“Send”发送请求,查看响应。
发送POST请求
  1. 选择“POST”方法。

  2. 输入URL,例如 http://127.0.0.1:8000/users/

  3. 切换到“Body”选项卡,选择“raw”并选择“JSON”格式。

  4. 输入JSON数据,例如:

    {
        "id": 1,
        "name": "Alice",
        "email": "alice@example.com",
        "age": 30
    }
    
  5. 点击“Send”发送请求,查看响应。

3. 使用环境变量

Postman允许你创建环境变量,简化请求中的重复数据。

  1. 点击右上角的“Environment”下拉菜单,选择“Manage Environments”。
  2. 创建一个新的环境,例如“Localhost”,添加变量:
    • base_url: http://127.0.0.1:8000
  3. 在请求URL中使用变量,例如 {{base_url}}/users/1

4. 编写测试脚本

Postman支持在请求后编写测试脚本,以自动验证响应。

示例: 验证响应状态码为200
  1. 发送一个GET请求。

  2. 切换到“Tests”选项卡。

  3. 输入以下脚本:

    pm.test("Status code is 200", function () {
        pm.response.to.have.status(200);
    });
    
  4. 发送请求,查看测试结果。

5. 实践练习

任务: 使用Postman测试用户管理API的所有CRUD操作,并编写相应的测试脚本验证响应数据。

参考答案:

  1. 创建用户:

    • 方法: POST

    • URL: {{base_url}}/users/

    • Body:

      {
          "id": 2,
          "name": "Bob",
          "email": "bob@example.com",
          "age": 25
      }
      
    • 测试脚本:

      pm.test("Status code is 200", function () {
          pm.response.to.have.status(200);
      });
      
      pm.test("Response has user data", function () {
          var jsonData = pm.response.json();
          pm.expect(jsonData).to.have.property("id", 2);
          pm.expect(jsonData).to.have.property("name", "Bob");
          pm.expect(jsonData).to.have.property("email", "bob@example.com");
          pm.expect(jsonData).to.have.property("age", 25);
      });
      
  2. 获取用户:

    • 方法: GET

    • URL: {{base_url}}/users/2

    • 测试脚本:

      pm.test("Status code is 200", function () {
          pm.response.to.have.status(200);
      });
      
      pm.test("Response has correct user data", function () {
          var jsonData = pm.response.json();
          pm.expect(jsonData).to.have.property("id", 2);
          pm.expect(jsonData).to.have.property("name", "Bob");
          pm.expect(jsonData).to.have.property("email", "bob@example.com");
          pm.expect(jsonData).to.have.property("age", 25);
      });
      
  3. 更新用户:

    • 方法: PUT

    • URL: {{base_url}}/users/2

    • Body:

      {
          "id": 2,
          "name": "Bob Updated",
          "email": "bob.updated@example.com",
          "age": 26
      }
      
    • 测试脚本:

      pm.test("Status code is 200", function () {
          pm.response.to.have.status(200);
      });
      
      pm.test("User is updated", function () {
          var jsonData = pm.response.json();
          pm.expect(jsonData).to.have.property("name", "Bob Updated");
          pm.expect(jsonData).to.have.property("email", "bob.updated@example.com");
          pm.expect(jsonData).to.have.property("age", 26);
      });
      
  4. 删除用户:

    • 方法: DELETE

    • URL: {{base_url}}/users/2

    • 测试脚本:

      pm.test("Status code is 200", function () {
          pm.response.to.have.status(200);
      });
      
      pm.test("User is deleted", function () {
          var jsonData = pm.response.json();
          pm.expect(jsonData).to.have.property("detail", "User deleted");
      });
      

综合实例

为了将所学内容整合起来,我们将构建一个完整的用户管理系统,包含REST API和WebSocket功能。

1. 项目结构

user_management/
├── main.py
├── models.py
├── routers/
│   └── users.py
├── websocket.py
└── logs/
    └── app.log

2. models.py - 数据模型

from pydantic import BaseModel, EmailStr
from typing import Optional

class User(BaseModel):
    id: int
    name: str
    email: EmailStr
    age: Optional[int] = None

3. routers/users.py - 用户路由

from fastapi import APIRouter, HTTPException
from typing import List, Dict
from models import User
import asyncio
import logging

router = APIRouter()
logger = logging.getLogger("users")

fake_db: Dict[int, User] = {}

@router.post("/", response_model=User)
async def create_user(user: User):
    if user.id in fake_db:
        logger.warning(f"Attempt to create user with existing id: {user.id}")
        raise HTTPException(status_code=400, detail="User already exists")
    # 模拟异步数据库操作
    await asyncio.sleep(0.1)
    fake_db[user.id] = user
    logger.info(f"User created: {user}")
    return user

@router.get("/{user_id}", response_model=User)
async def get_user(user_id: int):
    user = fake_db.get(user_id)
    if not user:
        logger.error(f"User not found: {user_id}")
        raise HTTPException(status_code=404, detail="User not found")
    logger.info(f"User retrieved: {user}")
    return user

@router.get("/", response_model=List[User])
async def get_all_users():
    logger.info("Retrieving all users")
    return list(fake_db.values())

@router.put("/{user_id}", response_model=User)
async def update_user(user_id: int, updated_user: User):
    if user_id not in fake_db:
        logger.error(f"Attempt to update non-existent user: {user_id}")
        raise HTTPException(status_code=404, detail="User not found")
    # 模拟异步数据库操作
    await asyncio.sleep(0.1)
    fake_db[user_id] = updated_user
    logger.info(f"User updated: {updated_user}")
    return updated_user

@router.delete("/{user_id}")
async def delete_user(user_id: int):
    if user_id not in fake_db:
        logger.error(f"Attempt to delete non-existent user: {user_id}")
        raise HTTPException(status_code=404, detail="User not found")
    # 模拟异步数据库操作
    await asyncio.sleep(0.1)
    del fake_db[user_id]
    logger.info(f"User deleted: {user_id}")
    return {"detail": "User deleted"}

4. websocket.py - WebSocket处理

from fastapi import WebSocket, WebSocketDisconnect
from typing import List
import json
import logging

logger = logging.getLogger("websocket")

active_connections: List[WebSocket] = []

async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    active_connections.append(websocket)
    logger.info("WebSocket connection established")
    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            username = message.get("username")
            text = message.get("message")
            full_message = f"{username}: {text}"
            logger.info(f"Broadcasting message: {full_message}")
            for connection in active_connections:
                await connection.send_text(full_message)
    except WebSocketDisconnect:
        active_connections.remove(websocket)
        logger.info("WebSocket disconnected")

5. main.py - 应用入口

from fastapi import FastAPI
from routers import users
from websocket import websocket_endpoint
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("logs/app.log"),
        logging.StreamHandler()
    ]
)

app = FastAPI()

# 包含用户路由
app.include_router(users.router, prefix="/users", tags=["Users"])

# WebSocket路由
app.add_api_websocket_route("/ws", websocket_endpoint)

@app.get("/")
def read_root():
    return {"message": "Welcome to the User Management API"}

6. 启动应用

确保所有文件在同一目录下,运行以下命令启动应用:

uvicorn main:app --reload

7. 测试API和WebSocket

  • 使用Postman测试用户管理API。
  • 使用浏览器打开之前的WebSocket客户端示例,与服务器进行实时通信。

总结与进一步学习

恭喜你完成了Python后端开发的基础学习!你已经掌握了以下关键技术:

  • 使用Pydantic定义数据模型
  • 创建和配置FastAPI应用
  • 定义和实现REST API接口
  • 使用Uvicorn运行和部署应用
  • 实现日志记录
  • 应用异步编程提升性能
  • 处理错误和异常
  • 集成WebSocket实现实时通信
  • 使用Postman测试和调试API

下一步建议

  1. 深入学习数据库: 学习如何与数据库(如PostgreSQL、MongoDB)集成,使用ORM(如SQLAlchemy)进行数据操作。
  2. 用户认证和授权: 实现安全机制,如JWT认证、OAuth。
  3. 部署和运维: 学习如何将应用部署到云平台(如AWS、Heroku),使用Docker容器化应用。
  4. 性能优化: 学习缓存机制(如Redis),优化数据库查询,使用负载均衡。
  5. 测试驱动开发(TDD): 学习如何编写单元测试和集成测试,确保代码质量。
  6. 前端集成: 学习如何与前端框架(如React、Vue)集成,构建完整的全栈应用。

Python 后端开发实战项目:用户管理与实时聊天系统

欢迎来到本实战项目!本项目旨在整合前面学习的各个Python后端开发要素,帮助你通过实际操作深入理解和掌握它们。我们将构建一个用户管理系统,同时集成实时聊天功能,涵盖以下内容:

  1. 使用Pydantic定义数据模型
  2. 创建FastAPI应用并包含路由
  3. 接口定义和接口实现
  4. 使用Uvicorn运行程序
  5. 日志记录
  6. 异步编程
  7. 错误处理
  8. WebSocket接口
  9. 使用Postman测试接口

项目概述

本项目将实现以下功能:

  • 用户管理API
    • 创建用户
    • 获取单个用户
    • 获取所有用户
    • 更新用户信息
    • 删除用户
  • 实时聊天功能
    • 用户可以通过WebSocket连接进行实时聊天
    • 消息广播给所有连接的客户端
  • 日志记录
    • 记录API请求和WebSocket活动日志
  • 错误处理
    • 处理常见错误,如用户不存在、数据验证失败等

项目结构

项目结构如下:

user_chat_app/
├── main.py
├── models.py
├── routers/
│   └── users.py
├── websocket.py
├── logs/
│   └── app.log
├── requirements.txt
└── client/
    └── websocket_client.html
  • main.py: 应用入口,包含FastAPI实例、路由和WebSocket集成。
  • models.py: Pydantic数据模型定义。
  • routers/users.py: 用户管理相关的路由和逻辑。
  • websocket.py: WebSocket连接和消息处理逻辑。
  • logs/app.log: 日志文件,记录应用运行日志。
  • requirements.txt: 项目依赖列表。
  • client/websocket_client.html: 简单的WebSocket客户端,用于测试实时聊天功能。

环境搭建

1. 克隆或创建项目目录

首先,创建一个项目目录并导航到该目录:

mkdir user_chat_app
cd user_chat_app

2. 创建虚拟环境

使用Python的venv模块创建并激活虚拟环境:

# 创建虚拟环境
python -m venv env

# 激活虚拟环境
# Windows
env\Scripts\activate

# macOS/Linux
source env/bin/activate

3. 创建requirements.txt

在项目根目录下创建一个requirements.txt文件,列出所需的依赖:

fastapi
uvicorn
pydantic

4. 安装依赖

使用pip安装项目依赖:

pip install -r requirements.txt

代码实现

1. 定义数据模型 (models.py)

在项目根目录下创建models.py,定义用户数据模型:

# models.py

from pydantic import BaseModel, EmailStr, Field
from typing import Optional

class User(BaseModel):
    id: int = Field(..., example=1)
    name: str = Field(..., example="Alice")
    email: EmailStr = Field(..., example="alice@example.com")
    age: Optional[int] = Field(None, ge=0, example=30)

解释:

  • BaseModel: 所有Pydantic模型应继承自BaseModel
  • Field: 用于提供字段的元数据,如示例值和验证约束。
  • Optional[int]: age字段是可选的,且必须是非负整数。

2. 创建用户路由 (routers/users.py)

在项目目录下创建routers文件夹,并在其中创建users.py

# routers/users.py

from fastapi import APIRouter, HTTPException, status
from typing import List, Dict
from models import User
import asyncio
import logging

router = APIRouter()
logger = logging.getLogger("users")

# 模拟数据库
fake_db: Dict[int, User] = {}

@router.post("/", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(user: User):
    if user.id in fake_db:
        logger.warning(f"Attempt to create user with existing id: {user.id}")
        raise HTTPException(status_code=400, detail="User already exists")
    # 模拟异步数据库操作
    await asyncio.sleep(0.1)
    fake_db[user.id] = user
    logger.info(f"User created: {user}")
    return user

@router.get("/{user_id}", response_model=User)
async def get_user(user_id: int):
    user = fake_db.get(user_id)
    if not user:
        logger.error(f"User not found: {user_id}")
        raise HTTPException(status_code=404, detail="User not found")
    logger.info(f"User retrieved: {user}")
    return user

@router.get("/", response_model=List[User])
async def get_all_users():
    logger.info("Retrieving all users")
    return list(fake_db.values())

@router.put("/{user_id}", response_model=User)
async def update_user(user_id: int, updated_user: User):
    if user_id != updated_user.id:
        logger.warning(f"User ID mismatch: {user_id} != {updated_user.id}")
        raise HTTPException(status_code=400, detail="User ID mismatch")
    if user_id not in fake_db:
        logger.error(f"Attempt to update non-existent user: {user_id}")
        raise HTTPException(status_code=404, detail="User not found")
    # 模拟异步数据库操作
    await asyncio.sleep(0.1)
    fake_db[user_id] = updated_user
    logger.info(f"User updated: {updated_user}")
    return updated_user

@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int):
    if user_id not in fake_db:
        logger.error(f"Attempt to delete non-existent user: {user_id}")
        raise HTTPException(status_code=404, detail="User not found")
    # 模拟异步数据库操作
    await asyncio.sleep(0.1)
    del fake_db[user_id]
    logger.info(f"User deleted: {user_id}")
    return

解释:

  • APIRouter: 用于模块化路由。
  • HTTPException: 用于抛出HTTP错误。
  • fake_db: 使用字典模拟数据库。
  • 异步函数: 使用async defawait模拟异步数据库操作。

3. 实现WebSocket逻辑 (websocket.py)

创建websocket.py,处理WebSocket连接和消息广播:

# websocket.py

from fastapi import WebSocket, WebSocketDisconnect
from typing import List
import json
import logging

logger = logging.getLogger("websocket")

active_connections: List[WebSocket] = []

async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    active_connections.append(websocket)
    logger.info("WebSocket connection established")
    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            username = message.get("username", "Anonymous")
            text = message.get("message", "")
            full_message = f"{username}: {text}"
            logger.info(f"Broadcasting message: {full_message}")
            # 广播消息给所有连接的客户端
            for connection in active_connections:
                await connection.send_text(full_message)
    except WebSocketDisconnect:
        active_connections.remove(websocket)
        logger.info("WebSocket disconnected")
    except Exception as e:
        logger.error(f"WebSocket error: {e}")
        active_connections.remove(websocket)

解释:

  • active_connections: 存储所有活跃的WebSocket连接。
  • websocket_endpoint: 处理WebSocket连接,接收消息并广播。

4. 应用入口 (main.py)

创建main.py,整合所有组件:

# main.py

from fastapi import FastAPI
from routers import users
from websocket import websocket_endpoint
import logging
import os

app = FastAPI(
    title="User Management and Chat API",
    description="A simple user management system with real-time chat functionality.",
    version="1.0.0"
)

# 配置日志
if not os.path.exists("logs"):
    os.makedirs("logs")

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("logs/app.log"),
        logging.StreamHandler()
    ]
)

# 引入用户路由
app.include_router(users.router, prefix="/users", tags=["Users"])

# WebSocket路由
@app.websocket("/ws")
async def websocket_route(websocket):
    await websocket_endpoint(websocket)

# 根路由
@app.get("/")
def read_root():
    return {"message": "Welcome to the User Management and Chat API"}

解释:

  • FastAPI实例: 配置应用标题、描述和版本。
  • 日志配置: 配置日志记录到文件和控制台。
  • 包含路由: 使用include_router整合用户路由。
  • WebSocket路由: 将/ws路径指向websocket_endpoint函数。
  • 根路由: 提供欢迎消息。

5. 创建WebSocket客户端 (client/websocket_client.html)

为了测试实时聊天功能,我们创建一个简单的WebSocket客户端:

<!-- client/websocket_client.html -->

<!DOCTYPE html>
<html>
<head>
    <title>WebSocket Chat Client</title>
    <style>
        body { font-family: Arial, sans-serif; }
        #chat { border: 1px solid #ccc; height: 300px; overflow-y: scroll; padding: 10px; }
        #messageForm { margin-top: 10px; }
    </style>
</head>
<body>
    <h1>WebSocket Chat Client</h1>
    <div>
        <label for="username">Username:</label>
        <input type="text" id="username" placeholder="Enter your username">
    </div>
    <div id="chat"></div>
    <form id="messageForm">
        <input type="text" id="messageText" placeholder="Type a message" autocomplete="off" required>
        <button type="submit">Send</button>
    </form>

    <script>
        const chat = document.getElementById('chat');
        const messageForm = document.getElementById('messageForm');
        const messageText = document.getElementById('messageText');
        const usernameInput = document.getElementById('username');

        const ws = new WebSocket("ws://localhost:8000/ws");

        ws.onopen = () => {
            appendMessage("Connected to the chat server.");
        };

        ws.onmessage = (event) => {
            appendMessage(event.data);
        };

        ws.onclose = () => {
            appendMessage("Disconnected from the chat server.");
        };

        messageForm.addEventListener('submit', function(e) {
            e.preventDefault();
            const username = usernameInput.value.trim() || "Anonymous";
            const message = messageText.value.trim();
            if (message === "") return;
            const payload = JSON.stringify({ username: username, message: message });
            ws.send(payload);
            messageText.value = '';
        });

        function appendMessage(message) {
            const messageElement = document.createElement('div');
            messageElement.textContent = message;
            chat.appendChild(messageElement);
            chat.scrollTop = chat.scrollHeight;
        }
    </script>
</body>
</html>

解释:

  • 用户名输入: 用户可以输入自己的用户名。
  • 聊天窗口: 显示所有聊天消息。
  • 消息表单: 发送消息。
  • WebSocket逻辑: 连接服务器,发送和接收消息。

6. 创建日志目录

确保日志目录存在:

mkdir logs

7. 项目依赖 (requirements.txt)

确认requirements.txt内容如下:

fastapi
uvicorn
pydantic

运行项目

1. 启动应用

在项目根目录下运行以下命令启动FastAPI应用:

uvicorn main:app --reload
  • main: 指向main.py文件。
  • app: FastAPI实例。
  • --reload: 代码修改后自动重启服务器(仅用于开发环境)。

输出示例:

INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [28716] using watchgod
INFO:     Started server process [28718]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

2. 访问API文档

FastAPI自动生成交互式API文档,访问以下地址查看:

  • Swagger UI: http://127.0.0.1:8000/docs
  • ReDoc: http://127.0.0.1:8000/redoc

3. 运行WebSocket客户端

打开client/websocket_client.html文件(双击或通过浏览器打开),输入用户名并发送消息。

使用Postman测试接口

Postman是一款强大的API测试工具,可以帮助你发送HTTP请求并查看响应。以下是如何使用Postman测试本项目的API。

1. 安装Postman

如果尚未安装Postman,请访问Postman官网下载并安装适合你操作系统的版本。

2. 创建Postman请求

a. 创建用户(POST /users/
  • 方法: POST

  • URL: http://127.0.0.1:8000/users/

  • Headers: Content-Type: application/json

  • Body: 选择rawJSON,输入以下JSON数据:

    {
        "id": 1,
        "name": "Alice",
        "email": "alice@example.com",
        "age": 30
    }
    
  • 发送请求: 点击Send

  • 预期响应:

    {
        "id": 1,
        "name": "Alice",
        "email": "alice@example.com",
        "age": 30
    }
    
b. 获取单个用户(GET /users/1
  • 方法: GET

  • URL: http://127.0.0.1:8000/users/1

  • 发送请求: 点击Send

  • 预期响应:

    {
        "id": 1,
        "name": "Alice",
        "email": "alice@example.com",
        "age": 30
    }
    
c. 获取所有用户(GET /users/
  • 方法: GET

  • URL: http://127.0.0.1:8000/users/

  • 发送请求: 点击Send

  • 预期响应:

    [
        {
            "id": 1,
            "name": "Alice",
            "email": "alice@example.com",
            "age": 30
        }
    ]
    
d. 更新用户(PUT /users/1
  • 方法: PUT

  • URL: http://127.0.0.1:8000/users/1

  • Headers: Content-Type: application/json

  • Body: 选择rawJSON,输入更新后的用户数据:

    {
        "id": 1,
        "name": "Alice Smith",
        "email": "alice.smith@example.com",
        "age": 31
    }
    
  • 发送请求: 点击Send

  • 预期响应:

    {
        "id": 1,
        "name": "Alice Smith",
        "email": "alice.smith@example.com",
        "age": 31
    }
    
e. 删除用户(DELETE /users/1
  • 方法: DELETE
  • URL: http://127.0.0.1:8000/users/1
  • 发送请求: 点击Send
  • 预期响应: 无内容(状态码204)

3. 使用Postman集合

为了方便,你可以将上述请求整理为Postman集合。以下是手动创建步骤:

  1. 创建集合:

    • 点击左侧栏的Collections
    • 点击New Collection,命名为User Management API
  2. 添加请求:

    • User Management API集合下,点击Add Request
    • 分别创建上述五个请求(创建、获取单个、获取所有、更新、删除),并保存到集合中。
  3. 编写测试脚本:

    • 在每个请求的Tests标签下,添加如下脚本以验证响应:

    示例: 创建用户请求的测试脚本

    pm.test("Status code is 201", function () {
        pm.response.to.have.status(201);
    });
    
    pm.test("Response has user data", function () {
        var jsonData = pm.response.json();
        pm.expect(jsonData).to.have.property("id", 1);
        pm.expect(jsonData).to.have.property("name", "Alice");
        pm.expect(jsonData).to.have.property("email", "alice@example.com");
        pm.expect(jsonData).to.have.property("age", 30);
    });
    

    注意: 根据每个请求的预期响应,调整测试脚本。

  4. 运行集合:

    • 点击集合右侧的Run按钮,选择要运行的请求,点击Run进行批量测试。

日志记录

日志记录对于监控和调试应用非常重要。在本项目中,日志记录已集成在main.pyrouters/users.pywebsocket.py中。

查看日志

日志文件位于logs/app.log,你可以通过以下命令查看日志内容:

# 实时查看日志(macOS/Linux)
tail -f logs/app.log

# Windows (使用 PowerShell)
Get-Content logs/app.log -Wait

示例日志内容:

2024-04-27 10:00:00,000 - users - INFO - User created: id=1 name='Alice' email='alice@example.com' age=30
2024-04-27 10:01:00,000 - websocket - INFO - WebSocket connection established
2024-04-27 10:01:05,000 - websocket - INFO - Broadcasting message: Alice: Hello, everyone!

错误处理

本项目通过使用HTTPException和自定义异常处理器来处理错误。以下是一些常见的错误处理示例:

1. 创建已存在用户

请求: POST /users/,用户ID已存在。

预期响应:

  • 状态码: 400

  • 响应内容:

    {
        "detail": "User already exists"
    }
    

2. 获取不存在的用户

请求: GET /users/999

预期响应:

  • 状态码: 404

  • 响应内容:

    {
        "detail": "User not found"
    }
    

3. 数据验证失败

请求: POST /users/,缺少必填字段或字段格式错误。

预期响应:

  • 状态码: 422

  • 响应内容:

    {
        "detail": [
            {
                "loc": ["body", "email"],
                "msg": "field required",
                "type": "value_error.missing"
            }
        ]
    }
    

解释:

  • FastAPI和Pydantic自动处理数据验证错误,并返回详细的错误信息。

异步编程

本项目中的API路由和WebSocket连接均使用异步编程,提升了应用的性能和响应能力。通过使用async defawait,应用能够在处理I/O密集型任务时高效地管理并发请求。

示例: 用户创建路由使用异步函数和await asyncio.sleep(0.1)模拟异步数据库操作。

@router.post("/", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(user: User):
    if user.id in fake_db:
        logger.warning(f"Attempt to create user with existing id: {user.id}")
        raise HTTPException(status_code=400, detail="User already exists")
    # 模拟异步数据库操作
    await asyncio.sleep(0.1)
    fake_db[user.id] = user
    logger.info(f"User created: {user}")
    return user

解释:

  • 异步函数: 使用async def定义。
  • await: 在需要等待的操作前使用await,如异步数据库操作。

部署与运行

1. 本地开发

在本地开发时,使用uvicorn启动应用,并使用--reload参数实现自动重载。

uvicorn main:app --reload

2. 生产环境部署

在生产环境中,建议使用gunicornuvicorn的工作器配合部署应用。

a. 安装Gunicorn

在虚拟环境中安装gunicorn

pip install gunicorn
b. 启动应用

使用以下命令启动应用:

gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app
  • -w 4: 启动4个工作进程。
  • -k uvicorn.workers.UvicornWorker: 使用Uvicorn的工作器类。

3. 使用Docker容器化应用(可选)

为了方便部署和环境一致性,可以将应用容器化。以下是一个简单的Dockerfile示例:

# Dockerfile

FROM python:3.10-slim

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .

# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 8000

# 启动应用
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

构建Docker镜像:

docker build -t user_chat_app .

运行Docker容器:

docker run -d --name user_chat_app -p 8000:8000 user_chat_app

综合测试

1. 测试用户管理API

使用Postman按照前面的步骤创建、获取、更新和删除用户,确保API正常工作。

2. 测试实时聊天功能

  1. 启动应用后,打开多个浏览器窗口,加载client/websocket_client.html文件。
  2. 在每个客户端输入不同的用户名。
  3. 发送消息,确保所有连接的客户端都能实时接收到消息。

示例:

  • 客户端1:
    • 用户名: Alice
    • 消息: Hello, everyone!
  • 客户端2:
    • 用户名: Bob
    • 消息: Hi Alice!

预期结果:

  • 客户端1客户端2都显示以下消息:

    Alice: Hello, everyone!
    Bob: Hi Alice!
    

3. 查看日志

通过查看logs/app.log文件,确认所有操作和WebSocket活动都有详细记录。

总结

通过本项目,你已经实践并整合了以下关键Python后端开发要素:

  • 数据模型定义: 使用Pydantic定义用户数据模型。
  • FastAPI应用创建: 创建FastAPI实例,配置路由和WebSocket。
  • 路由和接口实现: 实现用户管理的CRUD接口。
  • 异步编程: 使用异步函数和await提升性能。
  • 日志记录: 集成Python的logging模块,记录应用日志。
  • 错误处理: 使用HTTPException处理常见错误。
  • WebSocket接口: 实现实时聊天功能。
  • API测试: 使用Postman测试和验证API。

下一步建议

  1. 集成数据库: 使用数据库(如PostgreSQL)替代fake_db,并使用ORM(如SQLAlchemy)进行数据操作。
  2. 用户认证与授权: 实现JWT认证,保护API端点。
  3. 前端框架集成: 使用React或Vue构建更复杂的前端界面,集成API和WebSocket。
  4. 容器编排与部署: 使用Docker Compose或Kubernetes部署多容器应用。
  5. 单元测试与集成测试: 使用pytest编写测试,确保代码质量和稳定性。
  6. 性能优化: 引入缓存机制(如Redis),优化数据库查询,提高应用性能。

Python 操作和处理 MySQL 数据库详细教程

欢迎来到本教程!本教程旨在帮助你从Python脚本开发过渡到后端开发,重点学习如何使用Python与MySQL数据库进行交互。无论你是初学者还是有一定经验的开发者,这个教程都将为你提供全面的指导,涵盖以下内容:

  1. 环境搭建
  2. 安装必要的库
  3. 连接到MySQL数据库
  4. 创建和管理数据库和表
  5. 执行CRUD操作(创建、读取、更新、删除)
  6. 使用ORM(SQLAlchemy)进行数据库操作
  7. 事务管理
  8. 连接池管理
  9. 错误处理
  10. 综合实例
  11. 最佳实践与安全性
  12. 总结与进一步学习

目录

  1. 环境搭建
  2. 安装必要的库
  3. 连接到MySQL数据库
  4. 创建和管理数据库和表
  5. 执行CRUD操作
    • 创建(Create)
    • 读取(Read)
    • 更新(Update)
    • 删除(Delete)
  6. 使用ORM(SQLAlchemy)进行数据库操作
    • 安装SQLAlchemy
    • 定义模型
    • 创建会话
    • 执行CRUD操作
  7. 事务管理
  8. 连接池管理
  9. 错误处理
  10. 综合实例
  11. 最佳实践与安全性
  12. 总结与进一步学习

前言

在现代应用开发中,数据库是不可或缺的组成部分。MySQL作为流行的关系型数据库管理系统,广泛应用于各种应用场景。Python提供了多种方式与MySQL进行交互,包括使用原生的MySQL驱动程序或通过ORM(对象关系映射)工具如SQLAlchemy简化数据库操作。

本教程将从基础开始,逐步深入,帮助你掌握使用Python操作MySQL数据库的各种方法和最佳实践。


环境搭建

在开始之前,我们需要准备开发环境:

  1. Python 安装: 确保你的系统安装了Python 3.7及以上版本。你可以通过以下命令检查Python版本:

    python --version
    

    如果未安装,请访问Python官网下载安装。

  2. MySQL 安装: 安装并配置MySQL服务器。你可以从MySQL官网下载适合你操作系统的版本,并按照安装向导进行安装。

  3. 创建数据库用户: 为了安全性,建议为你的应用创建一个专用的数据库用户。

    -- 登录到MySQL命令行
    mysql -u root -p
    
    -- 创建数据库
    CREATE DATABASE python_mysql_demo;
    
    -- 创建用户并授予权限
    CREATE USER 'python_user'@'localhost' IDENTIFIED BY 'your_password';
    GRANT ALL PRIVILEGES ON python_mysql_demo.* TO 'python_user'@'localhost';
    FLUSH PRIVILEGES;
    

    注意: 替换 'your_password' 为一个安全的密码。


安装必要的库

Python与MySQL的交互主要依赖于第三方库。以下是常用的库:

  1. mysql-connector-python: 官方提供的MySQL驱动程序,支持Python 3。
  2. PyMySQL: 一个纯Python编写的MySQL客户端。
  3. SQLAlchemy: 强大的ORM工具,支持多种数据库,包括MySQL。

根据需求,你可以选择适合你的库。以下将主要介绍使用mysql-connector-pythonSQLAlchemy

安装库

首先,建议使用虚拟环境来管理项目依赖。使用以下命令创建并激活虚拟环境:

# 创建虚拟环境
python -m venv env

# 激活虚拟环境
# Windows
env\Scripts\activate

# macOS/Linux
source env/bin/activate

然后,安装必要的库:

pip install mysql-connector-python
pip install SQLAlchemy
pip install pymysql  # 如果选择使用PyMySQL作为MySQL驱动

连接到MySQL数据库

连接到MySQL数据库是进行任何数据库操作的第一步。以下将介绍如何使用mysql-connector-pythonPyMySQL进行连接。

使用 mysql-connector-python 连接

# mysql_connector_demo.py

import mysql.connector
from mysql.connector import Error

def create_connection(host_name, user_name, user_password, db_name):
    """创建与MySQL数据库的连接"""
    connection = None
    try:
        connection = mysql.connector.connect(
            host=host_name,
            user=user_name,
            passwd=user_password,
            database=db_name
        )
        print("连接到MySQL数据库成功")
    except Error as e:
        print(f"连接失败: {e}")
    return connection

if __name__ == "__main__":
    connection = create_connection(
        host_name="localhost",
        user_name="python_user",
        user_password="your_password",
        db_name="python_mysql_demo"
    )

解释:

  • mysql.connector.connect: 用于创建与MySQL数据库的连接。
  • Error: 捕获连接过程中可能发生的错误。

使用 PyMySQL 连接

# pymysql_demo.py

import pymysql

def create_connection(host, user, password, db):
    """创建与MySQL数据库的连接"""
    connection = None
    try:
        connection = pymysql.connect(
            host=host,
            user=user,
            password=password,
            database=db,
            cursorclass=pymysql.cursors.DictCursor  # 返回字典类型的结果
        )
        print("连接到MySQL数据库成功")
    except Exception as e:
        print(f"连接失败: {e}")
    return connection

if __name__ == "__main__":
    connection = create_connection(
        host="localhost",
        user="python_user",
        password="your_password",
        db="python_mysql_demo"
    )

解释:

  • pymysql.connect: 用于创建与MySQL数据库的连接。
  • cursorclass: 设置游标类型,DictCursor返回字典类型的结果,便于使用。

创建和管理数据库和表

在进行数据操作之前,需要确保数据库和表的存在。以下将介绍如何使用Python创建数据库和表。

使用 mysql-connector-python 创建表

# create_table_mysql_connector.py

import mysql.connector
from mysql.connector import Error

def create_table(connection, query):
    """创建表"""
    cursor = connection.cursor()
    try:
        cursor.execute(query)
        print("表创建成功")
    except Error as e:
        print(f"创建表失败: {e}")
    finally:
        cursor.close()

if __name__ == "__main__":
    connection = mysql.connector.connect(
        host="localhost",
        user="python_user",
        passwd="your_password",
        database="python_mysql_demo"
    )

    create_user_table = """
    CREATE TABLE IF NOT EXISTS users (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        email VARCHAR(100) NOT NULL UNIQUE,
        age INT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    """

    create_table(connection, create_user_table)
    connection.close()

使用 PyMySQL 创建表

# create_table_pymysql.py

import pymysql

def create_table(connection, query):
    """创建表"""
    with connection.cursor() as cursor:
        cursor.execute(query)
        connection.commit()
        print("表创建成功")

if __name__ == "__main__":
    connection = pymysql.connect(
        host="localhost",
        user="python_user",
        password="your_password",
        database="python_mysql_demo",
        cursorclass=pymysql.cursors.DictCursor
    )

    create_user_table = """
    CREATE TABLE IF NOT EXISTS users (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        email VARCHAR(100) NOT NULL UNIQUE,
        age INT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    """

    create_table(connection, create_user_table)
    connection.close()

解释:

  • CREATE TABLE IF NOT EXISTS: 创建表,如果表已存在则忽略。
  • 字段说明:
    • id: 主键,自增。
    • name: 用户名,非空。
    • email: 用户邮箱,非空且唯一。
    • age: 用户年龄。
    • created_at: 记录创建时间,默认为当前时间。

执行CRUD操作

CRUD代表创建(Create)、读取(Read)、更新(Update)、删除(Delete)操作,是数据库操作的基础。以下将分别介绍如何使用mysql-connector-pythonPyMySQL执行这些操作。

使用 mysql-connector-python 执行CRUD操作

1. 创建(Create)
# crud_create_mysql_connector.py

import mysql.connector
from mysql.connector import Error

def create_user(connection, name, email, age):
    """创建用户"""
    query = """
    INSERT INTO users (name, email, age)
    VALUES (%s, %s, %s)
    """
    cursor = connection.cursor()
    try:
        cursor.execute(query, (name, email, age))
        connection.commit()
        print(f"用户 '{name}' 创建成功,ID: {cursor.lastrowid}")
    except Error as e:
        print(f"创建用户失败: {e}")
    finally:
        cursor.close()

if __name__ == "__main__":
    connection = mysql.connector.connect(
        host="localhost",
        user="python_user",
        passwd="your_password",
        database="python_mysql_demo"
    )

    create_user(connection, "Alice", "alice@example.com", 30)
    create_user(connection, "Bob", "bob@example.com", 25)

    connection.close()
2. 读取(Read)
# crud_read_mysql_connector.py

import mysql.connector
from mysql.connector import Error

def get_user_by_id(connection, user_id):
    """根据ID获取用户"""
    query = "SELECT * FROM users WHERE id = %s"
    cursor = connection.cursor(dictionary=True)
    try:
        cursor.execute(query, (user_id,))
        result = cursor.fetchone()
        if result:
            print(f"用户信息: {result}")
        else:
            print(f"用户ID {user_id} 不存在")
    except Error as e:
        print(f"获取用户失败: {e}")
    finally:
        cursor.close()

def get_all_users(connection):
    """获取所有用户"""
    query = "SELECT * FROM users"
    cursor = connection.cursor(dictionary=True)
    try:
        cursor.execute(query)
        results = cursor.fetchall()
        for user in results:
            print(user)
    except Error as e:
        print(f"获取用户列表失败: {e}")
    finally:
        cursor.close()

if __name__ == "__main__":
    connection = mysql.connector.connect(
        host="localhost",
        user="python_user",
        passwd="your_password",
        database="python_mysql_demo"
    )

    get_user_by_id(connection, 1)
    get_all_users(connection)

    connection.close()
3. 更新(Update)
# crud_update_mysql_connector.py

import mysql.connector
from mysql.connector import Error

def update_user(connection, user_id, name=None, email=None, age=None):
    """更新用户信息"""
    fields = []
    values = []

    if name:
        fields.append("name = %s")
        values.append(name)
    if email:
        fields.append("email = %s")
        values.append(email)
    if age is not None:
        fields.append("age = %s")
        values.append(age)

    if not fields:
        print("没有提供需要更新的字段")
        return

    values.append(user_id)
    query = f"UPDATE users SET {', '.join(fields)} WHERE id = %s"

    cursor = connection.cursor()
    try:
        cursor.execute(query, tuple(values))
        connection.commit()
        if cursor.rowcount:
            print(f"用户ID {user_id} 更新成功")
        else:
            print(f"用户ID {user_id} 不存在")
    except Error as e:
        print(f"更新用户失败: {e}")
    finally:
        cursor.close()

if __name__ == "__main__":
    connection = mysql.connector.connect(
        host="localhost",
        user="python_user",
        passwd="your_password",
        database="python_mysql_demo"
    )

    update_user(connection, 1, name="Alice Smith", age=31)
    update_user(connection, 3, name="Charlie")  # 假设ID 3 不存在

    connection.close()
4. 删除(Delete)
# crud_delete_mysql_connector.py

import mysql.connector
from mysql.connector import Error

def delete_user(connection, user_id):
    """删除用户"""
    query = "DELETE FROM users WHERE id = %s"
    cursor = connection.cursor()
    try:
        cursor.execute(query, (user_id,))
        connection.commit()
        if cursor.rowcount:
            print(f"用户ID {user_id} 删除成功")
        else:
            print(f"用户ID {user_id} 不存在")
    except Error as e:
        print(f"删除用户失败: {e}")
    finally:
        cursor.close()

if __name__ == "__main__":
    connection = mysql.connector.connect(
        host="localhost",
        user="python_user",
        passwd="your_password",
        database="python_mysql_demo"
    )

    delete_user(connection, 2)
    delete_user(connection, 3)  # 假设ID 3 不存在

    connection.close()

使用 PyMySQL 执行CRUD操作

以下示例展示如何使用PyMySQL执行相同的CRUD操作。

1. 创建(Create)
# crud_create_pymysql.py

import pymysql

def create_user(connection, name, email, age):
    """创建用户"""
    query = """
    INSERT INTO users (name, email, age)
    VALUES (%s, %s, %s)
    """
    try:
        with connection.cursor() as cursor:
            cursor.execute(query, (name, email, age))
        connection.commit()
        print(f"用户 '{name}' 创建成功,ID: {cursor.lastrowid}")
    except Exception as e:
        print(f"创建用户失败: {e}")

if __name__ == "__main__":
    connection = pymysql.connect(
        host="localhost",
        user="python_user",
        password="your_password",
        db="python_mysql_demo",
        cursorclass=pymysql.cursors.DictCursor
    )

    create_user(connection, "Alice", "alice@example.com", 30)
    create_user(connection, "Bob", "bob@example.com", 25)

    connection.close()
2. 读取(Read)
# crud_read_pymysql.py

import pymysql

def get_user_by_id(connection, user_id):
    """根据ID获取用户"""
    query = "SELECT * FROM users WHERE id = %s"
    try:
        with connection.cursor() as cursor:
            cursor.execute(query, (user_id,))
            result = cursor.fetchone()
            if result:
                print(f"用户信息: {result}")
            else:
                print(f"用户ID {user_id} 不存在")
    except Exception as e:
        print(f"获取用户失败: {e}")

def get_all_users(connection):
    """获取所有用户"""
    query = "SELECT * FROM users"
    try:
        with connection.cursor() as cursor:
            cursor.execute(query)
            results = cursor.fetchall()
            for user in results:
                print(user)
    except Exception as e:
        print(f"获取用户列表失败: {e}")

if __name__ == "__main__":
    connection = pymysql.connect(
        host="localhost",
        user="python_user",
        password="your_password",
        db="python_mysql_demo",
        cursorclass=pymysql.cursors.DictCursor
    )

    get_user_by_id(connection, 1)
    get_all_users(connection)

    connection.close()
3. 更新(Update)
# crud_update_pymysql.py

import pymysql

def update_user(connection, user_id, name=None, email=None, age=None):
    """更新用户信息"""
    fields = []
    values = []

    if name:
        fields.append("name = %s")
        values.append(name)
    if email:
        fields.append("email = %s")
        values.append(email)
    if age is not None:
        fields.append("age = %s")
        values.append(age)

    if not fields:
        print("没有提供需要更新的字段")
        return

    values.append(user_id)
    query = f"UPDATE users SET {', '.join(fields)} WHERE id = %s"

    try:
        with connection.cursor() as cursor:
            cursor.execute(query, tuple(values))
        connection.commit()
        if cursor.rowcount:
            print(f"用户ID {user_id} 更新成功")
        else:
            print(f"用户ID {user_id} 不存在")
    except Exception as e:
        print(f"更新用户失败: {e}")

if __name__ == "__main__":
    connection = pymysql.connect(
        host="localhost",
        user="python_user",
        password="your_password",
        db="python_mysql_demo",
        cursorclass=pymysql.cursors.DictCursor
    )

    update_user(connection, 1, name="Alice Smith", age=31)
    update_user(connection, 3, name="Charlie")  # 假设ID 3 不存在

    connection.close()
4. 删除(Delete)
# crud_delete_pymysql.py

import pymysql

def delete_user(connection, user_id):
    """删除用户"""
    query = "DELETE FROM users WHERE id = %s"
    try:
        with connection.cursor() as cursor:
            cursor.execute(query, (user_id,))
        connection.commit()
        if cursor.rowcount:
            print(f"用户ID {user_id} 删除成功")
        else:
            print(f"用户ID {user_id} 不存在")
    except Exception as e:
        print(f"删除用户失败: {e}")

if __name__ == "__main__":
    connection = pymysql.connect(
        host="localhost",
        user="python_user",
        password="your_password",
        db="python_mysql_demo",
        cursorclass=pymysql.cursors.DictCursor
    )

    delete_user(connection, 2)
    delete_user(connection, 3)  # 假设ID 3 不存在

    connection.close()

使用ORM(SQLAlchemy)进行数据库操作

ORM(对象关系映射)工具允许开发者使用面向对象的方式与数据库交互,而无需编写大量的SQL语句。SQLAlchemy是Python中最流行的ORM工具之一,功能强大且灵活。

安装SQLAlchemy

如果尚未安装,可以使用以下命令安装:

pip install SQLAlchemy
pip install pymysql  # 作为SQLAlchemy的MySQL驱动

定义模型

使用SQLAlchemy定义数据库模型(表)。以下是如何定义一个User模型。

# sqlalchemy_models.py

from sqlalchemy import Column, Integer, String, TIMESTAMP, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(100), nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    age = Column(Integer)
    created_at = Column(TIMESTAMP, server_default=func.now())

    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', email='{self.email}', age={self.age})>"

解释:

  • declarative_base(): 创建一个基类,所有模型类应继承自此基类。
  • Column: 定义表中的列。
  • *tablename*: 指定数据库中的表名。

创建会话

SQLAlchemy使用会话(Session)来管理数据库操作。以下是如何设置数据库引擎和会话。

# sqlalchemy_setup.py

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import os

DATABASE_USER = 'python_user'
DATABASE_PASSWORD = 'your_password'
DATABASE_HOST = 'localhost'
DATABASE_NAME = 'python_mysql_demo'

DATABASE_URL = f"mysql+pymysql://{DATABASE_USER}:{DATABASE_PASSWORD}@{DATABASE_HOST}/{DATABASE_NAME}"

# 创建数据库引擎
engine = create_engine(
    DATABASE_URL,
    echo=True,  # 打印SQL语句
    pool_pre_ping=True  # 检查连接是否有效
)

# 创建会话类
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

创建数据库和表

# create_database_sqlalchemy.py

from sqlalchemy_setup import engine
from sqlalchemy_models import Base

def create_tables():
    """创建所有表"""
    Base.metadata.create_all(bind=engine)
    print("所有表创建成功")

if __name__ == "__main__":
    create_tables()

执行CRUD操作

以下示例展示如何使用SQLAlchemy执行CRUD操作。

1. 创建(Create)
# sqlalchemy_crud_create.py

from sqlalchemy_setup import SessionLocal, engine
from sqlalchemy_models import User
from sqlalchemy.orm import sessionmaker

def create_user(name, email, age):
    """创建新用户"""
    session = SessionLocal()
    try:
        new_user = User(name=name, email=email, age=age)
        session.add(new_user)
        session.commit()
        session.refresh(new_user)  # 更新实例以反映数据库中的值(如ID)
        print(f"用户 '{new_user.name}' 创建成功,ID: {new_user.id}")
    except Exception as e:
        session.rollback()
        print(f"创建用户失败: {e}")
    finally:
        session.close()

if __name__ == "__main__":
    create_user("Alice", "alice@example.com", 30)
    create_user("Bob", "bob@example.com", 25)
2. 读取(Read)
# sqlalchemy_crud_read.py

from sqlalchemy_setup import SessionLocal, engine
from sqlalchemy_models import User

def get_user_by_id(user_id):
    """根据ID获取用户"""
    session = SessionLocal()
    try:
        user = session.query(User).filter(User.id == user_id).first()
        if user:
            print(f"用户信息: {user}")
        else:
            print(f"用户ID {user_id} 不存在")
    except Exception as e:
        print(f"获取用户失败: {e}")
    finally:
        session.close()

def get_all_users():
    """获取所有用户"""
    session = SessionLocal()
    try:
        users = session.query(User).all()
        for user in users:
            print(user)
    except Exception as e:
        print(f"获取用户列表失败: {e}")
    finally:
        session.close()

if __name__ == "__main__":
    get_user_by_id(1)
    get_all_users()
3. 更新(Update)
# sqlalchemy_crud_update.py

from sqlalchemy_setup import SessionLocal, engine
from sqlalchemy_models import User

def update_user(user_id, name=None, email=None, age=None):
    """更新用户信息"""
    session = SessionLocal()
    try:
        user = session.query(User).filter(User.id == user_id).first()
        if not user:
            print(f"用户ID {user_id} 不存在")
            return
        if name:
            user.name = name
        if email:
            user.email = email
        if age is not None:
            user.age = age
        session.commit()
        print(f"用户ID {user_id} 更新成功")
    except Exception as e:
        session.rollback()
        print(f"更新用户失败: {e}")
    finally:
        session.close()

if __name__ == "__main__":
    update_user(1, name="Alice Smith", age=31)
    update_user(3, name="Charlie")  # 假设ID 3 不存在
4. 删除(Delete)
# sqlalchemy_crud_delete.py

from sqlalchemy_setup import SessionLocal, engine
from sqlalchemy_models import User

def delete_user(user_id):
    """删除用户"""
    session = SessionLocal()
    try:
        user = session.query(User).filter(User.id == user_id).first()
        if not user:
            print(f"用户ID {user_id} 不存在")
            return
        session.delete(user)
        session.commit()
        print(f"用户ID {user_id} 删除成功")
    except Exception as e:
        session.rollback()
        print(f"删除用户失败: {e}")
    finally:
        session.close()

if __name__ == "__main__":
    delete_user(2)
    delete_user(3)  # 假设ID 3 不存在

解释:

  • SessionLocal: 创建一个会话实例,用于与数据库进行交互。
  • session.add(): 添加新对象到会话。
  • session.commit(): 提交事务,将更改保存到数据库。
  • session.refresh(): 更新对象以反映数据库中的最新状态。
  • session.query(): 查询数据库。
  • session.rollback(): 回滚事务,在发生错误时撤销更改。
  • session.close(): 关闭会话,释放资源。

事务管理

事务是确保一系列数据库操作要么全部成功,要么全部失败的机制。在复杂的应用中,事务管理对于数据一致性至关重要。

mysql-connector-python 中管理事务

默认情况下,mysql-connector-python 的连接是自动提交的。你可以通过设置autocommit=False来手动管理事务。

# transaction_mysql_connector.py

import mysql.connector
from mysql.connector import Error

def transaction_demo(connection):
    """演示事务管理"""
    cursor = connection.cursor()
    try:
        # 开始事务
        connection.start_transaction()

        # 插入用户1
        cursor.execute("""
            INSERT INTO users (name, email, age)
            VALUES (%s, %s, %s)
        """, ("Charlie", "charlie@example.com", 28))

        # 插入用户2(故意使用重复邮箱,导致错误)
        cursor.execute("""
            INSERT INTO users (name, email, age)
            VALUES (%s, %s, %s)
        """, ("Dave", "charlie@example.com", 35))  # 重复邮箱

        # 提交事务
        connection.commit()
        print("事务提交成功")
    except Error as e:
        # 回滚事务
        connection.rollback()
        print(f"事务回滚: {e}")
    finally:
        cursor.close()

if __name__ == "__main__":
    connection = mysql.connector.connect(
        host="localhost",
        user="python_user",
        passwd="your_password",
        database="python_mysql_demo",
        autocommit=False  # 关闭自动提交
    )

    transaction_demo(connection)
    connection.close()

输出:

事务回滚: 1062 (23000): Duplicate entry 'charlie@example.com' for key 'users.email'

解释:

  • connection.start_transaction(): 显式开始一个事务。
  • connection.commit(): 提交事务,将所有更改保存到数据库。
  • connection.rollback(): 回滚事务,撤销所有更改。

在上面的示例中,由于尝试插入具有重复邮箱的用户,导致违反唯一约束,触发错误并回滚整个事务,确保数据库保持一致性。

SQLAlchemy 中管理事务

SQLAlchemy提供了更高级的事务管理功能,支持上下文管理器,简化事务的处理。

# transaction_sqlalchemy.py

from sqlalchemy_setup import SessionLocal, engine
from sqlalchemy_models import User
from sqlalchemy.exc import IntegrityError

def transaction_demo():
    """演示SQLAlchemy事务管理"""
    session = SessionLocal()
    try:
        # 创建新用户
        user1 = User(name="Eve", email="eve@example.com", age=29)
        session.add(user1)
        session.flush()  # 获取user1.id

        # 创建另一个用户,故意使用重复邮箱
        user2 = User(name="Frank", email="eve@example.com", age=40)  # 重复邮箱
        session.add(user2)
        session.commit()
        print("事务提交成功")
    except IntegrityError as e:
        session.rollback()
        print(f"事务回滚: {e.orig}")
    finally:
        session.close()

if __name__ == "__main__":
    transaction_demo()

输出:

事务回滚: (1062, "Duplicate entry 'eve@example.com' for key 'users.email'")

解释:

  • session.add(): 将对象添加到会话。
  • session.flush(): 强制会话将更改同步到数据库,但不提交事务。
  • session.commit(): 提交事务。
  • IntegrityError: 捕获数据库完整性错误,如唯一约束违规。
  • session.rollback(): 回滚事务,撤销所有更改。

SQLAlchemy的事务管理更加灵活,支持嵌套事务和更复杂的操作。


连接池管理

在高并发环境中,频繁创建和销毁数据库连接会导致性能问题。连接池通过维护一组可复用的连接,提升应用性能并减少资源消耗。

使用 mysql-connector-python 管理连接池

mysql-connector-python 提供了内置的连接池支持。

# connection_pool_mysql_connector.py

import mysql.connector
from mysql.connector import pooling, Error

def create_connection_pool():
    """创建连接池"""
    try:
        pool = pooling.MySQLConnectionPool(
            pool_name="mypool",
            pool_size=5,
            pool_reset_session=True,
            host="localhost",
            user="python_user",
            password="your_password",
            database="python_mysql_demo"
        )
        print("连接池创建成功")
        return pool
    except Error as e:
        print(f"创建连接池失败: {e}")
        return None

def get_connection(pool):
    """从连接池获取连接"""
    try:
        connection = pool.get_connection()
        if connection.is_connected():
            print("从连接池获取连接成功")
            return connection
    except Error as e:
        print(f"获取连接失败: {e}")
        return None

if __name__ == "__main__":
    pool = create_connection_pool()
    if pool:
        conn = get_connection(pool)
        if conn:
            conn.close()

解释:

  • pooling.MySQLConnectionPool: 创建一个连接池实例。
  • pool.get_connection(): 从连接池获取一个连接。

使用 SQLAlchemy 管理连接池

SQLAlchemy默认使用连接池,你可以通过create_engine的参数进行配置。

# connection_pool_sqlalchemy.py

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

DATABASE_USER = 'python_user'
DATABASE_PASSWORD = 'your_password'
DATABASE_HOST = 'localhost'
DATABASE_NAME = 'python_mysql_demo'

DATABASE_URL = f"mysql+pymysql://{DATABASE_USER}:{DATABASE_PASSWORD}@{DATABASE_HOST}/{DATABASE_NAME}"

# 创建引擎,配置连接池参数
engine = create_engine(
    DATABASE_URL,
    echo=True,
    pool_size=10,           # 连接池大小
    max_overflow=20,        # 超出连接池大小外最多可创建的连接数
    pool_timeout=30,        # 获取连接的超时时间(秒)
    pool_recycle=1800       # 连接回收时间(秒)
)

# 创建会话类
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

解释:

  • pool_size: 连接池的大小。
  • max_overflow: 超出连接池大小外,最多可创建的连接数。
  • pool_timeout: 获取连接的超时时间。
  • pool_recycle: 连接回收时间,防止长时间运行导致的连接问题。

使用连接池

使用连接池后,你无需手动管理连接的创建和销毁。SQLAlchemy会自动从连接池中获取和归还连接。

# sqlalchemy_connection_pool_demo.py

from sqlalchemy_setup import SessionLocal, engine
from sqlalchemy_models import User

def get_users():
    """使用连接池获取用户列表"""
    session = SessionLocal()
    try:
        users = session.query(User).all()
        for user in users:
            print(user)
    finally:
        session.close()

if __name__ == "__main__":
    get_users()

错误处理

在与数据库交互时,可能会遇到各种错误,如连接失败、数据验证错误、唯一约束违规等。有效的错误处理能够提升应用的稳定性和用户体验。

使用 mysql-connector-python 进行错误处理

# error_handling_mysql_connector.py

import mysql.connector
from mysql.connector import Error

def create_user(connection, name, email, age):
    """创建用户,包含错误处理"""
    query = """
    INSERT INTO users (name, email, age)
    VALUES (%s, %s, %s)
    """
    cursor = connection.cursor()
    try:
        cursor.execute(query, (name, email, age))
        connection.commit()
        print(f"用户 '{name}' 创建成功,ID: {cursor.lastrowid}")
    except mysql.connector.IntegrityError as ie:
        # 处理数据完整性错误,如唯一约束违规
        print(f"数据完整性错误: {ie}")
    except Error as e:
        print(f"创建用户失败: {e}")
    finally:
        cursor.close()

if __name__ == "__main__":
    connection = mysql.connector.connect(
        host="localhost",
        user="python_user",
        passwd="your_password",
        database="python_mysql_demo"
    )

    # 尝试创建具有重复邮箱的用户
    create_user(connection, "Charlie", "alice@example.com", 28)  # alice@example.com 已存在

    connection.close()

解释:

  • IntegrityError: 捕获数据完整性错误,如唯一约束违规。
  • Error: 捕获其他类型的数据库错误。

使用 SQLAlchemy 进行错误处理

# error_handling_sqlalchemy.py

from sqlalchemy_setup import SessionLocal, engine
from sqlalchemy_models import User
from sqlalchemy.exc import IntegrityError

def create_user(name, email, age):
    """创建用户,包含错误处理"""
    session = SessionLocal()
    try:
        new_user = User(name=name, email=email, age=age)
        session.add(new_user)
        session.commit()
        session.refresh(new_user)
        print(f"用户 '{new_user.name}' 创建成功,ID: {new_user.id}")
    except IntegrityError as ie:
        session.rollback()
        print(f"数据完整性错误: {ie.orig}")
    except Exception as e:
        session.rollback()
        print(f"创建用户失败: {e}")
    finally:
        session.close()

if __name__ == "__main__":
    create_user("Charlie", "alice@example.com", 28)  # alice@example.com 已存在

解释:

  • IntegrityError: 捕获数据完整性错误。
  • session.rollback(): 在发生错误时回滚事务,确保数据库保持一致性。

综合实例

为了将所学内容整合起来,我们将构建一个完整的Python应用,使用SQLAlchemy作为ORM工具,与MySQL数据库进行交互,执行CRUD操作,并管理事务和连接池。

项目结构

python_mysql_app/
├── main.py
├── models.py
├── database.py
├── crud.py
├── requirements.txt
└── README.md

1. 创建项目目录和文件

mkdir python_mysql_app
cd python_mysql_app
touch main.py models.py database.py crud.py requirements.txt README.md

2. requirements.txt

SQLAlchemy
pymysql

3. database.py - 数据库配置与会话管理

# database.py

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import os

DATABASE_USER = 'python_user'
DATABASE_PASSWORD = 'your_password'
DATABASE_HOST = 'localhost'
DATABASE_NAME = 'python_mysql_demo'

DATABASE_URL = f"mysql+pymysql://{DATABASE_USER}:{DATABASE_PASSWORD}@{DATABASE_HOST}/{DATABASE_NAME}"

# 创建引擎,配置连接池参数
engine = create_engine(
    DATABASE_URL,
    echo=True,  # 打印SQL语句
    pool_size=10,           # 连接池大小
    max_overflow=20,        # 超出连接池大小外最多可创建的连接数
    pool_timeout=30,        # 获取连接的超时时间(秒)
    pool_recycle=1800       # 连接回收时间(秒)
)

# 创建会话类
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def get_db():
    """生成数据库会话"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

4. models.py - 定义数据库模型

# models.py

from sqlalchemy import Column, Integer, String, TIMESTAMP, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(100), nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    age = Column(Integer)
    created_at = Column(TIMESTAMP, server_default=func.now())

    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', email='{self.email}', age={self.age})>"

5. crud.py - 定义CRUD操作

# crud.py

from sqlalchemy.orm import Session
from models import User
from sqlalchemy.exc import IntegrityError

def create_user(db: Session, name: str, email: str, age: int):
    """创建新用户"""
    new_user = User(name=name, email=email, age=age)
    db.add(new_user)
    try:
        db.commit()
        db.refresh(new_user)
        print(f"用户 '{new_user.name}' 创建成功,ID: {new_user.id}")
        return new_user
    except IntegrityError as ie:
        db.rollback()
        print(f"数据完整性错误: {ie.orig}")
        return None
    except Exception as e:
        db.rollback()
        print(f"创建用户失败: {e}")
        return None

def get_user(db: Session, user_id: int):
    """根据ID获取用户"""
    user = db.query(User).filter(User.id == user_id).first()
    if user:
        print(f"用户信息: {user}")
    else:
        print(f"用户ID {user_id} 不存在")
    return user

def get_all_users(db: Session):
    """获取所有用户"""
    users = db.query(User).all()
    for user in users:
        print(user)
    return users

def update_user(db: Session, user_id: int, name: str = None, email: str = None, age: int = None):
    """更新用户信息"""
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        print(f"用户ID {user_id} 不存在")
        return None
    if name:
        user.name = name
    if email:
        user.email = email
    if age is not None:
        user.age = age
    try:
        db.commit()
        db.refresh(user)
        print(f"用户ID {user_id} 更新成功")
        return user
    except IntegrityError as ie:
        db.rollback()
        print(f"数据完整性错误: {ie.orig}")
        return None
    except Exception as e:
        db.rollback()
        print(f"更新用户失败: {e}")
        return None

def delete_user(db: Session, user_id: int):
    """删除用户"""
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        print(f"用户ID {user_id} 不存在")
        return False
    db.delete(user)
    try:
        db.commit()
        print(f"用户ID {user_id} 删除成功")
        return True
    except Exception as e:
        db.rollback()
        print(f"删除用户失败: {e}")
        return False

6. main.py - 应用入口

# main.py

from database import engine, get_db
from models import Base
from crud import create_user, get_user, get_all_users, update_user, delete_user

def initialize_database():
    """创建所有表"""
    Base.metadata.create_all(bind=engine)
    print("数据库初始化完成")

def main():
    # 初始化数据库
    initialize_database()

    # 创建会话
    db = next(get_db())

    # 创建用户
    create_user(db, "Grace", "grace@example.com", 27)
    create_user(db, "Heidi", "heidi@example.com", 32)
    create_user(db, "Ivan", "ivan@example.com", 45)

    # 获取单个用户
    get_user(db, 1)

    # 获取所有用户
    get_all_users(db)

    # 更新用户
    update_user(db, 1, name="Grace Hopper", age=28)

    # 获取更新后的用户
    get_user(db, 1)

    # 删除用户
    delete_user(db, 2)

    # 获取所有用户
    get_all_users(db)

    # 关闭会话
    db.close()

if __name__ == "__main__":
    main()

7. 运行项目

首先,确保所有文件已正确编写并保存。然后,在项目根目录下运行以下命令:

python main.py

预期输出:

所有表创建成功
用户 'Grace' 创建成功,ID: 1
用户 'Heidi' 创建成功,ID: 2
用户 'Ivan' 创建成功,ID: 3
用户信息: <User(id=1, name='Grace', email='grace@example.com', age=27)>
<User(id=1, name='Grace', email='grace@example.com', age=27)>
<User(id=2, name='Heidi', email='heidi@example.com', age=32)>
<User(id=3, name='Ivan', email='ivan@example.com', age=45)>
用户ID 1 更新成功
用户信息: <User(id=1, name='Grace Hopper', email='grace@example.com', age=28)>
用户ID 2 删除成功
<User(id=1, name='Grace Hopper', email='grace@example.com', age=28)>
<User(id=3, name='Ivan', email='ivan@example.com', age=45)>

解释:

  • 初始化数据库: 创建所有定义的表。
  • 创建用户: 添加新用户到数据库。
  • 获取用户: 读取单个用户和所有用户的信息。
  • 更新用户: 修改现有用户的信息。
  • 删除用户: 从数据库中删除用户。

最佳实践与安全性

在与数据库交互时,遵循最佳实践和安全性原则至关重要,以确保应用的稳定性和数据的安全。

1. 使用参数化查询

无论是使用原生SQL还是ORM,都应避免SQL注入攻击。使用参数化查询可以有效防止此类安全漏洞。

# 安全的参数化查询示例

# 使用mysql-connector-python
cursor.execute("SELECT * FROM users WHERE email = %s", (email,))

# 使用PyMySQL
cursor.execute("SELECT * FROM users WHERE email = %s", (email,))

# 使用SQLAlchemy ORM
user = session.query(User).filter(User.email == email).first()

2. 处理敏感信息

避免在代码中硬编码敏感信息(如数据库密码)。使用环境变量或配置文件来管理这些信息。

# 使用环境变量管理敏感信息

import os

DATABASE_USER = os.getenv("DB_USER", "default_user")
DATABASE_PASSWORD = os.getenv("DB_PASSWORD", "default_password")

在运行应用之前,设置环境变量:

# Linux/macOS
export DB_USER="python_user"
export DB_PASSWORD="your_password"

# Windows (CMD)
set DB_USER=python_user
set DB_PASSWORD=your_password

# Windows (PowerShell)
$env:DB_USER = "python_user"
$env:DB_PASSWORD = "your_password"

3. 使用迁移工具

在开发过程中,数据库模式可能会频繁变动。使用迁移工具如Alembic可以帮助管理数据库迁移。

安装 Alembic
pip install alembic
初始化 Alembic
alembic init alembic

这将在项目中创建一个alembic目录和一个alembic.ini配置文件。

配置 Alembic

编辑alembic.ini,设置数据库URL:

# alembic.ini

sqlalchemy.url = mysql+pymysql://python_user:your_password@localhost/python_mysql_demo

编辑alembic/env.py,导入模型以便 Alembic 能检测到变化:

# alembic/env.py

from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from models import Base  # 导入你的模型
from alembic import context

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
fileConfig(config.config_file_name)

# add your model's MetaData object here
target_metadata = Base.metadata

# other setup...

def run_migrations_offline():
    """Run migrations in 'offline' mode."""
    # implementation...

def run_migrations_online():
    """Run migrations in 'online' mode."""
    # implementation...

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()
创建和应用迁移
# 生成迁移脚本
alembic revision --autogenerate -m "Create users table"

# 应用迁移
alembic upgrade head

解释:

  • Alembic: SQLAlchemy的迁移工具,用于管理数据库模式变更。

4. 关闭自动提交

在使用mysql-connector-python时,关闭自动提交,手动管理事务,以确保数据一致性。

# 关闭自动提交示例

connection = mysql.connector.connect(
    host="localhost",
    user="python_user",
    passwd="your_password",
    database="python_mysql_demo",
    autocommit=False
)

5. 使用环境隔离

在开发、测试和生产环境中使用不同的数据库和配置,避免数据泄漏和操作错误。


总结与进一步学习

恭喜你完成了Python与MySQL数据库交互的详细学习!在本教程中,你学到了:

  • 环境搭建: 配置Python和MySQL开发环境。
  • 连接数据库: 使用mysql-connector-pythonPyMySQL连接到MySQL数据库。
  • 创建和管理数据库和表: 使用Python脚本创建数据库和表。
  • 执行CRUD操作: 使用原生SQL和ORM执行创建、读取、更新和删除操作。
  • 使用ORM(SQLAlchemy): 定义模型、管理会话和执行CRUD操作。
  • 事务管理: 确保数据库操作的原子性和一致性。
  • 连接池管理: 提升应用性能,减少资源消耗。
  • 错误处理: 捕获和处理数据库操作中的各种错误。
  • 综合实例: 构建一个完整的Python应用,与MySQL数据库交互。
  • 最佳实践与安全性: 遵循安全原则,使用参数化查询,管理敏感信息,使用迁移工具。

下一步建议

  1. 深入学习SQLAlchemy: 探索更多高级功能,如关系映射、复杂查询、联合查询等。
  2. 集成Web框架: 将数据库操作集成到Web应用中,如使用Flask或FastAPI构建完整的后端服务。
  3. 测试与调试: 学习如何编写单元测试和集成测试,确保数据库操作的正确性。
  4. 优化性能: 学习数据库索引、查询优化和缓存机制,提升应用性能。
  5. 学习NoSQL数据库: 探索其他类型的数据库,如MongoDB、Redis,了解不同数据库的使用场景。
  6. 部署与运维: 学习如何在生产环境中部署数据库和应用,使用容器化技术如Docker,了解数据库备份与恢复策略。

http://www.kler.cn/a/405360.html

相关文章:

  • Matlab科研绘图:自定义内置多款配色函数
  • sql注入报错分享(mssql+mysql)
  • 数据结构(顺序栈——c语言实现)
  • 【FFmpeg】FFmpeg 内存结构 ③ ( AVPacket 函数简介 | av_packet_ref 函数 | av_packet_clone 函数 )
  • 【倍数问题——同余系】
  • HarmonyOS鸿蒙系统上File文件常用操作
  • 微信小程序技术架构图
  • 力扣--LCR 141.训练计划III
  • 二十:HTML Form表单提交时的协议格式
  • 银河麒麟v10 x86架构二进制方式kubeadm+docker+cri-docker搭建k8s集群(证书有效期100年) —— 筑梦之路
  • C++ 中的智能指针
  • 设计模式之 桥接模式
  • HarmonyOS中UIAbility和windowStage的区别
  • 【企业级分布式系统】Ceph集群
  • Hadoop 系列 MapReduce:Map、Shuffle、Reduce
  • uniapp接入高德地图
  • 【更新】中国省级产业集聚测算数据及协调集聚指数数据(2000-2022年)
  • Python+Selenium+Pytest+Allure+ Jenkins webUI自动化框架
  • 七次课掌握 Photoshop:绘画与修饰
  • librdns一个开源DNS解析库
  • 垂起固定翼无人机搭载高清三光(4K可见+红外+激光测距)吊舱图像采集技术详解
  • FileProvider高版本使用,跨进程传输文件
  • 主IP地址与从IP地址:深入解析与应用探讨
  • 锂电池学习笔记(一) 初识锂电池
  • 浅谈Python库之lxml
  • 24小时自动监控,自动录制直播蓝光视频!支持抖音等热门直播软件