数据库迁移脚本
数据库迁移脚本
这次使用node.js作为工具去做
至于为啥用node.js??
可能是js在异步操作上非常高效? (但它仍然是单线程的,对于 CPU 密集型的操作可能不如其他语言。)
本质就是先查再插
例子:
{
"name": "数据库迁移",
"version": "1.0.0",
"dependencies": {
"log4js": "^6.9.1",
"mysql2": "^3.5.2"
}
}
// 引入依赖
const mysql = require('mysql2/promise');
const log4js = require('log4js');
// 配置日志,此处格式需要自定义
log4js.configure({
appenders: {
errorLog: { type: 'file', filename: 'error.log' }
},
categories: {
default: { appenders: ['errorLog'], level: 'error' }
}
})
const logger = log4js.getLogger();
// 数据迁移配置
const migrationConfig = {
batchSize: 1000, // 每批处理的数据量
sourceDB: {
host: 'db2.ilaw.com.cn',
port: '3306',
user: 'developer',
password: 'cpsoft_873406',
database: 'kinglex',
connectionLimit: 10, // 设置连接数限制
queueLimit: 100, // 设置队列限制
acquireTimeout: 30000, // 设置获取连接的超时时间为30秒
waitForConnections: true, // 等待连接可用
acquireRetryCount: 3, // 获取连接的重试次数
acquireRetryWait: 1000, // 获取连接的重试间隔时间为1秒
supportBigNumbers: true,
bigNumberStrings: true
},
targetDB: {
host: 'db2.ilaw.com.cn',
port: '3306',
user: 'developer',
password: 'cpsoft_873406',
database: 'kinglex',
connectionLimit: 20, // 设置连接数限制
queueLimit: 200, // 设置队列限制
acquireTimeout: 5000, // 设置获取连接的超时时间为5秒
waitForConnections: true, // 等待连接可用
acquireRetryCount: 5, // 获取连接的重试次数
acquireRetryWait: 500, // 获取连接的重试间隔时间为0.5秒
supportBigNumbers: true,
bigNumberStrings: true
}
};
// 创建源数据库连接池
const sourcePool = mysql.createPool(migrationConfig.sourceDB);
// 创建目标数据库连接池
const targetPool = mysql.createPool(migrationConfig.targetDB);
async function cust_linkman() {
let sourceConnection, targetConnection;
try {
sourceConnection = await getConnectionFromSourcePool();
targetConnection = await getConnectionFromTargetPool();
let offset = 0;
while (true) {
const [rows] = await sourceConnection.query(`SELECT * FROM cust_linkman LIMIT ${offset}, ${migrationConfig.batchSize}`);
console.log(rows)
if (rows.length === 0) {
// 没有更多数据,数据迁移完成
break;
}
const dataBase = []
const dataPerson = []
// 处理当前批次的数据
for (const row of rows) {
const typeO = row.type;
let EntityType = null;
if(typeO!=null && typeO==1){
EntityType=501;
}
if(typeO!=null && typeO==2){
EntityType=503;
}
if(typeO!=null && typeO==3){
EntityType=506;
}
if(typeO!=null && typeO==4){
EntityType=505;
}
if(typeO!=null && typeO==5){
EntityType=504;
}
if(typeO!=null && typeO==6){
EntityType=507;
}
const transformedUserBase = {
id: row.id,
groupId: null,
type: 5, //5就是联系人
name: row.name,
nameIndex: row.firstName,
nameGBK: null,
simpleName: null,
formerName:row.beforeName,
codeType:row.cardType,
code:row.IDCard,
mobile1:row.mobile1,
mobile2:row.mobile2,
phone:null,
country:row.country,
province:row.province,
city:row.city,
district:row.district,
businessArea:row.businessArea,
countryCode:null,
provinceCode:null,
cityCode:null,
districtCode:null,
businessAreaCode:null,
address:null,
description: row.description,
remark: row.memo,
teamId: row.teamId,
managerId: row.managerId,
shareId:null,
resource: row.isAuto,
importanceEvaluate:null,
caseSourceEmpId:null,
creditedLevel:null,
relationLevel:null,
isImportance:null,
importanceDesc:null,
isinstancy:null,
instancyDesc:null,
status1 :null,
email: row.email,
renameStatus :null,
isGetGSData :null,
isGetGSGJData :null,
relation :null,
deptId :null,
dept :null,
relationCustomer :null,
company:row.company,
createUserId:row.createUserId,
createTime:row.createDate,
modifyTime:row.updateDate,
isDel:row.isDel,
belongTo:row.custId,
mobile3:row.mobile3,
entityType:EntityType
}
const transformedUserPerson = {
custId: row.id,
groupId: null,
type: 3, //3就是联系人
sex : row.sex,
marriage: row.marriage,
homePhone:row.homePhone,
qq: row.qq,
nation:row.nation,
birthday: row.birthday,
registeredAddress: row.address,
bigDiploma: row.bigDiploma,
origin: row.origin,
face: row.face,
religion: row.religion,
workCompany: row.workCompany,
workDept: row.workDept,
workDimission:row.workDimission,
workJob:row.workJob,
workRight: row.workRight,
workPhone:row.workPhone,
workFax: row.workFax,
workAddress: row.workAddress,
cardDept: null,
cardAddress: null,
cardAddressZip: null,
endUniversity: row.endUniversity,
workZip: row.workZip,
workWebsite: row.workWebsite,
workLinePhone: row.workLinePhone,
msn: row.msn,
homeAddress: row.homeAddress,
homeZip: row.homeZip,
communicationAddress: row.communicationAddress,
communicationZip: row.communicationZip,
like: row.lkmLike,
specialty: row.specialty,
consortInfo: row.consortInfo,
childInfo: row.childInfo,
otherInfo: row.otherInfo,
company:row.company,
createUserId:row.createUserId,
createTime: row.createDate,
modifyTime:row.updateDate,
isDel : row.isDel
}
dataBase.push(transformedUserBase)
dataPerson.push(transformedUserPerson)
}
const valueb = dataBase.map(Object.values)
console.log("valueb----------------------------")
console.log(valueb)
const valuep = dataPerson.map(Object.values)
try {
await targetConnection.query('INSERT IGNORE INTO `cust_base_info`' + 'VALUES ?',
[valueb]);
await targetConnection.query('INSERT IGNORE INTO `cust_person`' +
'VALUES ?',
[valuep]);
} catch (err) {
// 处理迁移过程中的错误
// 可以选择继续处理下一条数据或者中断迁移
logger.error(err);
console.error('出错: ', err);
}
//console.log('cst_opponent_organization_company 迁移成功:' + data.map(item => item.id + ''))
console.log('cst_opponent_organization_company 迁移成功:' + valueb.length + '条')
offset += migrationConfig.batchSize;
}
console.log('cst_opponent_organization_company 迁移完成')
} catch (err) {
console.error('出错: ', err);
throw err;
} finally {
// 释放连接回连接池
if (sourceConnection) {
sourceConnection.release();
}
if (targetConnection) {
targetConnection.release();
}
}
}
async function cst_opponent_organization_company() {
let sourceConnection, targetConnection;
try {
sourceConnection = await getConnectionFromSourcePool();
targetConnection = await getConnectionFromTargetPool();
let offset = 0;
while (true) {
const [rows] = await sourceConnection.query(`SELECT *, cst_opponent.id as id, cst_opponent.address as address, cst_opponent.company as company, cst_opponent.createTime as createTime,
cst_opponent.createUserId as createUserId, cst_opponent.modifyTime as modifyTime,
cst_opponent.isDel as isDel
FROM cst_opponent
left join (select * from cst_opponent_organization where isDel = 0) opponent_organization on opponent_organization.opponentId = cst_opponent.id
where type = 1 LIMIT ${offset}, ${migrationConfig.batchSize}`);
if (rows.length === 0) {
// 没有更多数据,数据迁移完成
break;
}
const data = []
// 处理当前批次的数据
for (const row of rows) {
// 在这里执行具体的数据转换、处理和插入操作
// 转换和映射数据
const transformedUser = {
custId: row.id,
groupId: null,
type: 2,
scope: row.scope,
property: row.property,
fax: row.fax,
website: row.website,
officeArea: row.officeArea,
turnover: row.turnover,
employeeNumber: row.employeeNumber,
creditCode: row.creditCode,
bankroll: row.bankroll,
code: row.code,
establishDate: row.finishDate,
chieftain: row.chieftain,
businessScope: null,
taxNo: row.taxNo,
bizPhone: row.bizPhone,
bizAddress: row.bizAddress,
bank1: row.bankOne,
bankAccountName1: row.bankAccountNameOne,
bankAccountId1: row.bankAccountIdOne,
bank2: row.bankTwo,
bankAccountName2: row.bankAccountNameTwo,
bankAccountId2: row.bankAccountIdTwo,
bank3: row.bankThree,
bankAccountName3: row.bankAccountNameThree,
bankAccountId3: row.bankAccountIdThree,
zip: row.zip,
brand: row.brand,
bizLicense: row.bizLicense,
propertySubName: null,
billCode: null,
company :row.company,
createUserId :row.createUserId,
createTime :row.createTime,
modifyTime :row.modifyTime,
isDel :row.isDel
};
data.push(transformedUser)
}
const value = data.map(Object.values)
try {
await targetConnection.query('INSERT IGNORE INTO cust_company ' +
'(custId, groupId, type, scope, property, ' +
'fax, website, officeArea, turnover, employeeNumber, ' +
'creditCode, bankroll, code, establishDate, chieftain, ' +
'businessScope, taxNo, bizPhone, bizAddress, bank1, bankAccountName1, ' +
'bankAccountId1, bank2, bankAccountName2, bankAccountId2, bank3, ' +
'bankAccountName3, bankAccountId3, zip, brand, bizLicense, ' +
'propertySubName, billCode, company ,createUserId ,createTime ,modifyTime ,isDel) ' +
'VALUES ?',
[value]);
} catch (err) {
// 处理迁移过程中的错误
// 可以选择继续处理下一条数据或者中断迁移
logger.error(err);
console.error('出错: ', err);
}
//console.log('cst_opponent_organization_company 迁移成功:' + data.map(item => item.id + ''))
console.log('cst_opponent_organization_company 迁移成功:' + data.length + '条')
offset += migrationConfig.batchSize;
}
console.log('cst_opponent_organization_company 迁移完成')
} catch (err) {
console.error('出错: ', err);
throw err;
} finally {
// 释放连接回连接池
if (sourceConnection) {
sourceConnection.release();
}
if (targetConnection) {
targetConnection.release();
}
}
}