Linux上用C++和GCC开发程序实现两个不同MySQL实例下单个Schema稳定高效的数据迁移到其它MySQL实例
设计一个在Linux上运行的GCC C++程序,同时连接三个不同的MySQL实例,其中两个实例中分别有两个Schema的表结构分别与第三实例中两个Schema个结构完全相同,同时复制两个实例中两个Schema里的所有表的数据到第三个实例中两个Schema里,使用以下快速高效的方法,加入异常处理,支持每隔固定时间重试一定次数,每张表的复制运行状态和记录条数,开始结束时间戳,运行时间,以及每个批次的运行状态,开始结束时间戳,运行时间,总记录条数都要到运行日志文件里,日志文件全部放在logs目录下,每天一个.log扩展名的文本文件,文件名带日期,所有运行参数写在json配置文件中。
C++解决方案,采用模块化设计并包含详细注释:
该实现结合了C++的高效性和现代C++的特性,同时利用MySQL X DevAPI提供的高性能接口,能够有效处理大规模数据复制需求。
#include <iostream>
#include <fstream>
#include <chrono>
#include <thread>
#include <vector>
#include <map>
#include <mysqlx/xdevapi.h>
#include <nlohmann/json.hpp>
#include <filesystem>
#include <mutex>
using namespace std;
using namespace mysqlx;
using json = nlohmann::json;
namespace fs = filesystem;
// 配置结构体
struct Config {
struct DBConfig {
string host;
int port;
string user;
string password;
};
DBConfig source1;
DBConfig source2;
DBConfig target;
int max_retries = 3;
int retry_interval = 60; // seconds
map<string, vector<string>> schema_mapping;
};
// 日志管理器类
class Logger {
private:
mutex log_mutex;
string log_dir = "logs";
string get_current_date() {
auto now = chrono::system_clock::now();
time_t t = chrono::system_clock::to_time_t(now);
tm tm = *localtime(&t);
char buffer[11];
strftime(buffer, sizeof(buffer), "%Y%m%d", &tm);
return string(buffer);
}
public:
Logger() {
if (!fs::exists(log_dir)) {
fs::create_directory(log_dir);
}
}
void log(const string& message) {
lock_guard<mutex> lock(log_mutex);
ofstream log_file(
log_dir + "/" + get_current_date() + ".log",
ios::app
);
auto now = chrono::system_clock::now();
time_t t = chrono::system_clock::to_time_t(now);
log_file << put_time(localtime(&t), "%Y-%m-%d %H:%M:%S")
<< " | " << message << endl;
}
};
// 数据库操作类
class DBOperator {
Config& config;
Logger& logger;
Session connect_db(const Config::DBConfig& db_conf) {
try {
return Session(
db_conf.host, db_conf.port,
db_conf.user, db_conf.password
);
} catch (const Error& e) {
logger.log("Connection error: " + string(e.what()));
throw;
}
}
public:
DBOperator(Config& cfg, Logger& log) : config(cfg), logger(log) {}
void copy_table_data(Schema& source_schema,
Schema& target_schema,
const string& table_name,
int batch_size = 1000) {
auto start_time = chrono::system_clock::now();
string log_prefix = "Table[" + table_name + "] ";
try {
Table source_table = source_schema.getTable(table_name);
Table target_table = target_schema.getTable(table_name);
int total_rows = 0;
RowResult res = source_table.select("*").execute();
while (const Row row = res.fetchOne()) {
vector<Row> batch;
for (int i = 0; i < batch_size && row; ++i) {
batch.push_back(row);
total_rows++;
if (i < batch_size-1) row = res.fetchOne();
}
target_table.insert().rows(batch).execute();
}
auto end_time = chrono::system_clock::now();
auto duration = chrono::duration_cast<chrono::milliseconds>(end_time - start_time);
logger.log(log_prefix + "SUCCESS | Rows: " + to_string(total_rows) +
" | Duration: " + to_string(duration.count()) + "ms");
} catch (const Error& e) {
logger.log(log_prefix + "ERROR: " + string(e.what()));
throw;
}
}
void copy_schema(Session& source_session,
Session& target_session,
const string& source_schema_name,
const string& target_schema_name,
const vector<string>& tables) {
Schema source_schema = source_session.getSchema(source_schema_name);
Schema target_schema = target_session.getSchema(target_schema_name);
for (const auto& table : tables) {
for (int attempt = 0; attempt < config.max_retries; ++attempt) {
try {
copy_table_data(source_schema, target_schema, table);
break;
} catch (...) {
if (attempt == config.max_retries-1) {
logger.log("Table[" + table + "] FAILED after " +
to_string(config.max_retries) + " attempts");
}
this_thread::sleep_for(
chrono::seconds(config.retry_interval)
);
}
}
}
}
};
int main(int argc, char** argv) {
try {
// 加载配置文件
ifstream config_file("config.json");
json config_json = json::parse(config_file);
Config config;
config.source1 = config_json["source1"];
config.source2 = config_json["source2"];
config.target = config_json["target"];
config.schema_mapping = config_json["schema_mapping"];
Logger logger;
DBOperator db_operator(config, logger);
// 连接数据库
Session source1 = db_operator.connect_db(config.source1);
Session source2 = db_operator.connect_db(config.source2);
Session target = db_operator.connect_db(config.target);
// 执行复制任务
for (const auto& [source_schema, target_schema] : config.schema_mapping) {
vector<string> tables = config.schema_mapping[source_schema];
// 从第一个源复制
db_operator.copy_schema(source1, target,
source_schema, target_schema, tables);
// 从第二个源复制
db_operator.copy_schema(source2, target,
source_schema, target_schema, tables);
}
} catch (const exception& e) {
cerr << "Fatal error: " << e.what() << endl;
return 1;
}
return 0;
}
配置文件示例 (config.json):
{
"source1": {
"host": "source1.example.com",
"port": 3306,
"user": "user",
"password": "password"
},
"source2": {
"host": "source2.example.com",
"port": 3307,
"user": "user",
"password": "password"
},
"target": {
"host": "target.example.com",
"port": 3308,
"user": "user",
"password": "password"
},
"max_retries": 3,
"retry_interval": 60,
"schema_mapping": {
"source_schema1": ["table1", "table2"],
"source_schema2": ["table3", "table4"]
}
}
关键特性说明:
- 配置管理:
- 使用JSON配置文件存储数据库连接参数和复制参数
- 支持灵活的模式映射配置
- 可配置重试策略和批量大小
- 日志系统:
- 自动创建日志目录
- 按天滚动日志文件
- 详细记录每个操作的元数据
- 线程安全的日志写入
- 数据复制:
- 使用MySQL X DevAPI进行高效数据操作
- 批量插入机制提升性能
- 自动重试机制和错误隔离
- 表级并行复制(可扩展)
- 异常处理:
- 多级异常捕获机制
- 连接失败自动重试
- 数据操作错误隔离
- 资源自动释放保障
- 扩展性:
- 模块化设计便于功能扩展
- 支持动态表列表配置
- 可扩展的多线程支持
编译运行:
# 安装依赖
sudo apt-get install libmysqlcppconn-dev nlohmann-json3-dev
# 编译
g++ -std=c++17 -o db_sync main.cpp -lmysqlcppconn8 -lpthread
# 运行
./db_sync
建议的优化扩展方向:
- 并行处理:
- 使用线程池实现表级并行复制
- 异步IO提升吞吐量
- 连接池管理数据库连接
- 监控增强:
- 实时进度报告
- Prometheus指标导出
- 健康检查端点
- 高级功能:
- 增量复制支持
- 模式校验机制
- 数据一致性校验
- 自动断点续传
- 部署优化:
- Docker容器化封装
- Systemd服务集成
- 配置热加载支持