当前位置: 首页 > article >正文

【myXdb.stop()关闭时保存数据流程分析】xdb关服时数据落地源码

1)关闭xdb

myXdb.stop();

2)xdb.Xdb#close

 private final synchronized void close() {
        Trace.warn("xdb stop begin");
        this.openFlag = false;
        mbeans().unregisterAll();
        UniqName.stop();
        Engine.getInstance().close();
        Executor.stop();
        if (null != this.angelThread) {
            this.angelThread.shutdown();
            this.angelThread = null;
        }

        if (null != this.checkpoint) {
            this.checkpoint.shutdown();
            this.checkpoint = null;
        }

    
        // 重点: 设置关闭缓存. 这个其实是ThreadHelper的关闭
        // 也就是CheckpointCache继承于ThreadHelper
        if (null != this.checkpointcache) {
            this.checkpointcache.shutdown();
            this.checkpointcache = null;
        }

        if (null != this.tables) {
            this.tables.close();
            this.tables = null;
        }

        MySqlMgr.getInstance().exit();
        Trace.warn("xdb stop end");
    }

3)xdb.CheckpointCache#run // 既然是一个线程,那就看执行体

public void run() {
        while(super.isRunning()) {
            long now = System.currentTimeMillis();
            this.save(PROCESS_MAX_COUNT);
            long useTime = System.currentTimeMillis() - now;
            if (useTime >= (long)PROCESS_TIME) {
                super.sleepIdle(100L);
                Trace.info("checkpoint cache process time is " + useTime + "ms!");
            } else {
                super.sleepIdle((long)PROCESS_TIME - useTime);
            }
        }

        Trace.warn("final CheckPointCache begin");

        // 重点: 因此这个才是关闭的本质
        this.save(2147483647);
        Trace.warn("final CheckPointCache end");
        int totalCacheCount = this.totalCacheCount();
        if (totalCacheCount > 0) {
            Trace.error("final CheckPointCache left " + totalCacheCount);

            for(int i = 0; i < 60; ++i) {
                this.save(2147483647);
                totalCacheCount = this.totalCacheCount();
                if (totalCacheCount == 0) {
                    break;
                }

                Trace.error("final CheckPointCache i=" + i + " left=" + totalCacheCount);
                super.sleepIdle(10000L);
            }
        }

    }

4)xdb.CheckpointCache#save // 保存

 private void save(int maxCount) {
        // 拿到所有的表
        Iterator iter = this.cacheTables.values().iterator();
        
        while(iter.hasNext()) {
            // 挨个执行save
            ((CheckpointCacheInfo)iter.next()).save(maxCount);
        }

 }

5)xdb.CheckpointCacheInfo#save

public void save(int maxCount) {
        if (this.record.size() != 0) {
            int initRecordCount = this.record.size();
            HashSet<String> failedRecordKey = new HashSet();
            Iterator<Entry<String, Treble<State, OctetsStream, Long>>> iter = this.record.entrySet().iterator();
            int count = 0;
            String key;
            String var10000;
            if (initRecordCount >= CheckpointCache.PROCESS_BATCH || maxCount == 2147483647) {
                long now = System.currentTimeMillis();
                Connection conn = null;

                try {
                    conn = MySqlMgr.getInstance().getWriteConn();
                    PreparedStatement pstReplace = conn.prepareStatement("REPLACE INTO " + this.tableName + " VALUES(?, ?)");
                    pstReplace.setQueryTimeout(10);
                    PreparedStatement pstRemove = conn.prepareStatement("DELETE FROM " + this.tableName + " WHERE k=?");
                    pstRemove.setQueryTimeout(10);

                    label436:
                    while(true) {
                        while(true) {
                            Entry entry;
                            do {
                                if (!iter.hasNext()) {
                                    break label436;
                                }

                                entry = (Entry)iter.next();
                            } while((Long)((Treble)entry.getValue()).third >= now && maxCount < 2147483647);

                            if (this.isZero((String)entry.getKey())) {
                                var10000 = (String)entry.getKey();
                                Trace.error("Key is Zero! Key=" + var10000 + " tableName=" + this.tableName);
                                failedRecordKey.add((String)entry.getKey());
                            } else {
                                if (((Treble)entry.getValue()).first != State.INDB_GET && ((Treble)entry.getValue()).first != State.INDB_ADD && ((Treble)entry.getValue()).first != State.ADD) {
                                    if (((Treble)entry.getValue()).first != State.INDB_REMOVE) {
                                        continue;
                                    }

                                    failedRecordKey.add((String)entry.getKey());
                                    pstRemove.setString(1, (String)entry.getKey());
                                    pstRemove.addBatch();
                                } else {
                                    failedRecordKey.add((String)entry.getKey());
                                    pstReplace.setString(1, (String)entry.getKey());
                                    if (Xdb.getInstance().isJsonFlag()) {
                                        pstReplace.setString(2, ((OctetsStream)((Treble)entry.getValue()).second).unmarshalJson());
                                    } else {
                                        pstReplace.setBinaryStream(2, new ByteArrayInputStream(((OctetsStream)((Treble)entry.getValue()).second).array()), ((OctetsStream)((Treble)entry.getValue()).second).size());
                                    }

                                    pstReplace.addBatch();
                                }

                                ++count;
                                if (count >= maxCount) {
                                    break label436;
                                }
                            }
                        }
                    }

                    if (count > 0) {
                        pstReplace.executeBatch();
                        pstRemove.executeBatch();
                        conn.commit();
                    }

                    pstReplace.close();
                    pstRemove.close();
                    Iterator var34 = failedRecordKey.iterator();

                    label410:
                    while(true) {
                        do {
                            if (!var34.hasNext()) {
                                failedRecordKey.clear();
                                long useTime = System.currentTimeMillis() - now;
                                if (useTime > (long)MySqlMgr.MS_DEBUG && Trace.isDebugEnabled()) {
                                    Trace.debug("Execute Batch Use " + useTime + " tableName is " + this.tableName);
                                }

                                if (useTime > (long)MySqlMgr.MS_INFO * 2L) {
                                    Trace.info("Execute Batch Use " + useTime + " tableName is " + this.tableName);
                                }
                                break label410;
                            }

                            key = (String)var34.next();
                        } while((Long)((Treble)this.record.get(key)).third >= now && maxCount < 2147483647);

                        this.record.remove(key);
                    }
                } catch (Exception var25) {
                    if (conn != null) {
                        try {
                            conn.rollback();
                            Trace.error("save exec Batch Rollback Success " + this.tableName);
                        } catch (Exception var24) {
                            Trace.error("save exec Batch Rollback Exception " + this.tableName, var24);
                        }
                    }

                    Trace.error("save exec Batch ReplaceSQL Exception " + this.tableName, var25);
                } finally {
                    if (conn != null) {
                        try {
                            conn.close();
                        } catch (SQLException var22) {
                            Trace.error("write conn close", var22);
                        }
                    }

                }
            }

            if (initRecordCount < CheckpointCache.PROCESS_BATCH || failedRecordKey.size() > 0 || maxCount == 2147483647) {
                int countInit = count;
                long now = System.currentTimeMillis();
                iter = this.record.entrySet().iterator();

                label389:
                while(true) {
                    while(true) {
                        boolean bSuccess;
                        Entry entry;
                        do {
                            if (!iter.hasNext()) {
                                break label389;
                            }

                            bSuccess = true;
                            entry = (Entry)iter.next();
                        } while((Long)((Treble)entry.getValue()).third >= now && maxCount < 2147483647);

                        if (this.isZero((String)entry.getKey())) {
                            var10000 = (String)entry.getKey();
                            Trace.error("Key is Zero! Key=" + var10000 + " tableName=" + this.tableName);
                            failedRecordKey.add((String)entry.getKey());
                        } else {
                            switch((State)((Treble)entry.getValue()).first) {
                            case INDB_REMOVE:
                                var10000 = this.tableName;
                                key = "DELETE FROM " + var10000 + " WHERE k='" + (String)entry.getKey() + "'";
                                if (MySqlMgr.getInstance().execUpdate(key, (OctetsStream)null) < 0 && MySqlMgr.getInstance().execUpdate(key, (OctetsStream)null) < 0) {
                                    failedRecordKey.add((String)entry.getKey());
                                    bSuccess = false;
                                }
                            case REMOVE:
                            default:
                                break;
                            case INDB_GET:
                            case INDB_ADD:
                            case ADD:
                                var10000 = this.tableName;
                                String replaceSql = "REPLACE INTO " + var10000 + " VALUES('" + (String)entry.getKey() + "', ?)";
                                if (MySqlMgr.getInstance().execUpdate(replaceSql, (OctetsStream)((Treble)entry.getValue()).second) < 0 && MySqlMgr.getInstance().execUpdate(replaceSql, (OctetsStream)((Treble)entry.getValue()).second) < 0) {
                                    failedRecordKey.add((String)entry.getKey());
                                    bSuccess = false;
                                }
                            }

                            if (bSuccess && ((Long)((Treble)this.record.get(entry.getKey())).third < now || maxCount == 2147483647)) {
                                iter.remove();
                            }

                            if (bSuccess) {
                                ++count;
                                if (count - countInit >= CheckpointCache.PROCESS_SINGLE && maxCount < CheckpointCache.PROCESS_MAX_COUNT) {
                                    break label389;
                                }
                            }
                        }
                    }
                }
            }

            if (failedRecordKey.size() > 0) {
                var10000 = this.tableName;
                Trace.error(var10000 + " success:" + count + " failed:" + failedRecordKey.size() + " left:" + this.record.size());
                Iterator var28 = failedRecordKey.iterator();

                while(var28.hasNext()) {
                    String key = (String)var28.next();
                    Treble<State, OctetsStream, Long> info = (Treble)this.record.get(key);
                    if (info != null && info.second != null) {
                        try {
                            String jsonStr = ((OctetsStream)info.second).unmarshalJson();
                            if (jsonStr.length() > 1000) {
                                Trace.error(this.tableName + " key=" + key + " json size=" + jsonStr.length());
                            } else {
                                Trace.error(this.tableName + " key=" + key + " json=" + jsonStr);
                            }
                        } catch (MarshalException var23) {
                            Trace.error(this.tableName + " key=" + key, var23);
                        }
                    }

                    if (MySqlMgr.failedClearMap.get(this.tableName + key) != null) {
                        this.record.remove(key);
                        Trace.error(this.tableName + " key=" + key + " is removed!");
                    }
                }
            } else {
                Trace.info(this.tableName + " success:" + count + " left:" + this.record.size());
            }

        }
    }

理解:

其实就是找到key和value, value是json。

而且是批量处理。

字段是删除还是别的,如果是删除会执行delete操作删除数据。


http://www.kler.cn/a/443912.html

相关文章:

  • 你好Python
  • Godot RPG 游戏开发指南
  • 堆【Lecode_HOT100】
  • 面试小札:Java后端闪电五连鞭_8
  • Windows安全中心(病毒和威胁防护)的注册
  • Unity 碎片化空间的产生和优化
  • 基于阿里云日志服务的程序优化策略与实践
  • 关于目标检测YOLO 各版本区别v1-v11/X/R/P
  • go语言并发读写数据队列,不停写的同时,一次最多读取指定量数据(逐行注释)
  • 【自动驾驶】Ubuntu20.04安装ROS1 Noetic
  • 在C#中,可以通过使用委托(delegate)或者是事件(event)来将方法作为参数传递。
  • Redis篇-14--数据结构篇6--Set内存模型(整数集合intset,哈希表hashtable)
  • 爬虫可能会遇到哪些反爬措施?
  • 【AI热点】小型语言模型(SLM)的崛起:如何在AI时代中找到你的“左膀右臂”?
  • 在 Go 中利用 ffmpeg 进行视频和音频处理
  • Java web概述
  • v-html详细解析与代码实例
  • GB_T 38636 《信息安全技术 传输层密码协议(TLCP)》题目
  • 如何在OneNote中高效使用copilot
  • Pytorch中关于Tensor的操作
  • 基于vue3实现小程序手机号一键登录
  • 常用的前端框架介绍
  • 蓝桥杯摆烂第三天
  • AutoMQ 流表一体新特性 Table Topic 发布: 无缝集成 AWS S3 Table 和 Iceberg
  • Ubuntu本地化安装MYSQL及Navicat
  • Unity 上好用的插件