当前位置: 首页 > article >正文

sql server导入mysql,使用python多线程

概述

在上一篇文章中,链接:https://www.cnblogs.com/xiao987334176/p/18377915

使用工具SQLyog进行导入,传输过程是单进程的,一个表一个表的传,一条条数据插入,所以传输速度会比较慢。

如果sql server mdf文件在200m左右,传输需要花费30分钟左右。

如果来了一个10GB左右的mdf的文件,需要25个小时,时间太漫长了。

mysql表结构重构

如果使用python多进程导入,那么导入顺序是错乱的。如果表结构包含外键关联,例如:

CREATE TABLE `DimAccount` (
  `AccountKey` int NOT NULL AUTO_INCREMENT,
  `ParentAccountKey` int DEFAULT NULL,
  `AccountCodeAlternateKey` int DEFAULT NULL,
  `ParentAccountCodeAlternateKey` int DEFAULT NULL,
  `AccountDescription` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `AccountType` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `Operator` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `CustomMembers` text COLLATE utf8mb4_unicode_ci,
  `ValueType` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `CustomMemberOptions` varchar(200) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`AccountKey`),
  KEY `FK_DimAccount_DimAccount` (`ParentAccountKey`),
  CONSTRAINT `DimAccount_ibfk_1` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_10` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_11` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_2` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_3` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_4` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_5` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_6` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_7` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_8` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_9` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT
) ENGINE=InnoDB AUTO_INCREMENT=102 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

那么就需要将外键关联全部删除掉,改造结果如下:

CREATE TABLE `DimAccount` (
  `AccountKey` int NOT NULL AUTO_INCREMENT,
  `ParentAccountKey` int DEFAULT NULL,
  `AccountCodeAlternateKey` int DEFAULT NULL,
  `ParentAccountCodeAlternateKey` int DEFAULT NULL,
  `AccountDescription` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `AccountType` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `Operator` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `CustomMembers` text COLLATE utf8mb4_unicode_ci,
  `ValueType` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `CustomMemberOptions` varchar(200) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`AccountKey`)
) ENGINE=InnoDB AUTO_INCREMENT=102 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

由于只是数据导入,我并不需要外键关联。外键关联特别麻烦,插入一条数据,还需要校验父表中的关联id,如果不匹配,就会导致插入数据失败。

上面只是举例了其中一个表,其他表需要一个个检查,由外键关键的,就全部删掉。

python多线程导入

这里使用python 3.x版本

安装模块

pip install pymssql
pip install pymysql

test.py

import pymssql
import pymysql
import json
from datetime import datetime
import multiprocessing
import time


def write_table_data(table_name):
    try:
        # 数据库名
        database_name='AdventureWorksDW2014'

        # 连接数据库
        mssql_conn = pymssql.connect(server='192.168.20.131', user='sa', password='Y.saabcd@1234', database=database_name)
        
        # 连接到MySQL数据库
        mysql_conn = pymysql.connect(
            host='192.168.20.131',  # 替换为你的数据库主机名
            user='root',  # 替换为你的数据库用户名
            password='root',  # 替换为你的数据库密码
            database=database_name  # 替换为你的数据库名
        )

        # 创建cursor对象
        mssql_cursor = mssql_conn.cursor()
        # 创建一个cursor对象
        mysql_cursor = mysql_conn.cursor()

        # 清空表
        print("清空表%s"%table_name)
        mysql_cursor.execute('TRUNCATE TABLE %s;'%table_name)
        mysql_conn.commit()

        # 执行SQL查询
        # mssql_cursor.execute("SELECT top 1 * FROM %s"%table_name)
        mssql_cursor.execute("SELECT * FROM %s"%table_name)

        mssql_rows = mssql_cursor.fetchall()
        values = ', '.join(['%s'] * len(mssql_rows[0]))
        insert_query = f'INSERT INTO {database_name}.{table_name} VALUES ({values})'
        # print(insert_query)
        #批量插入数据
        mysql_cursor.executemany(insert_query, mssql_rows)
            
        # 提交更改,在所有插入操作完成后只调用一次,减少与数据库的交互次数,提高整体性能
        mysql_conn.commit()
        print("%s 表数据导入完成"%table_name)
        
        # 关闭cursor和连接
        mssql_cursor.close()
        mssql_conn.close()

        mysql_cursor.close()
        mysql_conn.close()

    except Exception as e:
        print("程序异常",e)

if __name__ == "__main__":
    # 记录程序开始执行时间
    start_time = time.time()

    # 指定进程数
    num_processes = 30

    table_list=['FactFinance','DimAccount','DatabaseLog', 'AdventureWorksDWBuildVersion', 'DimCurrency', 'DimCustomer', 'DimDate', 'DimDepartmentGroup', 'DimEmployee', 'DimGeography', 'DimOrganization', 'DimProduct', 'DimProductCategory', 'DimProductSubcategory', 'DimPromotion', 'DimReseller', 'DimSalesReason', 'DimSalesTerritory', 'DimScenario', 'FactAdditionalInternationalProductDescription', 'FactCallCenter', 'FactCurrencyRate', 'FactInternetSales', 'FactInternetSalesReason', 'FactProductInventory', 'FactResellerSales', 'FactSalesQuota', 'FactSurveyResponse', 'NewFactCurrencyRate', 'ProspectiveBuyer']
    # table_list=['DatabaseLog']
    
    # 创建进程池,指定最大进程数
    with multiprocessing.Pool(processes=num_processes) as pool:
        # 使用pool.apply_async异步执行函数
        for table_name in table_list:
            pool.apply_async(write_table_data, args=(table_name,))

        # 等待所有异步操作完成
        pool.close()
        pool.join()


    # 记录程序执行结束的时间
    end_time = time.time()

    # 计算程序执行所需的时间
    execution_time = end_time - start_time

    print(f"程序执行时间为:{execution_time}秒")

执行python文件

python test.py

输出结果:

清空表DimScenario清空表DimSalesReason
清空表DimSalesTerritory

清空表DimDate
清空表DimCurrency
清空表DatabaseLog
清空表DimEmployee
清空表FactCurrencyRate
清空表DimProduct
清空表FactCallCenter
清空表AdventureWorksDWBuildVersion
清空表FactFinance
清空表FactProductInventory
清空表DimGeography
清空表DimOrganization
清空表DimDepartmentGroup
清空表DimProductSubcategory
清空表DimReseller
清空表FactAdditionalInternationalProductDescription
清空表DimAccount
清空表ProspectiveBuyer
清空表FactInternetSalesReason
清空表FactSalesQuota
清空表FactInternetSales
清空表NewFactCurrencyRate
清空表DimCustomer
清空表DimProductCategory
清空表FactSurveyResponse
清空表DimPromotion
DimScenario 表数据导入完成
清空表FactResellerSales
DimSalesReason 表数据导入完成
DimCurrency 表数据导入完成
DatabaseLog 表数据导入完成
DimSalesTerritory 表数据导入完成
AdventureWorksDWBuildVersion 表数据导入完成
DimDepartmentGroup 表数据导入完成
DimAccount 表数据导入完成
FactCallCenter 表数据导入完成
DimProductSubcategory 表数据导入完成
DimDate 表数据导入完成
DimOrganization 表数据导入完成
DimPromotion 表数据导入完成
DimProductCategory 表数据导入完成
FactSalesQuota 表数据导入完成
DimGeography 表数据导入完成
FactSurveyResponse 表数据导入完成
NewFactCurrencyRate 表数据导入完成
DimReseller 表数据导入完成
ProspectiveBuyer 表数据导入完成
FactCurrencyRate 表数据导入完成
FactAdditionalInternationalProductDescription 表数据导入完成
DimEmployee 表数据导入完成
DimProduct 表数据导入完成
FactFinance 表数据导入完成
FactInternetSalesReason 表数据导入完成
DimCustomer 表数据导入完成
FactResellerSales 表数据导入完成
FactInternetSales 表数据导入完成
FactProductInventory 表数据导入完成
程序执行时间为:37.30717396736145秒

从结果上来看,运行花费了37秒。

比用工具SQLyog,花了30分钟,快了48倍左右


http://www.kler.cn/news/283629.html

相关文章:

  • 从blob 下载zip文件到本地并解压
  • 罗德与施瓦茨RS、UPV 音频分析仪 250KHZ 双通道分析仪UPL
  • 【面试经验】字节产品经理二面面经
  • MySQL空间函数ST_Distance_Sphere()的使用
  • Mysql-redo logs,binlog以及undo logs的作用及区别
  • 对mozjpeg中的函数名进行替换
  • 详解Spring AOP
  • 简单的Tcp服务器
  • 【香橙派系列教程】(十六) 语音模块与阿里云结合
  • Kafka日志及常见问题
  • x-cmd mod | x scoop - Windows 开源包管理工具
  • Java、python、php版 美发美甲预约服务平台 美容院管理系统(源码、调试、LW、开题、PPT)
  • 安卓15发布日期确定,安卓15 谷歌GMS认证截止日期有重大变化!安卓版本GMS认证截止时间更新,谷歌GMS认证之MADA/EDLA设备认证截止时间介绍
  • CSS 的文字平滑属性font-smooth
  • C++研发笔记1——github注册文档
  • C++类和对象(5)——运算符重载(以日期类为例)
  • 数据库,SQL和 MySql的三者关系
  • 智能听诊器:开启宠物健康管理新维度
  • 【网络安全】打开这份“开学礼” 谨防骗子“冲业绩”
  • 【Spring Boot 3】【Web】同时启用 HTTP 和 HTTPS
  • vue3+ts+vite项目代码检查报错(vue-tsc)
  • 解决Nginx负载均衡中的慢启动问题:策略与实践
  • k8s-pod 实战八 (gRPC 探测详细分析)
  • Cpp学习手册-基础学习
  • Python 处理 PDF 文件(PyPDF2, ReportLab)
  • 云轴科技ZStack与鼎甲科技共创数据保护新篇章
  • 显示中文字体问题解决:ImportError: The _imagingft C module is not installed
  • 最简洁!四步完成C#——opencv环境配置
  • 大模型企业应用落地系列八》基于大模型的对话式推荐系统》用户交互层
  • Python编码系列—Python CI/CD 实战:构建高效的自动化流程