当前位置: 首页 > article >正文

HiveMetastore 的架构简析

HiveMetastore 的架构简析

Hive Metastore 是 Hive 元数据管理的服务。可以把元数据存储在数据库中。对外通过 api 访问。


对外提供的 Thrift 接口定义在文件 standalone-metastore/src/main/thrift/hive_metastore.thrift 中。

内容包括用到的结构体和枚举,和常量,和 rpc Service。

struct Partition {
  1: list<string> values // string value is converted to appropriate partition key type
  2: string       dbName,
  3: string       tableName,
  4: i32          createTime,
  5: i32          lastAccessTime,
  6: StorageDescriptor   sd,
  7: map<string, string> parameters,
  8: optional PrincipalPrivilegeSet privileges,
  9: optional string catName

Service的定义了 client 和 server 的 RPC 请求。如增加分区的定义如下:

service ThriftHiveMetastore extends fb303.FacebookService
	Partition add_partition(1:Partition new_part)
        throws(1:InvalidObjectException o1, 
        	   2:AlreadyExistsException o2, 
        	   3:MetaException o3)


hive_metastore.thrift 编译之后生成文件 ThriftHiveMetastore.java

ThriftHiveMetastore 结构如下:

public class ThriftHiveMetastore {
	// Iface 定义了 Service 的所有的接口。仅列出了 add_partition
	public interface Iface extends com.facebook.fb303.FacebookService.Iface {
		public Partition add_partition(Partition new_part) throws InvalidObjectException, AlreadyExistsException, MetaException, org.apache.thrift.TException;
		// omit other methods


   // AsyncIface 定义了异步接口。
	public interface AsyncIface extends com.facebook.fb303.FacebookService .AsyncIface {
		public void add_partition(Partition new_part, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
		// omit other methods


// Client 的实现
	public static class Client extends com.facebook.fb303.FacebookService.Client implements Iface {

		public Partition add_partition(Partition new_part) throws InvalidObjectException, AlreadyExistsException, MetaException, org.apache.thrift.TException
      return recv_add_partition();
    // receiveBase 调用 result.read 方法,从 protocal
    public Partition recv_add_partition() throws InvalidObjectException, AlreadyExistsException, MetaException, org.apache.thrift.TException
      add_partition_result result = new add_partition_result();
      receiveBase(result, "add_partition");
      if (result.isSetSuccess()) {
        return result.success;
      if (result.o1 != null) {
        throw result.o1;
      if (result.o2 != null) {
        throw result.o2;
      if (result.o3 != null) {
        throw result.o3;
      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "add_partition failed: unknown result");

// Processor 是服务端处理框架,把所有要处理的 rpc 的名称和处理的映射放到 map里,客户端请求 rpc,先输出 rpc 的名称。
	public static class Processor<I extends Iface> extends com.facebook.fb303.FacebookService.Processor<I> implements org.apache.thrift.TProcessor {

	private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
			processMap.put("add_partition", new add_partition());

    // AsyncProcessor
	public static class AsyncProcessor<I extends AsyncIface> extends com.facebook.fb303.FacebookService.AsyncProcessor<I> {


IHMSHandler 是服务器端需要实现的接口。除了 ThriftHiveMetastore.Iface 外,还包括其他一些方法。

IHMSHandler extends ThriftHiveMetastore.Iface, Configurable


HMSHandler 是服务器端的具体实现。创建服务时,创建 HMSHandler,多个线程调用同一个 HMSHandler 对象来处理 client 的请求。

public class HMSHandler extends FacebookBase implements IHMSHandler {
    public Partition add_partition(final Partition part)
        throws InvalidObjectException, AlreadyExistsException, MetaException {
      return add_partition_with_environment_context(part, null);
    // omit other methods

    public Partition add_partition_with_environment_context(
        final Partition part, EnvironmentContext envContext)
        throws InvalidObjectException, AlreadyExistsException,
        MetaException {
          part.getCatName(), part.getDbName(), part.getTableName());
      Partition ret = null;
      Exception ex = null;
      try {
        ret = add_partition_core(getMS(), part, envContext);
      } catch (Exception e) {
        ex = e;
        if (e instanceof MetaException) {
          throw (MetaException) e;
        } else if (e instanceof InvalidObjectException) {
          throw (InvalidObjectException) e;
        } else if (e instanceof AlreadyExistsException) {
          throw (AlreadyExistsException) e;
        } else {
          throw newMetaException(e);
      } finally {
        endFunction("add_partition", ret != null, ex, part != null ?  part.getTableName(): null);
      return ret;

getMS 从 ThreadLocal 里,每个线程单独的。

    public RawStore getMS() throws MetaException {
      Configuration conf = getConf();
      return getMSForConf(conf);
    public static RawStore getMSForConf(Configuration conf) throws MetaException {
      RawStore ms = threadLocalMS.get();
      if (ms == null) {
        ms = newRawStoreForConf(conf);
        ms = threadLocalMS.get();
      return ms;

为什么 Handler 总是一个线程处理一个 client 的请求

如果不是一个线程处理一个 client 的请求,那么 client 先发送一个请求,然后再发送第二个请求时, RawStore ms = threadLocalMS.get(); 就有可能拿到的是其他线程的 ms。

因为 org.apache.thrift.server.TThreadPoolServer.serve方法中。为每个 socket 创建一个 client 对象,并且把 client 的所有请求有 WorkerProcess 进行处理。WorkerProcess 是一个 Runnable。最终提交到 executorService_ 中。

while(!this.stopped_) {
        try {
            TTransport client = this.serverTransport_.accept();
            WorkerProcess wp = new WorkerProcess(client);

            while(true) {
                try {
                } catch (Throwable var13) {
                    // omit
        } catch (TTransportException var14) {
  • WorkerProcess
    WorkerProcess 也是除了设置停止标志外死循环。
do {
        if (eventHandler != null) {
            eventHandler.processContext(connectionContext, inputTransport, outputTransport);
    } while(!TThreadPoolServer.this.stopped_ && processor.process(inputProtocol, outputProtocol));

processor 类型是 TUGIBasedProcessor。

  • 当客户端正常退出时。
    client 会调用 metastore 的 shutdown 方法。此方法里,清除所有的 threadlocal 对象。
	public void shutdown() {
    protected static void cleanupRawStore() {
        try {
            RawStore rs = HMSHandler.getRawStore();
            if (rs != null) {
                HMSHandler.logInfo("Cleaning up thread local RawStore...");
        } finally {
            HMSHandler handler = HMSHandler.threadLocalHMSHandler.get();
            if (handler != null) {
            HMSHandler.logInfo("Done cleaning up thread local RawStore");
  • 异常退出时
    WorkerProcess 的 finally 处理不论是否当前连接调用 shutdown,都执行 eventHandler.deleteContext
finally {
        if (eventHandler != null) {
            eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);

在 HiveMetaStore.java里定义了 eventHandler, 也调用了 cleanupRawStore,和 shutdown 方法里调用的一样。

public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
  // If the IMetaStoreClient#close was called, HMSHandler#shutdown would have already
  // cleaned up thread local RawStore. Otherwise, do it now.

threadLocal 对象的 remove 方法多次调用是没有副作用的。


在异常退出时,没有调用 PerfLogger.getPerfLogger(false).cleanupPerfLogMetrics();, 是否会内存溢出。
调用 PerfLogger.getPerfLogger(false). 当参数是 false 时,如果 ThreadLocal 里已经有,则不会创建对象。处理线程的个数是固定的。不会导致内存问题



  • Node.js回调函数以及事件循环使用介绍(基础介绍 三)
  • spring-boot(thymeleaf前端框架,简单了解)、( 跨域请求)
  • 玩转「HF/魔搭/魔乐」平台
  • 解决 Ubuntu ‘InRelease is not valid yet’ 报错:内网源 apt update 详细教程
  • 端侧小模型新星,SmolLM2 1.7B击败了Llama 3.2、Qwen 2.5
  • 基础算法练习--滑动窗口(日更中)
  • 青少年编程与数学 02-003 Go语言网络编程 12课题、Go语言Soket编程
  • RabbitMQ 管理平台(控制中心)的介绍
  • SpringBoot健身房管理:提升效率与体验
  • STM32中,在哪些时候需要配置复用推挽/开漏输出?
  • 3种方法轻松从硬盘恢复已删除文件!
  • 零基础学习Java AI Spring AI
  • 舜宇光学科技入职测评:北森商业推理40分钟28题真题解析、网盘资料下载、答题技巧
  • stable diffusion 大模型
  • 腾讯轻量云服务器docker拉取不到镜像的问题:拉取超时
  • 如何不封禁UDP协议同时防止UDP攻击
  • swagger 报错查看
  • 深度学习中的多头注意力机制:原理与实现解析
  • 科技查新在医药健康领域的应用
  • 计算机网络:运输层 —— 运输层概述