hadoop源码解读
一、hadoop rpc总结
1、RPC指的是不同进程的方法调用,分为客户端和服务端,客户端调用服务端的方法,方法的执行在服务端。
2、如何实现Hadoop RPC的调用,必须要实现协议,这个协议其实就是一个接口,但是这个接口必须要有一个重要的特征,里面必须有VersionID.
3、RPC的服务端必须实现这些协议(接口)。
4、如何构建RPV的调用。
服务端:(构建者模式)
Server server = new RPC.Builder(new Configuration())
.setBindAddress("localhost")
.setPort(9999)
.setProtocol(ClientProtocol.class)
.setInstance(new NameNodeRpcServer())
.build();
客户端:获取代理以及各种参数
ClientProtocol namenode = RPC.getProxy(ClientProtocol.class, 1234L,
new InetSocketAddress("localhost",9999),
new Configuration());
Hadoop源码中有两种RPC,一种Hadoop RPC,另一种是HttpServer RPC,有什么区别?
应对的数据量不同,如果传输的数据量比较大,比如读写日志,用httpserver rpc,数据量比较小时,就是RPC之间的调用,用Hadoop RPC.
二、源码流程解读 启动
NameNode启动流程:
在createNameNode方法中通过不同的场景switch …… case进入(format,rollBack,checkPoint,recover)默认进入实例化 NameNode(new NameNode)
默认进入实例化NameNode(new NameNode)-> initialize(conf)初始化方法。
-
startHttpServer方法 -> 设置主机名和端口号(50070),绑定多个servlet(功能)
-
加载元数据
loadFromDisk(conf) -> loadFsImage(startOpt)
1)合并元数据,将fsimage和editlog合并
2)把合并出来新的fsimage写到磁盘,老的删掉
3)打开一个新的editlog,开始写日志。
3. 创建RPC服务端
createRpcServer(conf) -> NameNodeRpcServer -> 启动ServiceRpcServer
4. 启动公共服务,NameNode RPC的服务就在里面启动的
1)进行资源检查,检查存储元数据的磁盘空间是否足够
a. 如果磁盘空间不足;会在日志里打印告警,且hasResourceAvailable = false
2)进入安全模式检查,检查是否可以退出安全模式
HDFS进入安全模式的三个条件(或关系):
条件一:计算阈值,block 块数 * 0.999,判断目录元数据是否大于阈值
threshold != 0 && blockSafe < blockThreshold
HDFS的元数据那儿程序总计分析出来上一次关闭集群之前
假设有1000个complete的block,默认是阈值的计算比例是0.999
这样blockThreshold的值是999
现在集群起来了以后,发现累计datanode汇报过来的complete的block个数(blockSafe)
如果小于999就让集群处于安全模式。
条件二:判断存活dataNode个数是否大于配置数目
datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold
如果存活的datanode的个数小于一定的数目的时候,也会进去安全模式
默认是0,所以相当于没启用,但是我们也可以配置,如果存活的datanode个数
少于多少就让HDFS集群出入安全模式。
条件三:检查NameNode写元数据目录是否大于100M
!nameNodeHasResourcesAvailable()
就是前面 检查NameNode写的元数据的目录空间是否大于100M,
如果目录的空间小于100M,nameNodeHasResourcesAvailable 就为false
hdfs就会进入安全模式。
DataNode 总结:
1)一个集群里面可以有很多个DataNode,这些DataNode就是用来存储数据(hdfs上block文件块)。
2)DataNode启动了以后会周期性的跟NameNode进行通信(心跳,块汇报),客户端也可以跟DataNode进行交互或者DataNode之间也可以进行相互通信。
3)NameNode不能直接操作DataNode.而是通信心跳返回值指令的方式操做DataNode.DataNode也会去响应NameNode,响应NameNode发送过来的一些指令,比如:删除block,复制block等操作。
4) DataNode启动了以后开放了一个socket的服务(RPC),等待别人去调用他。DataNode启动的时候会把自己的主机名和端口号汇报给NameNode.也就是说如果Client和DataNode想要去访问某个DataNode.首先要跟NameNode进行通信;从NameNode那儿获取到目标DataNode的主机名和端口号。这样才可以访问到对应的DataNode了。
DataNode启动流程:
secureMain -> createDataNode(初始化DataNode) -> instantiateDataNode(实例化DataNode) -> makeInstance -> new DataNode -> startDataNode(启动DataNode)
initDataXceiver (初始化DataXceiver,dataNode用来接收客户端和其它DataNode传来block数据的服务)
startInforServer (启动HttpServer服务,绑定了多个servlet)
initRpcServer (初始化RPC服务)
new BlockPoolManager 创建了BlockPoolManager对象
refreshNameNodes
1. 向NameNode进行注册
2. 跟NameNode进行心跳
doRefreshNameNode
1)如果是联邦架构,里面会有多个NameService
2)一个联邦就是一个NameService
a. 一个联邦对应一个BPOfferService
b. 一个联邦的一个NameNode就是一个BPServiceActor
c. 正常来说一个BPOfferService对应NameNode个数个BPServiceActor
3. startAll(DataNode向NameNode进行注册和心跳)
HDFS心跳流程:
心跳主要就是两个目的:
1. 更改存储信息
2. 更新上一次的心跳时间
总结:在分布式场景下
注册:从节点向主节点进行注册本质上,就是把自己的主机名、端口号等信息写到主机的各种内存结构中。
心跳:对于分布式文件系统,就是把自己的存储信息告诉主节点,更新上一次的心跳时间
三、源码流程解读 写数据场景
HDFS元数据管理流程
HDFS双缓冲机制
思考三个问题:
1. 交换内存的条件是什么?
2. 将磁盘写改为内存写,会不会存在丢数据的风险?
3. 当数据从SyncBufffer内存往磁盘写数据还没写完的同时,client请求由于高并发的原因往CurrentBuffer内存中写数据写满,NameNode会有什么表现形式?
元数据创建流程:
-
创建元数据目录树
-
通过双缓冲机制将元数据写到本地和Journalnode(通过自己实现的NIO)
-
standBy NameNode从JournalNode读取元数据(跨服务跨进程读取,后台的线程),把获取到的元数据作用到自己的元数据里面。
通过创建了一个HttpURLConnection对象,发送一个Http请求(相当于一个RPC),读取数据流。通过流对烤方式将元数据写到standBy NameNode目录树上。
-
定期checkPoint,将内存中的目录树合并元数据并持久化到磁盘上,替换fsImage,将已经合并完的日志删除。
checkPoint两个条件(或):
1. 时间 (距上次checkPoint时间,默认一个小时)
2. 数量(比如多少条日志,默认100万条)
checkPoint步骤:
1. 把元数据持久化到磁盘
2. 开启一个异步线程,把刚从内存里面的数据持久化到磁盘,上传数据到active namenode上面
HDFS上传文件源码流程 :
-
create抽象方法,DistributedFileSystem实现类中的create方法实现(客户端)
a) 创建了一个DFSOutputStream,做很多初始化操作
1)往文件目录树里面添加INodeFile
2)添加了[契约(Lease)]管理
1⃣️ 先查看这个契约是否已经存在
a. 如果没有(第一次进来)肯定创建一个契约
存储到数据结构中(可以排序<实现compare进行升序排序>,底层是红黑树数据结构)
2⃣️ 如果有(第二次进来)那就是续约
3)启动了DataStreamer(写数据流程关键服务)/重要
第一次进入时,dataQuene没有数据,会启用线程阻塞
b) 开启续约(契约)
调用线程run方法,进行周期性续约
超过30秒没有进行续约就进行续约(当前时间-上一次续约时间)
续约和心跳类似,获取namenode的代理进行续约,续约完修改上一次续约时间,如果有契约,先从数据结构中删除契约,修改上一次的契约心跳时间,再把修改完以后的契约加入到数据结构中。同样会有个类似于心跳的监控线程,去检查契约是否过期;从最老的契约开始检查。
2. write方法
HDFS文件 -> Block块(128M)-> packet(64K)= chunk(127个chunk构成一个packet) -> chunk(521 bit) + chunksum(校验值 4 Bit) = 516 Bit
1. 计算出chunk的校验和
2. 按照chunk的大小遍历数据
一个一个的chunk去写数据
创建packet
往packet里面写chunk的校验和(4 Bit)
往packet里面写chunk(512 Bit)
写满127个chunk就是一个完整的packet
写满128M就是一个block
写满一个packet,就把这个packet写入队列(如果队列写满就等待)
唤醒之前睡眠的队列(因为此时已经有数据了)
3. 从dataQuene队列里面获取到数据(packet)
4. 建立数据通道
A. 向namenode申请block
因为申请block或者建立数据管道,这些都是重要的操作,肯定要执行成功,但是这些操作都涉及到网络请求,网络这个事,就不好说了,可能会有网络抖动什么的,所以代码中执行一次,不是说失败就失败了,肯定要多次尝试,所以HDFS源码里面很多地方就会用到循环。
服务端那边的操作:
1⃣️ 创建一个block,往文件目录树中挂载了block的信息
2⃣️ 在磁盘上记录了元数据信息
3⃣️ 往blockManager里面记录了block的元数据信息
B. 建立数据通道
1建立数据管道的目的就是提前将就收数据的线程或者socket服务启动起来,启动起来以后就构建好数据管道。
2 HDFS中就是客户端往hadoop1中写,在从hadoop1往hadoop2写。。。
这样设计的目的:
1. 减少客户端网络带宽连接压力
2. 客户端和hadoop1服务夸机房或者夸地域,这样传输的性能会差
3然后发送写数据请求,通过之前初始化好的DataXceiver来写数据
1.接收socket请求,每发送过来一个block,都启动一个DataVceiver去处理这个block,就是启动一个线程去处理。先去读取此次请求的类型(option)
2.根据请求类型进行处理,(写block)
3.通过writeBlock实现,里面创建BlockReceiver,并且查看是否有下游的服务器,有的话就创建镜像(副本),接着往下游发送socket连接
4⃣️ 建立管道时,有可能遇到管道建立不上,某个服务器连接不上
如果管道建立不成功,客户端调用服务器(namenode)代码,去放弃这个block,并且重新去申请Block,同时记录记录出问题那台服务器的编号。(记录原因:需要重新去申请block,namenode根据负载均衡和机架感知去重新申请,就得记录下来失败的那台机器,再一次重试的时候,就排除有故障的服务器)
5. 启动了ResponseProcessor,用来监听一个packet是否发送成功
DataStream会将数据(packet)发送到datanode上面,datanode到底有没有写成功,需要返回一个成功的响应(ACK),最终向客户端汇报处理的结果。
这个过程中会有一个AckQueue配合使用,会将这个packet先放到AckQueue中(把当前接收到的packet放大ackQueue,唤醒wait的线程,同时将dataQueue中的packet移除),再把当前的这个packet发送给下游的节点(数据管道里面),然后校验数据,没问题,就将数据写到本地磁盘上面;写成功的话就返回写成功,写失败的话,先重试,不行就会将AckQueue中的这个packet重新返回给dataQueue,dataQueue有这个数据后,就会将这个数据重新写一遍。(写到各个磁盘上面应该是同步的)
如果写成功,就会将这个packet从AckQueue中移除。
容错,写的过程中,很可能会遇到问题,通过try…catch捕获异常,捕获到异常,就会将hasError标识改为true,本身就是分布式的代码,循环执行的,他会再次进入代码,但是会有判断,进入时就会进,关闭流和线程的代码,并且进入processDatanodeError方法去处理,首先关闭流,重新把ackQueue的数据加入到dataQueue中,并将ackQueue中的packet清空,重新建立数据管道,这次建立管道会将有问题的服务器排除,直接传输正常的服务器节点。那么这样一来,副本数就会少一个,不用担心,等到datanode和namenode心跳的时候,会进行容错,将正常节点上的副本复制到之前有问题的节点上。还有一种情况,集群中超过一半的节点有问题,问题就比较大了,这时候就需要推倒重来,重新申请block,重新建立管道。
先引入一个小的背景,假如多个客户端同时要并发的写Hadoop HDFS上的一个文件,这个事儿能成吗? 明显不可以接受啊,因为HDFS上的文件是不允许并发写的,比如并发的追加一些数据什么。所以HDFS里有一个机制,叫做文件契约机制。也就是说,同一时间只能有一个客户端获取NameNode上面一个文件的契约,然后才可以向获取契约的文件写入数据。
此时如果其他客户端尝试获取文件契约的时候,就获取不到,只能干等着。通过这个机制,可以保证同一时间只有一个客户端在写一个文件。在获取到了文件契约之后,在写文件的过程期间,那个客户端需要开启一个线程,不停的发送请求给 NameNode进行文件续约,告诉NameNode: NameNode大哥,我还在写文件啊,你给我一直保留那个契约好吗? 而NameNode内部有一个专门的后台线程,负责监控各个契约的续约时间。如果某个契约很长时间没续约了,此时就自动过期掉这个契约,让别的客户端来写。
1. 创建文件
2. 创建契约
3. 启动了DataStramer线程
4. 开启了续约
5. 契约的检查
6. 创建packet
7. 申请Block
8. 建立数据管道
9. ResponseProcessor线程
10. PacketResponder线程
四、RPC示例:
/**
*在pom.xml引入依赖
*/
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
/**
* 网络协议
*/
public interface Protocol {
//定义版本号,可自定义
long versionID=123456789L;
void hello(String msg);
void add(int num);
}
/**
*定义服务端实现类
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import java.io.IOException;
public class NameNodeRPCServer implements Protocol {
public void hello(String msg) {
System.out.println(" hello " + msg);
}
public void add(int num) {
}
public static void main(String[] args) throws IOException {
Server server = new RPC.Builder(new Configuration())
.setBindAddress("localhost")
.setPort(9999)
.setProtocol(Protocol.class)
.setInstance(new NameNodeRPCServer())
.build();
//启动服务端
System.out.println("我是RPC服务端,我准备启动了");
server.start();
System.out.println("启动完成");
}
}
/**
*定义客户端类
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.io.IOException;
import java.net.InetSocketAddress;
public class Client {
public static void main(String[] args) throws IOException {
Protocol namenode = RPC.getProxy(Protocol.class,
Protocol.versionID,
new InetSocketAddress("localhost", 9999),
new Configuration());
namenode.hello("hadoop architechure");
}
}
启动NameNodeRPCServer,到服务器控制台执行jps,你会发现多了一个NameNodeRPCServer进程,所以不管是NameNode还是DataNode,其实都是一个RPC进程,于是我们可以从NameNode和DataNode这两个类入手.
NameNode服务既管理了HDFS的集群的命名空间和 "inode table"。
一个HDFS集群里面只有一个NameNode.(除了HA方案,或者是联邦)
Namenode管理了两张极其重要的表:
1)一张表管理了文件与block之间的映射关系。
2)另一张表管理了block文件块与 DataNode主机之间的映射关系。
第一张表存储到了磁盘上面。(因为文件与block块之间的关系是不会发生变化的)
每次NameNode重启的时候重新构建第二张映射表。
Namenode服务是由三个重要的类支撑的:
1)NameNode类:
管理配置的参数
2)NameNode server:
IPC Server:
NameNodeRPCServer:开放端口,等待别人调用.比如:8020/9000
HTTP Server:
NameNodeHttpServer:开放50070界面,我们可以通过这个界面了解HDFS的情况
3) FSNameSystem:
这个类非常重要,管理了HDFS的元数据。