当前位置: 首页 > article >正文

Hive,Oracle,redis同步数据之-从其他数据源同步到本地数据库之一

前言
对于金融数据 证券财报信息等来源比较广泛,类型也很杂乱。需要处理成自己公司统一标准的数据集,就会使用到数据的同步和处理清理等操作。
这里给出将其他数据同步到本地数据作为一个方案的演变 分享给大家。
本文会使用 构造者 策略 模版,工厂得设计模式思想带你进行企业级的代码编写
实战开始
在这里插入图片描述
下属存放对应的配置文件的信息 和使用xstream流的方式解析获取对应的配置信息
在这里插入图片描述
部分xml文件如下
在这里插入图片描述
我们 将启动定时任务,同步其他数据源到本地数据库

public void startIn() {
        if (CollectionUtils.isEmpty(jobList)) {
            logger.info("DBSync-in配置读取错误或未配置任务");
            return;
        }
        for (int index = 0; index < jobList.size(); index++) {
            JobInfo jobInfo = jobList.get(index);
            String logTitle = "[" + code + "]-" + jobInfo.getName() + " ";
            try {
                //系统监听的方式调用
                Map<String, Object> jobMap = new HashMap<>();
                jobMap.put("srcDbs", srcDbs);
                jobMap.put("destDbs", destDbs);
                jobMap.put("jobInfo", jobInfo);
                jobMap.put("logTitle", logTitle);
                //设置多线程方式执行
                logger.info("执行多线程-----同步到本地");
                jobDBSync.executeIn(jobMap);
            } catch (Exception e) {
                logger.info(logTitle + e.getMessage());
                logger.info(logTitle + " run failed");
                continue;
            }
        }
    }

使用异步线程池注解启动

 @Async("taskExector")
    public void executeIn(Map<String, Object> data) {
        logger.info("开始任务调度: " + new Date() + Thread.currentThread().getName());
        Connection inConn = null;
        Connection outConn = null;
        List<DBInfo> srcDbs = (List<DBInfo>) data.get("srcDbs");
        List<DBInfo> destDbs = (List<DBInfo>) data.get("destDbs");
        JobInfo jobInfo = (JobInfo) data.get("jobInfo");
        String logTitle = (String) data.get("logTitle");
        if (!checkOptions(jobInfo)) {
            return;
        }
        List<String> sourceDBNos = jobInfo.getSourceDBNo();
        for (String sourceDBNo : sourceDBNos) {
            DBInfo sourceDbInfo = getDBInfo(srcDbs, sourceDBNo);
            DBSyncService dbSyncService = dbSyncServiceDelegate.getDBSyncService(sourceDbInfo.getDbtype());
            try {
                if (!dbSyncService.fromCache()) {
                    inConn = createConnection(sourceDbInfo);
                    if (inConn == null) {
                        logger.info("请检查源数据连接!");
                        continue;
                    }
                } else {
                    //实例化redisTemplate
                    dbSyncService.setTempLateWithADC(createCacheTemplate(sourceDbInfo));
                }

                for (String destDBNo : jobInfo.getDestDBNo()) {
                    DBInfo destDbInfo = getDBInfo(destDbs, destDBNo);
                    outConn = createConnection(destDbInfo);
                    if (outConn == null) {
                        logger.info("请检查目标数据连接!");
                        return;
                    }
                    if (dbSyncService.fromCache() && !Boolean.valueOf(jobInfo.getIsCache())) {
                        continue;
                    }
                    if (!dbSyncService.fromCache() && !Boolean.valueOf(jobInfo.getIsDb())) {
                        continue;
                    }
                    boolean flag = true;
                    if (jobInfo.getDbWipe() != null && jobInfo.getDbWipe()) {
                        flag = dbSyncService.wipeDb(jobInfo, outConn, destDbInfo) > 0;
                    }

                    do {
                        long start = System.currentTimeMillis();
                        Map<String, Object> returnMap = dbSyncService.assembleSQLIn(jobInfo.getSrcSql(), inConn,
                                outConn, jobInfo, destDbInfo.getDbtype());
                        logger.info("组装SQL耗时: " + (new Date().getTime() - start) + "ms");
                        if (MapUtils.isNotEmpty(returnMap)) {
                            logger.debug(returnMap.get("sql").toString());
                            long eStart = new Date().getTime();
                            if (flag) {
                                dbSyncService.executeSQL(returnMap, outConn);
                                logger.info(jobInfo.getName() + "任务执行成功,执行SQL语句耗时: " + (new Date().getTime() - eStart) + "ms");
                            }
                        }
                    } while (dbSyncService.getRecord(sourceDbInfo, jobInfo));
                    //关闭资源
                    logger.error("关闭目标数据库连接");
                    destoryConnection(outConn);
                }
                //destoryCacheConnection(dbSyncService.getTempLateWithADC(),srcDb);
            } catch (Exception e) {
                logger.error(logTitle + e.getMessage(), e);
                logger.error(logTitle + " SQL执行出错,请检查是否存在语法错误");
                logger.error("关闭目标数据库连接");
                destoryConnection(outConn);
                //destoryCacheConnection(dbSyncService.getTempLateWithADC(),srcDb);
                continue;
            }
            logger.error("关闭源数据库连接");
            destoryConnection(inConn);
        }
    }

由于不同的来源 需要使用对应的处理 故采用实现spring的bean方式 进行拓展
如上的注入

 @Autowired
    private DBSyncServiceDelegate dbSyncServiceDelegate;
@Service
public class DBSyncServiceDelegate implements InitializingBean {

    private Map<String, DBSyncService> serviceMap = new HashMap<>();

    @Autowired
    List<DBSyncService> serviceList;

    /**
     * 获取数据同步的服务的实体类
     *
     * @param dbType
     * @return
     */
    public DBSyncService getDBSyncService(String dbType) {
        return serviceMap.get(dbType);
    }


    @Override
    public void afterPropertiesSet() {
        if (!CollectionUtils.isEmpty(serviceList)) {
            for (DBSyncService dbSyncService : serviceList) {
                serviceMap.put(dbSyncService.getDbType(), dbSyncService);
            }
        }
    }
}

树代码中对应的类型链接 提供了不同的实现 如截图
在这里插入图片描述
如redis的实现

 public void setTempLateWithADC(Object tempLateWithADC) {
        if (tempLateWithADC instanceof RedisTemplate) {
            this.redisTemplateWithADC = (RedisTemplate<String, byte[]>) tempLateWithADC;
        }
    }

对应的redis的创建

 /**
     * 创建缓存摸板
     *
     * @param db
     * @return
     */
    public synchronized Object createCacheTemplate(DBInfo db) {
        if (Constants.CacheType.REDIS.getType().equals(db.getDbtype())) {
            if (source.get(db.getUrl()) != null) {
                return source.get(db.getUrl());
            }
            if (source.get(db.getNodes()) != null) {
                return source.get(db.getNodes());
            }
            // 单点redis
            RedisStandaloneConfiguration redisConfigStandalone = new RedisStandaloneConfiguration();
            // 哨兵redis
            RedisSentinelConfiguration redisConfigSentinel = new RedisSentinelConfiguration();
            // 集群redis
            // RedisClusterConfiguration redisConfig = new RedisClusterConfiguration();
            if (StringUtils.isNotBlank(db.getNodes())) {
                redisConfigSentinel.master(db.getMaster());
                redisConfigSentinel.setSentinels(this.createSentinels(db.getNodes()));
                redisConfigSentinel.setDatabase(Integer.valueOf(db.getDataBase()).intValue());
                if (StringUtils.isNotBlank(db.getPassword())) {
                    redisConfigSentinel.setPassword(RedisPassword.of(db.getPassword()));
                }
            }
            if (StringUtils.isNotBlank(db.getUrl())) {
                redisConfigStandalone.setHostName(db.getUrl());
                redisConfigStandalone.setDatabase(Integer.valueOf(db.getDataBase()).intValue());
                redisConfigStandalone.setPassword(RedisPassword.of(db.getPassword()));
                redisConfigStandalone.setPort(Integer.valueOf(db.getPort()).intValue());
            }
//            JedisConnectionFactory factory = new JedisConnectionFactory(redisConfig);
            GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
            genericObjectPoolConfig.setMaxIdle(Constants.maxIdle);
            genericObjectPoolConfig.setMinIdle(Constants.minIdle);
            genericObjectPoolConfig.setMaxTotal(Constants.maxActive);
            genericObjectPoolConfig.setMaxWaitMillis(Constants.maxWait);

            //redis客户端配置
            LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder
                    builder = LettucePoolingClientConfiguration.builder().
                    commandTimeout(Duration.ofMillis(Constants.redisTimeout));
            builder.shutdownTimeout(Duration.ofMillis(Constants.redisTimeout));
            builder.poolConfig(genericObjectPoolConfig);
            LettuceClientConfiguration lettuceClientConfiguration = builder.build();

            //根据配置和客户端配置创建连接
            LettuceConnectionFactory lettuceConnectionFactory = null;
            if (StringUtils.isNotBlank(db.getNodes())) {
                lettuceConnectionFactory = new LettuceConnectionFactory(redisConfigSentinel, lettuceClientConfiguration);
            }
            if (StringUtils.isNotBlank(db.getUrl())) {
                lettuceConnectionFactory = new LettuceConnectionFactory(redisConfigStandalone, lettuceClientConfiguration);
            }
            lettuceConnectionFactory.afterPropertiesSet();
            // 考虑健壮性,增加这一配置,在每次访问时先校验连接
            lettuceConnectionFactory.setValidateConnection(true);

            //配置tempalte
            RedisSerializer<byte[]> serializer = new RedisSerializer<byte[]>() {
                @Override
                public byte[] serialize(byte[] t) throws SerializationException {
                    return t;
                }

                @Override
                public byte[] deserialize(byte[] bytes) throws SerializationException {
                    return bytes;
                }
            };
            RedisTemplate<String, byte[]> template = new RedisTemplate<>();
            template.setConnectionFactory(lettuceConnectionFactory);
            template.setKeySerializer(new StringRedisSerializer());
            template.setValueSerializer(serializer);
            template.setHashKeySerializer(new StringRedisSerializer());
            template.setHashValueSerializer(serializer);
            template.setDefaultSerializer(new StringRedisSerializer());
            template.afterPropertiesSet();
            String key = StringUtils.isBlank(db.getUrl()) ? db.getNodes() : db.getUrl();
            source.put(key, template);
            return template;
        }
        return null;
    }

对应的组装sql的不同策略如截图
在这里插入图片描述

 @Override
    public Map<String, Object> assembleSQLIn(String srcSql, Connection conn, Connection outConnection, JobInfo jobInfo,
                                             String dialect) throws SQLException {
        //新建同步时间
        Timestamp create_sync_time = null;
        //更新同步时间
        Timestamp up_sync_time = null;

        List<String> columns;

        Timestamp timestamp = null;

        //插入脚本
        String insertSql = null;
        //更新脚本
        String updateSql = null;

        Map<String, Object> returnMap = Maps.newHashMap();
        Map<String, Map<String, Integer>> numMap = countMap.get() == null ? new HashMap<>() : countMap.get();
        if (numMap.get(jobInfo.getName()) == null) {
            HashMap<String, Integer> countMapIn = Maps.newHashMap();
            countMapIn.put("markNum", 0);
            numMap.put(jobInfo.getName(), countMapIn);
        } else {
            numMap.get(jobInfo.getName()).replace("markNum", 0);
        }
        String uniqueName = this.generateString(6) + "_" + jobInfo.getName();
        String[] fields = jobInfo.getSourceTableFields().split(",");
        String[] fields_ = jobInfo.getDestTableFields().split(",");
        fields = this.trimArrayItem(fields);
        if (fields.length == 0) {
            if (columnMap.get() != null && columnMap.get().get(jobInfo.getName()) != null) {
                fields = columnMap.get().get(jobInfo.getName());
                fields_ = columnMap.get().get(jobInfo.getName());
            } else {
                columns = this.getColumnNameList(outConnection, getRealDestTableName(jobInfo).toUpperCase());
                fields = columns.toArray(fields);
                fields_ = columns.toArray(fields_);
                HashMap columnNameMap = new HashMap();
                columnNameMap.put(jobInfo.getName(), fields);
                columnMap.set(columnNameMap);
            }
        }
        String[] conditionFields = jobInfo.getDestTableCondition().split(",");
        conditionFields = this.trimArrayItem(conditionFields);
        String destTable = getRealDestTableName(jobInfo);
        String[] destTableKeys = jobInfo.getDestTableKey().split(",");
        destTableKeys = this.trimArrayItem(destTableKeys);
        String[] sourceTableKeys = jobInfo.getSourceTableKey().split(",");
        sourceTableKeys = this.trimArrayItem(sourceTableKeys);

        QueryWrapper<DBSyncJob> DBSyncJobWrapper = new QueryWrapper<>();
        DBSyncJobWrapper.eq("JOB_NAME", jobInfo.getName());
        List<DBSyncJob> dbSyncJobs = dbSyncJobService.getBaseMapper().selectList(DBSyncJobWrapper);
        if (CollectionUtil.isNotEmpty(dbSyncJobs)) {
            Timestamp dmCreatedTime = dbSyncJobs.get(0).getDmCreatedTime();
            Timestamp dmUpdatedTime = dbSyncJobs.get(0).getDmUpdatedTime();
            if (!srcSql.contains("where") && !srcSql.contains("WHERE")) {
                srcSql += " where 1=1";
            }
            if (dmCreatedTime != null) {
                create_sync_time = dmCreatedTime;
                insertSql = srcSql;
                insertSql += " AND " + conditionFields[0] + " > to_timestamp('" + dmCreatedTime + "','yyyy-mm-dd hh24:mi:ss.ff') " +
                        "AND " + conditionFields[1] + " > to_timestamp('" + dmUpdatedTime + "','yyyy-mm-dd hh24:mi:ss.ff')";
                if (dmUpdatedTime != null) {
                    up_sync_time = dmUpdatedTime;
                    updateSql = srcSql;
                    updateSql += " AND " + conditionFields[1] + " > to_timestamp('" + dmUpdatedTime + "','yyyy-mm-dd hh24:mi:ss.ff') " +
                            "AND " + conditionFields[0] + " <= to_timestamp('" + dmCreatedTime + "','yyyy-mm-dd hh24:mi:ss.ff')";
                }
            }
        }
        if (conditionFields.length > 0) {
            srcSql += " order by " + conditionFields[0] + "," + conditionFields[1] + " asc";
        }

        StringBuffer sql = new StringBuffer();
        PreparedStatement pst = null;
        ResultSet rs = null;
        long count_insert_add = 0;
        long count_update_add = 0;
        Integer count_all = 0;
        String sql_begin = "INSERT ALL ";
        //执行增量
        if (StringUtils.isNotBlank(insertSql)) {
            pst = conn.prepareStatement(insertSql);
            rs = pst.executeQuery();
            StringBuffer insert_sql = new StringBuffer();
            while (rs.next()) {
                if (count_insert_add == 0) {
                    insert_sql.append(sql_begin);
                }
                if ((count_insert_add + 1) % maxRead == 0) {
                    insert_sql.append(" SELECT 1 FROM DUAL;");
                    insert_sql.append(" " + sql_begin);
                    count_insert_add = 0;
                }
                insert_sql.append("INTO " + destTable + " ").append(StringUtils.isBlank(jobInfo.getDestTableFields()) ? "" : "(" + jobInfo.getDestTableFields() + ") ").append("values (");
                for (int index = 0; index < fields.length; index++) {
                    insert_sql.append(this.copyValueFromSourceDb(getObject(rs, fields[index], jobInfo), dialect)).append(index == (fields.length - 1) ? "" : ",");
                    if (StringUtils.equals(fields[index], conditionFields[0])) {
                        timestamp = getTimestamp(rs, fields[index], jobInfo);
                        create_sync_time = create_sync_time == null ? timestamp : timestamp.before(create_sync_time) ? create_sync_time : timestamp;
                    }
                    if (StringUtils.equals(fields[index], conditionFields[1])) {
                        timestamp = getTimestamp(rs, fields[index], jobInfo);
                        up_sync_time = up_sync_time == null ? timestamp : timestamp.before(up_sync_time) ? up_sync_time : timestamp;
                    }
                }
                insert_sql.append(") ");
                count_insert_add++;
            }
            if (count_insert_add > 0) {
                insert_sql.append(" SELECT 1 FROM DUAL;");
                sql.append(insert_sql);
            }
            pst = conn.prepareStatement(updateSql);
            rs = pst.executeQuery();
            StringBuffer update_sql = new StringBuffer();
            while (rs.next()) {
                update_sql.append("UPDATE  " + destTable + " SET ");
                for (int index = 0; index < fields.length; index++) {
                    boolean tag = false;
                    for (String sourceTableKey : sourceTableKeys) {
                        if (fields[index].equals(sourceTableKey)) {
                            tag = true;
                        }
                    }
                    if (tag) {
                        continue;
                    }
                    update_sql.append(fields_[index] + " = " + this.copyValueFromSourceDb(
                            getObject(rs, fields[index], jobInfo), dialect))
                            .append(index == (fields.length - 1) ? "" : ",");
                    if (StringUtils.equals(fields[index], conditionFields[0])) {
                        timestamp = getTimestamp(rs, fields[index], jobInfo);
                        create_sync_time = create_sync_time == null ? timestamp : timestamp.before(create_sync_time) ? create_sync_time : timestamp;
                    }
                    if (StringUtils.equals(fields[index], conditionFields[1])) {
                        timestamp = getTimestamp(rs, fields[index], jobInfo);
                        up_sync_time = up_sync_time == null ? timestamp : timestamp.before(up_sync_time) ? up_sync_time : timestamp;
                    }
                }
                update_sql.append(" WHERE 1 = 1");
                for (int i = 0; i < destTableKeys.length; i++) {
                    update_sql.append(" AND " + destTableKeys[i] + " = " + this.copyValueFromSourceDb(
                            getObject(rs, sourceTableKeys[i], jobInfo), dialect))
                            .append(i == (destTableKeys.length - 1) ? ";" : "");
                }
                count_update_add++;
            }
            if (count_update_add > 0) {
                sql.append(update_sql);
            }
        } else {//执行全量
            if (rsMap.get() == null || rsMap.get().get(jobInfo.getName()) == null) {
                pst = conn.prepareStatement(srcSql);
                rs = pst.executeQuery();
                Map<String, ResultSet> rsMapIn = Maps.newHashMap();
                rsMapIn.put(jobInfo.getName(), rs);
                pstMap.set(pst);
                rsMap.set(rsMapIn);
            } else {
                rs = rsMap.get().get(jobInfo.getName());
                pst = pstMap.get();
            }
            StringBuffer src_sql = new StringBuffer();
            //src_sql.append(sql_begin);
            while (rs.next()) {
                if (count_all == 0) {
                    src_sql.append(sql_begin);
                }
                src_sql.append("INTO " + destTable + " ").append(StringUtils.isBlank(jobInfo.getDestTableFields()) ? getColumnsFromMeta(fields_) : "(" + jobInfo.getDestTableFields() + ") ").append("values (");
                for (int index = 0; index < fields.length; index++) {
                    //rs.getObject("");
                    src_sql.append(this.copyValueFromSourceDb(getObject(rs, fields[index], jobInfo), dialect))
                            .append(index == (fields.length - 1) ? "" : ",");
                    if (conditionFields.length > 0) {
                        if (StringUtils.equals(fields[index], conditionFields[0])) {
                            timestamp = getTimestamp(rs, fields[index], jobInfo);
                            if (timestamp != null) {
                                create_sync_time = create_sync_time == null ? timestamp : timestamp.before(create_sync_time) ? create_sync_time : timestamp;
                            }
                        }
                        if (StringUtils.equals(fields[index], conditionFields[1])) {
                            timestamp = getTimestamp(rs, fields[index], jobInfo);
                            if (timestamp != null) {
                                up_sync_time = up_sync_time == null ? timestamp : timestamp.before(up_sync_time) ? up_sync_time : timestamp;
                            }
                        }
                    }
                }
                src_sql.append(") ");
                count_all++;
                if ((count_all) % maxRead == 0) {
                    src_sql.append(" SELECT 1 FROM DUAL");
//                    src_sql.append(" " + sql_begin);
                    numMap.get(jobInfo.getName()).put("rowNum", count_all);
                    numMap.get(jobInfo.getName()).put("markNum", 1);
                    break;
                }
            }
            if (count_all > 0) {
                if (numMap.get(jobInfo.getName()).get("markNum") == 0) {
                    src_sql.append(" SELECT 1 FROM DUAL");
                }
                sql.append(src_sql);
            }
        }

        if (rs != null && numMap.get(jobInfo.getName()).get("markNum") == 0) {
            rs.close();
        }
        if (pst != null && numMap.get(jobInfo.getName()).get("markNum") == 0) {
            pst.close();
        }
        countMap.set(numMap);
        if (count_insert_add + count_update_add + count_all > 0) {
//            sql.append(" SELECT 1 FROM DUAL;");
//            if (!jobInfo.getDestTableKey().equals("")) {
//                return new StringBuffer("alter table ").append(destTable).append(" add constraint ").append(uniqueName).append(" unique (").append(destTableKey).append(");").append(sql.toString())
//                        .append(";alter table ").append(destTable).append(" drop constraint ").append(uniqueName).append(";").toString();
//            }
            returnMap.put("sql", sql.toString());
            returnMap.put("createSyncTime", create_sync_time);
            returnMap.put("upSyncTime", up_sync_time);
            returnMap.put("dbSyncJobs", dbSyncJobs);
            returnMap.put("jobName", jobInfo.getName());
            // 这里简单考虑,仅填写全量的数量
            returnMap.put("count", count_all);
            return returnMap;
        }
        return new HashMap<>();
    }

其余的redis和oracle由于很多 这里就不一一提供了 可关注私信我获取源码。

好了 至此 Hive,Oracle,redis同步数据之-从其他数据源同步到本地数据库之一 点点关注不迷路 老铁们!!!!!


http://www.kler.cn/a/502294.html

相关文章:

  • 无源器件-电容
  • Java100道面试题
  • Java Web开发进阶——错误处理与日志管理
  • Pandas常用数据类型
  • 网络应用技术 实验七:实现无线局域网
  • Copula算法原理和R语言股市收益率相依性可视化分析
  • macos python环境安装
  • Appium:Android 和 iOS 的capabilities是否需要前缀?
  • 监督学习、无监督学习和强化学习的特点和应用场景
  • Python目录结构参考
  • 基于华为atlas的重车(满载)空车(空载)识别
  • 某漫画网站JS逆向反混淆流程分析
  • Vue2+OpenLayers添加/删除点、点击事件功能实现(提供Gitee源码)
  • 正点原子STM32F103战舰版电容触摸键学习
  • LabVIEW光流算法的应用
  • HTML 闪烁动画(Blink Animation)
  • 前端练习题
  • MACPA:fMRI连接性分析的新工具
  • 第 5 场 算法季度赛
  • 51c大模型~合集104
  • 计算机网络之---网络层的基本功能
  • 使用python调用JIRA6 REST API及遇到的问题
  • 自然语言处理之jieba分词和TF-IDF分析
  • Linux 高级路由 —— 筑梦之路
  • 链条缺陷检测数据集VOC+YOLO格式1422张7类别
  • [Android]service命令的使用