当前位置: 首页 > article >正文

《Python实战进阶》No17: 数据库连接与 ORM(SQLAlchemy 实战)

No17: 数据库连接与 ORM(SQLAlchemy 实战)


摘要
本文深入探讨SQLAlchemy在复杂场景下的高级应用,涵盖四大核心主题:

  1. 会话生命周期管理:通过事件钩子实现事务监控与审计追踪
  2. 混合继承映射:结合单表/连接表继承优势实现多态模型
  3. 高性能操作:批量插入与核心SQL构造优化大数据处理
  4. 异步支持:基于asyncmy的MySQL异步引擎实践

实战案例包含电商系统的分库分表实现策略与基于ORM的审计日志系统设计。扩展部分解析多租户架构的三种隔离方案及SQL注入防御的ORM层最佳实践。配套代码演示了从分片路由到异步操作的完整电商系统实现,帮助开发者掌握构建高并发、高安全性的企业级数据库应用能力。

在这里插入图片描述

核心概念

1. 会话生命周期管理(Session事件钩子)

通过事件钩子实现精细化的会话控制,典型场景包括事务边界管理、变更追踪和审计日志。

from sqlalchemy import event
from sqlalchemy.orm import sessionmaker

Session = sessionmaker()

# 事务提交前事件
@event.listens_for(Session, 'before_commit')
def before_commit(session):
    print("即将提交事务,当前待处理变更:", session.dirty)

# 事务回滚后事件
@event.listens_for(Session, 'after_rollback')
def after_rollback(session):
    print("事务已回滚,清理临时状态")

2. 混合继承映射策略

结合单表继承和连接表继承的优势,实现灵活的多态查询:

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship

Base = declarative_base()

class Product(Base):
    __tablename__ = 'product'
    id = Column(Integer, primary_key=True)
    type = Column(String(20))  # 单表继承标识
    __mapper_args__ = {'polymorphic_on': type}

class Book(Product):
    __tablename__ = 'book'
    id = Column(Integer, ForeignKey('product.id'), primary_key=True)
    isbn = Column(String(13))
    __mapper_args__ = {'polymorphic_identity': 'book'}

class Electronic(Product):
    __tablename__ = 'electronic'
    id = Column(Integer, ForeignKey('product.id'), primary_key=True)
    warranty = Column(Integer)
    __mapper_args__ = {'polymorphic_identity': 'electronic'}

3. 批量操作与核心SQL构造

高效处理大数据量操作的两种方式:

# ORM批量插入
session.bulk_insert_mappings(User, [
    {"name": "Alice", "age": 30},
    {"name": "Bob", "age": 25}
])

# 核心SQL构造
from sqlalchemy import select, table, column
users = table('users', 
    column('id'), column('name'), column('age')
)
stmt = users.insert().values([
    {'name': 'Charlie', 'age': 35},
    {'name': 'David', 'age': 40}
])
connection.execute(stmt)

4. 异步引擎与greenlet整合

使用asyncmy驱动实现MySQL异步操作:

from sqlalchemy.ext.asyncio import create_async_engine
import asyncio

async_engine = create_async_engine(
    "mysql+asyncmy://user:pass@localhost/db",
    pool_size=5, max_overflow=10
)

async def fetch_users():
    async with async_engine.connect() as conn:
        result = await conn.execute("SELECT * FROM users")
        return result.fetchall()

asyncio.run(fetch_users())

实战案例

1. 电商系统的分表分库实现

使用ShardedSession实现用户表水平分片:

from sqlalchemy.ext.horizontal_shard import ShardedSession

shards = {
    'shard1': create_engine('sqlite:///./shard1.db'),
    'shard2': create_engine('sqlite:///./shard2.db')
}

def shard_chooser(mapper, instance, clause=None):
    if instance:
        return "shard%d" % (instance.id % 2 + 1)
    else:
        return 'shard1'

session = ShardedSession(
    shards=shards,
    shard_chooser=shard_chooser,
    id_chooser=lambda *args: list(shards.keys())
)

# 自动路由到对应分片
user = User(id=101, name="Alice")
session.add(user)  # 自动路由到shard2

2. 版本控制与审计日志实现

通过事件监听实现变更追踪:

from sqlalchemy import inspect

audit_logs = []

@event.listens_for(Session, 'before_flush')
def track_changes(session, context, instances):
    for obj in session.dirty:
        state = inspect(obj)
        for attr in state.attrs:
            hist = attr.load_history()
            if hist.has_changes():
                audit_logs.append({
                    'object': obj,
                    'attribute': attr.key,
                    'old': hist.deleted,
                    'new': hist.added
                })

扩展思考

1. 多租户架构的数据库隔离方案

三种常见实现方式:

# 方案1:schema隔离
class TenantSpecificQuery(Query):
    def get(self, ident):
        return super().filter_by(tenant_id=current_tenant.id).get(ident)

# 方案2:行级隔离
class BaseModel(Base):
    __abstract__ = True
    tenant_id = Column(Integer, nullable=False)

# 方案3:动态schema切换
@event.listens_for(Pool, 'connect')
def connect(dbapi_con, record):
    dbapi_con.execute(f"SET search_path TO tenant_{current_tenant.id}")

2. SQL注入防御的ORM层实践

安全操作示例:

# 错误方式(存在注入风险)
query = User.query.filter(f"username = '{input_username}'")

# 正确方式
query = User.query.filter(User.username == input_username)

# 动态查询安全构造
from sqlalchemy import text
stmt = text("SELECT * FROM users WHERE username = :username")
session.execute(stmt, {"username": input_username})

完整代码示例

包含分表分库、审计日志、异步操作的完整电商系统示例:

# 安装依赖
# pip install sqlalchemy aiosqlite

from sqlalchemy import Column, Integer, String, Float, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import asyncio

# 定义模型
Base = declarative_base()

class Product(Base):
    __tablename__ = 'products'
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    price = Column(Float)
    shard_id = Column(Integer, nullable=False)

# 创建异步引擎
engines = {
    'shard1': create_async_engine('sqlite+aiosqlite:///./shard1.db', echo=True),
    'shard2': create_async_engine('sqlite+aiosqlite:///./shard2.db', echo=True)
}

# 创建异步会话工厂
async_sessions = {
    shard_id: sessionmaker(
        bind=engine,
        class_=AsyncSession,
        expire_on_commit=False
    )
    for shard_id, engine in engines.items()
}

# 根据 shard_id 选择分片
def get_shard_id(shard_id):
    return f'shard{shard_id % 2 + 1}'

# 异步操作示例
async def main():
    # 创建表
    for engine in engines.values():
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
    
    # 插入数据
    product = Product(name="Laptop", price=999.99, shard_id=101)
    shard_id = get_shard_id(product.shard_id)
    
    async with async_sessions[shard_id]() as session:
        session.add(product)
        await session.commit()
        print(f"产品已添加到 {shard_id}")


    # 插入数据
    product = Product(name="手机", price=10000, shard_id=102)
    shard_id = get_shard_id(product.shard_id)
    
    async with async_sessions[shard_id]() as session:
        session.add(product)
        await session.commit()
        print(f"产品已添加到 {shard_id}")


    
    # 查询所有分片的数据
    all_products = []
    for shard_id, session_factory in async_sessions.items():
        async with session_factory() as session:
            result = await session.execute(select(Product))
            products = result.scalars().all()
            all_products.extend(products)
            print(f"从 {shard_id} 查询到 {len(products)} 个产品")
    
    # 显示所有产品
    print("\n所有产品:")
    for product in all_products:
        print(f"产品: {product.name}, 价格: {product.price}, 分片ID: {product.shard_id}")

# 运行异步主函数
if __name__ == "__main__":
    asyncio.run(main())

输出:

d:\python_projects\jupyter_demo\sqlalchemy_demo.py:11: MovedIn20Warning: The ``declarative_base()`` function is now available as sqlalchemy.orm.declarative_base(). (deprecated since: 2.0) (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
  Base = declarative_base()
2025-03-09 19:50:49,625 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,626 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("products")    
2025-03-09 19:50:49,626 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,627 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("products")    
2025-03-09 19:50:49,627 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,628 INFO sqlalchemy.engine.Engine
CREATE TABLE products (
        id INTEGER NOT NULL,
        name VARCHAR(50),
        price FLOAT,
        shard_id INTEGER NOT NULL,
        PRIMARY KEY (id)
)
2025-03-09 19:50:49,629 INFO sqlalchemy.engine.Engine [no key 0.00072s] ()
2025-03-09 19:50:49,636 INFO sqlalchemy.engine.Engine COMMIT
2025-03-09 19:50:49,638 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,639 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("products")    
2025-03-09 19:50:49,639 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,640 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("products")
2025-03-09 19:50:49,640 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,641 INFO sqlalchemy.engine.Engine
CREATE TABLE products (
        id INTEGER NOT NULL,
        name VARCHAR(50),
        price FLOAT,
        shard_id INTEGER NOT NULL,
        PRIMARY KEY (id)
)
2025-03-09 19:50:49,642 INFO sqlalchemy.engine.Engine [no key 0.00067s] ()
2025-03-09 19:50:49,647 INFO sqlalchemy.engine.Engine COMMIT
2025-03-09 19:50:49,649 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,650 INFO sqlalchemy.engine.Engine INSERT INTO products (name, price, shard_id) VALUES (?, ?, ?)
2025-03-09 19:50:49,650 INFO sqlalchemy.engine.Engine [generated in 0.00035s] ('Laptop', 999.99, 101)
2025-03-09 19:50:49,652 INFO sqlalchemy.engine.Engine COMMIT
产品已添加到 shard2
2025-03-09 19:50:49,657 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,658 INFO sqlalchemy.engine.Engine INSERT INTO products (name, price, shard_id) VALUES (?, ?, ?)
2025-03-09 19:50:49,659 INFO sqlalchemy.engine.Engine [generated in 0.00077s] ('手机', 10000.0, 102)
2025-03-09 19:50:49,661 INFO sqlalchemy.engine.Engine COMMIT
产品已添加到 shard1
2025-03-09 19:50:49,665 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,666 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
2025-03-09 19:50:49,665 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,666 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
2025-03-09 19:50:49,666 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
ducts.price, products.shard_id
FROM products
2025-03-09 19:50:49,667 INFO sqlalchemy.engine.Engine [generated in 0.00054s] ()
从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
FROM products
2025-03-09 19:50:49,667 INFO sqlalchemy.engine.Engine [generated in 0.00054s] ()
从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,667 INFO sqlalchemy.engine.Engine [generated in 0.00054s] ()
从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, pro从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
2025-03-09 19:50:49,670 INFO sqlalchemy.engine.Engine [generated in 0.00029s] ()
从 shard2 查询到 1 个产品
ducts.price, products.shard_id
FROM products
2025-03-09 19:50:49,670 INFO sqlalchemy.engine.Engine [generated in 0.00029s] ()
从 shard2 查询到 1 个产品
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
FROM products
2025-03-09 19:50:49,670 INFO sqlalchemy.engine.Engine [generated in 0.00029s] ()
从 shard2 查询到 1 个产品
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
从 shard2 查询到 1 个产品
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
所有产品:
产品: 手机, 价格: 10000.0, 分片ID: 102
所有产品:
产品: 手机, 价格: 10000.0, 分片ID: 102
产品: Laptop, 价格: 999.99, 分片ID: 101
产品: 手机, 价格: 10000.0, 分片ID: 102
产品: Laptop, 价格: 999.99, 分片ID: 101
产品: Laptop, 价格: 999.99, 分片ID: 101

通过本章学习,您应该能够:

  1. 熟练使用SQLAlchemy事件系统管理会话生命周期
  2. 实现复杂的继承映射策略
  3. 处理大规模数据操作的性能优化
  4. 构建高并发的异步数据库应用
  5. 设计企业级的数据库架构方案

http://www.kler.cn/a/579503.html

相关文章:

  • 从零开始的 Kafka 学习(二)| 集群启动
  • vue+element|el-tree树设置懒加载和设置默认勾选
  • Go_zero学习笔记
  • 关于更新字段为空值——MybatisPlus框架
  • Linux | Vim 鼠标不能右键粘贴、跨系统复制粘贴
  • 为企业级AI交互系统OpenWebUI集成LDAP用户权限认证(1)
  • Python爬虫实战:爬取财金网实时财经信息
  • 如何高效准备PostgreSQL认证考试?
  • 基于LabVIEW的脚本化子VI动态生成
  • 如何构建一个 Docker 镜像?
  • 汇编点亮LED
  • 假设检验与置信区间在机器学习中的应用
  • 【Linux】信号处理以及补充知识
  • c++: 容器vector
  • 对WebSocket做一点简单的理解
  • 小程序 wxml 语法 —— 39 简单双向数据绑定
  • navicat导出postgresql的数据库结构、字段名、备注等等
  • SpringBoot项目的五种搭建方式
  • Docker 运行 GPUStack 的详细教程
  • 微软程序的打包格式MSIX