当前位置: 首页 > article >正文

跨数据库定时数据推送实战

跨数据库定时数据推送实战指南(Python实现)

摘要

本文介绍如何通过Python实现跨服务器数据库定时数据推送,支持MySQL和达梦数据库,无需对方编写接收代码,直接通过数据库凭证完成数据同步。

目录

  1. 环境准备
  2. MySQL数据推送实现
  3. 达梦数据库数据推送实现
  4. 关键注意事项
  5. 总结

1. 环境准备

1.1 安装依赖库

# 安装MySQL驱动和定时任务库
pip install mysql-connector-python schedule

# 安装达梦数据库驱动和定时任务库
pip install dmPython schedule

2. MySQL数据推送实现

2.1 代码实现

import mysql.connector
import schedule
import time

# 我方多个数据库配置列表
source_db_configs = [
    {
        'user': '我方数据库用户名1',
        'password': '我方数据库密码1',
        'host': '我方数据库服务器 IP 地址1',
        'port': 3306,
        'database': '我方数据库名1'
    },
    {
        'user': '我方数据库用户名2',
        'password': '我方数据库密码2',
        'host': '我方数据库服务器 IP 地址2',
        'port': 3306,
        'database': '我方数据库名2'
    }
]

# 每个数据库对应的要推送的表名列表
table_names_dict = {
    '我方数据库名1': ['表名1', '表名2'],
    '我方数据库名2': ['表名3', '表名4']
}

# 对方数据库配置
target_db_config = {
    'user': '对方数据库用户名',
    'password': '对方数据库密码',
    'host': '对方数据库服务器 IP 地址',
    'port': 3306,
    'database': '对方数据库名'
}


def push_data():
    try:
        # 连接到对方数据库
        target_conn = mysql.connector.connect(**target_db_config)
        target_cursor = target_conn.cursor()

        for source_db_config in source_db_configs:
            source_db_name = source_db_config['database']
            # 连接到我方数据库
            source_conn = mysql.connector.connect(**source_db_config)
            source_cursor = source_conn.cursor()

            table_names = table_names_dict.get(source_db_name, [])
            for table_name in table_names:
                # 获取源表的创建语句
                source_cursor.execute(f"SHOW CREATE TABLE {table_name}")
                create_table_statement = source_cursor.fetchone()[1]

                # 在目标数据库创建表
                try:
                    target_cursor.execute(create_table_statement)
                    target_conn.commit()
                except mysql.connector.Error as err:
                    if err.errno == 1050:  # 表已存在
                        print(f"表 {table_name}{target_db_config['database']} 中已存在,跳过创建")
                    else:
                        print(f"在 {target_db_config['database']} 中创建表 {table_name} 失败: {err}")
                        continue

                # 从我方数据库查询数据
                source_cursor.execute(f"SELECT * FROM {table_name}")
                columns = [desc[0] for desc in source_cursor.description]
                rows = source_cursor.fetchall()

                # 构建插入 SQL 语句
                columns_str = ', '.join(columns)
                placeholders = ', '.join(['%s'] * len(columns))
                insert_query = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders})"

                # 向目标数据库插入数据
                try:
                    for row in rows:
                        target_cursor.execute(insert_query, row)
                    target_conn.commit()
                    print(f"表 {table_name} 的数据已成功推送到 {target_db_config['database']}")
                except mysql.connector.Error as err:
                    print(f"向 {target_db_config['database']} 中插入表 {table_name} 的数据失败: {err}")
                    target_conn.rollback()

            source_cursor.close()
            source_conn.close()

        target_cursor.close()
        target_conn.close()
        print("所有表结构和数据推送完成")

    except mysql.connector.Error as err:
        print(f"数据推送过程中出现错误: {err}")


# 每天 2:00 执行数据推送任务
schedule.every().day.at("02:00").do(push_data)

while True:
    schedule.run_pending()
    time.sleep(1)
    

2.2 代码说明

  1. 数据库配置:填写双方数据库的连接信息
  2. 数据同步逻辑
    • 使用executemany实现批量插入
    • 通过事务保证数据一致性
  3. 定时任务:使用schedule库实现简单定时调度

3. 达梦数据库数据推送实现

3.1 代码实现

import dmPython
import schedule
import time

# 配置源数据库(我方数据库)
source_db = {
    'user': 'source_user',
    'password': 'source_pwd',
    'server': 'source_ip',
    'port': 5236,
    'database': 'source_db'
}

# 配置目标数据库(对方数据库)
target_db = {
    'user': 'target_user',
    'password': 'target_pwd',
    'server': 'target_ip',
    'port': 5236,
    'database': 'target_db'
}

def sync_data():
    try:
        # 连接源数据库
        source_conn = dmPython.connect(**source_db)
        source_cursor = source_conn.cursor()
        
        # 连接目标数据库
        target_conn = dmPython.connect(**target_db)
        target_cursor = target_conn.cursor()
        
        # 同步指定表数据
        table = "your_table"
        source_cursor.execute(f"SELECT * FROM {table}")
        columns = [desc[0] for desc in source_cursor.description]
        rows = source_cursor.fetchall()
        
        # 批量插入数据
        insert_sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({', '.join(['%s']*len(columns))})"
        target_cursor.executemany(insert_sql, rows)
        target_conn.commit()
        
        print(f"成功同步{len(rows)}条数据")
        
    except dmPython.Error as e:
        print(f"同步失败:{str(e)}")
    finally:
        source_cursor.close()
        source_conn.close()
        target_cursor.close()
        target_conn.close()

# 定时任务配置(每天凌晨2点执行)
schedule.every().day.at("02:00").do(sync_data)

# 保持程序运行
while True:
    schedule.run_pending()
    time.sleep(1)

4. 关键注意事项

  1. 表结构一致性

    • 确保双方数据库表结构完全一致(字段名、类型、顺序)
    • 建议使用SHOW CREATE TABLE命令对比表结构
  2. 权限管理

    • 源数据库用户需具备SELECT权限
    • 目标数据库用户需具备INSERT权限
    -- 示例授权语句
    GRANT SELECT ON source_db.table TO 'source_user';
    GRANT INSERT ON target_db.table TO 'target_user';
    
  3. 网络安全

    • 使用VPN或专线保障数据传输安全
    • 避免在代码中明文存储密码(推荐使用配置文件+加密)
  4. 性能优化

    • 大数据量建议分批次提交(如每1000条提交一次)
    • 使用LOAD DATA INFILE等高效导入方式
  5. 错误处理

    • 添加详细日志记录
    • 实现重试机制(如tenacity库)

5. 总结

本文通过Python实现了跨数据库定时数据推送,核心步骤包括:

  1. 环境准备与依赖安装
  2. 数据库连接配置
  3. 数据同步逻辑实现
  4. 定时任务调度

实际应用中需根据具体场景调整:

  • 调整schedule定时表达式(支持cron语法)
  • 添加数据校验和去重逻辑
  • 实现数据增量同步(通过时间戳或版本号)

您可以根据实际需要调整代码示例中的变量名和格式,建议补充实际应用中的错误处理和日志记录代码。

原文地址:https://blog.csdn.net/Kun_112114/article/details/146467625
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/599364.html

相关文章:

  • 我的世界1.20.1forge模组进阶开发教程——Alex‘s mob的深入研究
  • 蓝桥杯 - 简单 - 布局切换
  • 【时时三省】(C语言基础)选择结构和条件判断
  • 用免费的github的key调用gpt实现一个简单的rag自动打分评测系统,不用任何框架
  • Docker 数据卷管理
  • 模型搭建与复现
  • 同旺科技USB to SPI 适配器 ---- 指令循环发送功能
  • 从指令集鸿沟到硬件抽象:AI 如何重塑手机与电脑编程语言差异——PanLang 原型全栈设计方案与实验性探索1
  • 基于SpringBoot的“社区居民诊疗健康管理系统”的设计与实现(源码+数据库+文档+PPT)
  • tcl语法中的命令
  • word中指定页面开始添加页码
  • 深度拆解:AI Agent发展演练·数字挑战
  • xss-labs
  • C++STL(四):stack和queue的模拟实现
  • python如何提取html中所有的图片链接
  • qt介绍自定义插件 三
  • 为什么后端接口返回数字类型1.00前端会取到1?
  • RAG优化:python从零实现自适应检索增强Adaptive Retrieval
  • Excel中如何自动计算累计销量,当具体销量为空时公式自动不计算
  • NVIDIA V100显卡支持Tensor Core技术,而Granite-3.1-8B模型在适当的条件下可以利用Tensor Core来加速数据处理