当前位置: 首页 > article >正文

Starrocks 命令 Alter table DISTRIBUTED 重分布数据的实现

背景

在前文Starrocks 写入报错 primary key memory usage exceeds the limit中,可以通过ALTER TABLE xxxx DISTRIBUTED BY HASH(xx) BUCKETS 50;来改变数据的分布状态,具体的执行过程是怎么样的呢?

分析

首先对应的g4文件中为 alterTableStatement ,这里最终的调用是 AlterJobExecutor.visitAlterTableStatement:

if (statement.hasSchemaChangeOp()) {
   Locker locker = new Locker();
   locker.lockTableWithIntensiveDbLock(db, table.getId(), LockType.WRITE);
   try {
       SchemaChangeHandler schemaChangeHandler = GlobalStateMgr.getCurrentState().getSchemaChangeHandler();
       assert table instanceof OlapTable;
       schemaChangeHandler.process(statement.getAlterClauseList(), db, (OlapTable) table);
   } catch (UserException e) {
       throw new AlterJobException(e.getMessage());
   } finally {
       locker.unLockTableWithIntensiveDbLock(db, table, LockType.WRITE);
   }

   isSynchronous = false;

schemaChangeHandler.process会创建OptimizeJobV2 实例去优化对象,数据链路如下:

SchemaChangeHandler.process
      ||
      \/
analyzeAndCreateJob
      ||
      \/
createOptimizeTableJob
      ||
      \/
OptimizeJobV2Builder.build()
      ||
      \/
new OptimizeJobV2()

SchemaChangeHandler.process 会把当前的OptimizeJobV2 job 放入要执行的队列中,之后SchemaChangeHandler 以 alter_scheduler_interval_millisecond (10000ms)的轮询间隔从队列中取出要执行的任务,并调用run方法.run方法如下:

public synchronized void run() {
        if (isTimeout()) {
            cancelImpl("Timeout");
            return;
        }

        // create connectcontext
        createConnectContextIfNeeded();

        try {
            while (true) {
                JobState prevState = jobState;
                switch (prevState) {
                    case PENDING:
                        runPendingJob();
                        break;
                    case WAITING_TXN:
                        runWaitingTxnJob();
                        break;
                    case RUNNING:
                        runRunningJob();
                        break;
                    case FINISHED_REWRITING:
                        runFinishedRewritingJob();
                        break;
                    default:
                        break;
                }
                if (jobState == prevState) {
                    break;
                } // else: handle the new state
            }
        } catch (AlterCancelException e) {
            cancelImpl(e.getMessage());
        }
    }
  • PENDING
    创建完任务初始状态就是PENDING,所以调用 runPendingJob() 方法,这里有几个关键点是
  1. 创建该Alter语句涉及到的所有的分区
  2. 检查改任务所涉及到表的状态,必须该表的tablet都为健康状态才可以进行下一步,否则设置该表的状态为WAITING_STABLE,并直接跳过该任务
  3. 会获取到在一个事务的ID
  4. 改变该作业的状态为WAITING_TXN
  • WAITING_TXN
    如果任务所涉到的表为正常状态,则会进入runWaitingTxnJob()方法,这里的几个关键点是
  1. 会等待在该任务对应的事务之前的事务都运行完才会执行该任务
  2. 每个分区建立一个任务,并把分区写入一个临时分区中
  3. 改变该作业的状态为RUNNING
  • RUNNING
    如果任务正常运行的话,则会进入runRunningJob()方法,这里的几个关键点是
  1. 等待所有的写入临时分区的任务完成
  2. 锁住该表所在库以及该表,并且是排他锁,所以读取该库的操作也是不可行的
  3. 替换临时分区到对应的分区上去
  4. 改变该作业的状态为FINISHED

http://www.kler.cn/a/597501.html

相关文章:

  • Java常用类
  • 使用Aspera高速上传文件到ncbi
  • Mac | Excel | 列数改为和行数一样用数字表示
  • 操作系统为ubantu的服务器上部署nginx软件基础步骤总结
  • 5(五)Jmeter监控服务器性能
  • 【机器学习】机器学习工程实战-第2章 项目开始前
  • 网络安全应急入门到实战
  • pytorch3d学习(五)——批量输出图片+对渲染器的位姿解读+npy文件解读
  • 图像回归评价的常用指标
  • 3. 轴指令(omron 机器自动化控制器)——>MC_ImmediateStop
  • 基于STC89C52的8255并行口拓展实验
  • Transformers x SwanLab:可视化NLP模型训练(2025最新版)
  • vue3:十一、主页面布局(优化页面跳转方式)
  • Android:蓝牙设置配套设备配对
  • 机器学习--DBSCAN聚类算法详解
  • 【空地协同异构机器人系统之无人机点云引导无人车实时避障技术研究】
  • 考研复习之队列
  • 反反爬虫技术指南:原理、策略与合规实践
  • k8s高可用集群安装
  • python写入excel多个sheet表 以及追加sheet表