当前位置: 首页 > article >正文

Linux上用C++和GCC开发程序实现不同MySQL实例下单个Schema之间的稳定高效的数据迁移

设计一个在Linux上运行的GCC C++程序,同时连接两个不同的MySQL实例,两个实例中分别有两个Schema的表结构完全相同,复制一个实例中一个Schema里的所有表的数据到另一个实例中一个Schema里,使用以下快速高效的方法,加入异常处理,支持每隔固定时间重试一定次数,每张表的复制运行状态和记录条数,开始结束时间戳,运行时间,以及每个批次的运行状态,开始结束时间戳,运行时间,总记录条数都要到运行日志文件里,日志文件全部放在logs目录下,每天一个.log扩展名的文本文件,文件名带日期,所有运行参数写在json配置文件中。

C++程序分步解决方案:


文件结构

├── config.json          # 配置文件
├── include/             # 头文件目录
│   ├── DatabaseManager.h
│   ├── Logger.h
│   └── DataReplicator.h
├── src/                # 源文件目录
│   ├── main.cpp
│   ├── DatabaseManager.cpp
│   ├── Logger.cpp
│   └── DataReplicator.cpp
└── CMakeLists.txt      # 编译配置

1. 配置文件 (config.json)

{
    "source": {
        "host": "127.0.0.1",
        "port": 3306,
        "user": "root",
        "password": "source_pass",
        "schema": "source_db"
    },
    "target": {
        "host": "127.0.0.1",
        "port": 3307,
        "user": "root",
        "password": "target_pass",
        "schema": "target_db"
    },
    "retry": {
        "max_attempts": 3,
        "interval_seconds": 5
    },
    "batch_size": 1000,
    "log_dir": "./logs"
}

2. 日志类 (Logger.h/Logger.cpp)

// Logger.h
#include <fstream>
#include <string>
#include <mutex>
#include <chrono>

class Logger {
public:
    static Logger& getInstance();
    void log(const std::string& message);
    
private:
    Logger();
    ~Logger();
    std::ofstream logFile;
    std::mutex mtx;
    std::string getCurrentDate();
    std::string getCurrentTimestamp();
    void openNewLogFile();
};

3. 数据库管理类 (DatabaseManager.h/DatabaseManager.cpp)

// DatabaseManager.h
#include <mysqlx/xdevapi.h>

class DatabaseManager {
public:
    DatabaseManager(const mysqlx::SessionSettings& settings);
    mysqlx::Session& getSession();
    mysqlx::Schema getSchema(const std::string& name);
    
private:
    mysqlx::Session session;
};

4. 数据复制核心类 (DataReplicator.h/DataReplicator.cpp)

// DataReplicator.h
#include <vector>
#include <string>
#include <functional>

class DataReplicator {
public:
    struct TableStats {
        std::string table_name;
        size_t total_records = 0;
        std::chrono::system_clock::time_point start_time;
        std::chrono::system_clock::time_point end_time;
        bool success = false;
    };

    void replicateAllTables();
    
private:
    std::vector<std::string> getSourceTables();
    void replicateTable(const std::string& tableName);
    void batchInsert(mysqlx::Table& source, mysqlx::Table& target, int batchSize);
    void withRetry(std::function<void()> operation, const std::string& context);
};

5. 主程序 (main.cpp)

#include "DatabaseManager.h"
#include "DataReplicator.h"
#include "Logger.h"
#include <nlohmann/json.hpp>

using json = nlohmann::json;

json loadConfig() {
    // 实现配置文件加载
}

int main() {
    auto config = loadConfig();
    
    DatabaseManager sourceManager(createSettings(config["source"]));
    DatabaseManager targetManager(createSettings(config["target"]));
    
    DataReplicator replicator(sourceManager, targetManager, config);
    replicator.replicateAllTables();
    
    return 0;
}

6. 关键实现细节

  1. 批量数据复制
void DataReplicator::batchInsert(mysqlx::Table& source, mysqlx::Table& target, int batchSize) {
    auto result = source.select("*").execute();
    std::vector<mysqlx::Row> buffer;
    
    while (auto row = result.fetchOne()) {
        buffer.push_back(row);
        if (buffer.size() >= batchSize) {
            target.insert().values(buffer).execute();
            buffer.clear();
        }
    }
    if (!buffer.empty()) {
        target.insert().values(buffer).execute();
    }
}
  1. 带重试的逻辑
void DataReplicator::withRetry(std::function<void()> operation, const std::string& context) {
    int attempts = 0;
    while (attempts < maxRetries) {
        try {
            operation();
            return;
        } catch (const mysqlx::Error& e) {
            Logger::getInstance().log("Error: " + std::string(e.what()));
            if (++attempts >= maxRetries) throw;
            std::this_thread::sleep_for(retryInterval);
        }
    }
}
  1. 日志记录实现
void Logger::log(const std::string& message) {
    std::lock_guard<std::mutex> lock(mtx);
    if (logFile.tellp() == 0 || getCurrentDate() != currentDate) {
        openNewLogFile();
    }
    logFile << "[" << getCurrentTimestamp() << "] " << message << std::endl;
}

7. 编译配置 (CMakeLists.txt)

cmake_minimum_required(VERSION 3.10)
project(DataReplicator)

set(CMAKE_CXX_STANDARD 17)

find_package(MySQLCppConn REQUIRED)
find_package(nlohmann_json REQUIRED)

include_directories(
    include
    ${MySQLCppConn_INCLUDE_DIRS}
)

add_executable(data_replicator
    src/main.cpp
    src/DatabaseManager.cpp
    src/Logger.cpp
    src/DataReplicator.cpp
)

target_link_libraries(data_replicator
    PRIVATE MySQLCppConn::mysqlcppconn
    PRIVATE nlohmann_json::nlohmann_json
    pthread ssl crypto
)

8. 运行方式

mkdir -p build && cd build
cmake ..
make
./data_replicator

9. 日志示例

[2023-10-05 14:30:45] [users] STARTED
[2023-10-05 14:30:47] [users] BATCH 1-1000 inserted (1.2s)
[2023-10-05 14:30:49] [users] COMPLETED 1500 records (4.5s)
[2023-10-05 14:30:50] [products] FAILED: Duplicate entry 'XYZ' (Attempt 1/3)
[2023-10-05 14:30:55] [products] RETRY SUCCESS (2nd attempt)

优化建议

  1. 并行处理
std::vector<std::future<void>> futures;
for (auto& table : tables) {
    futures.push_back(std::async(std::launch::async, [&](){
        replicateTable(table);
    }));
}
for (auto& f : futures) f.wait();
  1. 索引优化
-- 复制前禁用索引
ALTER TABLE target_table DISABLE KEYS;

-- 复制后重建索引
ALTER TABLE target_table ENABLE KEYS;
  1. 内存管理
  • 使用智能指针管理数据库连接
  • 限制同时打开的连接数
  • 实现连接池机制

这个解决方案实现了:

  • 配置驱动的数据库连接
  • 带重试机制的可靠数据传输
  • 详细的运行日志记录
  • 批量处理优化
  • 异常安全保证
  • 可扩展的架构设计

完整的C++代码实现,包括各个头文件和源文件以及 CMakeLists.txt 的完整内容:

请确保安装了MySQL C++ Connector和 nlohmann_json 库,并按照运行方式中的步骤进行编译和运行。

include/DatabaseManager.h

#include <mysqlx/xdevapi.h>

class DatabaseManager {
public:
    DatabaseManager(const mysqlx::SessionSettings& settings);
    mysqlx::Session& getSession();
    mysqlx::Schema getSchema(const std::string& name);
    
private:
    mysqlx::Session session;
};

include/Logger.h

#include <fstream>
#include <string>
#include <mutex>
#include <chrono>

class Logger {
public:
    static Logger& getInstance();
    void log(const std::string& message);
    
private:
    Logger();
    ~Logger();
    std::ofstream logFile;
    std::mutex mtx;
    std::string getCurrentDate();
    std::string getCurrentTimestamp();
    void openNewLogFile();
    std::string currentDate;
};

include/DataReplicator.h

#include <vector>
#include <string>
#include <functional>
#include <mysqlx/xdevapi.h>
#include <nlohmann/json.hpp>

using json = nlohmann::json;

class DataReplicator {
public:
    struct TableStats {
        std::string table_name;
        size_t total_records = 0;
        std::chrono::system_clock::time_point start_time;
        std::chrono::system_clock::time_point end_time;
        bool success = false;
    };

    DataReplicator(DatabaseManager& sourceManager, DatabaseManager& targetManager, const json& config);
    void replicateAllTables();
    
private:
    std::vector<std::string> getSourceTables();
    void replicateTable(const std::string& tableName);
    void batchInsert(mysqlx::Table& source, mysqlx::Table& target, int batchSize);
    void withRetry(std::function<void()> operation, const std::string& context);
    DatabaseManager& sourceManager;
    DatabaseManager& targetManager;
    json config;
    int maxRetries;
    std::chrono::seconds retryInterval;
    int batchSize;
};

src/DatabaseManager.cpp

#include "../include/DatabaseManager.h"

DatabaseManager::DatabaseManager(const mysqlx::SessionSettings& settings) : session(settings) {
    if (!session.isOpen()) {
        throw std::runtime_error("Failed to connect to database");
    }
}

mysqlx::Session& DatabaseManager::getSession() {
    return session;
}

mysqlx::Schema DatabaseManager::getSchema(const std::string& name) {
    return session.getSchema(name);
}

src/Logger.cpp

#include "../include/Logger.h"
#include <iostream>
#include <iomanip>

Logger::Logger() {
    openNewLogFile();
}

Logger::~Logger() {
    logFile.close();
}

Logger& Logger::getInstance() {
    static Logger instance;
    return instance;
}

std::string Logger::getCurrentDate() {
    auto now = std::chrono::system_clock::now();
    auto in_time_t = std::chrono::system_clock::to_time_t(now);

    std::tm tm_info;
    localtime_r(&in_time_t, &tm_info);

    std::ostringstream oss;
    oss << std::put_time(&tm_info, "%Y-%m-%d");
    return oss.str();
}

std::string Logger::getCurrentTimestamp() {
    auto now = std::chrono::system_clock::now();
    auto in_time_t = std::chrono::system_clock::to_time_t(now);

    std::tm tm_info;
    localtime_r(&in_time_t, &tm_info);

    std::ostringstream oss;
    oss << std::put_time(&tm_info, "%Y-%m-%d %H:%M:%S");
    return oss.str();
}

void Logger::openNewLogFile() {
    currentDate = getCurrentDate();
    std::string logFileName = config["log_dir"].get<std::string>() + "/" + currentDate + ".log";
    logFile.open(logFileName, std::ios::app);
    if (!logFile.is_open()) {
        std::cerr << "Failed to open log file: " << logFileName << std::endl;
    }
}

void Logger::log(const std::string& message) {
    std::lock_guard<std::mutex> lock(mtx);
    if (logFile.tellp() == 0 || getCurrentDate()!= currentDate) {
        openNewLogFile();
    }
    logFile << "[" << getCurrentTimestamp() << "] " << message << std::endl;
}

src/DataReplicator.cpp

#include "../include/DataReplicator.h"
#include <iostream>
#include <future>

DataReplicator::DataReplicator(DatabaseManager& sourceManager, DatabaseManager& targetManager, const json& config)
    : sourceManager(sourceManager), targetManager(targetManager), config(config) {
    maxRetries = config["retry"]["max_attempts"];
    retryInterval = std::chrono::seconds(config["retry"]["interval_seconds"]);
    batchSize = config["batch_size"];
}

std::vector<std::string> DataReplicator::getSourceTables() {
    auto schema = sourceManager.getSchema(config["source"]["schema"]);
    auto tables = schema.getTables();
    std::vector<std::string> tableNames;
    for (auto& table : tables) {
        tableNames.push_back(table.getName());
    }
    return tableNames;
}

void DataReplicator::replicateTable(const std::string& tableName) {
    auto sourceSchema = sourceManager.getSchema(config["source"]["schema"]);
    auto targetSchema = targetManager.getSchema(config["target"]["schema"]);
    auto sourceTable = sourceSchema.getTable(tableName);
    auto targetTable = targetSchema.getTable(tableName);

    TableStats stats;
    stats.table_name = tableName;
    stats.start_time = std::chrono::system_clock::now();

    try {
        withRetry([&]() {
            batchInsert(sourceTable, targetTable, batchSize);
        }, tableName);
        stats.success = true;
    } catch (const mysqlx::Error& e) {
        Logger::getInstance().log("Error replicating table " + tableName + ": " + std::string(e.what()));
        stats.success = false;
    }

    stats.end_time = std::chrono::system_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::seconds>(stats.end_time - stats.start_time).count();
    std::string status = stats.success? "COMPLETED" : "FAILED";
    Logger::getInstance().log("[" + tableName + "] " + status + " " + std::to_string(stats.total_records) + " records (" + std::to_string(duration) + "s)");
}

void DataReplicator::batchInsert(mysqlx::Table& source, mysqlx::Table& target, int batchSize) {
    auto result = source.select("*").execute();
    std::vector<mysqlx::Row> buffer;
    int batchCount = 0;
    auto batchStartTime = std::chrono::system_clock::now();

    while (auto row = result.fetchOne()) {
        buffer.push_back(row);
        stats.total_records++;
        if (buffer.size() >= batchSize) {
            withRetry([&]() {
                target.insert().values(buffer).execute();
            }, stats.table_name + " batch " + std::to_string(++batchCount));
            auto batchEndTime = std::chrono::system_clock::now();
            auto batchDuration = std::chrono::duration_cast<std::chrono::seconds>(batchEndTime - batchStartTime).count();
            Logger::getInstance().log("[" + stats.table_name + "] BATCH " + std::to_string((batchCount - 1) * batchSize + 1) + "-" + std::to_string(batchCount * batchSize) + " inserted (" + std::to_string(batchDuration) + "s)");
            batchStartTime = batchEndTime;
            buffer.clear();
        }
    }
    if (!buffer.empty()) {
        withRetry([&]() {
            target.insert().values(buffer).execute();
        }, stats.table_name + " batch " + std::to_string(++batchCount));
        auto batchEndTime = std::chrono::system_clock::now();
        auto batchDuration = std::chrono::duration_cast<std::chrono::seconds>(batchEndTime - batchStartTime).count();
        Logger::getInstance().log("[" + stats.table_name + "] BATCH " + std::to_string((batchCount - 1) * batchSize + 1) + "-" + std::to_string(stats.total_records) + " inserted (" + std::to_string(batchDuration) + "s)");
    }
}

void DataReplicator::withRetry(std::function<void()> operation, const std::string& context) {
    int attempts = 0;
    while (attempts < maxRetries) {
        try {
            operation();
            return;
        } catch (const mysqlx::Error& e) {
            Logger::getInstance().log("Error: " + std::string(e.what()) + " (" + context + " Attempt " + std::to_string(attempts + 1) + "/" + std::to_string(maxRetries) + ")");
            if (++attempts >= maxRetries) throw;
            std::this_thread::sleep_for(retryInterval);
        }
    }
}

void DataReplicator::replicateAllTables() {
    auto tables = getSourceTables();
    std::vector<std::future<void>> futures;
    for (auto& table : tables) {
        futures.push_back(std::async(std::launch::async, [&](){
            replicateTable(table);
        }));
    }
    for (auto& f : futures) f.wait();
}

src/main.cpp

#include "../include/DatabaseManager.h"
#include "../include/DataReplicator.h"
#include "../include/Logger.h"
#include <nlohmann/json.hpp>
#include <fstream>

using json = nlohmann::json;

json loadConfig() {
    std::ifstream configFile("config.json");
    json config;
    configFile >> config;
    return config;
}

mysqlx::SessionSettings createSettings(const json& config) {
    mysqlx::SessionSettings settings;
    settings.setHost(config["host"]);
    settings.setPort(config["port"]);
    settings.setUser(config["user"]);
    settings.setPassword(config["password"]);
    return settings;
}

int main() {
    auto config = loadConfig();
    
    mysqlx::SessionSettings sourceSettings = createSettings(config["source"]);
    mysqlx::SessionSettings targetSettings = createSettings(config["target"]);
    
    DatabaseManager sourceManager(sourceSettings);
    DatabaseManager targetManager(targetSettings);
    
    DataReplicator replicator(sourceManager, targetManager, config);
    replicator.replicateAllTables();
    
    return 0;
}

CMakeLists.txt

cmake_minimum_required(VERSION 3.10)
project(DataReplicator)

set(CMAKE_CXX_STANDARD 17)

find_package(MySQLCppConn REQUIRED)
find_package(nlohmann_json REQUIRED)

include_directories(
    include
    ${MySQLCppConn_INCLUDE_DIRS}
)

add_executable(data_replicator
    src/main.cpp
    src/DatabaseManager.cpp
    src/Logger.cpp
    src/DataReplicator.cpp
)

target_link_libraries(data_replicator
    PRIVATE MySQLCppConn::mysqlcppconn
    PRIVATE nlohmann_json::nlohmann_json
    pthread ssl crypto
)

http://www.kler.cn/a/566861.html

相关文章:

  • 鸿蒙 ArkUI 实现 2048 小游戏
  • Spring系列学习之Spring CredHub
  • 1160 拼写单词
  • TP-LINK路由器如何设置网段、网关和DHCP服务
  • 网络层(IP)
  • c#实现485协议
  • TCP基本入门-简单认识一下什么是TCP
  • 【deepseek】本地部署+webui访问
  • Redis使用手册
  • Spring Boot 启动与 Service 注入的 JVM 运行细节
  • DeepSeek接入问题-Xshell5连接Ubuntu22失败解决方案
  • 【欢迎来到Git世界】Github入门
  • 【FL0086】基于SSM和微信小程序的垃圾分类小程序
  • 火语言RPA--Word写入文本段
  • MySQL数据库基本概念
  • DeepSeek开源周Day5: 3FS存储系统与AI数据处理新标杆
  • Github 2025-02-28 Java开源项目日报 Top9
  • 13.重新设计oj_model|综合测试|顶层makefile(C++)
  • SAP-ABAP:SAP数据库视图(Database View)详解-创建
  • 学习dify第二天-web前篇