当前位置: 首页 > article >正文

数据库迁移脚本

数据库迁移脚本

这次使用node.js作为工具去做

至于为啥用node.js??

可能是js在异步操作上非常高效? (但它仍然是单线程的,对于 CPU 密集型的操作可能不如其他语言。)

本质就是先查再插

例子:

{
  "name": "数据库迁移",
  "version": "1.0.0",
  "dependencies": {
    "log4js": "^6.9.1",
    "mysql2": "^3.5.2"
  }
}
// 引入依赖
const mysql = require('mysql2/promise');
const log4js = require('log4js');

// 配置日志,此处格式需要自定义
log4js.configure({
  appenders: {
    errorLog: { type: 'file', filename: 'error.log' }
  },
  categories: {
    default: { appenders: ['errorLog'], level: 'error' }
  }
})
const logger = log4js.getLogger();

// 数据迁移配置
const migrationConfig = {
  batchSize: 1000, // 每批处理的数据量
  sourceDB: {
    host: 'db2.ilaw.com.cn',
    port: '3306',
    user: 'developer',
    password: 'cpsoft_873406',
    database: 'kinglex',
    connectionLimit: 10,  // 设置连接数限制
    queueLimit: 100,  // 设置队列限制
    acquireTimeout: 30000,  // 设置获取连接的超时时间为30秒
    waitForConnections: true,  // 等待连接可用
    acquireRetryCount: 3,  // 获取连接的重试次数
    acquireRetryWait: 1000,  // 获取连接的重试间隔时间为1秒
    supportBigNumbers: true,
    bigNumberStrings: true
  },
  targetDB: {
    host: 'db2.ilaw.com.cn',
    port: '3306',
    user: 'developer',
    password: 'cpsoft_873406',
    database: 'kinglex',
    connectionLimit: 20,  // 设置连接数限制
    queueLimit: 200,  // 设置队列限制
    acquireTimeout: 5000,  // 设置获取连接的超时时间为5秒
    waitForConnections: true,  // 等待连接可用
    acquireRetryCount: 5,  // 获取连接的重试次数
    acquireRetryWait: 500,  // 获取连接的重试间隔时间为0.5秒
    supportBigNumbers: true,
    bigNumberStrings: true
  }
};

// 创建源数据库连接池
const sourcePool = mysql.createPool(migrationConfig.sourceDB);

// 创建目标数据库连接池
const targetPool = mysql.createPool(migrationConfig.targetDB);

async function cust_linkman() {
  let sourceConnection, targetConnection;

  try {
    sourceConnection = await getConnectionFromSourcePool();
    targetConnection = await getConnectionFromTargetPool();

    let offset = 0;
    while (true) {
      const [rows] = await sourceConnection.query(`SELECT * FROM cust_linkman LIMIT ${offset}, ${migrationConfig.batchSize}`);
      console.log(rows)
      if (rows.length === 0) {
        // 没有更多数据,数据迁移完成
        break;
      }
      const dataBase = []
      const dataPerson = []
      // 处理当前批次的数据
      for (const row of rows) {
        const typeO = row.type;
        let EntityType = null;

        if(typeO!=null && typeO==1){
          EntityType=501;
        }
        if(typeO!=null && typeO==2){
          EntityType=503;
        }
        if(typeO!=null && typeO==3){
          EntityType=506;
        }
        if(typeO!=null && typeO==4){
          EntityType=505;
        }
        if(typeO!=null && typeO==5){
          EntityType=504;
        }
        if(typeO!=null && typeO==6){
          EntityType=507;
        }
        const transformedUserBase = {
          id: row.id,
          groupId: null,
          type: 5, //5就是联系人
          name: row.name,
          nameIndex: row.firstName,
          nameGBK: null,
          simpleName: null,
          formerName:row.beforeName,
          codeType:row.cardType,
          code:row.IDCard,
          mobile1:row.mobile1,
          mobile2:row.mobile2,
          phone:null,
          country:row.country,
          province:row.province,
          city:row.city,
          district:row.district,
          businessArea:row.businessArea,
          countryCode:null,
          provinceCode:null,
          cityCode:null,
          districtCode:null,
          businessAreaCode:null,
          address:null,
          description: row.description,
          remark: row.memo,
          teamId: row.teamId,
          managerId: row.managerId,
          shareId:null,
          resource: row.isAuto,
          importanceEvaluate:null,
          caseSourceEmpId:null,
          creditedLevel:null,
          relationLevel:null,
          isImportance:null,
          importanceDesc:null,
          isinstancy:null,
          instancyDesc:null,
          status1 :null,
          email: row.email,
          renameStatus :null,
          isGetGSData :null,
          isGetGSGJData :null,
          relation :null,
          deptId :null,
          dept :null,
          relationCustomer :null,
          company:row.company,
          createUserId:row.createUserId,
          createTime:row.createDate,
          modifyTime:row.updateDate,
          isDel:row.isDel,
          belongTo:row.custId,
          mobile3:row.mobile3,
          entityType:EntityType
        }
        const transformedUserPerson = {
          custId: row.id,
          groupId: null,
          type: 3,  //3就是联系人
          sex : row.sex,
          marriage: row.marriage,
          homePhone:row.homePhone,
          qq: row.qq,
          nation:row.nation,
          birthday: row.birthday,
          registeredAddress: row.address,
          bigDiploma: row.bigDiploma,
          origin: row.origin,
          face: row.face,
          religion: row.religion,
          workCompany: row.workCompany,
          workDept: row.workDept,
          workDimission:row.workDimission,
          workJob:row.workJob,
          workRight: row.workRight,
          workPhone:row.workPhone,
          workFax: row.workFax,
          workAddress: row.workAddress,
          cardDept: null,
          cardAddress: null,
          cardAddressZip: null,
          endUniversity: row.endUniversity,
          workZip: row.workZip,
          workWebsite: row.workWebsite,
          workLinePhone: row.workLinePhone,
          msn: row.msn,
          homeAddress: row.homeAddress,
          homeZip: row.homeZip,
          communicationAddress: row.communicationAddress,
          communicationZip: row.communicationZip,
          like: row.lkmLike,
          specialty: row.specialty,
          consortInfo: row.consortInfo,
          childInfo: row.childInfo,
          otherInfo: row.otherInfo,
          company:row.company,
          createUserId:row.createUserId,
          createTime: row.createDate,
          modifyTime:row.updateDate,
          isDel : row.isDel
        }

        dataBase.push(transformedUserBase)
        dataPerson.push(transformedUserPerson)

      }
      const valueb = dataBase.map(Object.values)
      console.log("valueb----------------------------")
      console.log(valueb)
      const valuep = dataPerson.map(Object.values)
      try {
        await targetConnection.query('INSERT IGNORE INTO `cust_base_info`'  + 'VALUES ?',
            [valueb]);

        await targetConnection.query('INSERT IGNORE INTO `cust_person`'  +
            'VALUES ?',
            [valuep]);
      } catch (err) {
        // 处理迁移过程中的错误
        // 可以选择继续处理下一条数据或者中断迁移
        logger.error(err);
        console.error('出错: ', err);
      }
      //console.log('cst_opponent_organization_company 迁移成功:' + data.map(item => item.id + ''))
      console.log('cst_opponent_organization_company 迁移成功:' + valueb.length + '条')

      offset += migrationConfig.batchSize;
    }

    console.log('cst_opponent_organization_company 迁移完成')
  } catch (err) {
    console.error('出错: ', err);
    throw err;
  } finally {
    // 释放连接回连接池
    if (sourceConnection) {
      sourceConnection.release();
    }
    if (targetConnection) {
      targetConnection.release();
    }
  }
}

async function cst_opponent_organization_company() {
  let sourceConnection, targetConnection;

  try {
    sourceConnection = await getConnectionFromSourcePool();
    targetConnection = await getConnectionFromTargetPool();

    let offset = 0;
    while (true) {
      const [rows] = await sourceConnection.query(`SELECT *, cst_opponent.id as id, cst_opponent.address as address, cst_opponent.company as company, cst_opponent.createTime as createTime,
          cst_opponent.createUserId as createUserId, cst_opponent.modifyTime as modifyTime,
          cst_opponent.isDel as isDel
            FROM cst_opponent
            left join (select * from cst_opponent_organization where isDel = 0) opponent_organization  on opponent_organization.opponentId = cst_opponent.id
            where type = 1 LIMIT ${offset}, ${migrationConfig.batchSize}`);

      if (rows.length === 0) {
        // 没有更多数据,数据迁移完成
        break;
      }
      const data = []
      // 处理当前批次的数据
      for (const row of rows) {
        // 在这里执行具体的数据转换、处理和插入操作
        // 转换和映射数据
        const transformedUser = {
          custId: row.id,
          groupId: null,
          type: 2,
          scope: row.scope,
          property: row.property,
          fax: row.fax,
          website: row.website,
          officeArea: row.officeArea,
          turnover: row.turnover,
          employeeNumber: row.employeeNumber,
          creditCode: row.creditCode,
          bankroll: row.bankroll,
          code: row.code,
          establishDate: row.finishDate,
          chieftain: row.chieftain,
          businessScope: null,
          taxNo: row.taxNo,
          bizPhone: row.bizPhone,
          bizAddress: row.bizAddress,
          bank1: row.bankOne,
          bankAccountName1: row.bankAccountNameOne,
          bankAccountId1: row.bankAccountIdOne,
          bank2: row.bankTwo,
          bankAccountName2: row.bankAccountNameTwo,
          bankAccountId2: row.bankAccountIdTwo,
          bank3: row.bankThree,
          bankAccountName3: row.bankAccountNameThree,
          bankAccountId3: row.bankAccountIdThree,
          zip: row.zip,
          brand: row.brand,
          bizLicense: row.bizLicense,
          propertySubName: null,
          billCode: null,
          company :row.company,
          createUserId :row.createUserId,
          createTime :row.createTime,
          modifyTime :row.modifyTime,
          isDel :row.isDel
        };

        data.push(transformedUser)
      }
      const value = data.map(Object.values)
      try {
        await targetConnection.query('INSERT IGNORE INTO cust_company ' +
            '(custId, groupId, type, scope, property, ' +
            'fax, website, officeArea, turnover, employeeNumber, ' +
            'creditCode, bankroll, code, establishDate, chieftain, ' +
            'businessScope, taxNo, bizPhone, bizAddress, bank1, bankAccountName1, ' +
            'bankAccountId1, bank2, bankAccountName2, bankAccountId2, bank3, ' +
            'bankAccountName3, bankAccountId3, zip, brand, bizLicense, ' +
            'propertySubName, billCode, company ,createUserId ,createTime ,modifyTime ,isDel) ' +
            'VALUES ?',
            [value]);
      } catch (err) {
        // 处理迁移过程中的错误
        // 可以选择继续处理下一条数据或者中断迁移
        logger.error(err);
        console.error('出错: ', err);
      }
      //console.log('cst_opponent_organization_company 迁移成功:' + data.map(item => item.id + ''))
      console.log('cst_opponent_organization_company 迁移成功:' + data.length + '条')

      offset += migrationConfig.batchSize;
    }

    console.log('cst_opponent_organization_company 迁移完成')
  } catch (err) {
    console.error('出错: ', err);
    throw err;
  } finally {
    // 释放连接回连接池
    if (sourceConnection) {
      sourceConnection.release();
    }
    if (targetConnection) {
      targetConnection.release();
    }
  }
}

http://www.kler.cn/a/132052.html

相关文章:

  • Leetcode 剑指 Offer II 053. 二叉搜索树中的中序后继
  • 实现Vue3源码 isReactive 和 isReadonly
  • LeetCode235. Lowest Common Ancestor of a Binary Search Tree
  • 常见面试题-Netty线程模型以及TCP粘包拆包
  • RT-DETR优化改进:轻量级Backbone改进 | VanillaNet极简神经网络模型 | 华为诺亚2023
  • Java使用Redis的几种客户端介绍
  • Python 简易 HTTP 服务器
  • 现有文章汇总
  • Selenium操作已经打开的Chrome浏览器窗口
  • 小数背包问题
  • html所有标签和DOCTYPE的总结
  • iApp祁天社区UI成品源码 功能齐全的社区应用
  • 交换机的工作原理
  • 【SAP-QUERY】QUERY报表的创建
  • SQL ALTER TABLE 语句||SQL AUTO INCREMENT 字段
  • Java排序算法之贪心算法
  • springboot(ssm邮件过滤系统 在线邮箱平台Java(codeLW)
  • 【Linux】Linux进程间通信(三)
  • 信息系统项目管理师 第四版 第5章 信息系统工程
  • 服务器数据恢复—热备盘同步中断导致Raid5数据丢失的数据恢复案例