不同数据库进行同步和增量数据(SQL server 与MySQL数据库为例)
场景
最近在做的一个项目需要将远程服务器的SQL server数据库中表的数据传输到本机的MySQL数据库中,并且远程的SQL server数据库表的数据会实时进行更新,并且差不多是一分钟内传输18条数据,例如现在是2023-12-4 15:09,在15:08这个时间内有18条数据需要首先进SQL server数据库,再更新到MySQL数据库中,这种场景如果每分钟都能将18条数据放入SQL server数据库的话就非常简单了,但是在15:08的时候,这18条数据可能只来11条,剩下的7条可能在15:09或后面的时间陆续过来。我开始的想法是通过最后更新的时间的时间戳来查询新来的数据然后更新到MySQL中,但是由于在最终的时间内还会来前面时间的数据,这样会导致前面时间的数据丢失,所以我想了另外一方法。
- 首先使用python写一个程序来同步SQL sever的历史数据到MySQL数据库中
- 在SQL server中创建一个中间表。
- 在SQL server中要传输的表中创建一个触发器,当这个表更新数据则触发将更新的数据放入到中间表中
- 在python脚本中写一个循环来定期检查中间表,我的SQL server表中由两个主键定义一条数据,所以中间表也是由两个字段定义一条数据,由于入库历史数据的数据量非常大,有几十万条,在这个入库历史数据的时间段内更新了很多条数据,所以可能中间表的数据与入库到MySQL中的字段有重复,所以我需要先验证中间表中的数据MySQL是否存在。
- 存在则删除中间表中这条数据
- 不存在则插入MySQL后删除这条数据
- 最后完成了入库程序,经过验证没有数据丢失
1.历史数据入库
历史数据入库我使用的python写的,首先定义两个数据库的信息
# 使用示例
sql_server_conn_params = {
'driver': '{SQL Server}',
'server': 'ip',
'database': '数据库名',
'uid': 'jzyg',
'pwd': ''
}
mysql_conn_params = {
'host': 'localhost',
'user': 'root',
'password': '123456',
'database': '数据库名',
'charset': 'utf8mb4'
}
定义查询语句
querySolar = 'SELECT dtime,stationID,staionName,electric,tiltSolar,levelSolar,scatterSolar,directSolar,tiltSolar_day,levelSolar_day,scatterSolar_day,directSolar_day,sunShine_day FROM realData_Solar'
定义入库历史数据函数
def transfer_wind_data(self):
# 连接到 SQL Server
with pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:
with sql_server_conn.cursor() as sql_server_cursor:
# 修改查询,仅选择上次同步后的数据
modified_query = self.queryWind + " WHERE dtime >= ? ORDER BY dtime"
sql_server_cursor.execute(modified_query, (self.wind_last_dtime,))
rows = sql_server_cursor.fetchall()
if not rows:
return # 没有新数据
# 连接到 MySQL
with pymysql.connect(**self.mysql_conn_params) as mysql_conn:
with mysql_conn.cursor() as mysql_cursor:
data_list = []
for row in rows:
observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")
fsz_id = row[1].replace('"', "").strip()
station_name = row[2]
farmName = station_name.split("-")[0]
# 根据电站名查询电站号
sql = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"
mysql_cursor.execute(sql, ('%' + farmName + '%',))
result = mysql_cursor.fetchone()
farm_id = None
if result is not None:
farm_id = result
# 处理查询结果为空的情况
if farm_id is not None:
farm_id = farm_id[0]
staion_name = row[2]
wind_direction_instant = row[3]
wind_speed_instant = row[4]
wind_speed_two_min = row[5]
wind_speed_ten_min = row[6]
data = (observe_time,fsz_id,staion_name,farm_id,wind_direction_instant,
wind_speed_instant,wind_speed_two_min,wind_speed_ten_min)
data_list.append(data)
self.wind_last_dtime = row[0].strftime("%Y-%m-%d %H:%M:%S")
if data_list:
result = mysql_cursor.executemany('INSERT INTO wind_monitor'
'(observe_time,fsz_id,station_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min) '
'VALUES (%s,%s,%s,%s,%s,%s,%s,%s)', data_list) # 根据你的表结构修改
mysql_conn.commit()
print(f'wind_monitor表插入了{result}行数据.' if result else '没有新数据插入。')
2.创建中间表和触发器
创建中间表
CREATE TABLE intermediateData_Wind AS SELECT * FROM realData_Wind WHERE 1=0;
创建触发器
CREATE TRIGGER CopyToIntermediateTable
ON realData_Wind
AFTER INSERT
AS
BEGIN
-- 插入操作
INSERT INTO intermediateData_Wind (dtime, stationID, staionName, windDirectionInstant, windSpeedInstant, windSpeed2min, windSpeed10min)
SELECT dtime, stationID, staionName, windDirectionInstant, windSpeedInstant, windSpeed2min, windSpeed10min
FROM inserted;
END;
3.创建轮询中间表代码
def transfer_insert_intermediateData_Wind(self):
# 连接到 SQL Server
with pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:
with sql_server_conn.cursor() as sql_server_cursor:
# 查询中间表中所有数据
sql_server_cursor.execute(self.queryIntermediateData)
intermediate_rows = sql_server_cursor.fetchall()
# 用于跟踪删除和插入的数量
deleted_count = 0
inserted_count = 0
# 连接到 MySQL
with pymysql.connect(**self.mysql_conn_params) as mysql_conn:
with mysql_conn.cursor() as mysql_cursor:
for row in intermediate_rows:
observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")
fsz_id = row[1].replace('"', "").strip()
# 检查wind_monitor表中是否存在相同数据
check_query = "SELECT COUNT(*) FROM wind_monitor WHERE observe_time = %s AND fsz_id = %s"
mysql_cursor.execute(check_query, (observe_time, fsz_id))
count = mysql_cursor.fetchone()[0]
dtime = row[0].strftime("%Y-%m-%d %H:%M:00.000")
if count > 0:
# 数据存在,从中间表删除
delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"
sql_server_cursor.execute(delete_query, (dtime, row[1]))
sql_server_conn.commit()
deleted_count += 1
else:
# 数据不存在,插入到wind_monitor并从中间表删除
station_name = row[2]
farmName = station_name.split("-")[0]
# 根据电站名查询电站号
farm_query = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"
mysql_cursor.execute(farm_query, ('%' + farmName + '%',))
farm_result = mysql_cursor.fetchone()
farm_id = farm_result[0] if farm_result else None
wind_direction_instant = row[3]
wind_speed_instant = row[4]
wind_speed_two_min = row[5]
wind_speed_ten_min = row[6]
insert_data = (observe_time, fsz_id, station_name, farm_id, wind_direction_instant,
wind_speed_instant, wind_speed_two_min, wind_speed_ten_min)
insert_query = 'INSERT INTO wind_monitor (observe_time, fsz_id, station_name, farm_id, wind_direction_instant, wind_speed_instant, wind_speed_two_min, wind_speed_ten_min) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
mysql_cursor.execute(insert_query, insert_data)
mysql_conn.commit()
inserted_count += 1
delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"
sql_server_cursor.execute(delete_query, (dtime, row[1]))
sql_server_conn.commit()
# 打印删除和插入的数据统计
print(f"从intermediateData_Wind表中删除了{deleted_count}条数据.")
print(f"向wind_monitor表中插入了{inserted_count}条数据.")
4.总体代码
import threading
import time
import pyodbc
import pymysql
class DataTransfer:
def __init__(self, sql_server_conn_params, mysql_conn_params, queryWind,queryIntermediateData, interval=1):
self.sql_server_conn_params = sql_server_conn_params
self.mysql_conn_params = mysql_conn_params
self.queryWind = queryWind
self.queryIntermediateData = queryIntermediateData
self.interval = interval
self.wind_last_dtime = '1970-01-01 00:00:00' # 初始时间
def clear_mysql_tables(self):
"""清空 MySQL 中的指定表格数据"""
try:
with pymysql.connect(**self.mysql_conn_params) as mysql_conn:
with mysql_conn.cursor() as cursor:
# 清空 wind_monitor 表
cursor.execute("TRUNCATE TABLE wind_monitor")
mysql_conn.commit()
print("已清空 wind_monitor 表的数据。")
except Exception as e:
print(f"清空表格时发生错误: {e}")
def transfer_data(self):
self.transfer_wind_data()
while True:
try:
self.transfer_insert_intermediateData_Wind()
except Exception as e:
print(f"发生错误: {e}")
# 等待一定时间再次传输数据
time.sleep(self.interval)
def transfer_insert_intermediateData_Wind(self):
# 连接到 SQL Server
with pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:
with sql_server_conn.cursor() as sql_server_cursor:
# 查询中间表中所有数据
sql_server_cursor.execute(self.queryIntermediateData)
intermediate_rows = sql_server_cursor.fetchall()
# 用于跟踪删除和插入的数量
deleted_count = 0
inserted_count = 0
# 连接到 MySQL
with pymysql.connect(**self.mysql_conn_params) as mysql_conn:
with mysql_conn.cursor() as mysql_cursor:
for row in intermediate_rows:
observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")
fsz_id = row[1].replace('"', "").strip()
# 检查wind_monitor表中是否存在相同数据
check_query = "SELECT COUNT(*) FROM wind_monitor WHERE observe_time = %s AND fsz_id = %s"
mysql_cursor.execute(check_query, (observe_time, fsz_id))
count = mysql_cursor.fetchone()[0]
dtime = row[0].strftime("%Y-%m-%d %H:%M:00.000")
if count > 0:
# 数据存在,从中间表删除
delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"
sql_server_cursor.execute(delete_query, (dtime, row[1]))
sql_server_conn.commit()
deleted_count += 1
else:
# 数据不存在,插入到wind_monitor并从中间表删除
station_name = row[2]
farmName = station_name.split("-")[0]
# 根据电站名查询电站号
farm_query = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"
mysql_cursor.execute(farm_query, ('%' + farmName + '%',))
farm_result = mysql_cursor.fetchone()
farm_id = farm_result[0] if farm_result else None
wind_direction_instant = row[3]
wind_speed_instant = row[4]
wind_speed_two_min = row[5]
wind_speed_ten_min = row[6]
insert_data = (observe_time, fsz_id, station_name, farm_id, wind_direction_instant,
wind_speed_instant, wind_speed_two_min, wind_speed_ten_min)
insert_query = 'INSERT INTO wind_monitor (observe_time, fsz_id, station_name, farm_id, wind_direction_instant, wind_speed_instant, wind_speed_two_min, wind_speed_ten_min) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
mysql_cursor.execute(insert_query, insert_data)
mysql_conn.commit()
inserted_count += 1
delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"
sql_server_cursor.execute(delete_query, (dtime, row[1]))
sql_server_conn.commit()
# 打印删除和插入的数据统计
print(f"从intermediateData_Wind表中删除了{deleted_count}条数据.")
print(f"向wind_monitor表中插入了{inserted_count}条数据.")
def transfer_wind_data(self):
# 连接到 SQL Server
with pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:
with sql_server_conn.cursor() as sql_server_cursor:
# 修改查询,仅选择上次同步后的数据
modified_query = self.queryWind + " WHERE dtime >= ? ORDER BY dtime"
sql_server_cursor.execute(modified_query, (self.wind_last_dtime,))
rows = sql_server_cursor.fetchall()
if not rows:
return # 没有新数据
# 连接到 MySQL
with pymysql.connect(**self.mysql_conn_params) as mysql_conn:
with mysql_conn.cursor() as mysql_cursor:
data_list = []
for row in rows:
observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")
fsz_id = row[1].replace('"', "").strip()
station_name = row[2]
farmName = station_name.split("-")[0]
# 根据电站名查询电站号
sql = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"
mysql_cursor.execute(sql, ('%' + farmName + '%',))
result = mysql_cursor.fetchone()
farm_id = None
if result is not None:
farm_id = result
# 处理查询结果为空的情况
if farm_id is not None:
farm_id = farm_id[0]
staion_name = row[2]
wind_direction_instant = row[3]
wind_speed_instant = row[4]
wind_speed_two_min = row[5]
wind_speed_ten_min = row[6]
data = (observe_time,fsz_id,staion_name,farm_id,wind_direction_instant,
wind_speed_instant,wind_speed_two_min,wind_speed_ten_min)
data_list.append(data)
self.wind_last_dtime = row[0].strftime("%Y-%m-%d %H:%M:%S")
if data_list:
result = mysql_cursor.executemany('INSERT INTO wind_monitor'
'(observe_time,fsz_id,station_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min) '
'VALUES (%s,%s,%s,%s,%s,%s,%s,%s)', data_list) # 根据你的表结构修改
mysql_conn.commit()
print(f'wind_monitor表插入了{result}行数据.' if result else '没有新数据插入。')
def start(self):
# 在启动线程前先清空表格
self.clear_mysql_tables()
thread = threading.Thread(target=self.transfer_data)
thread.start()
sql_server_conn_params = {
'driver': '{SQL Server}',
'server': '',
'database': '',
'uid': '',
'pwd': ''
}
mysql_conn_params = {
'host': 'localhost',
'user': 'root',
'password': '123456',
'database': '',
'charset': 'utf8mb4'
}
queryIntermediateData = "SELECT dtime,stationID,staionName,windDirectionInstant,windSpeedInstant,windSpeed2min,windSpeed10min FROM intermediateData_Wind"
queryWind = 'SELECT dtime,stationID,staionName,windDirectionInstant,windSpeedInstant,windSpeed2min,windSpeed10min FROM realData_Wind'
data_transfer = DataTransfer(sql_server_conn_params, mysql_conn_params, queryWind,queryIntermediateData)
data_transfer.start()