Peewee+Postgresql+PooledPostgresqlDatabase重连机制
需求:
Postgresql数据库服务重启后,需要业务代码正常读写数据库
方案:
- 通过继承playhouse.shortcuts.ReconnectMixin和playhouse.pool.PooledPostgresqlDatabase来创建一个新的ReconnectPooledPostgresqlDatabase类
- 修改reconnect_errors属性来适配Postgresql的错误类型
- 使用ReconnectPooledPostgresqlDatabase来获取数据库连接
测试
- 启动程序 -->重起数据库服务–>读写数据库操作正常
- 启动应用程序–>关闭数据库服务–>读写数据库失败–>启动数据库服务–>读写数据库操作正常
示例:
from playhouse.pool import PooledPostgresqlDatabase
from peewee import OperationalError, InterfaceError
from playhouse.shortcuts import ReconnectMixin
class ReconnectPooledPostgresqlDatabase(ReconnectMixin, PooledPostgresqlDatabase):
"""
支持重连机制的数据库连接池类,可以通过扩展reconnect_errors来支持重连场景
"""
reconnect_errors = (
# Postgres error examples:
(OperationalError, 'terminat'),
(InterfaceError, 'connection already closed')
)
class PostgresqlManager(object):
"""
PG数据库管理类
"""
__instance_lock = threading.Lock()
__database = None
@classmethod
def get_database(cls, **kwargs):
if cls.__database is None:
with cls.__instance_lock:
cls.__database = PooledPostgresqlDatabase(
database=kwargs.get("database"),
host=kwargs.get("host", "127.0.0.1"),
port=int(kwargs.get("port", 5432)),
user=kwargs.get("user", "postgres"),
password=kwargs.get("password", "postgres"),
max_connections=int(kwargs.get("max_connections", 50)),
stale_timeout=int(kwargs.get("stale_timeout", 600))
)
return cls.__database