Hive,Oracle,redis同步数据之-从其他数据源同步到本地数据库之一
前言
对于金融数据 证券财报信息等来源比较广泛,类型也很杂乱。需要处理成自己公司统一标准的数据集,就会使用到数据的同步和处理清理等操作。
这里给出将其他数据同步到本地数据作为一个方案的演变 分享给大家。
本文会使用 构造者 策略 模版,工厂得设计模式思想带你进行企业级的代码编写
实战开始
下属存放对应的配置文件的信息 和使用xstream流的方式解析获取对应的配置信息
部分xml文件如下
我们 将启动定时任务,同步其他数据源到本地数据库
public void startIn() {
if (CollectionUtils.isEmpty(jobList)) {
logger.info("DBSync-in配置读取错误或未配置任务");
return;
}
for (int index = 0; index < jobList.size(); index++) {
JobInfo jobInfo = jobList.get(index);
String logTitle = "[" + code + "]-" + jobInfo.getName() + " ";
try {
//系统监听的方式调用
Map<String, Object> jobMap = new HashMap<>();
jobMap.put("srcDbs", srcDbs);
jobMap.put("destDbs", destDbs);
jobMap.put("jobInfo", jobInfo);
jobMap.put("logTitle", logTitle);
//设置多线程方式执行
logger.info("执行多线程-----同步到本地");
jobDBSync.executeIn(jobMap);
} catch (Exception e) {
logger.info(logTitle + e.getMessage());
logger.info(logTitle + " run failed");
continue;
}
}
}
使用异步线程池注解启动
@Async("taskExector")
public void executeIn(Map<String, Object> data) {
logger.info("开始任务调度: " + new Date() + Thread.currentThread().getName());
Connection inConn = null;
Connection outConn = null;
List<DBInfo> srcDbs = (List<DBInfo>) data.get("srcDbs");
List<DBInfo> destDbs = (List<DBInfo>) data.get("destDbs");
JobInfo jobInfo = (JobInfo) data.get("jobInfo");
String logTitle = (String) data.get("logTitle");
if (!checkOptions(jobInfo)) {
return;
}
List<String> sourceDBNos = jobInfo.getSourceDBNo();
for (String sourceDBNo : sourceDBNos) {
DBInfo sourceDbInfo = getDBInfo(srcDbs, sourceDBNo);
DBSyncService dbSyncService = dbSyncServiceDelegate.getDBSyncService(sourceDbInfo.getDbtype());
try {
if (!dbSyncService.fromCache()) {
inConn = createConnection(sourceDbInfo);
if (inConn == null) {
logger.info("请检查源数据连接!");
continue;
}
} else {
//实例化redisTemplate
dbSyncService.setTempLateWithADC(createCacheTemplate(sourceDbInfo));
}
for (String destDBNo : jobInfo.getDestDBNo()) {
DBInfo destDbInfo = getDBInfo(destDbs, destDBNo);
outConn = createConnection(destDbInfo);
if (outConn == null) {
logger.info("请检查目标数据连接!");
return;
}
if (dbSyncService.fromCache() && !Boolean.valueOf(jobInfo.getIsCache())) {
continue;
}
if (!dbSyncService.fromCache() && !Boolean.valueOf(jobInfo.getIsDb())) {
continue;
}
boolean flag = true;
if (jobInfo.getDbWipe() != null && jobInfo.getDbWipe()) {
flag = dbSyncService.wipeDb(jobInfo, outConn, destDbInfo) > 0;
}
do {
long start = System.currentTimeMillis();
Map<String, Object> returnMap = dbSyncService.assembleSQLIn(jobInfo.getSrcSql(), inConn,
outConn, jobInfo, destDbInfo.getDbtype());
logger.info("组装SQL耗时: " + (new Date().getTime() - start) + "ms");
if (MapUtils.isNotEmpty(returnMap)) {
logger.debug(returnMap.get("sql").toString());
long eStart = new Date().getTime();
if (flag) {
dbSyncService.executeSQL(returnMap, outConn);
logger.info(jobInfo.getName() + "任务执行成功,执行SQL语句耗时: " + (new Date().getTime() - eStart) + "ms");
}
}
} while (dbSyncService.getRecord(sourceDbInfo, jobInfo));
//关闭资源
logger.error("关闭目标数据库连接");
destoryConnection(outConn);
}
//destoryCacheConnection(dbSyncService.getTempLateWithADC(),srcDb);
} catch (Exception e) {
logger.error(logTitle + e.getMessage(), e);
logger.error(logTitle + " SQL执行出错,请检查是否存在语法错误");
logger.error("关闭目标数据库连接");
destoryConnection(outConn);
//destoryCacheConnection(dbSyncService.getTempLateWithADC(),srcDb);
continue;
}
logger.error("关闭源数据库连接");
destoryConnection(inConn);
}
}
由于不同的来源 需要使用对应的处理 故采用实现spring的bean方式 进行拓展
如上的注入
@Autowired
private DBSyncServiceDelegate dbSyncServiceDelegate;
@Service
public class DBSyncServiceDelegate implements InitializingBean {
private Map<String, DBSyncService> serviceMap = new HashMap<>();
@Autowired
List<DBSyncService> serviceList;
/**
* 获取数据同步的服务的实体类
*
* @param dbType
* @return
*/
public DBSyncService getDBSyncService(String dbType) {
return serviceMap.get(dbType);
}
@Override
public void afterPropertiesSet() {
if (!CollectionUtils.isEmpty(serviceList)) {
for (DBSyncService dbSyncService : serviceList) {
serviceMap.put(dbSyncService.getDbType(), dbSyncService);
}
}
}
}
树代码中对应的类型链接 提供了不同的实现 如截图
如redis的实现
public void setTempLateWithADC(Object tempLateWithADC) {
if (tempLateWithADC instanceof RedisTemplate) {
this.redisTemplateWithADC = (RedisTemplate<String, byte[]>) tempLateWithADC;
}
}
对应的redis的创建
/**
* 创建缓存摸板
*
* @param db
* @return
*/
public synchronized Object createCacheTemplate(DBInfo db) {
if (Constants.CacheType.REDIS.getType().equals(db.getDbtype())) {
if (source.get(db.getUrl()) != null) {
return source.get(db.getUrl());
}
if (source.get(db.getNodes()) != null) {
return source.get(db.getNodes());
}
// 单点redis
RedisStandaloneConfiguration redisConfigStandalone = new RedisStandaloneConfiguration();
// 哨兵redis
RedisSentinelConfiguration redisConfigSentinel = new RedisSentinelConfiguration();
// 集群redis
// RedisClusterConfiguration redisConfig = new RedisClusterConfiguration();
if (StringUtils.isNotBlank(db.getNodes())) {
redisConfigSentinel.master(db.getMaster());
redisConfigSentinel.setSentinels(this.createSentinels(db.getNodes()));
redisConfigSentinel.setDatabase(Integer.valueOf(db.getDataBase()).intValue());
if (StringUtils.isNotBlank(db.getPassword())) {
redisConfigSentinel.setPassword(RedisPassword.of(db.getPassword()));
}
}
if (StringUtils.isNotBlank(db.getUrl())) {
redisConfigStandalone.setHostName(db.getUrl());
redisConfigStandalone.setDatabase(Integer.valueOf(db.getDataBase()).intValue());
redisConfigStandalone.setPassword(RedisPassword.of(db.getPassword()));
redisConfigStandalone.setPort(Integer.valueOf(db.getPort()).intValue());
}
// JedisConnectionFactory factory = new JedisConnectionFactory(redisConfig);
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(Constants.maxIdle);
genericObjectPoolConfig.setMinIdle(Constants.minIdle);
genericObjectPoolConfig.setMaxTotal(Constants.maxActive);
genericObjectPoolConfig.setMaxWaitMillis(Constants.maxWait);
//redis客户端配置
LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder
builder = LettucePoolingClientConfiguration.builder().
commandTimeout(Duration.ofMillis(Constants.redisTimeout));
builder.shutdownTimeout(Duration.ofMillis(Constants.redisTimeout));
builder.poolConfig(genericObjectPoolConfig);
LettuceClientConfiguration lettuceClientConfiguration = builder.build();
//根据配置和客户端配置创建连接
LettuceConnectionFactory lettuceConnectionFactory = null;
if (StringUtils.isNotBlank(db.getNodes())) {
lettuceConnectionFactory = new LettuceConnectionFactory(redisConfigSentinel, lettuceClientConfiguration);
}
if (StringUtils.isNotBlank(db.getUrl())) {
lettuceConnectionFactory = new LettuceConnectionFactory(redisConfigStandalone, lettuceClientConfiguration);
}
lettuceConnectionFactory.afterPropertiesSet();
// 考虑健壮性,增加这一配置,在每次访问时先校验连接
lettuceConnectionFactory.setValidateConnection(true);
//配置tempalte
RedisSerializer<byte[]> serializer = new RedisSerializer<byte[]>() {
@Override
public byte[] serialize(byte[] t) throws SerializationException {
return t;
}
@Override
public byte[] deserialize(byte[] bytes) throws SerializationException {
return bytes;
}
};
RedisTemplate<String, byte[]> template = new RedisTemplate<>();
template.setConnectionFactory(lettuceConnectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(serializer);
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
template.setDefaultSerializer(new StringRedisSerializer());
template.afterPropertiesSet();
String key = StringUtils.isBlank(db.getUrl()) ? db.getNodes() : db.getUrl();
source.put(key, template);
return template;
}
return null;
}
对应的组装sql的不同策略如截图
@Override
public Map<String, Object> assembleSQLIn(String srcSql, Connection conn, Connection outConnection, JobInfo jobInfo,
String dialect) throws SQLException {
//新建同步时间
Timestamp create_sync_time = null;
//更新同步时间
Timestamp up_sync_time = null;
List<String> columns;
Timestamp timestamp = null;
//插入脚本
String insertSql = null;
//更新脚本
String updateSql = null;
Map<String, Object> returnMap = Maps.newHashMap();
Map<String, Map<String, Integer>> numMap = countMap.get() == null ? new HashMap<>() : countMap.get();
if (numMap.get(jobInfo.getName()) == null) {
HashMap<String, Integer> countMapIn = Maps.newHashMap();
countMapIn.put("markNum", 0);
numMap.put(jobInfo.getName(), countMapIn);
} else {
numMap.get(jobInfo.getName()).replace("markNum", 0);
}
String uniqueName = this.generateString(6) + "_" + jobInfo.getName();
String[] fields = jobInfo.getSourceTableFields().split(",");
String[] fields_ = jobInfo.getDestTableFields().split(",");
fields = this.trimArrayItem(fields);
if (fields.length == 0) {
if (columnMap.get() != null && columnMap.get().get(jobInfo.getName()) != null) {
fields = columnMap.get().get(jobInfo.getName());
fields_ = columnMap.get().get(jobInfo.getName());
} else {
columns = this.getColumnNameList(outConnection, getRealDestTableName(jobInfo).toUpperCase());
fields = columns.toArray(fields);
fields_ = columns.toArray(fields_);
HashMap columnNameMap = new HashMap();
columnNameMap.put(jobInfo.getName(), fields);
columnMap.set(columnNameMap);
}
}
String[] conditionFields = jobInfo.getDestTableCondition().split(",");
conditionFields = this.trimArrayItem(conditionFields);
String destTable = getRealDestTableName(jobInfo);
String[] destTableKeys = jobInfo.getDestTableKey().split(",");
destTableKeys = this.trimArrayItem(destTableKeys);
String[] sourceTableKeys = jobInfo.getSourceTableKey().split(",");
sourceTableKeys = this.trimArrayItem(sourceTableKeys);
QueryWrapper<DBSyncJob> DBSyncJobWrapper = new QueryWrapper<>();
DBSyncJobWrapper.eq("JOB_NAME", jobInfo.getName());
List<DBSyncJob> dbSyncJobs = dbSyncJobService.getBaseMapper().selectList(DBSyncJobWrapper);
if (CollectionUtil.isNotEmpty(dbSyncJobs)) {
Timestamp dmCreatedTime = dbSyncJobs.get(0).getDmCreatedTime();
Timestamp dmUpdatedTime = dbSyncJobs.get(0).getDmUpdatedTime();
if (!srcSql.contains("where") && !srcSql.contains("WHERE")) {
srcSql += " where 1=1";
}
if (dmCreatedTime != null) {
create_sync_time = dmCreatedTime;
insertSql = srcSql;
insertSql += " AND " + conditionFields[0] + " > to_timestamp('" + dmCreatedTime + "','yyyy-mm-dd hh24:mi:ss.ff') " +
"AND " + conditionFields[1] + " > to_timestamp('" + dmUpdatedTime + "','yyyy-mm-dd hh24:mi:ss.ff')";
if (dmUpdatedTime != null) {
up_sync_time = dmUpdatedTime;
updateSql = srcSql;
updateSql += " AND " + conditionFields[1] + " > to_timestamp('" + dmUpdatedTime + "','yyyy-mm-dd hh24:mi:ss.ff') " +
"AND " + conditionFields[0] + " <= to_timestamp('" + dmCreatedTime + "','yyyy-mm-dd hh24:mi:ss.ff')";
}
}
}
if (conditionFields.length > 0) {
srcSql += " order by " + conditionFields[0] + "," + conditionFields[1] + " asc";
}
StringBuffer sql = new StringBuffer();
PreparedStatement pst = null;
ResultSet rs = null;
long count_insert_add = 0;
long count_update_add = 0;
Integer count_all = 0;
String sql_begin = "INSERT ALL ";
//执行增量
if (StringUtils.isNotBlank(insertSql)) {
pst = conn.prepareStatement(insertSql);
rs = pst.executeQuery();
StringBuffer insert_sql = new StringBuffer();
while (rs.next()) {
if (count_insert_add == 0) {
insert_sql.append(sql_begin);
}
if ((count_insert_add + 1) % maxRead == 0) {
insert_sql.append(" SELECT 1 FROM DUAL;");
insert_sql.append(" " + sql_begin);
count_insert_add = 0;
}
insert_sql.append("INTO " + destTable + " ").append(StringUtils.isBlank(jobInfo.getDestTableFields()) ? "" : "(" + jobInfo.getDestTableFields() + ") ").append("values (");
for (int index = 0; index < fields.length; index++) {
insert_sql.append(this.copyValueFromSourceDb(getObject(rs, fields[index], jobInfo), dialect)).append(index == (fields.length - 1) ? "" : ",");
if (StringUtils.equals(fields[index], conditionFields[0])) {
timestamp = getTimestamp(rs, fields[index], jobInfo);
create_sync_time = create_sync_time == null ? timestamp : timestamp.before(create_sync_time) ? create_sync_time : timestamp;
}
if (StringUtils.equals(fields[index], conditionFields[1])) {
timestamp = getTimestamp(rs, fields[index], jobInfo);
up_sync_time = up_sync_time == null ? timestamp : timestamp.before(up_sync_time) ? up_sync_time : timestamp;
}
}
insert_sql.append(") ");
count_insert_add++;
}
if (count_insert_add > 0) {
insert_sql.append(" SELECT 1 FROM DUAL;");
sql.append(insert_sql);
}
pst = conn.prepareStatement(updateSql);
rs = pst.executeQuery();
StringBuffer update_sql = new StringBuffer();
while (rs.next()) {
update_sql.append("UPDATE " + destTable + " SET ");
for (int index = 0; index < fields.length; index++) {
boolean tag = false;
for (String sourceTableKey : sourceTableKeys) {
if (fields[index].equals(sourceTableKey)) {
tag = true;
}
}
if (tag) {
continue;
}
update_sql.append(fields_[index] + " = " + this.copyValueFromSourceDb(
getObject(rs, fields[index], jobInfo), dialect))
.append(index == (fields.length - 1) ? "" : ",");
if (StringUtils.equals(fields[index], conditionFields[0])) {
timestamp = getTimestamp(rs, fields[index], jobInfo);
create_sync_time = create_sync_time == null ? timestamp : timestamp.before(create_sync_time) ? create_sync_time : timestamp;
}
if (StringUtils.equals(fields[index], conditionFields[1])) {
timestamp = getTimestamp(rs, fields[index], jobInfo);
up_sync_time = up_sync_time == null ? timestamp : timestamp.before(up_sync_time) ? up_sync_time : timestamp;
}
}
update_sql.append(" WHERE 1 = 1");
for (int i = 0; i < destTableKeys.length; i++) {
update_sql.append(" AND " + destTableKeys[i] + " = " + this.copyValueFromSourceDb(
getObject(rs, sourceTableKeys[i], jobInfo), dialect))
.append(i == (destTableKeys.length - 1) ? ";" : "");
}
count_update_add++;
}
if (count_update_add > 0) {
sql.append(update_sql);
}
} else {//执行全量
if (rsMap.get() == null || rsMap.get().get(jobInfo.getName()) == null) {
pst = conn.prepareStatement(srcSql);
rs = pst.executeQuery();
Map<String, ResultSet> rsMapIn = Maps.newHashMap();
rsMapIn.put(jobInfo.getName(), rs);
pstMap.set(pst);
rsMap.set(rsMapIn);
} else {
rs = rsMap.get().get(jobInfo.getName());
pst = pstMap.get();
}
StringBuffer src_sql = new StringBuffer();
//src_sql.append(sql_begin);
while (rs.next()) {
if (count_all == 0) {
src_sql.append(sql_begin);
}
src_sql.append("INTO " + destTable + " ").append(StringUtils.isBlank(jobInfo.getDestTableFields()) ? getColumnsFromMeta(fields_) : "(" + jobInfo.getDestTableFields() + ") ").append("values (");
for (int index = 0; index < fields.length; index++) {
//rs.getObject("");
src_sql.append(this.copyValueFromSourceDb(getObject(rs, fields[index], jobInfo), dialect))
.append(index == (fields.length - 1) ? "" : ",");
if (conditionFields.length > 0) {
if (StringUtils.equals(fields[index], conditionFields[0])) {
timestamp = getTimestamp(rs, fields[index], jobInfo);
if (timestamp != null) {
create_sync_time = create_sync_time == null ? timestamp : timestamp.before(create_sync_time) ? create_sync_time : timestamp;
}
}
if (StringUtils.equals(fields[index], conditionFields[1])) {
timestamp = getTimestamp(rs, fields[index], jobInfo);
if (timestamp != null) {
up_sync_time = up_sync_time == null ? timestamp : timestamp.before(up_sync_time) ? up_sync_time : timestamp;
}
}
}
}
src_sql.append(") ");
count_all++;
if ((count_all) % maxRead == 0) {
src_sql.append(" SELECT 1 FROM DUAL");
// src_sql.append(" " + sql_begin);
numMap.get(jobInfo.getName()).put("rowNum", count_all);
numMap.get(jobInfo.getName()).put("markNum", 1);
break;
}
}
if (count_all > 0) {
if (numMap.get(jobInfo.getName()).get("markNum") == 0) {
src_sql.append(" SELECT 1 FROM DUAL");
}
sql.append(src_sql);
}
}
if (rs != null && numMap.get(jobInfo.getName()).get("markNum") == 0) {
rs.close();
}
if (pst != null && numMap.get(jobInfo.getName()).get("markNum") == 0) {
pst.close();
}
countMap.set(numMap);
if (count_insert_add + count_update_add + count_all > 0) {
// sql.append(" SELECT 1 FROM DUAL;");
// if (!jobInfo.getDestTableKey().equals("")) {
// return new StringBuffer("alter table ").append(destTable).append(" add constraint ").append(uniqueName).append(" unique (").append(destTableKey).append(");").append(sql.toString())
// .append(";alter table ").append(destTable).append(" drop constraint ").append(uniqueName).append(";").toString();
// }
returnMap.put("sql", sql.toString());
returnMap.put("createSyncTime", create_sync_time);
returnMap.put("upSyncTime", up_sync_time);
returnMap.put("dbSyncJobs", dbSyncJobs);
returnMap.put("jobName", jobInfo.getName());
// 这里简单考虑,仅填写全量的数量
returnMap.put("count", count_all);
return returnMap;
}
return new HashMap<>();
}
其余的redis和oracle由于很多 这里就不一一提供了 可关注私信我获取源码。
好了 至此 Hive,Oracle,redis同步数据之-从其他数据源同步到本地数据库之一 点点关注不迷路 老铁们!!!!!