Starrocks 命令 Alter table DISTRIBUTED 重分布数据的实现
背景
在前文Starrocks 写入报错 primary key memory usage exceeds the limit中,可以通过ALTER TABLE xxxx DISTRIBUTED BY HASH(xx) BUCKETS 50;
来改变数据的分布状态,具体的执行过程是怎么样的呢?
分析
首先对应的g4文件中为 alterTableStatement ,这里最终的调用是 AlterJobExecutor.visitAlterTableStatement:
if (statement.hasSchemaChangeOp()) {
Locker locker = new Locker();
locker.lockTableWithIntensiveDbLock(db, table.getId(), LockType.WRITE);
try {
SchemaChangeHandler schemaChangeHandler = GlobalStateMgr.getCurrentState().getSchemaChangeHandler();
assert table instanceof OlapTable;
schemaChangeHandler.process(statement.getAlterClauseList(), db, (OlapTable) table);
} catch (UserException e) {
throw new AlterJobException(e.getMessage());
} finally {
locker.unLockTableWithIntensiveDbLock(db, table, LockType.WRITE);
}
isSynchronous = false;
schemaChangeHandler.process
会创建OptimizeJobV2
实例去优化对象,数据链路如下:
SchemaChangeHandler.process
||
\/
analyzeAndCreateJob
||
\/
createOptimizeTableJob
||
\/
OptimizeJobV2Builder.build()
||
\/
new OptimizeJobV2()
SchemaChangeHandler.process 会把当前的OptimizeJobV2
job 放入要执行的队列中,之后SchemaChangeHandler 以 alter_scheduler_interval_millisecond (10000ms)的轮询间隔从队列中取出要执行的任务,并调用run
方法.run
方法如下:
public synchronized void run() {
if (isTimeout()) {
cancelImpl("Timeout");
return;
}
// create connectcontext
createConnectContextIfNeeded();
try {
while (true) {
JobState prevState = jobState;
switch (prevState) {
case PENDING:
runPendingJob();
break;
case WAITING_TXN:
runWaitingTxnJob();
break;
case RUNNING:
runRunningJob();
break;
case FINISHED_REWRITING:
runFinishedRewritingJob();
break;
default:
break;
}
if (jobState == prevState) {
break;
} // else: handle the new state
}
} catch (AlterCancelException e) {
cancelImpl(e.getMessage());
}
}
- PENDING
创建完任务初始状态就是PENDING
,所以调用runPendingJob()
方法,这里有几个关键点是
- 创建该Alter语句涉及到的所有的分区
- 检查改任务所涉及到表的状态,必须该表的tablet都为健康状态才可以进行下一步,否则设置该表的状态为
WAITING_STABLE
,并直接跳过该任务 - 会获取到在一个事务的ID
- 改变该作业的状态为
WAITING_TXN
- WAITING_TXN
如果任务所涉到的表为正常状态,则会进入runWaitingTxnJob()
方法,这里的几个关键点是
- 会等待在该任务对应的事务之前的事务都运行完才会执行该任务
- 每个分区建立一个任务,并把分区写入一个临时分区中
- 改变该作业的状态为
RUNNING
- RUNNING
如果任务正常运行的话,则会进入runRunningJob()
方法,这里的几个关键点是
- 等待所有的写入临时分区的任务完成
- 锁住该表所在库以及该表,并且是排他锁,所以读取该库的操作也是不可行的
- 替换临时分区到对应的分区上去
- 改变该作业的状态为
FINISHED