Hadoop架构详解
Hadoop 是一个开源的分布式计算系统,用于存储和处理大规模数据集。Hadoop 主要由HDFS(Hadoop Distributed File System)、MapReduce、Yarn(Jobtracker,TaskTracker)三大核心组件组成。其中HDFS是分布式文件系统,用于存储文件,MapReducer是计算框架,可以分为Map和Reduce两部分,简单来说就是先分组,后计算,而Yarn则是对主机资源的协调,辅助计算的顺利进行。
1. HDFS(Hadoop Distributed File System)
HDFS基本架构
HDFS负责存储所有文件。他将大型文件分成若干个数据块,默认情况下,HDFS 的数据块大小为 128MB(可以配置),方便计算的分布式进行,提高计算效率。每个数据块还可以生成多个副本,存储在不同主机中提高系统容错率。
HDFS中有三个角色发挥着主要作用,分别是NameNode,DataNode,Secondary NameNod。
NameNode:主要负责管理集群中的所有数据,包括DataNode节点信息,以及文件保存的位置信息等等。
DataNode:实际存储数据的节点,一个集群中存在多个DataNode,互相不知道对方的信息,需要和NameNode保持心跳,汇报存储状态。当DataNode没有定期向NameNode发送心跳时,会触发NameNode的故障恢复,例如副本重新分配。
Secondary NameNode:从名字来看,难道是NameNode的备份节点?当NameNode宕机时代替NameNode发挥作用?实际上并不是,他的作用是帮助NameNode优化磁盘空间。和大多数持久化数据的中间件一样,HDFS对于集群元数据的持久化也是通过快照和日志来持久化进磁盘,不过Hadoop作为大型文件分布式处理系统,其操作日志非常庞大,如果靠操作日志来持久化文件,将要占用极高的磁盘空间,使用快照文件能够显著的压缩信息持久化体积,不过由于操作日志的内容过于巨大,将操作日志变为快照的过程极为耗时,如果这一操作由NameNode完成,可能会导致Hadoop集群的正常服务受到影响,所以Hadoop使用Secondary NameNode这一角色来完成这一过程,Secondary NameNode会定期的向NameNode获取快照文件(FsImage ),以及操作日志(EditLog),并且讲操作日志的内容,补充进快照文件,再将快照文件返回给NameNode,帮助NameNode完成一次信息压缩。
通常上面的学习,我们可以得到一个简单的HDFS架构图如下:
HDFS存储数据流程
- 当一个客户端想要向Hadoop中的HDFS中存储数据时,首先他需要将大型文件按照要求的文件大小进行分块,
- 将文件进行分块后,客户端会向NameNode发送请求(多个数据块可能会并发请求),NameNode再确认文件不存在后(如果已经存在会抛出错误),集群的元数据信息,给这个数据块,及其副本分配位置。
NameNode在给数据块分配位置时,会考虑到节点当前的负载程度,存储空间,节点是否存活等因素,并且还会尽可能将其副本分配到不同主机,甚至机架中(机架需要在配置文件中配置机架感知)。
- 客户端在接收到NameNode返回的信息后,会按照NameNode的安排,开始将数据块传向第一个DataNode。当第一个DataNode接收完成后,会继续将数据发向下一个DataNode(注意这里是第一个DataNode向第二个DataNode发送数据块,而不是客户端向第二个DataNode发送)。在此期间,DataNode会持续向NameNode以及客户端汇报进度。
- 当所有数据传输完成后,客户端会向NameNode发送一个写入完成的请求,NameNode会根据客户端发送的信息来更新自己的集群元数据。
流程图如下:
2. MapReduce
MapReduce是建立在HDFS基础之上的Hadoop计算框架之一(还有很多其他的计算框架),用于处理大量数据块并发计算的计算框架,MapReduce可以分为四个阶段。
输入分割(Input Splitting)
在这一阶段,输入数据被分割成较小的块,称为输入分割(Input Splits)。每个输入分割的大小通常与 Hadoop 分布式文件系统(HDFS)的块大小一致,用户可以通过配置参数(如 mapreduce.input.fileinputformat.split.minsize 和 mapreduce.input.fileinputformat.split.maxsize)调整大小。这些分割被分配给不同的映射任务(Map Tasks)进行处理。客户端也可以自由选择分块大小,甚至大于HDFS分块大小,因为MapReducer的分块是逻辑分块,是指向实际文件的引用,并不是物理分块,不受HDFS分块大小的限制。
Map阶段(Mapping)
分块完毕后,每个Map任务通过RecordReader
从输入分片中解析出一个个键值对。这个过程涉及到如何定义记录边界,例如在文本文件中,可能每一行被视为一个记录。RecordReader
的作用是将输入分片的内容转换为可以作为Map函数输入的键值对形式。例如,在单词计数中,RecordReader
可能将文本行分割成单词,并输出如 <“Hadoop”, 1> 的键值对。
Hadoop会将映射任务尽量分配到其数据所在节点上,以节省网络带宽和提升性能。也就是计算资源向数据移动,这被称为数据本地性(Data Locality)。当无法将计算任务分配到目标主机时,Hadopp考虑机架感知,将任务分配到同一机架的主机中(机架感知需要手动配置)。
洗牌和排序阶段(Shuffling and Sorting)
在Map输出键值对后、Reducer输入前进行一次局部规约操作,称为Combiner。这一步骤是可选的优化项,可以看作是对数据的局部计算,比如说在单词计数的例子中,规约就会将当前分块的单词进行局部汇总。然后将得到的结果传入分区,通过规约的方式可以减少传输到Reducer的数据量,提高整体效率。
Combiner完成后,会进一步对结果进行分区(Partition)、排序(Sort)等多个子阶段。在这个过程中,Map输出的结果会被写入内存缓冲区(默认大小 100MB,可通过 mapreduce.task.io.sort.mb 配置)。当缓冲区达到阈值(如 80%,可通过 mapreduce.map.sort.spill.percent 配置)时,后台线程会将其溢写到磁盘;随后,Reduce任务会从各个Map节点拉取属于自己的那部分数据,并对其进行合并、排序、分组等预处理操作。
分区操作是将键值对发向目标reducer,而排序是将分区结果进一步分类,将相同的key,放在一起,比如说如果分区结果一个reducer需要处理的键值对如下:
(world, 1) (hadoop, 1) (world, 1) (mapreduce, 1)
那么经过排序后,结果是:
(hadoop, 1) (mapreduce, 1) (world, 1) (world, 1)
如果在Combiner阶段出现两个(world, 1),可能会变成(world, 2),这就叫规约。
reducer在开始拉取数据时,还需要再次进行合并、排序和分组,因为在洗牌和排序阶段的排序是单节点的,而reducer需要从多个节点拉取数据,所以需要将结果进行合并,在排序,并且按照既定规则进行分组。分组规则可以自定义,reducer每次会根据分组处理一组数据。
Reducer阶段
分组结束后,Hadoop会将这些分组数据依次交给对应的Reducer执行,Reducer的数量由用户通过配置参数 mapreduce.job.reduces
预先指定,默认值为 1。Reduce 任务数会影响并行度和输出文件数量。Reducer 的输入是一个迭代器(Iterator),它指向当前分组的所有值。每次调用Reducer时都会传入一个分组。Reducer 会依次按照用户定义的Reducer函数(一个传入HDFS的jar包)处理每一组数据。并将结果将处理结果输出到指定的目标位置,通常是 HDFS,且默认以键值对形式存储(可以通过 OutputFormat 自定义输出格式,例如文本或序列化文件)。
流程图
3. Yarn
对于计算框架中,计算像向数据移动的理念,需要一个调度器来辅助执行,Yarn就是这样的一个调度器,他承接这在计算过程中的资源管理和任务调度的工作,让每个一个任务都能分配到最佳节点,并监控整个任务的执行情况。
在Hadoop1.x版本中,这一功能是由jobtracker和tasktracker完成的。
3.1 JobTracker和TaskTracker
JobTracker
是一个全局服务组件,它在整个Hadoop集群中是唯一的,并且通常运行在一个专门配置的主节点上。他的主要职责包括:
- 作业调度:接收来自客户端提交的作业,并将其分解为多个任务(
Map
任务和Reduce
任务)。 - 资源管理:监控整个集群的资源使用情况,并决定哪些
TaskTracker
节点可以执行新任务。 - 状态监控:持续跟踪所有正在运行的任务的状态,并处理任务失败或节点失效的情况,必要时重新调度任务
当用户通过客户端提交一个MapReduce作业时,首先会创建一个JobClient
实例。这个JobClient
负责与JobTracker
进行交互。它会将作业所需的所有文件(如输入分片信息、客户端配置文件、jar包(jar包就是客户端编写的计算任务)等)上传到HDFS上,并向JobTracker发送请求来注册该作业。
JobTracker
接收到新的作业请求后,会根据客户端的配置参数将整个作业拆分为多个Map和Reducer任务,并向NameNode
请求数据块所在位置,以及TaskTracker
的状态信息,尽可能将任务分配给数据所在主机的TaskTracker。当数据所在的主机的TaskTracker过于繁忙时,也会根据机架感知,分配给一个机架的TaskTracker
。
TaskTracker
是运行在每个工作节点上的从属服务。每个节点上只会有一个TaskTracker
实例,它负责以下任务:
- 任务执行:根据
JobTracker
的指令执行分配给它的具体任务。 - 状态汇报:定期向
JobTracker
发送心跳信号,报告自身健康状况及所执行任务的进展。
TaskTracker
通过心跳机制,和JobTracker
保持连接,并在发送心跳时发送自己的状态信息。JobTracker
会根据这些状态信息以及数据所在位置,合理的分配任务,并在返回心跳时,返回给目标TaskTracker
为其分配的任务。
TaskTracker
接收到来自JobTracker
分配下来的具体任务之后,会为每一个任务生成一个Task实例,并且启动相应的Java虚拟机(JVM)去实际运行这个任务(因为任务的计算本质上是执行jar包的内容)。根据接收到的不同类型的动作命令(例如启动任务、提交结果、终止任务等),TaskTracker会采取相应措施来满足要求。
如果某个TaskTracker失去联系超过一定时间,则会被标记为不可用,并且其上正在运行的任务可能需要重新分配给其他可用节点。对于任何失败的任务,JobTracker会尝试重新启动它们直到达到最大重试次数为止。
当所有的Map和Reduce任务都顺利完成之后,TaskTracker
会通知JobTracker
。随后,JobTracker
将正式宣布该作业已完成,并清理相关的临时资源。同时,如果存在输出数据的话,也会告知客户端可以从指定位置下载最终结果。
流程图
这个资源调度架构有一些明显缺陷:
- 首先所有调度任务都会集中在一个
JobTracker
上,这样随着集群的扩展和任务的增加,jobtrakcer的性能会成为集群扩展的瓶颈。 - 一旦jobtracker故障,所有计算任务都无法进行。
- Hadoop1.x中将
TaskTracker
中可用资源抽象为插槽,这些插槽的数量以及分类(map还是reducer)由启动时的hadoop配置决定,这就导致了如果map任务和reducer任务的比例和插槽分类的比例不一致,就会导致资源浪费的问题。 - jobtracker不能满足不同计算框架的任务调度需求。
为了解决这些问题,在hadoop2.x版本中重新引入了Yarn
。
3.2 Yarn
Yarn将原本的jobtracker负责的任务一分为二,将资源管理和任务调度,分别用两套结构分别负责。
资源管理
资源管理由ResourceManager和NodeManager负责管理,每个主机上都有一个NodeManager,NodeManager会向集群中唯一的ResourceManager发送主机资源信息,并维持心跳。
当客户端需要发起一个工作请求时,首先需要携带AppMaster、启动AppMaster的命令、用户程序等向ResourceManager发起请求,ResourceManager接受请求后会根据资源情况为其分配一个Contarin,并且寻找对应的NodeManager。ResourceManager会将任务分配到对应的NodeManager,NodeManager在接收到任务后会生成一个Container,负责容纳AppMaster。
Container是用来代替hadoop1.x中的插槽概念的,和插槽不同的是Container不仅可以随意容纳map和reducer任务,还可以容纳AppMaster。
AppMaster是负责任务调度的组件。在Yarn的架构中,为了兼容更多的计算框架的不同的任务调度需求。任务调度组件由计算框架自己实现,也就是说不同的计算框架会使用不同的调度组件,所以需要有客户端发送。
资源调度
AppMaster在被启动后,需要向ResourceManager进行注册,并汇报任务运行状态。和jobTracker不同的是,ResourceManager不在负责管理大量的map,reducer任务,而是由AppMaster进行管理。
AppMaster会通过心跳向ResourceManager申请任务资源,申请成功后会通知NodeManager,为任务创建Contrainer,并启动任务。各个任务需要和AppMaster维持心跳并汇报工作进度。在程序运行时,客户端可以随时向AppMaster发起请求查看任务进度。
当应用程序运行完成后,ApplicationMaster通知ResourceManager释放已分配的资源。
通过这些改变,Hadoop2.x解决了Hadoop1.x中的集群扩展问题,ResourceManager的负载能力不再是集群扩展的瓶颈。Container解决了插槽中对于资源利用的问题。多计算框架的兼容问题也随着AppMaster的出现而被解决。于此同时Hadoop2.x还可以配置多个ResourceManager来解决集群的高可用问题。
集群高可用
ResourceManager的高可用性是通过Active/Standby架构模式实现的,这种设计确保了在任意时刻只有一个ResourceManager处于Active状态,其余的则处于Standby状态。Active状态的ResourceManager会正常处理客户端的请求,而Standby状态则处于待机状态,随时等待Active状态的ResourceManager宕机时接管其任务。
为了保证故障切换时的状态一致性,Active状态的ResourceManager会将其状态信息写入到一个共享的状态存储系统中。这个状态存储系统可以是基于ZooKeeper的state-store或基于FileSystem的state-store。
FileSystem是Hadoop的一个抽象类,它定义了文件系统的基本操作接口,如创建文件、删除文件、打开文件、重命名文件等。通过
FileSystem
抽象类,Hadoop 可以轻松地支持多种文件系统。基于FileSystem的state-store就是利用HDFS自身的能力为ResourceManager提供状态存储系统能力,可以通过配置实现。
Yarn依赖于Zookeeper来实现自动故障转移,当Active节点故障时,Standby会通过抢占Zookeeper节点的方式获取Active状态,并读取共享的状态存储系统来恢复功能。
如果未启用自动故障转移,则管理员必须手动将其中一个ResourceManager转换为Active。要从一个ResourceManager到另一个ResourceManager进行故障转移,他们应该先将Active状态的ResourceManager转换为Standby,然后将Standby状态的ResourceManager转换为Active。所有这些都可以使用yarn rmadmin
命令完成。