当前位置: 首页 > article >正文

Python连接Mysql、Postgre、ClickHouse、Redis常用库及封装方法

博主在这里分享一些常见的python连接数据库或中间件的库和封装方案,希望对大家有用。

Mysql封装

#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
import pymysql
from settings import MYSQL_DB, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, ENV
# 这个自己在配置文件写好,就不暴露出来了

class MysqlClient(object):
    def __init__(self, host, port, user, password, db):
        """
        初始化连接
        """
        self.conn = None
        self.cursor = None
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.db = db
        
        self.conn = pymysql.Connect(host=self.host, user=self.user, password=self.password,
                                        database=self.db, port=self.port)
        self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
        self.cur = self.conn.cursor()  # 区别是上面那个返回的记录是字典,这个是元组

    def __del__(self):
        if self.cursor is not None:
            self.cursor.close()
        if self.cur is not None:
            self.cur.close()
        if self.conn is not None:
            self.conn.close()
        if ENV != 'dev' and self.server is not None:
            self.server.close()

    def query_record(self, query_sql=None):
        """
        默认查询接口, 获取所有数据
        :param query_sql:
        :return: List
        """
        self.cursor.execute(query_sql)
        self.conn.commit()
        return self.cursor.fetchall()

    def insert_record(self, insert_sql, res):
        """
        默认写入所有数据
        :param insert_sql:
        :param res:
        :return:
        """
        if ENV != 'dev':
            print("不允许执行", ENV)
            sys.exit(-1)
        self.cursor.executemany(insert_sql, res)
        self.conn.commit()

    def delete_record(self, delete_sql):
        """
        删除数据
        :param delete_sql:
        :return:
        """
        if ENV != 'dev':
            print("不允许执行", ENV)
            sys.exit(-1)
        self.cursor.execute(delete_sql)
        self.conn.commit()

    def update_record(self, update_sql, res_tuple):
        """
        更新数据
        :param update_sql:
        :param res_tuple:
        :return:
        """
        if ENV != 'dev':
            print("不允许执行", ENV)
            sys.exit(-1)
        self.cursor.execute(update_sql % res_tuple)
        self.conn.commit()

    # 查询一条数据,并返回表头
    def search_one_with_header(self, sql):
        self.cursor.execute(sql)
        result = self.cursor.fetchone()
        self.conn.commit()
        return result

    def search_all_with_header(self, sql):
        # 暂不使用self.cursor,因为它返回的是字典格式
        self.cur.execute(sql)
        result = self.cur.fetchall()
        self.conn.commit() 

        data_dict = []
        for field in self.cur.description:
            data_dict.append(field[0])
        return result, data_dict


if __name__ == "__main__":
    sql = 'select * from user_info limit 2'
    mysql_client = MysqlClient(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DB)
    res = mysql_client.query_record(sql)
    for r in res:
        print(r)

Postgre封装

import psycopg2
import sys
from psycopg2 import extras
from settings import PG_DB, PG_PORT, PG_USER, PG_PASSWORD, PG_HOST, ENV


class PgClient(object):
    def __init__(self, host, port, user, password, db):
        """
        初始化连接
        """
        self.conn = None
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.db = db
    
        self.conn = psycopg2.connect(host=self.host, user=self.user, password=self.password,
                                     database=self.db, port=self.port)
        self.cursor = self.conn.cursor(cursor_factory=extras.DictCursor) # 字典形式
        self.cur = self.conn.cursor()  # 元组格式

    def __del__(self):
        if self.cursor is not None:
            self.cursor.close()
        if self.cur is not None:
            self.cur.close()
        if self.conn is not None:
            self.conn.close()
        if ENV != 'dev' and self.server is not None:
            try:
                self.server.close()
            except Exception as e:
                pass

    def query_record(self, query_sql=None):
        """
        默认查询接口, 获取所有数据
        :param query_sql:
        :return: List
        """
        self.cursor.execute(query_sql)
        self.conn.commit()  # 防止轮询时,查到的结果一直未更新
        return self.cursor.fetchall()

    def insert_record(self, insert_sql, res):
        """
        默认写入所有数据
        :param insert_sql:
        :param res:
        :return:
        """
        if ENV != 'dev':
            print("不允许执行", ENV)
            sys.exit(-1)
        self.cursor.executemany(insert_sql, res)
        self.conn.commit()

    def delete_record(self, delete_sql):
        """
        删除数据
        :param delete_sql:
        :return:
        """
        if ENV != 'dev':
            print("不允许执行", ENV)
            sys.exit(-1)
        self.cursor.execute(delete_sql)
        self.conn.commit()

    def update_record(self, update_sql, res_tuple):
        """
        更新数据
        :param update_sql:
        :param res_tuple:
        :return:
        """
        if ENV != 'dev':
            print("不允许执行", ENV)
            sys.exit(-1)
        self.cursor.execute(update_sql % res_tuple)
        self.conn.commit()

pg_ofclient = PgClient(PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB)
sql = "select * from user_info limit 2"
rows = pg_ofclient.query_record(sql)
for row in rows:
    print(row)

ClickHouse封装

#!/usr/bin/python
# -*- coding: utf-8 -*-
from clickhouse_driver import connect
from settings import CK_HOST, CK_PORT, CK_DB, CK_USER, CK_PW, ENV

class CkClient(object):
    def __init__(self, host, port, user, password, db):
        """
        初始化连接
        """
        self.conn = None
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.db = db

        self.conn = connect(host=self.host, user=self.user, password=self.password,
                            database=self.db, port=self.port)
        self.cur = self.conn.cursor()  

    def __del__(self):
        if self.cur is not None:
            self.cur.close()
        if self.conn is not None:
            self.conn.close()
        if ENV != 'dev' and self.server is not None:
            try:
                self.server.close()
            except Exception as e:
                pass

    def query_record(self, query_sql=None):
        """
        默认查询接口, 获取所有数据
        :param query_sql:
        :return: List
        """
        self.cur.execute(query_sql)
        columns = [desc[0] for desc in self.cur.description]
        result = [dict(zip(columns, row)) for row in self.cur.fetchall()]
        self.conn.commit() 
        return result

    def search_all_with_header(self, sql):
        self.cur.execute(sql)
        result = self.cur.fetchall()
        self.conn.commit()

        data_dict = []
        for field in self.cur.description:
            data_dict.append(field[0])
        return result, data_dict


if __name__ == "__main__":
    ck_client = CkClient(CK_HOST, CK_PORT, CK_USER, CK_PW, CK_DB)
    sql = "select * from user_info limit 2"
    rows = ck_client.query_record(sql)
    print(rows)

Redis封装

#!/usr/bin/python
# -*- coding: utf-8 -*-
import redis
import sys
import json
import time
from settings import redis_host, redis_port, redis_pw


class RedisClient:
    def __init__(self, host=redis_host, port=redis_port, password=redis_pw, db=5):
        self.host = host
        self.port = port
        self.password = password
        self.db = db
        self.connection = redis.Redis(host=self.host, port=self.port, password=self.password, db=self.db)

    def get(self, key):
        return self.connection.get(key)

    def hget_all(self, hash_name):
        return self.connection.hgetall(hash_name)

    def delete(self, key):
        if ENV != 'dev':
            print("不允许执行", ENV)
            sys.exit(-1)
        self.connection.delete(key)

    def set(self, key, value, expiration=None):
        if ENV != 'dev':
            print("不允许执行", ENV)
            sys.exit(-1)
        self.connection.set(key, value, ex=expiration)

    def set_json(self, key, value, expiration=None):
        json_value = json.dumps(value)  # 将 JSON 对象转换为字符串
        self.connection.set(key, json_value, ex=expiration)

    def get_json(self, key):
        json_value = self.connection.get(key)
        if json_value is not None:
            value = json.loads(json_value)  # 将字符串转换为 JSON 对象
            return value
        return None
if __name__ == '__main__':
    # 使用示例
    redis_client = RedisClient()
    key = 'XXXX:XX:XX'
    while True:
        d = redis_client.get(key=key)
        print(d)
        time.sleep(0.1)

http://www.kler.cn/a/396213.html

相关文章:

  • Git如何简单使用
  • 【洛谷】T539823 202411D Phoenix
  • git上传文件到远程仓库
  • Qt文件目录操作
  • uniapp ios app以framwork形式接入sentry
  • ABAP开发学习——ST05 ABAP SQL跟踪工具
  • 嵌入式交叉编译:glib(未成功)
  • React状态管理之Redux
  • TVBox 网络接口
  • Go-RPC框架分层设计
  • AndroidStudio 获取 Git 提交次数和编译时间
  • ubuntu将firewall-config导出为.deb文件
  • [项目代码] YOLOv5 铁路工人安全帽安全背心识别 [目标检测]
  • 深度神经网络DNN反向传播BP算法公式推导
  • Flume1.9.0自定义Sink组件将数据发送至Mysql
  • 基于OpenCV的图片人脸检测研究
  • 【Java】设计模式——工厂模式
  • emulator总结
  • 26. 智能指针
  • Py2Neo 库将 Json 文件导入 Neo4J
  • 新能源汽车磁集成技术的机遇与瓶颈
  • 比特大陆/算能科技嵌入式面试题及参考答案
  • 商业智能BI如何零编码对接低代码数据模型?
  • 从依托指标字典到 NoETL 自动化指标平台,指标口径一致性管理的进阶
  • union介绍及使用
  • leetcode104:二叉树的最大深度