import pymysql
def get_table_structure(connection, cursor, database, table):
# 获取表结构
query = f"SHOW COLUMNS FROM {database}.{table}"
cursor.execute(query)
return cursor.fetchall()
def generate_create_table(cursor, table_structure, database, table):
# 生成创建表的SQL语句,包含字段约束、默认值和主键信息
columns = []
for column in table_structure:
column_name = column[0]
column_type = column[1]
is_nullable = "NOT NULL" if column[2] == "NO" else ""
default_value = f"DEFAULT {column[4]}" if column[4] is not None else ""
columns.append(f"`{column_name}` {column_type} {is_nullable} {default_value}")
# 查找主键信息
cursor.execute(f"SHOW KEYS FROM {table} WHERE Key_name = 'PRIMARY'")
primary_key = cursor.fetchone()
if primary_key:
columns.append(f"PRIMARY KEY (`{primary_key[4]}`)")
return f"CREATE TABLE `{database}`.`{table}` ({', '.join(columns)});"
def generate_add_or_modify_column(connection, cursor, database, table, column_structure):
# 生成添加字段或修改字段的SQL语句
alter_statements = []
for column_info in column_structure:
# column_name, column_type, is_nullable, default_value = column_info
# default_clause = f"DEFAULT {default_value}" if default_value is not None else ""
column_name = column_info[0]
column_type = column_info[1]
is_nullable = "NOT NULL" if column_info[2] == "NO" else ""
default_value = f"DEFAULT {column_info[4]}" if (column_info[4] is not None and column_info[4]) else "DEFAULT NULL"
# 检查字段是否已经存在
if is_column_exists(cursor, database, table, column_name):
# 字段存在,生成修改字段的SQL语句
alter_statements.append(
f"ALTER TABLE `{database}`.`{table}` MODIFY COLUMN `{column_name}` {column_type} {is_nullable} {default_value} ;")
else:
# 字段不存在,生成添加字段的SQL语句
alter_statements.append(
f"ALTER TABLE `{database}`.`{table}` ADD COLUMN `{column_name}` {column_type} {is_nullable} {default_value} ;")
return alter_statements
def is_column_exists(cursor, database, table, column_name):
# 检查字段是否存在于目标表中
query = f"SHOW COLUMNS FROM `{database}`.`{table}` LIKE '{column_name}'"
cursor.execute(query)
return cursor.fetchone() is not None
if __name__ == "__main__":
dba ="test";
dbb = "test1";
# 连接数据库A
db_a = pymysql.connect(host="127.0.0.1", port=3307, user="root", password="123456", database=dba)
cursor_a = db_a.cursor()
# 连接数据库B
db_b = pymysql.connect(host="127.0.0.1", port=3307, user="root", password="123456", database=dbb)
cursor_b = db_b.cursor()
# 获取数据库A中的表
cursor_a.execute("SHOW TABLES")
database_a_tables = [table[0] for table in cursor_a.fetchall()]
# 获取数据库B中的表
cursor_b.execute("SHOW TABLES")
database_b_tables = [table[0] for table in cursor_b.fetchall()]
# 比较两个数据库的表结构并生成SQL语句
for table in database_a_tables:
if table not in database_b_tables:
# 表在库B中缺失,生成创建表语句
table_structure = get_table_structure(db_a, cursor_a,dba, table)
create_table_sql = generate_create_table(cursor_a, table_structure, dbb, table)
print(create_table_sql)
else:
# 表在库B中存在,比较字段
columns_a = get_table_structure(db_a, cursor_a, dba, table)
columns_b = get_table_structure(db_b, cursor_b,dbb, table)
# 字段差异
column_diff = [col for col in columns_a if col not in columns_b]
# 生成添加字段或修改字段的SQL语句
alter_column_sqls = generate_add_or_modify_column(db_b, cursor_b, dbb, table, column_diff)
for alter_column_sql in alter_column_sqls:
print(alter_column_sql)
# 关闭数据库连接
db_a.close()
db_b.close()