当前位置: 首页 > article >正文

mysql到doris的DDL整库转换工具

一、简介

适用于常规的mysql-ddl到doris-ddl的整库转换,其他需求可以修改代码实现。
需要链接到mysql,实际上我做过对比,解析DDL VS 直接查Mysql,解析DDL可能会有解析失败的情况,查询Mysql元数据的方式比较稳定可靠。

整体思路:
1.获取mysql元数据,
2.转换成doris的格式
3.套doris ddl模板拼起来

二、获取Mysql元数据

python获取Mysql元数据依赖pymysql包,因此要提前准备好包,这里不赘述

2.1 mysql连接器

# mysql连接
def mysql_connect(config):
    content = ''
    # MySQL连接
    return pymysql.connect(
        host=config['mysql']['host'],
        user=config['mysql']['user'],
        port=config['mysql']['port'],
        password=config['mysql']['password'],
        database=config['mysql']['database'],
    )

2.2获取所有表

有了连接器,就可以执行具体的sql
先用 show tables 快速的获取库里所有的表

# 获取库里所有的表
def mysql_table_list(mysql_conn):
    with mysql_conn.cursor() as cursor:
        cursor.execute("SHOW TABLES")
        return [row[0] for row in cursor.fetchall()]

2.3表元数据

接着遍历所有的表,依次获取每个表的元数据。
这里常用的sql有:

desc table_name;
SELECT *  FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'database_name' AND TABLE_NAME = 'table_name';
show full columns from table_name  ;

这里三个sql语句我做了简单的对比
第一个:内容太少了,会丢失字段注释
第二个:内容很全
第三个:东西比较多,满足使用

这里使用第三种,本质是后面两个都可以

# 获取表字段的所有信息
def describe_table(mysql_conn,table_name):
    with mysql_conn.cursor() as cursor:
        cursor.execute(f"show full columns from {table_name}")
        return cursor.fetchall()

2.4 表注释

到此一张表的所有字段信息都有了,不过还缺一个 表注释
从 information_schema.TABLES 里获取表的注释

# 表注释
def mysql_table_comment(mysql_conn,table_name,database_name):
    with mysql_conn.cursor() as cursor:
        cursor.execute(f"SELECT table_name , table_comment  FROM information_schema.TABLES WHERE table_schema='{database_name}' and table_name = '{table_name}'")
        # return cursor.fetchone()
        return [row[1] for row in cursor.fetchall()]

到此,一张表的所有信息都有了,就要开始转成适用于doris类型

三、转换适用于doris

3.1 类型转换

mysql -> doris 有一些类型需要做映射,下面只是列举了一切常用的

# 将MySQL类型转换为doris类型
def convert_type(mysql_type):
    mysql_type = mysql_type.lower()
    if 'bigint' in mysql_type:
        return 'BIGINT'
    elif 'int' in mysql_type:
        return 'INT'
    elif  mysql_type.startswith('varchar'):
        length = mysql_type.split('(')[1].split(')')[0]
        return f'VARCHAR({int(length) * 3})'  # 考虑UTF-8编码
    elif 'datetime' in mysql_type:
        return 'DATETIME'
    elif mysql_type.startswith('char'):
        length = mysql_type.split('(')[1].split(')')[0]
        return f'CHAR({int(length) * 3})'
    elif 'date' in mysql_type:
        return 'DATE'
    elif 'timestamp' in mysql_type:
        return 'datetime'
    elif 'text' in mysql_type:
        return 'STRING'  # TEXT类型最大长度
    elif 'tinyint' in mysql_type:
        return 'TINYINT'
    else:
        return mysql_type.upper()  # 其他类型直接转换为大写

这里有个坑,顺序会造成类型匹配时穿透,bigint 需要放在 int类型前面,否则会匹配到int里。

3.2 key字段提取

我这里默认都使用doris的 UNIQUE模型,在转换时常用mysql的primary key来替代。 就需要做一下转换。

def uqune_key(fileds):
    unqune_key = []
    for filed in fileds:
        if filed[4] == 'PRI':
            unqune_key.append('`'+ filed[0]+'`')
    # 如无主键 默认第一个字段作为主键
    if len(unqune_key) == 0:
        unqune_key.append('`'+fileds[0][0]+'`')
    return unqune_key

这里的fileds 就来自上面的 describe_table 方法 。

3.3 拼DDL

到这里就可以把整体的建表语句拼接出来了

doris_table_name = conf['doris']['table_prefix'] + table_name + conf['doris']['table_suffix']
    uqune_key = ','.join(uqune_key_list)
    ddl = 'CREATE TABLE ' + doris_table_name + ' ( \n' + doris_columns_str + ') \n'+  'UNIQUE KEY('+uqune_key + ') \n'
    if table_comment is not None:
        ddl += 'COMMENT \'' + table_comment + '\' \n'
    ddl += 'DISTRIBUTED BY HASH(' + uqune_key_list[0] + ') BUCKETS 10' +  ' \n'
    ddl += 'PROPERTIES (\n' + '"replication_num" = "1",\n' + '"in_memory" = "false",\n' + '"storage_format" = "DEFAULT"\n' + '); \n\n'
    return ddl

我这里模版使用的是单副本 。

四、整体代码

以下是整体代码

import json
import pymysql
import re


# 读取JSON文件
def read_and_format_json(file_path):
    # 读取文件内容
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
    # 将内容转换为Python字典
    return json.loads(content)

def mysql_connect(config):
    content = ''
    # MySQL连接
    return pymysql.connect(
        host=config['mysql']['host'],
        user=config['mysql']['user'],
        port=config['mysql']['port'],
        password=config['mysql']['password'],
        database=config['mysql']['database'],
    )
def mysql_table_list(mysql_conn):
    with mysql_conn.cursor() as cursor:
        cursor.execute("SHOW TABLES")
        return [row[0] for row in cursor.fetchall()]


def describe_table(mysql_conn,table_name):
    with mysql_conn.cursor() as cursor:
        cursor.execute(f"show full columns from {table_name}")
        return cursor.fetchall()



# 表注释
def mysql_table_comment(mysql_conn,table_name,database_name):
    with mysql_conn.cursor() as cursor:
        cursor.execute(f"SELECT table_name , table_comment  FROM information_schema.TABLES WHERE table_schema='{database_name}' and table_name = '{table_name}'")
        return [row[1] for row in cursor.fetchall()]




# 将MySQL类型转换为doris类型
def convert_type(mysql_type):
    mysql_type = mysql_type.lower()
    if 'bigint' in mysql_type:
        return 'BIGINT'
    elif 'int' in mysql_type:
        return 'INT'
    elif  mysql_type.startswith('varchar'):
        length = mysql_type.split('(')[1].split(')')[0]
        return f'VARCHAR({int(length) * 3})'  # 考虑UTF-8编码
    elif 'datetime' in mysql_type:
        return 'DATETIME'
    elif mysql_type.startswith('char'):
        length = mysql_type.split('(')[1].split(')')[0]
        return f'CHAR({int(length) * 3})'
    elif 'date' in mysql_type:
        return 'DATE'
    elif 'timestamp' in mysql_type:
        return 'datetime'
    elif 'text' in mysql_type:
        return 'STRING'  # TEXT类型最大长度
    elif 'tinyint' in mysql_type:
        return 'TINYINT'
    else:
        return mysql_type.upper()  # 其他类型直接转换为大写

def column_default_value(default_value):
    if default_value == 'NULL':
        return 'DEFALUT NULL'
    elif default_value == 'CURRENT_TIMESTAMP':
        return 'CURRENT_TIMESTAMP'
    else:
        return default_value



def covert_ddl_column(fileds):

    doris_columns_list = []
    for field in fileds:
        # 0    1     2          3      4       5            6     7           8
        # name type  collation is_null is_key default_vaule extra privileges comment
        # 字段名 类型  非字符集  是否允许空 是否主键  默认值         额外信息     权限 注释
        column_type = convert_type(field[1])
        column = '`'+ field[0] + '`  ' + column_type
        if field[3] == 'YES':
            column += ' NULL '
        else:column+=' NOT NULL '
        if field[5] is not None and column_type != 'DATETIME' and column_type != 'DATE':
            if field[5] == 'NULL':
                column += ' DEFAULT NULL '
            else: column += ' DEFAULT \'' + field[5] + '\''
        if field[8] is not None:
            column += ' COMMENT \'' + field[8] + '\''

        # column += ' ,'
        doris_columns_list.append(column)
    doris_columns_str = ', \n '.join(doris_columns_list)
    return doris_columns_str
def uqune_key(fileds):
    unqune_key = []
    for filed in fileds:
        if filed[4] == 'PRI':
            unqune_key.append('`'+ filed[0]+'`')
    # 如无主键 默认第一个字段作为主键
    if len(unqune_key) == 0:
        unqune_key.append('`'+fileds[0][0]+'`')
    return unqune_key


def concat_ddl(conf,table_name,doris_columns_str,uqune_key_list,table_comment):
    doris_table_name = conf['doris']['table_prefix'] + table_name + conf['doris']['table_suffix']
    uqune_key = ','.join(uqune_key_list)
    ddl = 'CREATE TABLE ' + doris_table_name + ' ( \n' + doris_columns_str + ') \n'+  'UNIQUE KEY('+uqune_key + ') \n'
    if table_comment is not None:
        ddl += 'COMMENT \'' + table_comment + '\' \n'
    ddl += 'DISTRIBUTED BY HASH(' + uqune_key_list[0] + ') BUCKETS 10' +  ' \n'
    ddl += 'PROPERTIES (\n' + '"replication_num" = "1",\n' + '"in_memory" = "false",\n' + '"storage_format" = "DEFAULT"\n' + '); \n\n'
    return ddl




if __name__ == '__main__':
    json_data = read_and_format_json('conf.json')
    mysql_conn = mysql_connect(json_data)
    tables = mysql_table_list(mysql_conn)
    with open('doris_ddl.sql', 'w', encoding='utf-8') as f:
        for table_name in tables:
            print(table_name)
            fileds = describe_table(mysql_conn, table_name)
            doris_columns_str = covert_ddl_column(fileds)
            uqune_key_list = uqune_key(fileds)
            table_comment = mysql_table_comment(mysql_conn,table_name,json_data['mysql']['database'])
            doris_ddl = concat_ddl(json_data,table_name,doris_columns_str,uqune_key_list,table_comment[0])
            f.write(doris_ddl)
            


最后补一个conf.json的格式

{
    "mysql": {
        "host": "",
        "port": ,
        "user": "",
        "password": "",
        "database": ""
    },
    "doris": {
        "database": "ods_project_develop",
        "table_prefix": "ods_project_develop_",
        "table_suffix": "_i_daily"
    }

http://www.kler.cn/a/380360.html

相关文章:

  • 2025生物发酵展(济南)为生物制造产业注入新活力共谱行业新篇章
  • 通过不当变更导致 PostgreSQL 翻车的案例分析与防范
  • 解析JSON字符串的多种方式
  • 如何在Word的表格中一次性插入多行?
  • 考研要求掌握的C语言程度(插入排序)
  • Vue3版本的uniapp项目运行至鸿蒙系统
  • Nop平台与APIJSON的功能对比
  • 国际化教育品牌的人力资源管理利器
  • CVE-2024-51567 CyberPanel upgrademysqlstatus 远程命令执行
  • JavaEE初阶-----servlet-api,Maven创建项目,部署,打包,测试全过程
  • 分类模型onnx推理,并生成混淆矩阵
  • 如何在本地Linux服务器搭建WordPress网站结合内网穿透随时随地可访问
  • 使用 Python 中的 pydub实现 M4A 转 MP3 转换器
  • element-plus按需引入报错IconsResolver is not a function
  • 经纬恒润车载TSN网络测试仪TestBase-ATT全新上线!
  • C#、C和C++的主要区别
  • Python | Leetcode Python题解之第530题二叉搜索树的最小绝对差
  • 将Notepad++添加到右键菜单【一招实现】
  • Rust 力扣 - 1297. 子串的最大出现次数
  • 使用python爬取某新闻网并进行数据分析
  • 【论文阅读笔记】Wavelet Convolutions for Large Receptive Fields
  • 论文阅读(一种基于球面投影和特征提取的岩石点云快速配准算法)
  • [ DOS 命令基础 4 ] DOS 命令命令详解-端口进程相关命令
  • 【ROS2】hbm_img_msgs/msg/HbmMsg1080P 转 opencv cv::Mat
  • 江协科技STM32学习- P32 MPU6050
  • PHP不良事件上报系统源码,医院安全不良事件管理系统,基于 vue2+element+ laravel框架开发