Linux上用C++和GCC开发程序实现不同MySQL实例下单个Schema之间的稳定高效的数据迁移
设计一个在Linux上运行的GCC C++程序,同时连接两个不同的MySQL实例,两个实例中分别有两个Schema的表结构完全相同,复制一个实例中一个Schema里的所有表的数据到另一个实例中一个Schema里,使用以下快速高效的方法,加入异常处理,支持每隔固定时间重试一定次数,每张表的复制运行状态和记录条数,开始结束时间戳,运行时间,以及每个批次的运行状态,开始结束时间戳,运行时间,总记录条数都要到运行日志文件里,日志文件全部放在logs目录下,每天一个.log扩展名的文本文件,文件名带日期,所有运行参数写在json配置文件中。
C++程序分步解决方案:
文件结构
├── config.json # 配置文件
├── include/ # 头文件目录
│ ├── DatabaseManager.h
│ ├── Logger.h
│ └── DataReplicator.h
├── src/ # 源文件目录
│ ├── main.cpp
│ ├── DatabaseManager.cpp
│ ├── Logger.cpp
│ └── DataReplicator.cpp
└── CMakeLists.txt # 编译配置
1. 配置文件 (config.json)
{
"source": {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"password": "source_pass",
"schema": "source_db"
},
"target": {
"host": "127.0.0.1",
"port": 3307,
"user": "root",
"password": "target_pass",
"schema": "target_db"
},
"retry": {
"max_attempts": 3,
"interval_seconds": 5
},
"batch_size": 1000,
"log_dir": "./logs"
}
2. 日志类 (Logger.h/Logger.cpp)
// Logger.h
#include <fstream>
#include <string>
#include <mutex>
#include <chrono>
class Logger {
public:
static Logger& getInstance();
void log(const std::string& message);
private:
Logger();
~Logger();
std::ofstream logFile;
std::mutex mtx;
std::string getCurrentDate();
std::string getCurrentTimestamp();
void openNewLogFile();
};
3. 数据库管理类 (DatabaseManager.h/DatabaseManager.cpp)
// DatabaseManager.h
#include <mysqlx/xdevapi.h>
class DatabaseManager {
public:
DatabaseManager(const mysqlx::SessionSettings& settings);
mysqlx::Session& getSession();
mysqlx::Schema getSchema(const std::string& name);
private:
mysqlx::Session session;
};
4. 数据复制核心类 (DataReplicator.h/DataReplicator.cpp)
// DataReplicator.h
#include <vector>
#include <string>
#include <functional>
class DataReplicator {
public:
struct TableStats {
std::string table_name;
size_t total_records = 0;
std::chrono::system_clock::time_point start_time;
std::chrono::system_clock::time_point end_time;
bool success = false;
};
void replicateAllTables();
private:
std::vector<std::string> getSourceTables();
void replicateTable(const std::string& tableName);
void batchInsert(mysqlx::Table& source, mysqlx::Table& target, int batchSize);
void withRetry(std::function<void()> operation, const std::string& context);
};
5. 主程序 (main.cpp)
#include "DatabaseManager.h"
#include "DataReplicator.h"
#include "Logger.h"
#include <nlohmann/json.hpp>
using json = nlohmann::json;
json loadConfig() {
// 实现配置文件加载
}
int main() {
auto config = loadConfig();
DatabaseManager sourceManager(createSettings(config["source"]));
DatabaseManager targetManager(createSettings(config["target"]));
DataReplicator replicator(sourceManager, targetManager, config);
replicator.replicateAllTables();
return 0;
}
6. 关键实现细节
- 批量数据复制:
void DataReplicator::batchInsert(mysqlx::Table& source, mysqlx::Table& target, int batchSize) {
auto result = source.select("*").execute();
std::vector<mysqlx::Row> buffer;
while (auto row = result.fetchOne()) {
buffer.push_back(row);
if (buffer.size() >= batchSize) {
target.insert().values(buffer).execute();
buffer.clear();
}
}
if (!buffer.empty()) {
target.insert().values(buffer).execute();
}
}
- 带重试的逻辑:
void DataReplicator::withRetry(std::function<void()> operation, const std::string& context) {
int attempts = 0;
while (attempts < maxRetries) {
try {
operation();
return;
} catch (const mysqlx::Error& e) {
Logger::getInstance().log("Error: " + std::string(e.what()));
if (++attempts >= maxRetries) throw;
std::this_thread::sleep_for(retryInterval);
}
}
}
- 日志记录实现:
void Logger::log(const std::string& message) {
std::lock_guard<std::mutex> lock(mtx);
if (logFile.tellp() == 0 || getCurrentDate() != currentDate) {
openNewLogFile();
}
logFile << "[" << getCurrentTimestamp() << "] " << message << std::endl;
}
7. 编译配置 (CMakeLists.txt)
cmake_minimum_required(VERSION 3.10)
project(DataReplicator)
set(CMAKE_CXX_STANDARD 17)
find_package(MySQLCppConn REQUIRED)
find_package(nlohmann_json REQUIRED)
include_directories(
include
${MySQLCppConn_INCLUDE_DIRS}
)
add_executable(data_replicator
src/main.cpp
src/DatabaseManager.cpp
src/Logger.cpp
src/DataReplicator.cpp
)
target_link_libraries(data_replicator
PRIVATE MySQLCppConn::mysqlcppconn
PRIVATE nlohmann_json::nlohmann_json
pthread ssl crypto
)
8. 运行方式
mkdir -p build && cd build
cmake ..
make
./data_replicator
9. 日志示例
[2023-10-05 14:30:45] [users] STARTED
[2023-10-05 14:30:47] [users] BATCH 1-1000 inserted (1.2s)
[2023-10-05 14:30:49] [users] COMPLETED 1500 records (4.5s)
[2023-10-05 14:30:50] [products] FAILED: Duplicate entry 'XYZ' (Attempt 1/3)
[2023-10-05 14:30:55] [products] RETRY SUCCESS (2nd attempt)
优化建议
- 并行处理:
std::vector<std::future<void>> futures;
for (auto& table : tables) {
futures.push_back(std::async(std::launch::async, [&](){
replicateTable(table);
}));
}
for (auto& f : futures) f.wait();
- 索引优化:
-- 复制前禁用索引
ALTER TABLE target_table DISABLE KEYS;
-- 复制后重建索引
ALTER TABLE target_table ENABLE KEYS;
- 内存管理:
- 使用智能指针管理数据库连接
- 限制同时打开的连接数
- 实现连接池机制
这个解决方案实现了:
- 配置驱动的数据库连接
- 带重试机制的可靠数据传输
- 详细的运行日志记录
- 批量处理优化
- 异常安全保证
- 可扩展的架构设计
完整的C++代码实现,包括各个头文件和源文件以及 CMakeLists.txt 的完整内容:
请确保安装了MySQL C++ Connector和 nlohmann_json 库,并按照运行方式中的步骤进行编译和运行。
include/DatabaseManager.h
#include <mysqlx/xdevapi.h>
class DatabaseManager {
public:
DatabaseManager(const mysqlx::SessionSettings& settings);
mysqlx::Session& getSession();
mysqlx::Schema getSchema(const std::string& name);
private:
mysqlx::Session session;
};
include/Logger.h
#include <fstream>
#include <string>
#include <mutex>
#include <chrono>
class Logger {
public:
static Logger& getInstance();
void log(const std::string& message);
private:
Logger();
~Logger();
std::ofstream logFile;
std::mutex mtx;
std::string getCurrentDate();
std::string getCurrentTimestamp();
void openNewLogFile();
std::string currentDate;
};
include/DataReplicator.h
#include <vector>
#include <string>
#include <functional>
#include <mysqlx/xdevapi.h>
#include <nlohmann/json.hpp>
using json = nlohmann::json;
class DataReplicator {
public:
struct TableStats {
std::string table_name;
size_t total_records = 0;
std::chrono::system_clock::time_point start_time;
std::chrono::system_clock::time_point end_time;
bool success = false;
};
DataReplicator(DatabaseManager& sourceManager, DatabaseManager& targetManager, const json& config);
void replicateAllTables();
private:
std::vector<std::string> getSourceTables();
void replicateTable(const std::string& tableName);
void batchInsert(mysqlx::Table& source, mysqlx::Table& target, int batchSize);
void withRetry(std::function<void()> operation, const std::string& context);
DatabaseManager& sourceManager;
DatabaseManager& targetManager;
json config;
int maxRetries;
std::chrono::seconds retryInterval;
int batchSize;
};
src/DatabaseManager.cpp
#include "../include/DatabaseManager.h"
DatabaseManager::DatabaseManager(const mysqlx::SessionSettings& settings) : session(settings) {
if (!session.isOpen()) {
throw std::runtime_error("Failed to connect to database");
}
}
mysqlx::Session& DatabaseManager::getSession() {
return session;
}
mysqlx::Schema DatabaseManager::getSchema(const std::string& name) {
return session.getSchema(name);
}
src/Logger.cpp
#include "../include/Logger.h"
#include <iostream>
#include <iomanip>
Logger::Logger() {
openNewLogFile();
}
Logger::~Logger() {
logFile.close();
}
Logger& Logger::getInstance() {
static Logger instance;
return instance;
}
std::string Logger::getCurrentDate() {
auto now = std::chrono::system_clock::now();
auto in_time_t = std::chrono::system_clock::to_time_t(now);
std::tm tm_info;
localtime_r(&in_time_t, &tm_info);
std::ostringstream oss;
oss << std::put_time(&tm_info, "%Y-%m-%d");
return oss.str();
}
std::string Logger::getCurrentTimestamp() {
auto now = std::chrono::system_clock::now();
auto in_time_t = std::chrono::system_clock::to_time_t(now);
std::tm tm_info;
localtime_r(&in_time_t, &tm_info);
std::ostringstream oss;
oss << std::put_time(&tm_info, "%Y-%m-%d %H:%M:%S");
return oss.str();
}
void Logger::openNewLogFile() {
currentDate = getCurrentDate();
std::string logFileName = config["log_dir"].get<std::string>() + "/" + currentDate + ".log";
logFile.open(logFileName, std::ios::app);
if (!logFile.is_open()) {
std::cerr << "Failed to open log file: " << logFileName << std::endl;
}
}
void Logger::log(const std::string& message) {
std::lock_guard<std::mutex> lock(mtx);
if (logFile.tellp() == 0 || getCurrentDate()!= currentDate) {
openNewLogFile();
}
logFile << "[" << getCurrentTimestamp() << "] " << message << std::endl;
}
src/DataReplicator.cpp
#include "../include/DataReplicator.h"
#include <iostream>
#include <future>
DataReplicator::DataReplicator(DatabaseManager& sourceManager, DatabaseManager& targetManager, const json& config)
: sourceManager(sourceManager), targetManager(targetManager), config(config) {
maxRetries = config["retry"]["max_attempts"];
retryInterval = std::chrono::seconds(config["retry"]["interval_seconds"]);
batchSize = config["batch_size"];
}
std::vector<std::string> DataReplicator::getSourceTables() {
auto schema = sourceManager.getSchema(config["source"]["schema"]);
auto tables = schema.getTables();
std::vector<std::string> tableNames;
for (auto& table : tables) {
tableNames.push_back(table.getName());
}
return tableNames;
}
void DataReplicator::replicateTable(const std::string& tableName) {
auto sourceSchema = sourceManager.getSchema(config["source"]["schema"]);
auto targetSchema = targetManager.getSchema(config["target"]["schema"]);
auto sourceTable = sourceSchema.getTable(tableName);
auto targetTable = targetSchema.getTable(tableName);
TableStats stats;
stats.table_name = tableName;
stats.start_time = std::chrono::system_clock::now();
try {
withRetry([&]() {
batchInsert(sourceTable, targetTable, batchSize);
}, tableName);
stats.success = true;
} catch (const mysqlx::Error& e) {
Logger::getInstance().log("Error replicating table " + tableName + ": " + std::string(e.what()));
stats.success = false;
}
stats.end_time = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(stats.end_time - stats.start_time).count();
std::string status = stats.success? "COMPLETED" : "FAILED";
Logger::getInstance().log("[" + tableName + "] " + status + " " + std::to_string(stats.total_records) + " records (" + std::to_string(duration) + "s)");
}
void DataReplicator::batchInsert(mysqlx::Table& source, mysqlx::Table& target, int batchSize) {
auto result = source.select("*").execute();
std::vector<mysqlx::Row> buffer;
int batchCount = 0;
auto batchStartTime = std::chrono::system_clock::now();
while (auto row = result.fetchOne()) {
buffer.push_back(row);
stats.total_records++;
if (buffer.size() >= batchSize) {
withRetry([&]() {
target.insert().values(buffer).execute();
}, stats.table_name + " batch " + std::to_string(++batchCount));
auto batchEndTime = std::chrono::system_clock::now();
auto batchDuration = std::chrono::duration_cast<std::chrono::seconds>(batchEndTime - batchStartTime).count();
Logger::getInstance().log("[" + stats.table_name + "] BATCH " + std::to_string((batchCount - 1) * batchSize + 1) + "-" + std::to_string(batchCount * batchSize) + " inserted (" + std::to_string(batchDuration) + "s)");
batchStartTime = batchEndTime;
buffer.clear();
}
}
if (!buffer.empty()) {
withRetry([&]() {
target.insert().values(buffer).execute();
}, stats.table_name + " batch " + std::to_string(++batchCount));
auto batchEndTime = std::chrono::system_clock::now();
auto batchDuration = std::chrono::duration_cast<std::chrono::seconds>(batchEndTime - batchStartTime).count();
Logger::getInstance().log("[" + stats.table_name + "] BATCH " + std::to_string((batchCount - 1) * batchSize + 1) + "-" + std::to_string(stats.total_records) + " inserted (" + std::to_string(batchDuration) + "s)");
}
}
void DataReplicator::withRetry(std::function<void()> operation, const std::string& context) {
int attempts = 0;
while (attempts < maxRetries) {
try {
operation();
return;
} catch (const mysqlx::Error& e) {
Logger::getInstance().log("Error: " + std::string(e.what()) + " (" + context + " Attempt " + std::to_string(attempts + 1) + "/" + std::to_string(maxRetries) + ")");
if (++attempts >= maxRetries) throw;
std::this_thread::sleep_for(retryInterval);
}
}
}
void DataReplicator::replicateAllTables() {
auto tables = getSourceTables();
std::vector<std::future<void>> futures;
for (auto& table : tables) {
futures.push_back(std::async(std::launch::async, [&](){
replicateTable(table);
}));
}
for (auto& f : futures) f.wait();
}
src/main.cpp
#include "../include/DatabaseManager.h"
#include "../include/DataReplicator.h"
#include "../include/Logger.h"
#include <nlohmann/json.hpp>
#include <fstream>
using json = nlohmann::json;
json loadConfig() {
std::ifstream configFile("config.json");
json config;
configFile >> config;
return config;
}
mysqlx::SessionSettings createSettings(const json& config) {
mysqlx::SessionSettings settings;
settings.setHost(config["host"]);
settings.setPort(config["port"]);
settings.setUser(config["user"]);
settings.setPassword(config["password"]);
return settings;
}
int main() {
auto config = loadConfig();
mysqlx::SessionSettings sourceSettings = createSettings(config["source"]);
mysqlx::SessionSettings targetSettings = createSettings(config["target"]);
DatabaseManager sourceManager(sourceSettings);
DatabaseManager targetManager(targetSettings);
DataReplicator replicator(sourceManager, targetManager, config);
replicator.replicateAllTables();
return 0;
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(DataReplicator)
set(CMAKE_CXX_STANDARD 17)
find_package(MySQLCppConn REQUIRED)
find_package(nlohmann_json REQUIRED)
include_directories(
include
${MySQLCppConn_INCLUDE_DIRS}
)
add_executable(data_replicator
src/main.cpp
src/DatabaseManager.cpp
src/Logger.cpp
src/DataReplicator.cpp
)
target_link_libraries(data_replicator
PRIVATE MySQLCppConn::mysqlcppconn
PRIVATE nlohmann_json::nlohmann_json
pthread ssl crypto
)