HiveMetastore 的架构简析
HiveMetastore 的架构简析
Hive Metastore 是 Hive 元数据管理的服务。可以把元数据存储在数据库中。对外通过 api 访问。
hive_metastore.thrift
对外提供的 Thrift 接口定义在文件 standalone-metastore/src/main/thrift/hive_metastore.thrift 中。
内容包括用到的结构体和枚举,和常量,和 rpc Service。
如分区定义如下:
struct Partition {
1: list<string> values // string value is converted to appropriate partition key type
2: string dbName,
3: string tableName,
4: i32 createTime,
5: i32 lastAccessTime,
6: StorageDescriptor sd,
7: map<string, string> parameters,
8: optional PrincipalPrivilegeSet privileges,
9: optional string catName
}
Service的定义了 client 和 server 的 RPC 请求。如增加分区的定义如下:
service ThriftHiveMetastore extends fb303.FacebookService
{
Partition add_partition(1:Partition new_part)
throws(1:InvalidObjectException o1,
2:AlreadyExistsException o2,
3:MetaException o3)
}
ThriftHiveMetastore.java
hive_metastore.thrift 编译之后生成文件 ThriftHiveMetastore.java
ThriftHiveMetastore 结构如下:
public class ThriftHiveMetastore {
// Iface 定义了 Service 的所有的接口。仅列出了 add_partition
public interface Iface extends com.facebook.fb303.FacebookService.Iface {
public Partition add_partition(Partition new_part) throws InvalidObjectException, AlreadyExistsException, MetaException, org.apache.thrift.TException;
// omit other methods
}
// AsyncIface 定义了异步接口。
public interface AsyncIface extends com.facebook.fb303.FacebookService .AsyncIface {
public void add_partition(Partition new_part, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
// omit other methods
}
// Client 的实现
public static class Client extends com.facebook.fb303.FacebookService.Client implements Iface {
public Partition add_partition(Partition new_part) throws InvalidObjectException, AlreadyExistsException, MetaException, org.apache.thrift.TException
{
send_add_partition(new_part);
return recv_add_partition();
}
// receiveBase 调用 result.read 方法,从 protocal
public Partition recv_add_partition() throws InvalidObjectException, AlreadyExistsException, MetaException, org.apache.thrift.TException
{
add_partition_result result = new add_partition_result();
receiveBase(result, "add_partition");
if (result.isSetSuccess()) {
return result.success;
}
if (result.o1 != null) {
throw result.o1;
}
if (result.o2 != null) {
throw result.o2;
}
if (result.o3 != null) {
throw result.o3;
}
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "add_partition failed: unknown result");
}
}
// Processor 是服务端处理框架,把所有要处理的 rpc 的名称和处理的映射放到 map里,客户端请求 rpc,先输出 rpc 的名称。
public static class Processor<I extends Iface> extends com.facebook.fb303.FacebookService.Processor<I> implements org.apache.thrift.TProcessor {
private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
processMap.put("add_partition", new add_partition());
}
}
// AsyncProcessor
public static class AsyncProcessor<I extends AsyncIface> extends com.facebook.fb303.FacebookService.AsyncProcessor<I> {
}
}
IHMSHandler
IHMSHandler 是服务器端需要实现的接口。除了 ThriftHiveMetastore.Iface 外,还包括其他一些方法。
IHMSHandler extends ThriftHiveMetastore.Iface, Configurable
HMSHandler
HMSHandler 是服务器端的具体实现。创建服务时,创建 HMSHandler,多个线程调用同一个 HMSHandler 对象来处理 client 的请求。
public class HMSHandler extends FacebookBase implements IHMSHandler {
@Override
public Partition add_partition(final Partition part)
throws InvalidObjectException, AlreadyExistsException, MetaException {
return add_partition_with_environment_context(part, null);
}
// omit other methods
@Override
public Partition add_partition_with_environment_context(
final Partition part, EnvironmentContext envContext)
throws InvalidObjectException, AlreadyExistsException,
MetaException {
startTableFunction("add_partition",
part.getCatName(), part.getDbName(), part.getTableName());
Partition ret = null;
Exception ex = null;
try {
ret = add_partition_core(getMS(), part, envContext);
} catch (Exception e) {
ex = e;
if (e instanceof MetaException) {
throw (MetaException) e;
} else if (e instanceof InvalidObjectException) {
throw (InvalidObjectException) e;
} else if (e instanceof AlreadyExistsException) {
throw (AlreadyExistsException) e;
} else {
throw newMetaException(e);
}
} finally {
endFunction("add_partition", ret != null, ex, part != null ? part.getTableName(): null);
}
return ret;
}
}
getMS 从 ThreadLocal 里,每个线程单独的。
@Override
public RawStore getMS() throws MetaException {
Configuration conf = getConf();
return getMSForConf(conf);
}
public static RawStore getMSForConf(Configuration conf) throws MetaException {
RawStore ms = threadLocalMS.get();
if (ms == null) {
ms = newRawStoreForConf(conf);
ms.verifySchema();
threadLocalMS.set(ms);
ms = threadLocalMS.get();
}
return ms;
}
为什么 Handler 总是一个线程处理一个 client 的请求
如果不是一个线程处理一个 client 的请求,那么 client 先发送一个请求,然后再发送第二个请求时, RawStore ms = threadLocalMS.get();
就有可能拿到的是其他线程的 ms。
因为 org.apache.thrift.server.TThreadPoolServer.serve方法中。为每个 socket 创建一个 client 对象,并且把 client 的所有请求有 WorkerProcess 进行处理。WorkerProcess 是一个 Runnable。最终提交到 executorService_ 中。
while(!this.stopped_) {
try {
TTransport client = this.serverTransport_.accept();
WorkerProcess wp = new WorkerProcess(client);
while(true) {
try {
this.executorService_.execute(wp);
break;
} catch (Throwable var13) {
// omit
}
}
} catch (TTransportException var14) {
//
}
}
- WorkerProcess
WorkerProcess 也是除了设置停止标志外死循环。
do {
if (eventHandler != null) {
eventHandler.processContext(connectionContext, inputTransport, outputTransport);
}
} while(!TThreadPoolServer.this.stopped_ && processor.process(inputProtocol, outputProtocol));
processor 类型是 TUGIBasedProcessor。
- 当客户端正常退出时。
client 会调用 metastore 的 shutdown 方法。此方法里,清除所有的 threadlocal 对象。
public void shutdown() {
cleanupRawStore();
PerfLogger.getPerfLogger(false).cleanupPerfLogMetrics();
}
protected static void cleanupRawStore() {
try {
RawStore rs = HMSHandler.getRawStore();
if (rs != null) {
HMSHandler.logInfo("Cleaning up thread local RawStore...");
rs.shutdown();
}
} finally {
HMSHandler handler = HMSHandler.threadLocalHMSHandler.get();
if (handler != null) {
handler.notifyMetaListenersOnShutDown();
}
HMSHandler.threadLocalHMSHandler.remove();
HMSHandler.threadLocalConf.remove();
HMSHandler.threadLocalModifiedConfig.remove();
HMSHandler.removeRawStore();
HMSHandler.logInfo("Done cleaning up thread local RawStore");
}
}
- 异常退出时
WorkerProcess 的 finally 处理不论是否当前连接调用 shutdown,都执行eventHandler.deleteContext
finally {
if (eventHandler != null) {
eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
}
在 HiveMetaStore.java里定义了 eventHandler, 也调用了 cleanupRawStore
,和 shutdown 方法里调用的一样。
@Override
public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
openConnections.decrementAndGet();
// If the IMetaStoreClient#close was called, HMSHandler#shutdown would have already
// cleaned up thread local RawStore. Otherwise, do it now.
HMSHandler.cleanupRawStore();
}
threadLocal 对象的 remove 方法多次调用是没有副作用的。
其他考虑的点
在异常退出时,没有调用 PerfLogger.getPerfLogger(false).cleanupPerfLogMetrics();
, 是否会内存溢出。
调用 PerfLogger.getPerfLogger(false). 当参数是 false 时,如果 ThreadLocal 里已经有,则不会创建对象。处理线程的个数是固定的。不会导致内存问题