跨数据库定时数据推送实战
跨数据库定时数据推送实战指南(Python实现)
摘要
本文介绍如何通过Python实现跨服务器数据库定时数据推送,支持MySQL和达梦数据库,无需对方编写接收代码,直接通过数据库凭证完成数据同步。
目录
- 环境准备
- MySQL数据推送实现
- 达梦数据库数据推送实现
- 关键注意事项
- 总结
1. 环境准备
1.1 安装依赖库
# 安装MySQL驱动和定时任务库
pip install mysql-connector-python schedule
# 安装达梦数据库驱动和定时任务库
pip install dmPython schedule
2. MySQL数据推送实现
2.1 代码实现
import mysql.connector
import schedule
import time
# 我方多个数据库配置列表
source_db_configs = [
{
'user': '我方数据库用户名1',
'password': '我方数据库密码1',
'host': '我方数据库服务器 IP 地址1',
'port': 3306,
'database': '我方数据库名1'
},
{
'user': '我方数据库用户名2',
'password': '我方数据库密码2',
'host': '我方数据库服务器 IP 地址2',
'port': 3306,
'database': '我方数据库名2'
}
]
# 每个数据库对应的要推送的表名列表
table_names_dict = {
'我方数据库名1': ['表名1', '表名2'],
'我方数据库名2': ['表名3', '表名4']
}
# 对方数据库配置
target_db_config = {
'user': '对方数据库用户名',
'password': '对方数据库密码',
'host': '对方数据库服务器 IP 地址',
'port': 3306,
'database': '对方数据库名'
}
def push_data():
try:
# 连接到对方数据库
target_conn = mysql.connector.connect(**target_db_config)
target_cursor = target_conn.cursor()
for source_db_config in source_db_configs:
source_db_name = source_db_config['database']
# 连接到我方数据库
source_conn = mysql.connector.connect(**source_db_config)
source_cursor = source_conn.cursor()
table_names = table_names_dict.get(source_db_name, [])
for table_name in table_names:
# 获取源表的创建语句
source_cursor.execute(f"SHOW CREATE TABLE {table_name}")
create_table_statement = source_cursor.fetchone()[1]
# 在目标数据库创建表
try:
target_cursor.execute(create_table_statement)
target_conn.commit()
except mysql.connector.Error as err:
if err.errno == 1050: # 表已存在
print(f"表 {table_name} 在 {target_db_config['database']} 中已存在,跳过创建")
else:
print(f"在 {target_db_config['database']} 中创建表 {table_name} 失败: {err}")
continue
# 从我方数据库查询数据
source_cursor.execute(f"SELECT * FROM {table_name}")
columns = [desc[0] for desc in source_cursor.description]
rows = source_cursor.fetchall()
# 构建插入 SQL 语句
columns_str = ', '.join(columns)
placeholders = ', '.join(['%s'] * len(columns))
insert_query = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders})"
# 向目标数据库插入数据
try:
for row in rows:
target_cursor.execute(insert_query, row)
target_conn.commit()
print(f"表 {table_name} 的数据已成功推送到 {target_db_config['database']}")
except mysql.connector.Error as err:
print(f"向 {target_db_config['database']} 中插入表 {table_name} 的数据失败: {err}")
target_conn.rollback()
source_cursor.close()
source_conn.close()
target_cursor.close()
target_conn.close()
print("所有表结构和数据推送完成")
except mysql.connector.Error as err:
print(f"数据推送过程中出现错误: {err}")
# 每天 2:00 执行数据推送任务
schedule.every().day.at("02:00").do(push_data)
while True:
schedule.run_pending()
time.sleep(1)
2.2 代码说明
- 数据库配置:填写双方数据库的连接信息
- 数据同步逻辑:
- 使用
executemany
实现批量插入 - 通过事务保证数据一致性
- 使用
- 定时任务:使用
schedule
库实现简单定时调度
3. 达梦数据库数据推送实现
3.1 代码实现
import dmPython
import schedule
import time
# 配置源数据库(我方数据库)
source_db = {
'user': 'source_user',
'password': 'source_pwd',
'server': 'source_ip',
'port': 5236,
'database': 'source_db'
}
# 配置目标数据库(对方数据库)
target_db = {
'user': 'target_user',
'password': 'target_pwd',
'server': 'target_ip',
'port': 5236,
'database': 'target_db'
}
def sync_data():
try:
# 连接源数据库
source_conn = dmPython.connect(**source_db)
source_cursor = source_conn.cursor()
# 连接目标数据库
target_conn = dmPython.connect(**target_db)
target_cursor = target_conn.cursor()
# 同步指定表数据
table = "your_table"
source_cursor.execute(f"SELECT * FROM {table}")
columns = [desc[0] for desc in source_cursor.description]
rows = source_cursor.fetchall()
# 批量插入数据
insert_sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({', '.join(['%s']*len(columns))})"
target_cursor.executemany(insert_sql, rows)
target_conn.commit()
print(f"成功同步{len(rows)}条数据")
except dmPython.Error as e:
print(f"同步失败:{str(e)}")
finally:
source_cursor.close()
source_conn.close()
target_cursor.close()
target_conn.close()
# 定时任务配置(每天凌晨2点执行)
schedule.every().day.at("02:00").do(sync_data)
# 保持程序运行
while True:
schedule.run_pending()
time.sleep(1)
4. 关键注意事项
-
表结构一致性
- 确保双方数据库表结构完全一致(字段名、类型、顺序)
- 建议使用
SHOW CREATE TABLE
命令对比表结构
-
权限管理
- 源数据库用户需具备
SELECT
权限 - 目标数据库用户需具备
INSERT
权限
-- 示例授权语句 GRANT SELECT ON source_db.table TO 'source_user'; GRANT INSERT ON target_db.table TO 'target_user';
- 源数据库用户需具备
-
网络安全
- 使用VPN或专线保障数据传输安全
- 避免在代码中明文存储密码(推荐使用配置文件+加密)
-
性能优化
- 大数据量建议分批次提交(如每1000条提交一次)
- 使用
LOAD DATA INFILE
等高效导入方式
-
错误处理
- 添加详细日志记录
- 实现重试机制(如
tenacity
库)
5. 总结
本文通过Python实现了跨数据库定时数据推送,核心步骤包括:
- 环境准备与依赖安装
- 数据库连接配置
- 数据同步逻辑实现
- 定时任务调度
实际应用中需根据具体场景调整:
- 调整
schedule
定时表达式(支持cron语法) - 添加数据校验和去重逻辑
- 实现数据增量同步(通过时间戳或版本号)
您可以根据实际需要调整代码示例中的变量名和格式,建议补充实际应用中的错误处理和日志记录代码。
原文地址:https://blog.csdn.net/Kun_112114/article/details/146467625
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/599364.html 如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/599364.html 如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!