Python一些可能用的到的函数系列132 ORM-sqlalchemy连clickhouse
说明
继续ORM的转换
通过ORM,可以:
- 1 用几乎一样的方式来操作不同的数据库
- 2 可以提供One的处理模式
内容
同步方式
这种方式更简单,适合处理小批量任务。这种操作严格来说,不是严格的One,而是MiniBatch,只是在某些时候,例如我自己的Interative Table,可以把这种方式视为One。真正的One还是要通过下面的异步方式来实现。
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, func
from sqlalchemy.orm import sessionmaker,declarative_base
from datetime import datetime
db_url = f"postgresql://USER:PASSWD@IP:PORT/postgres"
# from urllib.parse import quote_plus
# the_passed = quote_plus('!@#*')
# # 创建数据库引擎
pg_engine = create_engine(db_url)
# 创建基类
Base = declarative_base()
# 定义数据模型
class NewsContent(Base):
__tablename__ = 'some_table'
__table_args__ = {'schema': 'some_schema'} # 指定模式
id = Column(Integer, primary_key=True)
mid = Column(String)
content = Column(String)
created = Column(DateTime)
def dict(self):
data_dict = {}
data_dict['id'] = self.id
data_dict['mid'] = self.mid
data_dict['content'] = self.content
data_dict['created'] = self.created
return data_dict
# 创建表
Base.metadata.create_all(pg_engine)
# 创建会话
Session = sessionmaker(bind=pg_engine)
session = Session()
# 随机选取100条数据 order_by(func.random()) 数据集太大或者索引没建好可能会非常慢
# random_news = session.query(NewsContent).limit(100).all()
# >>>> 采用select in 的方式
# 查询最大id
max_id = session.query(func.max(NewsContent.id)).scalar()
print(f"The maximum id is: {max_id}")
import random
# 定义范围和选择的数量
start = max_id-10000000
end = max_id
num_samples = 200
# 从指定范围中随机选择
random_samples = random.sample(range(start, end + 1), num_samples)
print(random_samples)
# # 查询 ID 在集合中的记录
filtered_news = session.query(NewsContent).filter(NewsContent.id.in_(random_samples)).all()
filtered_news1 = [x.dict() for x in filtered_news ]
效果很好,速度很快。
异步方式
之后如果上生产了可以考虑这种方式
要使用 SQLAlchemy 异步连接 PostgreSQL,你可以结合 asyncio
和 SQLAlchemy 的异步支持。你使用的是 SQLAlchemy 2.x,而 SQLAlchemy 从 1.4 版本开始引入了对异步编程的支持,并在 2.x 版本中进一步强化和完善了这一功能。因此,SQLAlchemy 2.0.30 版本已经完全支持异步操作,特别是结合 asyncio 事件循环来异步连接数据库,如 PostgreSQL。
所以你当前使用的 SQLAlchemy 2.x 版本已经可以支持我之前提到的基于 asyncpg 的异步连接 PostgreSQL。这个版本的异步功能是通过 AsyncSession、create_async_engine 等接口来实现的。
以下是如何使用 SQLAlchemy 异步连接 PostgreSQL 的步骤:
1. 安装所需依赖
确保你已经安装了 asyncpg
和 SQLAlchemy 版本 1.4+ 及其他相关依赖。
pip install SQLAlchemy[asyncio] asyncpg
2. 配置异步连接
使用 async_engine
和 AsyncSession
来配置异步连接。
示例代码
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
# 定义模型的基础类
Base = declarative_base()
# 定义数据库模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String)
# 异步连接数据库
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
# 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True)
# 创建异步会话工厂
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
# 异步函数:数据库交互
async def async_main():
async with async_session() as session:
async with session.begin():
# 创建新用户
new_user = User(name="Async User")
session.add(new_user)
# 查询数据
result = await session.execute("SELECT * FROM users")
users = result.fetchall()
print(users)
# 启动异步事件循环
asyncio.run(async_main())
主要步骤解析:
- 创建异步引擎:
create_async_engine()
使用postgresql+asyncpg
连接字符串。 - 异步会话:使用
sessionmaker()
创建异步会话工厂,结合AsyncSession
实现数据库的异步交互。 - 异步操作:在
async_main()
中进行异步的数据库操作,如查询和添加数据。
注意事项:
- 异步操作必须在
async
函数中执行,并通过await
语句异步地进行数据库操作。 - 数据库的连接字符串需要使用
postgresql+asyncpg
来指定asyncpg
驱动。
这种方式可以有效利用异步 I/O,提高数据库操作的性能。