【myXdb.stop()关闭时保存数据流程分析】xdb关服时数据落地源码
1)关闭xdb
myXdb.stop();
2)xdb.Xdb#close
private final synchronized void close() {
Trace.warn("xdb stop begin");
this.openFlag = false;
mbeans().unregisterAll();
UniqName.stop();
Engine.getInstance().close();
Executor.stop();
if (null != this.angelThread) {
this.angelThread.shutdown();
this.angelThread = null;
}
if (null != this.checkpoint) {
this.checkpoint.shutdown();
this.checkpoint = null;
}
// 重点: 设置关闭缓存. 这个其实是ThreadHelper的关闭
// 也就是CheckpointCache继承于ThreadHelper
if (null != this.checkpointcache) {
this.checkpointcache.shutdown();
this.checkpointcache = null;
}
if (null != this.tables) {
this.tables.close();
this.tables = null;
}
MySqlMgr.getInstance().exit();
Trace.warn("xdb stop end");
}
3)xdb.CheckpointCache#run // 既然是一个线程,那就看执行体
public void run() {
while(super.isRunning()) {
long now = System.currentTimeMillis();
this.save(PROCESS_MAX_COUNT);
long useTime = System.currentTimeMillis() - now;
if (useTime >= (long)PROCESS_TIME) {
super.sleepIdle(100L);
Trace.info("checkpoint cache process time is " + useTime + "ms!");
} else {
super.sleepIdle((long)PROCESS_TIME - useTime);
}
}
Trace.warn("final CheckPointCache begin");
// 重点: 因此这个才是关闭的本质
this.save(2147483647);
Trace.warn("final CheckPointCache end");
int totalCacheCount = this.totalCacheCount();
if (totalCacheCount > 0) {
Trace.error("final CheckPointCache left " + totalCacheCount);
for(int i = 0; i < 60; ++i) {
this.save(2147483647);
totalCacheCount = this.totalCacheCount();
if (totalCacheCount == 0) {
break;
}
Trace.error("final CheckPointCache i=" + i + " left=" + totalCacheCount);
super.sleepIdle(10000L);
}
}
}
4)xdb.CheckpointCache#save // 保存
private void save(int maxCount) {
// 拿到所有的表
Iterator iter = this.cacheTables.values().iterator();
while(iter.hasNext()) {
// 挨个执行save
((CheckpointCacheInfo)iter.next()).save(maxCount);
}
}
5)xdb.CheckpointCacheInfo#save
public void save(int maxCount) {
if (this.record.size() != 0) {
int initRecordCount = this.record.size();
HashSet<String> failedRecordKey = new HashSet();
Iterator<Entry<String, Treble<State, OctetsStream, Long>>> iter = this.record.entrySet().iterator();
int count = 0;
String key;
String var10000;
if (initRecordCount >= CheckpointCache.PROCESS_BATCH || maxCount == 2147483647) {
long now = System.currentTimeMillis();
Connection conn = null;
try {
conn = MySqlMgr.getInstance().getWriteConn();
PreparedStatement pstReplace = conn.prepareStatement("REPLACE INTO " + this.tableName + " VALUES(?, ?)");
pstReplace.setQueryTimeout(10);
PreparedStatement pstRemove = conn.prepareStatement("DELETE FROM " + this.tableName + " WHERE k=?");
pstRemove.setQueryTimeout(10);
label436:
while(true) {
while(true) {
Entry entry;
do {
if (!iter.hasNext()) {
break label436;
}
entry = (Entry)iter.next();
} while((Long)((Treble)entry.getValue()).third >= now && maxCount < 2147483647);
if (this.isZero((String)entry.getKey())) {
var10000 = (String)entry.getKey();
Trace.error("Key is Zero! Key=" + var10000 + " tableName=" + this.tableName);
failedRecordKey.add((String)entry.getKey());
} else {
if (((Treble)entry.getValue()).first != State.INDB_GET && ((Treble)entry.getValue()).first != State.INDB_ADD && ((Treble)entry.getValue()).first != State.ADD) {
if (((Treble)entry.getValue()).first != State.INDB_REMOVE) {
continue;
}
failedRecordKey.add((String)entry.getKey());
pstRemove.setString(1, (String)entry.getKey());
pstRemove.addBatch();
} else {
failedRecordKey.add((String)entry.getKey());
pstReplace.setString(1, (String)entry.getKey());
if (Xdb.getInstance().isJsonFlag()) {
pstReplace.setString(2, ((OctetsStream)((Treble)entry.getValue()).second).unmarshalJson());
} else {
pstReplace.setBinaryStream(2, new ByteArrayInputStream(((OctetsStream)((Treble)entry.getValue()).second).array()), ((OctetsStream)((Treble)entry.getValue()).second).size());
}
pstReplace.addBatch();
}
++count;
if (count >= maxCount) {
break label436;
}
}
}
}
if (count > 0) {
pstReplace.executeBatch();
pstRemove.executeBatch();
conn.commit();
}
pstReplace.close();
pstRemove.close();
Iterator var34 = failedRecordKey.iterator();
label410:
while(true) {
do {
if (!var34.hasNext()) {
failedRecordKey.clear();
long useTime = System.currentTimeMillis() - now;
if (useTime > (long)MySqlMgr.MS_DEBUG && Trace.isDebugEnabled()) {
Trace.debug("Execute Batch Use " + useTime + " tableName is " + this.tableName);
}
if (useTime > (long)MySqlMgr.MS_INFO * 2L) {
Trace.info("Execute Batch Use " + useTime + " tableName is " + this.tableName);
}
break label410;
}
key = (String)var34.next();
} while((Long)((Treble)this.record.get(key)).third >= now && maxCount < 2147483647);
this.record.remove(key);
}
} catch (Exception var25) {
if (conn != null) {
try {
conn.rollback();
Trace.error("save exec Batch Rollback Success " + this.tableName);
} catch (Exception var24) {
Trace.error("save exec Batch Rollback Exception " + this.tableName, var24);
}
}
Trace.error("save exec Batch ReplaceSQL Exception " + this.tableName, var25);
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException var22) {
Trace.error("write conn close", var22);
}
}
}
}
if (initRecordCount < CheckpointCache.PROCESS_BATCH || failedRecordKey.size() > 0 || maxCount == 2147483647) {
int countInit = count;
long now = System.currentTimeMillis();
iter = this.record.entrySet().iterator();
label389:
while(true) {
while(true) {
boolean bSuccess;
Entry entry;
do {
if (!iter.hasNext()) {
break label389;
}
bSuccess = true;
entry = (Entry)iter.next();
} while((Long)((Treble)entry.getValue()).third >= now && maxCount < 2147483647);
if (this.isZero((String)entry.getKey())) {
var10000 = (String)entry.getKey();
Trace.error("Key is Zero! Key=" + var10000 + " tableName=" + this.tableName);
failedRecordKey.add((String)entry.getKey());
} else {
switch((State)((Treble)entry.getValue()).first) {
case INDB_REMOVE:
var10000 = this.tableName;
key = "DELETE FROM " + var10000 + " WHERE k='" + (String)entry.getKey() + "'";
if (MySqlMgr.getInstance().execUpdate(key, (OctetsStream)null) < 0 && MySqlMgr.getInstance().execUpdate(key, (OctetsStream)null) < 0) {
failedRecordKey.add((String)entry.getKey());
bSuccess = false;
}
case REMOVE:
default:
break;
case INDB_GET:
case INDB_ADD:
case ADD:
var10000 = this.tableName;
String replaceSql = "REPLACE INTO " + var10000 + " VALUES('" + (String)entry.getKey() + "', ?)";
if (MySqlMgr.getInstance().execUpdate(replaceSql, (OctetsStream)((Treble)entry.getValue()).second) < 0 && MySqlMgr.getInstance().execUpdate(replaceSql, (OctetsStream)((Treble)entry.getValue()).second) < 0) {
failedRecordKey.add((String)entry.getKey());
bSuccess = false;
}
}
if (bSuccess && ((Long)((Treble)this.record.get(entry.getKey())).third < now || maxCount == 2147483647)) {
iter.remove();
}
if (bSuccess) {
++count;
if (count - countInit >= CheckpointCache.PROCESS_SINGLE && maxCount < CheckpointCache.PROCESS_MAX_COUNT) {
break label389;
}
}
}
}
}
}
if (failedRecordKey.size() > 0) {
var10000 = this.tableName;
Trace.error(var10000 + " success:" + count + " failed:" + failedRecordKey.size() + " left:" + this.record.size());
Iterator var28 = failedRecordKey.iterator();
while(var28.hasNext()) {
String key = (String)var28.next();
Treble<State, OctetsStream, Long> info = (Treble)this.record.get(key);
if (info != null && info.second != null) {
try {
String jsonStr = ((OctetsStream)info.second).unmarshalJson();
if (jsonStr.length() > 1000) {
Trace.error(this.tableName + " key=" + key + " json size=" + jsonStr.length());
} else {
Trace.error(this.tableName + " key=" + key + " json=" + jsonStr);
}
} catch (MarshalException var23) {
Trace.error(this.tableName + " key=" + key, var23);
}
}
if (MySqlMgr.failedClearMap.get(this.tableName + key) != null) {
this.record.remove(key);
Trace.error(this.tableName + " key=" + key + " is removed!");
}
}
} else {
Trace.info(this.tableName + " success:" + count + " left:" + this.record.size());
}
}
}
理解:
其实就是找到key和value, value是json。
而且是批量处理。
字段是删除还是别的,如果是删除会执行delete操作删除数据。