mysql到doris的DDL整库转换工具
一、简介
适用于常规的mysql-ddl到doris-ddl的整库转换,其他需求可以修改代码实现。
需要链接到mysql,实际上我做过对比,解析DDL VS 直接查Mysql,解析DDL可能会有解析失败的情况,查询Mysql元数据的方式比较稳定可靠。
整体思路:
1.获取mysql元数据,
2.转换成doris的格式
3.套doris ddl模板拼起来
二、获取Mysql元数据
python获取Mysql元数据依赖pymysql包,因此要提前准备好包,这里不赘述
2.1 mysql连接器
# mysql连接
def mysql_connect(config):
content = ''
# MySQL连接
return pymysql.connect(
host=config['mysql']['host'],
user=config['mysql']['user'],
port=config['mysql']['port'],
password=config['mysql']['password'],
database=config['mysql']['database'],
)
2.2获取所有表
有了连接器,就可以执行具体的sql
先用 show tables 快速的获取库里所有的表
# 获取库里所有的表
def mysql_table_list(mysql_conn):
with mysql_conn.cursor() as cursor:
cursor.execute("SHOW TABLES")
return [row[0] for row in cursor.fetchall()]
2.3表元数据
接着遍历所有的表,依次获取每个表的元数据。
这里常用的sql有:
desc table_name;
SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'database_name' AND TABLE_NAME = 'table_name';
show full columns from table_name ;
这里三个sql语句我做了简单的对比
第一个:内容太少了,会丢失字段注释
第二个:内容很全
第三个:东西比较多,满足使用
这里使用第三种,本质是后面两个都可以
# 获取表字段的所有信息
def describe_table(mysql_conn,table_name):
with mysql_conn.cursor() as cursor:
cursor.execute(f"show full columns from {table_name}")
return cursor.fetchall()
2.4 表注释
到此一张表的所有字段信息都有了,不过还缺一个 表注释
从 information_schema.TABLES 里获取表的注释
# 表注释
def mysql_table_comment(mysql_conn,table_name,database_name):
with mysql_conn.cursor() as cursor:
cursor.execute(f"SELECT table_name , table_comment FROM information_schema.TABLES WHERE table_schema='{database_name}' and table_name = '{table_name}'")
# return cursor.fetchone()
return [row[1] for row in cursor.fetchall()]
到此,一张表的所有信息都有了,就要开始转成适用于doris类型
三、转换适用于doris
3.1 类型转换
mysql -> doris 有一些类型需要做映射,下面只是列举了一切常用的
# 将MySQL类型转换为doris类型
def convert_type(mysql_type):
mysql_type = mysql_type.lower()
if 'bigint' in mysql_type:
return 'BIGINT'
elif 'int' in mysql_type:
return 'INT'
elif mysql_type.startswith('varchar'):
length = mysql_type.split('(')[1].split(')')[0]
return f'VARCHAR({int(length) * 3})' # 考虑UTF-8编码
elif 'datetime' in mysql_type:
return 'DATETIME'
elif mysql_type.startswith('char'):
length = mysql_type.split('(')[1].split(')')[0]
return f'CHAR({int(length) * 3})'
elif 'date' in mysql_type:
return 'DATE'
elif 'timestamp' in mysql_type:
return 'datetime'
elif 'text' in mysql_type:
return 'STRING' # TEXT类型最大长度
elif 'tinyint' in mysql_type:
return 'TINYINT'
else:
return mysql_type.upper() # 其他类型直接转换为大写
这里有个坑,顺序会造成类型匹配时穿透,bigint 需要放在 int类型前面,否则会匹配到int里。
3.2 key字段提取
我这里默认都使用doris的 UNIQUE模型,在转换时常用mysql的primary key来替代。 就需要做一下转换。
def uqune_key(fileds):
unqune_key = []
for filed in fileds:
if filed[4] == 'PRI':
unqune_key.append('`'+ filed[0]+'`')
# 如无主键 默认第一个字段作为主键
if len(unqune_key) == 0:
unqune_key.append('`'+fileds[0][0]+'`')
return unqune_key
这里的fileds 就来自上面的 describe_table 方法 。
3.3 拼DDL
到这里就可以把整体的建表语句拼接出来了
doris_table_name = conf['doris']['table_prefix'] + table_name + conf['doris']['table_suffix']
uqune_key = ','.join(uqune_key_list)
ddl = 'CREATE TABLE ' + doris_table_name + ' ( \n' + doris_columns_str + ') \n'+ 'UNIQUE KEY('+uqune_key + ') \n'
if table_comment is not None:
ddl += 'COMMENT \'' + table_comment + '\' \n'
ddl += 'DISTRIBUTED BY HASH(' + uqune_key_list[0] + ') BUCKETS 10' + ' \n'
ddl += 'PROPERTIES (\n' + '"replication_num" = "1",\n' + '"in_memory" = "false",\n' + '"storage_format" = "DEFAULT"\n' + '); \n\n'
return ddl
我这里模版使用的是单副本 。
四、整体代码
以下是整体代码
import json
import pymysql
import re
# 读取JSON文件
def read_and_format_json(file_path):
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# 将内容转换为Python字典
return json.loads(content)
def mysql_connect(config):
content = ''
# MySQL连接
return pymysql.connect(
host=config['mysql']['host'],
user=config['mysql']['user'],
port=config['mysql']['port'],
password=config['mysql']['password'],
database=config['mysql']['database'],
)
def mysql_table_list(mysql_conn):
with mysql_conn.cursor() as cursor:
cursor.execute("SHOW TABLES")
return [row[0] for row in cursor.fetchall()]
def describe_table(mysql_conn,table_name):
with mysql_conn.cursor() as cursor:
cursor.execute(f"show full columns from {table_name}")
return cursor.fetchall()
# 表注释
def mysql_table_comment(mysql_conn,table_name,database_name):
with mysql_conn.cursor() as cursor:
cursor.execute(f"SELECT table_name , table_comment FROM information_schema.TABLES WHERE table_schema='{database_name}' and table_name = '{table_name}'")
return [row[1] for row in cursor.fetchall()]
# 将MySQL类型转换为doris类型
def convert_type(mysql_type):
mysql_type = mysql_type.lower()
if 'bigint' in mysql_type:
return 'BIGINT'
elif 'int' in mysql_type:
return 'INT'
elif mysql_type.startswith('varchar'):
length = mysql_type.split('(')[1].split(')')[0]
return f'VARCHAR({int(length) * 3})' # 考虑UTF-8编码
elif 'datetime' in mysql_type:
return 'DATETIME'
elif mysql_type.startswith('char'):
length = mysql_type.split('(')[1].split(')')[0]
return f'CHAR({int(length) * 3})'
elif 'date' in mysql_type:
return 'DATE'
elif 'timestamp' in mysql_type:
return 'datetime'
elif 'text' in mysql_type:
return 'STRING' # TEXT类型最大长度
elif 'tinyint' in mysql_type:
return 'TINYINT'
else:
return mysql_type.upper() # 其他类型直接转换为大写
def column_default_value(default_value):
if default_value == 'NULL':
return 'DEFALUT NULL'
elif default_value == 'CURRENT_TIMESTAMP':
return 'CURRENT_TIMESTAMP'
else:
return default_value
def covert_ddl_column(fileds):
doris_columns_list = []
for field in fileds:
# 0 1 2 3 4 5 6 7 8
# name type collation is_null is_key default_vaule extra privileges comment
# 字段名 类型 非字符集 是否允许空 是否主键 默认值 额外信息 权限 注释
column_type = convert_type(field[1])
column = '`'+ field[0] + '` ' + column_type
if field[3] == 'YES':
column += ' NULL '
else:column+=' NOT NULL '
if field[5] is not None and column_type != 'DATETIME' and column_type != 'DATE':
if field[5] == 'NULL':
column += ' DEFAULT NULL '
else: column += ' DEFAULT \'' + field[5] + '\''
if field[8] is not None:
column += ' COMMENT \'' + field[8] + '\''
# column += ' ,'
doris_columns_list.append(column)
doris_columns_str = ', \n '.join(doris_columns_list)
return doris_columns_str
def uqune_key(fileds):
unqune_key = []
for filed in fileds:
if filed[4] == 'PRI':
unqune_key.append('`'+ filed[0]+'`')
# 如无主键 默认第一个字段作为主键
if len(unqune_key) == 0:
unqune_key.append('`'+fileds[0][0]+'`')
return unqune_key
def concat_ddl(conf,table_name,doris_columns_str,uqune_key_list,table_comment):
doris_table_name = conf['doris']['table_prefix'] + table_name + conf['doris']['table_suffix']
uqune_key = ','.join(uqune_key_list)
ddl = 'CREATE TABLE ' + doris_table_name + ' ( \n' + doris_columns_str + ') \n'+ 'UNIQUE KEY('+uqune_key + ') \n'
if table_comment is not None:
ddl += 'COMMENT \'' + table_comment + '\' \n'
ddl += 'DISTRIBUTED BY HASH(' + uqune_key_list[0] + ') BUCKETS 10' + ' \n'
ddl += 'PROPERTIES (\n' + '"replication_num" = "1",\n' + '"in_memory" = "false",\n' + '"storage_format" = "DEFAULT"\n' + '); \n\n'
return ddl
if __name__ == '__main__':
json_data = read_and_format_json('conf.json')
mysql_conn = mysql_connect(json_data)
tables = mysql_table_list(mysql_conn)
with open('doris_ddl.sql', 'w', encoding='utf-8') as f:
for table_name in tables:
print(table_name)
fileds = describe_table(mysql_conn, table_name)
doris_columns_str = covert_ddl_column(fileds)
uqune_key_list = uqune_key(fileds)
table_comment = mysql_table_comment(mysql_conn,table_name,json_data['mysql']['database'])
doris_ddl = concat_ddl(json_data,table_name,doris_columns_str,uqune_key_list,table_comment[0])
f.write(doris_ddl)
最后补一个conf.json的格式
{
"mysql": {
"host": "",
"port": ,
"user": "",
"password": "",
"database": ""
},
"doris": {
"database": "ods_project_develop",
"table_prefix": "ods_project_develop_",
"table_suffix": "_i_daily"
}