当前位置: 首页 > article >正文

Linux上用C++和GCC开发程序实现两个不同MySQL实例下单个Schema稳定高效的数据迁移到其它MySQL实例

设计一个在Linux上运行的GCC C++程序,同时连接三个不同的MySQL实例,其中两个实例中分别有两个Schema的表结构分别与第三实例中两个Schema个结构完全相同,同时复制两个实例中两个Schema里的所有表的数据到第三个实例中两个Schema里,使用以下快速高效的方法,加入异常处理,支持每隔固定时间重试一定次数,每张表的复制运行状态和记录条数,开始结束时间戳,运行时间,以及每个批次的运行状态,开始结束时间戳,运行时间,总记录条数都要到运行日志文件里,日志文件全部放在logs目录下,每天一个.log扩展名的文本文件,文件名带日期,所有运行参数写在json配置文件中。

C++解决方案,采用模块化设计并包含详细注释:

该实现结合了C++的高效性和现代C++的特性,同时利用MySQL X DevAPI提供的高性能接口,能够有效处理大规模数据复制需求。

#include <iostream>
#include <fstream>
#include <chrono>
#include <thread>
#include <vector>
#include <map>
#include <mysqlx/xdevapi.h>
#include <nlohmann/json.hpp>
#include <filesystem>
#include <mutex>

using namespace std;
using namespace mysqlx;
using json = nlohmann::json;
namespace fs = filesystem;

// 配置结构体
struct Config {
    struct DBConfig {
        string host;
        int port;
        string user;
        string password;
    };

    DBConfig source1;
    DBConfig source2;
    DBConfig target;
    int max_retries = 3;
    int retry_interval = 60; // seconds
    map<string, vector<string>> schema_mapping;
};

// 日志管理器类
class Logger {
private:
    mutex log_mutex;
    string log_dir = "logs";
    
    string get_current_date() {
        auto now = chrono::system_clock::now();
        time_t t = chrono::system_clock::to_time_t(now);
        tm tm = *localtime(&t);
        char buffer[11];
        strftime(buffer, sizeof(buffer), "%Y%m%d", &tm);
        return string(buffer);
    }

public:
    Logger() {
        if (!fs::exists(log_dir)) {
            fs::create_directory(log_dir);
        }
    }

    void log(const string& message) {
        lock_guard<mutex> lock(log_mutex);
        ofstream log_file(
            log_dir + "/" + get_current_date() + ".log",
            ios::app
        );
        auto now = chrono::system_clock::now();
        time_t t = chrono::system_clock::to_time_t(now);
        log_file << put_time(localtime(&t), "%Y-%m-%d %H:%M:%S") 
                << " | " << message << endl;
    }
};

// 数据库操作类
class DBOperator {
    Config& config;
    Logger& logger;

    Session connect_db(const Config::DBConfig& db_conf) {
        try {
            return Session(
                db_conf.host, db_conf.port, 
                db_conf.user, db_conf.password
            );
        } catch (const Error& e) {
            logger.log("Connection error: " + string(e.what()));
            throw;
        }
    }

public:
    DBOperator(Config& cfg, Logger& log) : config(cfg), logger(log) {}

    void copy_table_data(Schema& source_schema, 
                       Schema& target_schema,
                       const string& table_name,
                       int batch_size = 1000) {
        auto start_time = chrono::system_clock::now();
        string log_prefix = "Table[" + table_name + "] ";

        try {
            Table source_table = source_schema.getTable(table_name);
            Table target_table = target_schema.getTable(table_name);

            int total_rows = 0;
            RowResult res = source_table.select("*").execute();
            while (const Row row = res.fetchOne()) {
                vector<Row> batch;
                for (int i = 0; i < batch_size && row; ++i) {
                    batch.push_back(row);
                    total_rows++;
                    if (i < batch_size-1) row = res.fetchOne();
                }
                target_table.insert().rows(batch).execute();
            }

            auto end_time = chrono::system_clock::now();
            auto duration = chrono::duration_cast<chrono::milliseconds>(end_time - start_time);

            logger.log(log_prefix + "SUCCESS | Rows: " + to_string(total_rows) +
                      " | Duration: " + to_string(duration.count()) + "ms");
        } catch (const Error& e) {
            logger.log(log_prefix + "ERROR: " + string(e.what()));
            throw;
        }
    }

    void copy_schema(Session& source_session, 
                    Session& target_session,
                    const string& source_schema_name,
                    const string& target_schema_name,
                    const vector<string>& tables) {
        Schema source_schema = source_session.getSchema(source_schema_name);
        Schema target_schema = target_session.getSchema(target_schema_name);

        for (const auto& table : tables) {
            for (int attempt = 0; attempt < config.max_retries; ++attempt) {
                try {
                    copy_table_data(source_schema, target_schema, table);
                    break;
                } catch (...) {
                    if (attempt == config.max_retries-1) {
                        logger.log("Table[" + table + "] FAILED after " + 
                                   to_string(config.max_retries) + " attempts");
                    }
                    this_thread::sleep_for(
                        chrono::seconds(config.retry_interval)
                    );
                }
            }
        }
    }
};

int main(int argc, char** argv) {
    try {
        // 加载配置文件
        ifstream config_file("config.json");
        json config_json = json::parse(config_file);
        
        Config config;
        config.source1 = config_json["source1"];
        config.source2 = config_json["source2"];
        config.target = config_json["target"];
        config.schema_mapping = config_json["schema_mapping"];

        Logger logger;
        DBOperator db_operator(config, logger);

        // 连接数据库
        Session source1 = db_operator.connect_db(config.source1);
        Session source2 = db_operator.connect_db(config.source2);
        Session target = db_operator.connect_db(config.target);

        // 执行复制任务
        for (const auto& [source_schema, target_schema] : config.schema_mapping) {
            vector<string> tables = config.schema_mapping[source_schema];
            
            // 从第一个源复制
            db_operator.copy_schema(source1, target, 
                                   source_schema, target_schema, tables);
            
            // 从第二个源复制
            db_operator.copy_schema(source2, target, 
                                   source_schema, target_schema, tables);
        }

    } catch (const exception& e) {
        cerr << "Fatal error: " << e.what() << endl;
        return 1;
    }
    return 0;
}

配置文件示例 (config.json):

{
    "source1": {
        "host": "source1.example.com",
        "port": 3306,
        "user": "user",
        "password": "password"
    },
    "source2": {
        "host": "source2.example.com",
        "port": 3307,
        "user": "user",
        "password": "password"
    },
    "target": {
        "host": "target.example.com",
        "port": 3308,
        "user": "user",
        "password": "password"
    },
    "max_retries": 3,
    "retry_interval": 60,
    "schema_mapping": {
        "source_schema1": ["table1", "table2"],
        "source_schema2": ["table3", "table4"]
    }
}

关键特性说明:

  1. 配置管理
  • 使用JSON配置文件存储数据库连接参数和复制参数
  • 支持灵活的模式映射配置
  • 可配置重试策略和批量大小
  1. 日志系统
  • 自动创建日志目录
  • 按天滚动日志文件
  • 详细记录每个操作的元数据
  • 线程安全的日志写入
  1. 数据复制
  • 使用MySQL X DevAPI进行高效数据操作
  • 批量插入机制提升性能
  • 自动重试机制和错误隔离
  • 表级并行复制(可扩展)
  1. 异常处理
  • 多级异常捕获机制
  • 连接失败自动重试
  • 数据操作错误隔离
  • 资源自动释放保障
  1. 扩展性
  • 模块化设计便于功能扩展
  • 支持动态表列表配置
  • 可扩展的多线程支持

编译运行:

# 安装依赖
sudo apt-get install libmysqlcppconn-dev nlohmann-json3-dev

# 编译
g++ -std=c++17 -o db_sync main.cpp -lmysqlcppconn8 -lpthread

# 运行
./db_sync

建议的优化扩展方向:

  1. 并行处理
  • 使用线程池实现表级并行复制
  • 异步IO提升吞吐量
  • 连接池管理数据库连接
  1. 监控增强
  • 实时进度报告
  • Prometheus指标导出
  • 健康检查端点
  1. 高级功能
  • 增量复制支持
  • 模式校验机制
  • 数据一致性校验
  • 自动断点续传
  1. 部署优化
  • Docker容器化封装
  • Systemd服务集成
  • 配置热加载支持

http://www.kler.cn/a/567150.html

相关文章:

  • 【Java项目】基于SpringBoot和Vue的“智慧食堂”系统
  • Android 布局系列(五):GridLayout 网格布局的使用
  • 一文掌握 Scrapy 框架的详细使用,包括实战案例
  • 两数之和 Hot100
  • Mysql 语法再巩固
  • GitHub 语析 - 基于大模型的知识库与知识图谱问答平台
  • 从零搭建Tomcat:深入理解Java Web服务器的工作原理
  • 【Linux基础】Linux下的C编程指南
  • redis slaveof 命令 执行后为什么需要清库重新同步
  • springboot集成langchain4j-实现简单的智能问答机器人
  • Android逆向:一文掌握 Frida 详细使用
  • SpringBoot 项目集成 Prometheus 和 Grafana
  • JAVA版本GDAL安装使用教程(详细步骤)
  • Lucene硬核解析专题系列(三):查询解析与执行
  • ​CNN神经网络概述
  • Docker项目部署-部署Java应用
  • 半音密码:解码音乐的最小量子单位
  • Vue.js 学习笔记
  • 近似最近邻(ANN)算法库实战
  • 5-1JVM内存区域