当前位置: 首页 > article >正文

postgresql|数据库|利用sqlparse和psycopg2库批量按顺序执行SQL语句(psyconpg2新优化版本)

一、

旧版批量执行SQL脚本的python文件缺点,优点,以及更新内容

书接上回,postgresql|数据库开发|python的psycopg2库按指定顺序批量执行SQL文件(可离线化部署)_python sql psycopg2-CSDN博客

这个python脚本写了很久了,最近开始实际使用,发现了很多问题,问题主要集中在以下几点:

1、

SQL语句解析不太标准,遇到;分号就也当SQL语句执行,导致很多不必要的错误,并且很多时候不能有效区分多行SQL,因此,本文计划使用sqlparse库来做更准确的SQL语句解析

2、

批量的SQL文件跑完后,并没有一个比较详细的总结报告,遇到问题不太好排查,因此,本文对此做了优化,当一个目录内的SQL文件都执行完毕后,给一个相对详细的报告

确保即使某些 SQL 文件执行失败,程序也会继续尝试执行其余的文件,并最终给出一个详细的执行报告,帮助用户了解哪些文件和语句被执行以及哪些文件遇到了问题。这样,用户不仅能知道哪些文件未能成功处理,还能清楚地看到哪些文件及其内部的语句已经成功应用到了数据库中。

3、

SQL文件排序使用正则表达式处理,以改善SQL文件排序有时候不正确的问题

4、

所有的指定目录下的SQL文件都执行,但,执行失败的SQL文件仍留在原文件夹,需要将失败的SQL文件拿出来,手动处理有问题的SQL文件

5、

SQL语句执行输出太多,成功的SQL语句不应该输出到控制台,导致脚本执行情况并不是一目了然,因此,对脚本输出进行优化,以方便SQL脚本的调试

优点:

1、

以SQL文件为单位,每一个SQL文件内的SQL语句要么全部执行成功,如果中间有任何错误就回滚,对于数据库的数据安全是有一定的保障的

2、

该工具执行迅速,效率非常高,但对于锁表的情况,现在暂时没有什么想法

二、

优化后的python脚本源码

import os
import re
import sys
import json
import shutil
import psycopg2
from psycopg2 import sql, OperationalError, ProgrammingError
import sqlparse

def print_colored_text(text, color_code):
    """打印带有颜色的文本"""
    print(f"\033[{color_code}m{text}\033[0m")

# 定义颜色代码
COLORS = {
    'BLACK': 30,
    'RED': 31,
    'GREEN': 32,
    'YELLOW': 33,
    'BLUE': 34,
    'MAGENTA': 35,
    'CYAN': 36,
    'WHITE': 37
}

def find_sql_files(path):
    """查找指定路径下的所有 .sql 文件,并按文件名中第一个出现的数字升序排序返回列表"""
    sql_files = [
        os.path.join(root, file)
        for root, _, files in os.walk(path)
        for file in files
        if file.endswith('.sql')
    ]

    def extract_first_number(filename):
        """从文件名中提取第一个出现的数字序列并转换为整数,用于排序"""
        match = re.search(r'\d+', os.path.basename(filename))
        return int(match.group()) if match else float('inf')  # 如果没有匹配到数字,则排到最后

    # 使用 sorted 函数并指定 key 参数来实现升序排序,仅依据第一个数字
    sorted_sql_files = sorted(sql_files, key=extract_first_number)

    return sorted_sql_files
def execute_sql_file(db_config, sql_file_path, script_dir):
    """执行指定的 SQL 文件,并在遇到错误时移动文件"""
    conn = None
    cursor = None
    successful_statements = []  # 新增: 记录成功的SQL语句
    try:
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        with open(sql_file_path, 'r', encoding='utf-8') as file:
            parsed_statements = sqlparse.split(file.read())
            for raw_stmt in parsed_statements:
                stmt = raw_stmt.strip()
                if not stmt:  # 忽略空语句
                    continue
                
                statements = sqlparse.parse(stmt)
                for statement in statements:
                    if statement.tokens:  # 确保有非空的SQL语句
                        stmt_str = str(statement).strip(';').strip()
                        if stmt_str:  # 忽略空语句
                           # print(stmt_str)
                           # print_colored_text('<<<<<<<=======<<<<<++++++++<<<<<<上面这条语句将要执行了===----========*****', COLORS['CYAN'])
                            try:
                                cursor.execute(stmt_str)
                                successful_statements.append(stmt_str)  # 添加到成功语句列表
                                #print_colored_text('执行成功!', COLORS['GREEN'])
                            except (Exception, psycopg2.DatabaseError) as e:
                                print(f"执行失败的语句是:")
                                print(f"{stmt_str}")
                                print("=========这是第一个分隔符===================")
                                print("报错详细信息是:")
                                print(e)
                                print("===========这是第二个分隔符===================")
                                print(f"执行的文件名称是:")
                                print(f"{sql_file_path}")
                                conn.rollback()  # 回滚事务
                                move_failed_file(sql_file_path, script_dir)
                                return False, successful_statements
        conn.commit()
        print_colored_text(f"Executed SQL file successfully: {sql_file_path}", COLORS['RED'])
        return True, successful_statements
    except (OperationalError, ProgrammingError) as error:
        print(f"Database error occurred while executing SQL file {sql_file_path}: {error}")
        if conn:
            conn.rollback()
        return False, successful_statements
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

def move_failed_file(src, dst_dir):
    """将失败的SQL文件移动到指定的目标目录"""
    try:
        os.makedirs(dst_dir, exist_ok=True)
        dst = os.path.join(dst_dir, os.path.basename(src))
        print(f"Moving failed SQL file to {dst}")
        shutil.move(src, dst)
    except Exception as e:
        print(f"Failed to move the file {src}: {e}")

def main():
    if len(sys.argv) != 3:
        print("Usage: python script.py <user> <path_to_search>")
        sys.exit(1)

    user = sys.argv[1]
    search_path = sys.argv[2]
    script_dir = os.path.dirname(os.path.abspath(__file__))  # 获取脚本所在目录

    try:
        with open('test.json', 'r', encoding='utf-8') as f:
            params = json.load(f)
            db_config = params.get('db_config', {})
            required_keys = ['dbname', 'user', 'password', 'host', 'port']
            if not all(key in db_config for key in required_keys):
                print("Error: Missing required database configuration in test.json")
                sys.exit(1)
            db_config['user'] = user
    except FileNotFoundError:
        print("Error: test.json not found")
        sys.exit(1)
    except json.JSONDecodeError:
        print("Error: test.json is not a valid JSON file")
        sys.exit(1)

    sql_files = list(find_sql_files(search_path))
    print(f"Found {len(sql_files)} SQL files: {sql_files}")
    print_colored_text(f'|||||二十秒后开始执行{sql_files},以打印出来的顺序依次执行SQL文件|||||||||||', COLORS['MAGENTA'])

    failed_files = []
    executed_statements = []
    successful_files = []  # 新增: 记录成功的SQL文件

    if sql_files:
        for sql_file in sql_files:
            print_colored_text(f'这个文件将要执行: {sql_file}\n\n', COLORS['YELLOW'])
            success, statements = execute_sql_file(db_config, sql_file, script_dir)
            if not success:
                print(f"Failed to execute SQL file: {sql_file}. Continuing with the next file...")
                failed_files.append(sql_file)
            else:
                executed_statements.extend(statements)
                successful_files.append(sql_file)  # 添加到成功文件列表
        # 打印总结信息
        if failed_files:
            print_colored_text("\nThe following SQL files failed to execute:", COLORS['RED'])
            for failed_file in failed_files:
                print(f"- {failed_file}")
        else:
            print_colored_text("\nAll SQL files executed successfully.", COLORS['GREEN'])
#        print_colored_text("\nThe following SQL statements were executed successfully:", COLORS['GREEN'])
#        for stmt in executed_statements:
#            print(f"- {stmt}")
        print_colored_text("\nThe following SQL files were executed successfully:", COLORS['GREEN'])
        for successful_file in successful_files:
            print(f"- {successful_file}")

    else:
        print("No SQL files found in the specified path.")

if __name__ == "__main__":
    main()

该python脚本依赖于python3环境,libpq.so,pyscopg库,sqlparse库,这些库什么的都可以离线安装,相关文件都已放到百度网盘内了

通过网盘分享的文件:批量执行SQL语句项目
链接: https://pan.baidu.com/s/1zCAL78hp2-92NdjIHCjM0w?pwd=gkw1 提取码: gkw1 

 python3.zip 文件里都是rpm包,适用于centos7,解压后,怎么安装就不在这多说了
其中,里面的sqlpaser.tar.gz 需要解压到/usr/local/lib/python3.6目录下,如果没有python3.6目录,建立即可

psycopg2.gz这个文件里的内容解压到/usr/lib64/python3.6目录下

解压完毕后,需要执行python3 -V 命令,以激活sqlparse库

最终目录如下所示即可:

[root@centos7 ~]# ls /usr/lib64/python3.6/site-packages/
psycopg2  psycopg2-2.8.6-py3.6.egg-info  __pycache__  README.txt
[root@centos7 ~]# ls /usr/local/lib/python3.6/site-packages/
sqlparse  sqlparse-0.4.4.dist-info

三、

重点代码解析

1、

代码编程结构

本次代码编写仍然是延续上个版本,主要功能封装为方法,在main方法统一集中调用

主要是一个main主方法+4个功能方法,分别是控制台颜色渲染方法,SQL文件搜寻方法,SQL文件调用psyconpg2,sqlparse库逐行执行SQL语句方法,移动执行失败SQL文件到脚本当前目录方法,整体调用main方法

2、

控制台颜色渲染方法

该方法主要是使用了字典,形参调用形式,例如该方法的调用:

print_colored_text(f"Executed SQL file successfully: {sql_file_path}", COLORS['RED'])

没什么好说的,主要就是字典的应用是难点 

3、

SQL文件搜寻方法

def find_sql_files(path):
    """查找指定路径下的所有 .sql 文件,并按文件名中第一个出现的数字升序排序返回列表"""
    sql_files = [
        os.path.join(root, file)
        for root, _, files in os.walk(path)
        for file in files
        if file.endswith('.sql')
    ]

    def extract_first_number(filename):
        """从文件名中提取第一个出现的数字序列并转换为整数,用于排序"""
        match = re.search(r'\d+', os.path.basename(filename))
        return int(match.group()) if match else float('inf')  # 如果没有匹配到数字,则排到最后

    # 使用 sorted 函数并指定 key 参数来实现升序排序,仅依据第一个数字
    sorted_sql_files = sorted(sql_files, key=extract_first_number)

    return sorted_sql_files

这一段代码难点主要在方法嵌套,首先迭代寻找指定的路径下所有的SQL文件,然后通过子方法,调用正则表达式重新排序,主要是以数字开始的文件

这里需要注意,搜寻到的SQL文件是返回一个列表

4、

SQL文件调用psyconpg2,sqlparse库逐行执行SQL语句方法

该方法主要是业务逻辑实现,主要逻辑是以单个SQL文件为整体事务,如果某个SQL文件有错误,事务回滚,并调用移动执行失败SQL文件到脚本当前目录方法;如果该SQL文件顺利执行完成,则提交事务

这样做的目的是保护数据库,避免不必要的数据混乱

无论如何,当一个SQL文件执行完毕,都会将数据库连接关闭,游标关闭

编写的时候考虑了一下,还是需要捕获错误并将详细错误信息打印,并收集失败的SQL文件和成功的SQL文件,在main方法内将要调用这两个列表,list

5、

移动执行失败SQL文件到脚本当前目录方法

该方法没什么特别的,主要是调用此方法的形式,在main方法内,这里比较绕,当时编写的时候考虑了很久

6、

main方法

整体逻辑封装都在此方法内

    user = sys.argv[1]
    search_path = sys.argv[2]
    script_dir = os.path.dirname(os.path.abspath(__file__))  # 获取脚本所在目录

__file__ 是一个非常有用的内置变量,它帮助你在编写脚本时能够动态地确定文件的位置,而不需要硬编码路径。这对于提高代码的可移植性和维护性非常有帮助。如果你在开发过程中遇到需要根据脚本位置来定位其他文件的情况,说实话这个变量我是差点给忘记掉的

那么,为什么要有user这个变量呢?考虑到很多时候并不是有高权限的数据库账号

其它的就没什么好说的了

四、

数据库信息文件json

有需要实践的同学按实际情况填写此json文件

{
    "db_config": {
        "dbname": "postgres",
        "user": "postgres",
        "password": "xxxxx",
        "host": "192.168.xxx.xx",
        "port": "1543"
    }
}

四、

执行方式和执行结果示例

[root@centos7 ~]# python3 test7.py postgres ./
Found 2 SQL files: ['./12323.sql', './33333.sql']
|||||二十秒后开始执行['./12323.sql', './33333.sql'],以打印出来的顺序依次执行SQL文件|||||||||||
这个文件将要执行: ./12323.sql


执行失败的语句是:
-- 插入部门数据 ;
INSERT INTO 
dept VALUES (10, '战略部', '咸阳')
=========这是第一个分隔符===================
报错详细信息是:
duplicate key value violates unique constraint "dept_pkey"
DETAIL:  Key (deptno)=(10) already exists.

===========这是第二个分隔符===================
执行的文件名称是:
./12323.sql
Moving failed SQL file to /root/12323.sql
Failed to execute SQL file: ./12323.sql. Continuing with the next file...
这个文件将要执行: ./33333.sql


执行失败的语句是:
-- 插入部门数据 ;
INSERT INTO 
dept VALUES (10, '战略部', '咸阳')
=========这是第一个分隔符===================
报错详细信息是:
duplicate key value violates unique constraint "dept_pkey"
DETAIL:  Key (deptno)=(10) already exists.

===========这是第二个分隔符===================
执行的文件名称是:
./33333.sql
Moving failed SQL file to /root/33333.sql
Failed to execute SQL file: ./33333.sql. Continuing with the next file...

The following SQL files failed to execute:
- ./12323.sql
- ./33333.sql

The following SQL files were executed successfully:

全失败的:

有失败有成功的情况:

[root@centos7 ~]# python3 test7.py postgres ./
Found 2 SQL files: ['./12323.sql', './33333.sql']
|||||二十秒后开始执行['./12323.sql', './33333.sql'],以打印出来的顺序依次执行SQL文件|||||||||||
这个文件将要执行: ./12323.sql


Executed SQL file successfully: ./12323.sql
这个文件将要执行: ./33333.sql


执行失败的语句是:
-- 插入部门数据 ;
INSERT INTO 
dept VALUES (10, '战略部', '咸阳')
=========这是第一个分隔符===================
报错详细信息是:
duplicate key value violates unique constraint "dept_pkey"
DETAIL:  Key (deptno)=(10) already exists.

===========这是第二个分隔符===================
执行的文件名称是:
./33333.sql
Moving failed SQL file to /root/33333.sql
Failed to execute SQL file: ./33333.sql. Continuing with the next file...

The following SQL files failed to execute:
- ./33333.sql

The following SQL files were executed successfully:
- ./12323.sql

全部成功的情况:

[root@centos7 ~]# python3 test7.py postgres ./
Found 1 SQL files: ['./12323.sql']
|||||二十秒后开始执行['./12323.sql'],以打印出来的顺序依次执行SQL文件|||||||||||
这个文件将要执行: ./12323.sql


Executed SQL file successfully: ./12323.sql

All SQL files executed successfully.

The following SQL files were executed successfully:
- ./12323.sql

 

🆗,这个SQL文件批量执行的小工具就介绍到这了,欢迎各位大拿指正错误!!!!!


http://www.kler.cn/a/499846.html

相关文章:

  • python类和对象
  • 请求方式(基于注解实现)
  • vue.js 使用router-link替代a标签实现高亮
  • Windows11环境下设置MySQL8字符集utf8mb4_unicode_ci
  • UE材质控制UV
  • axios的替代方案onion-middleware
  • Windows 下安装 PyTorch 的常见问题及解决方法
  • 在php中,Fiber、Swoole、Swow这3个协程都是如何并行运行的?
  • 【HTML+CSS+JS+VUE】web前端教程-3-标题标签
  • Seata搭建
  • 解锁 C# 与 LiteDB 嵌入式 NoSQL 数据库
  • JavaScript的输出
  • 浅谈容灾技术方案详解
  • 2025年实训总结
  • 大语言模型的前沿探索:从理论到实践的深度剖析
  • 深入理解计算机系统——优化程序性能(一)
  • 类加载器和双亲委派
  • 深度学习与机器学习的关系和差别?
  • CMD批处理命令入门(4)——ping,ipconfig,arp,start,shutdown,taskkill
  • 【Unity3D】利用IJob、Burst优化处理切割物体
  • Redis 多路复用(Multiplexing)