Python连接Mysql、Postgre、ClickHouse、Redis常用库及封装方法
博主在这里分享一些常见的python连接数据库或中间件的库和封装方案,希望对大家有用。
Mysql封装
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
import pymysql
from settings import MYSQL_DB, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, ENV
# 这个自己在配置文件写好,就不暴露出来了
class MysqlClient(object):
def __init__(self, host, port, user, password, db):
"""
初始化连接
"""
self.conn = None
self.cursor = None
self.host = host
self.port = port
self.user = user
self.password = password
self.db = db
self.conn = pymysql.Connect(host=self.host, user=self.user, password=self.password,
database=self.db, port=self.port)
self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
self.cur = self.conn.cursor() # 区别是上面那个返回的记录是字典,这个是元组
def __del__(self):
if self.cursor is not None:
self.cursor.close()
if self.cur is not None:
self.cur.close()
if self.conn is not None:
self.conn.close()
if ENV != 'dev' and self.server is not None:
self.server.close()
def query_record(self, query_sql=None):
"""
默认查询接口, 获取所有数据
:param query_sql:
:return: List
"""
self.cursor.execute(query_sql)
self.conn.commit()
return self.cursor.fetchall()
def insert_record(self, insert_sql, res):
"""
默认写入所有数据
:param insert_sql:
:param res:
:return:
"""
if ENV != 'dev':
print("不允许执行", ENV)
sys.exit(-1)
self.cursor.executemany(insert_sql, res)
self.conn.commit()
def delete_record(self, delete_sql):
"""
删除数据
:param delete_sql:
:return:
"""
if ENV != 'dev':
print("不允许执行", ENV)
sys.exit(-1)
self.cursor.execute(delete_sql)
self.conn.commit()
def update_record(self, update_sql, res_tuple):
"""
更新数据
:param update_sql:
:param res_tuple:
:return:
"""
if ENV != 'dev':
print("不允许执行", ENV)
sys.exit(-1)
self.cursor.execute(update_sql % res_tuple)
self.conn.commit()
# 查询一条数据,并返回表头
def search_one_with_header(self, sql):
self.cursor.execute(sql)
result = self.cursor.fetchone()
self.conn.commit()
return result
def search_all_with_header(self, sql):
# 暂不使用self.cursor,因为它返回的是字典格式
self.cur.execute(sql)
result = self.cur.fetchall()
self.conn.commit()
data_dict = []
for field in self.cur.description:
data_dict.append(field[0])
return result, data_dict
if __name__ == "__main__":
sql = 'select * from user_info limit 2'
mysql_client = MysqlClient(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DB)
res = mysql_client.query_record(sql)
for r in res:
print(r)
Postgre封装
import psycopg2
import sys
from psycopg2 import extras
from settings import PG_DB, PG_PORT, PG_USER, PG_PASSWORD, PG_HOST, ENV
class PgClient(object):
def __init__(self, host, port, user, password, db):
"""
初始化连接
"""
self.conn = None
self.host = host
self.port = port
self.user = user
self.password = password
self.db = db
self.conn = psycopg2.connect(host=self.host, user=self.user, password=self.password,
database=self.db, port=self.port)
self.cursor = self.conn.cursor(cursor_factory=extras.DictCursor) # 字典形式
self.cur = self.conn.cursor() # 元组格式
def __del__(self):
if self.cursor is not None:
self.cursor.close()
if self.cur is not None:
self.cur.close()
if self.conn is not None:
self.conn.close()
if ENV != 'dev' and self.server is not None:
try:
self.server.close()
except Exception as e:
pass
def query_record(self, query_sql=None):
"""
默认查询接口, 获取所有数据
:param query_sql:
:return: List
"""
self.cursor.execute(query_sql)
self.conn.commit() # 防止轮询时,查到的结果一直未更新
return self.cursor.fetchall()
def insert_record(self, insert_sql, res):
"""
默认写入所有数据
:param insert_sql:
:param res:
:return:
"""
if ENV != 'dev':
print("不允许执行", ENV)
sys.exit(-1)
self.cursor.executemany(insert_sql, res)
self.conn.commit()
def delete_record(self, delete_sql):
"""
删除数据
:param delete_sql:
:return:
"""
if ENV != 'dev':
print("不允许执行", ENV)
sys.exit(-1)
self.cursor.execute(delete_sql)
self.conn.commit()
def update_record(self, update_sql, res_tuple):
"""
更新数据
:param update_sql:
:param res_tuple:
:return:
"""
if ENV != 'dev':
print("不允许执行", ENV)
sys.exit(-1)
self.cursor.execute(update_sql % res_tuple)
self.conn.commit()
pg_ofclient = PgClient(PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB)
sql = "select * from user_info limit 2"
rows = pg_ofclient.query_record(sql)
for row in rows:
print(row)
ClickHouse封装
#!/usr/bin/python
# -*- coding: utf-8 -*-
from clickhouse_driver import connect
from settings import CK_HOST, CK_PORT, CK_DB, CK_USER, CK_PW, ENV
class CkClient(object):
def __init__(self, host, port, user, password, db):
"""
初始化连接
"""
self.conn = None
self.host = host
self.port = port
self.user = user
self.password = password
self.db = db
self.conn = connect(host=self.host, user=self.user, password=self.password,
database=self.db, port=self.port)
self.cur = self.conn.cursor()
def __del__(self):
if self.cur is not None:
self.cur.close()
if self.conn is not None:
self.conn.close()
if ENV != 'dev' and self.server is not None:
try:
self.server.close()
except Exception as e:
pass
def query_record(self, query_sql=None):
"""
默认查询接口, 获取所有数据
:param query_sql:
:return: List
"""
self.cur.execute(query_sql)
columns = [desc[0] for desc in self.cur.description]
result = [dict(zip(columns, row)) for row in self.cur.fetchall()]
self.conn.commit()
return result
def search_all_with_header(self, sql):
self.cur.execute(sql)
result = self.cur.fetchall()
self.conn.commit()
data_dict = []
for field in self.cur.description:
data_dict.append(field[0])
return result, data_dict
if __name__ == "__main__":
ck_client = CkClient(CK_HOST, CK_PORT, CK_USER, CK_PW, CK_DB)
sql = "select * from user_info limit 2"
rows = ck_client.query_record(sql)
print(rows)
Redis封装
#!/usr/bin/python
# -*- coding: utf-8 -*-
import redis
import sys
import json
import time
from settings import redis_host, redis_port, redis_pw
class RedisClient:
def __init__(self, host=redis_host, port=redis_port, password=redis_pw, db=5):
self.host = host
self.port = port
self.password = password
self.db = db
self.connection = redis.Redis(host=self.host, port=self.port, password=self.password, db=self.db)
def get(self, key):
return self.connection.get(key)
def hget_all(self, hash_name):
return self.connection.hgetall(hash_name)
def delete(self, key):
if ENV != 'dev':
print("不允许执行", ENV)
sys.exit(-1)
self.connection.delete(key)
def set(self, key, value, expiration=None):
if ENV != 'dev':
print("不允许执行", ENV)
sys.exit(-1)
self.connection.set(key, value, ex=expiration)
def set_json(self, key, value, expiration=None):
json_value = json.dumps(value) # 将 JSON 对象转换为字符串
self.connection.set(key, json_value, ex=expiration)
def get_json(self, key):
json_value = self.connection.get(key)
if json_value is not None:
value = json.loads(json_value) # 将字符串转换为 JSON 对象
return value
return None
if __name__ == '__main__':
# 使用示例
redis_client = RedisClient()
key = 'XXXX:XX:XX'
while True:
d = redis_client.get(key=key)
print(d)
time.sleep(0.1)