当前位置: 首页 > article >正文

深入MapReduce——计算模型设计

引入

通过引入篇,我们可以总结,MapReduce针对海量数据计算核心痛点的解法如下:

  • 统一编程模型,降低用户使用门槛
  • 分而治之,利用了并行处理提高计算效率
  • 移动计算,减少硬件瓶颈的限制

优秀的设计,总会借鉴使用到前人总结的精华。

在MapReduce设计中,就有很多经典的设计模式的影子:

  • 责任链模式,让每个组件发挥自己的作用,串联起来完成一个完整的分布式应用程序的执行。
  • 模板方法模式,在责任链的基础上,又用了模板的形式来定义数据处理的基本流程架构。
  • 策略模式,在模板方法的基础上,提供灵活的具体业务实现方式。

下面我们就深入了解一下,MapReduce这个所谓的通用计算模型,到底是如何设计落地的。

MapReduce计算模型设计

首先,我们要知道,任何通用的计算模型,本质都可以划分为输入->计算->输出三个模块。既然说MapReduce是一个通用的计算模型,那我们就来看看它是怎么设计实现的。

核心设计思路

我们先从核心设计思路方面入手,MapReduce的编程模型中的核心计算模块设计很简单,正如其名,分为Map和Reduce两个部分:

  • Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
  • Reduce负责“合”,即对map阶段的结果进行全局汇总。

可以看到,这个计算模块的设计非常简单,下面我们看下在代码层面,它是如何基于这个核心思路,去提供输入,计算,输出的能力给用户的。

编程组件设计

在代码层面,MapReduce结合了分布式场景的特殊性,针对这三个模块对外提供了5个可编程组件,分别是InputFormat、Mapper、Partitioner、Reducer和OutputFormat。

下面我们分别介绍一下:

InputFormat

  • 数据读取与分片:因为MapReduce是构建在HDFS上的,那要计算的数据肯定是以一个个Block块的形式,分散存储在不同的DataNode里。InputFormat 组件负责从各种数据源读取数据,并将数据切分成合适的分片(split),从而实现在多个计算节点上并行处理。例如,在处理大规模的文本数据时,InputFormat 可以按行或按固定大小对数据进行分片,使得每个 Mapper 任务可以独立处理一个数据分片,实现数据的并行读取和处理。
  • 数据格式适配:不同的数据源可能有不同的数据格式,如文本格式、二进制格式、数据库记录格式等。InputFormat 能够将各种不同格式的数据转换为 MapReduce 可以处理的键值对形式,为后续的处理提供统一的输入格式。

InputFormat主要用于描述输入数据的格式,核心就是以下两件事:

  1. 数据切分:按照某个策略将输入数据切分成若干个split,以便确定Map Task个数以及对应的split。
  2. 为Mapper提供输入数据:给定某个split,通过创建读取数据的工具(RecordReader)来将其解析成一个个 key-value 对。

  

这种设计有点类似工厂方法,主要有以下好处:

  1. 解耦数据划分和读取过程:
    通过这种设计,将输入数据的划分(InputFormat的职责)和具体的数据读取(RecordReader的职责)两个过程分开。这样可以让开发者独立地修改和扩展这两个部分。
    例如,如果要支持一种新的数据格式,只需要创建一个新的InputFormat子类和对应的RecordReader,而不会影响到其他部分的代码。
  2. 提高可维护性和可扩展性:
    这种设计使得MapReduce框架能够方便地支持多种输入数据格式。对于不同的数据来源和格式,只需要实现相应的InputFormat和RecordReader组合。
    比如,对于数据库数据、日志文件、二进制文件等不同类型的数据,都可以通过自定义的InputFormat和RecordReader来实现数据的有效处理。
  3. 支持数据局部性优化:
    InputFormat在划分数据分片时,可以考虑数据的存储位置等因素,使得RecordReader读取数据时能够更好地利用数据局部性。
    例如,将在同一物理存储位置的数据划分到一个分片,这样可以减少数据传输开销,提高MapReduce的整体性能。

其中文件切分算法在v1和v2版本有所区别:

  • v1:splitSize = max{minSize, min{goalSize, blockSize}}
  • v2:splitSize = max{minSize, min{maxSize, blockSize}}

新版本用 maxSize 替换了 goalSize ,可以更直接地对 splitSize 的上限进行严格控制。

例如,在处理一些对单个Map任务处理数据量上限有严格要求的场景(如资源有限的小型集群或者对任务响应时间敏感的场景),能够明确设置 maxSize ,避免出现因 goalSize 计算复杂而导致输入分片过大的情况。

在InputSplit切分方案确定后,会确定每个InputSplit的元数据信息。这通常由四部分组成:<file, start, length, hosts>,分别表示InputSplit所在的文件、起始位置、长度以及所在的host(节点)列表。

其中,前三项很容易确定,难点在于host列表的选择方法。

FileInputFormat设计了一个简单有效的启发式算法,核心就是尽量选择本地节点。

其实现主要考虑以下几点:

  • 性能提升。通过尽量选择本地和机架本地的节点,可以尽可能减少网络带宽带来的瓶颈,如果能走本地,更是可以完全利用本地磁盘IO,避免网络传输带来的延迟。
  • 资源优化利用。考虑节点的资源状况进行host选择,可以尽可能平衡各个节点的负载,并有效提升集群的吞吐能力。
  • 增强容错。尽可能的选择本地,避免网络传输,能很好的降低数据丢失风险,并提高故障恢复效率。

Mapper

  • 并行数据处理:Mapper 是 MapReduce 中实现并行计算的核心组件。对于大规模的数据处理任务,将数据分片后,每个 Mapper 任务在不同的计算节点上独立地对数据分片进行处理,实现了数据的并行处理,大大提高了处理效率。例如,在进行文本数据的词频统计时,每个 Mapper 可以对自己负责的数据分片中的文本进行单词拆分和初步计数。
  • 数据转换与过滤:Mapper 可以对输入数据进行各种转换和过滤操作,将原始数据转换为更适合后续处理的中间表示形式。比如,可以在 Mapper 中对数据进行清洗、格式转换、提取关键信息等操作,为后续的聚合和分析做准备。

Partitioner

  • 数据分区与分发:在分布式计算中,Mapper 任务的输出需要按照一定的规则分配到不同的 Reducer 任务中进行处理。Partitioner 组件负责根据键的特征将 Mapper 的输出数据划分到不同的分区,确保具有相同或相关键的数据能够被发送到同一个 Reducer 任务中,以便进行有效的聚合和处理。例如,在对大规模用户数据按用户 ID 进行统计分析时,Partitioner 可以根据用户 ID 的哈希值将数据分配到不同的 Reducer,使得同一用户的数据能够在同一个 Reducer 中进行处理。
  • 负载均衡:通过合理的分区策略,Partitioner 可以实现数据在 Reducer 任务之间的均衡分配,避免某些 Reducer 任务处理的数据量过大,而其他 Reducer 任务闲置的情况,从而充分利用集群资源,提高整个系统的性能和效率。

Partitioner的作用是对Mapper产生的中间结果进行分区,以便将同一分组的数据交给同一个Reducer处理,它直接影响Reduce阶段的负载均衡。

MapReduce提供了两个Partitioner实现:HashPartitioner和TotalOrderPartitioner。

  • HashPartitioner是默认实现,它是基于哈希值的分片方法实现的。
  • TotalOrderPartitioner提供了一种基于区间的分片方法,通常用在数据全排序中。

关于全排序,通常容易想到的是归并排序,主要是利用二分去提升效率,其与一些简单的排序算法如插入,冒泡,选择相比,核心就在于没有浪费比较行为

但由于作业只能有一个ReduceTask,因而Reduce阶段会成为作业的瓶颈。为了提高全局排序的性能和扩展性, MapReduce提供了TotalOrderPartitioner。它能够按照大小将数据分成若干个区间(分片),并保证后一个区间的所有数据均大于前一个区间数据。

TotalOrderPartitioner的全排序的步骤如下:

  1. 数据采样。
    在Client端通过采样获取分片的分割点。(Hadoop自带了几个采样算法,IntercalSampler、RandomSampler、SplitSampler等。
  2. Map阶段。
    本阶段涉及两个组件,分别是Mapper和Partitioner。其中,Mapper可选用不同的Mapper实现类,如IdentityMapper,直接将输入数据输出,但Partitioner必须选TotalOrderPartitioner,它将步骤1中获取的分割点保存到trie树(前缀树,字典树)中以便快速定位任意一个记录所在的区间,这样,每个MapTask产生R(Reduce Task个数)个区间,且区间之间有序。
  3. Reduce阶段。
    每个Reducer对分配到的区间数据进行局部排序,最终得到全排序数据。

从以上步骤可以看出,基于TotalOrderPartitioner全排序的效率跟key分布规律和采样算法有直接关系;key值分布越均匀且采样越具有代表性,则Reduce Task负载越均衡,全排序效率越高。

TotalOrderPartitioner有两个典型的应用实例:TeraSort和HBase批量数据导入。

  • TeraSort是Hadoop自带的一个应用程序实例。它曾在TB级数据排序基准评估中赢得第一名,而TotalOrderPartitioner正是从该实例中提炼出来的。
  • HBase是一个构建在Hadoop之上的NoSQL数据仓库。它以Region为单位划分数据,Region内部数据有序(按key排序),Region之间也有序。一个MapReduce全排序作业的R个输出文件正好可对应HBase的R个Region。

Reducer

  • 数据聚合与合并:Reducer 主要用于对 Mapper 输出的经过分区和排序的数据进行聚合和合并操作。在许多分布式计算场景中,需要对数据进行汇总、统计、合并等操作,Reducer 能够将具有相同键的值进行合并和计算,得到最终的结果。如在词频统计中,Reducer 将各个 Mapper 输出的相同单词的计数进行累加,得到最终的单词出现频率。
  • 复杂数据分析:对于一些需要全局视角或多轮处理的复杂数据分析任务,Reducer 可以在收到所有相关数据后进行综合处理。例如,在计算数据的平均值、中位数,或者进行数据的关联和整合等操作时,Reducer 可以根据具体的业务逻辑对数据进行进一步的分析和处理,得到最终的分析结果。

OutputFormat

  • 数据存储与持久化:在 MapReduce 任务完成后,需要将最终的计算结果存储到合适的位置,以便后续的查询和使用。OutputFormat 组件负责将 Reducer 的输出数据按照指定的格式和存储方式进行存储,如将结果存储为文本文件、二进制文件、数据库表等。
  • 结果格式定制:不同的应用场景可能对结果的输出格式有不同的要求,OutputFormat 允许用户根据实际需求定制输出结果的格式和内容,或者按照特定的文件结构和数据组织方式进行存储,方便与其他系统或工具进行集成和交互。

任务架构设计

用户通过借助前面MapReduce提供的编程组件,实现了业务逻辑以后,会将程序打包提交到Hadoop集群中,这里就涉及如何去调度执行任务。

如下图所示,是MRv1的架构设计(MRv2,也就是Yarn,可以看后面深入Yarn篇的内容

我们来介绍一下里面的涉及的核心模块:

Client

用户编写的MapReduce程序通过Client提交到JobTracker端;同时,用户可通过Client提供的一些接口查看作业运行状态。

在Hadoop内部用Job(任务)表示MapReduce程序。一个MapReduce程序可对应若干个Job,而每个作业会被分解成若干个 Map/Reduce Task​。

JobTracker

JobTracker主要负责资源监控和作业调度。

JobTracker监控所有TaskTracker与作业的健康状况,一旦发现失败情况后,其会将相应的任务转移到其他节点;同时,JobTracker会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。

TaskTracker

TaskTracker会周期性地通过Heartbeat(心跳),将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)​。

TaskTracker使用slot来等量划分本节点上的资源量。

slot是MapReduce针对CPU、内存等计算资源的一个抽象,它代表集群中计算节点上的一个基本资源分配单位。

其设计的核心目的,是为了控制同时运行的任务数量,并有效地管理和分配集群的计算资源,避免资源过度使用或闲置。

一个Task获取到一个slot后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot分为Map slot和Reduce slot两种,分别供Map Task和Reduce Task使用。TaskTracker通过slot数目(可配置参数)限定Task的并发度。

Task

Task分为Map Task和Reduce Task两种,均由TaskTracker启动。

从深入HDFS篇章,我们知道HDFS会以固定大小的block为基本单位存储数据,而对于MapReduce而言,其处理单位是Split。这是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意的是,split的多少决定了Map Task的数目,因为每个split会交由一个Map Task处理。

任务调度流程

MapReduce任务的调度流程如下:

Job提交

  • 客户端配置与提交:用户编写实现了 Mapper 和 Reducer 接口的 Java 程序,设置作业的各项参数,如输入输出路径、Mapper 和 Reducer 类等。接着,客户端调用 JobClient 类将作业提交给 JobTracker。
  • 作业检查:在提交作业前,客户端会检查作业的输入输出路径是否合法等,同时计算输入数据的分片信息。

Job初始化

  • JobTracker 接收作业:JobTracker 接收到客户端提交的作业后,为作业分配一个唯一的作业 ID,创建一个 JobInProgress 对象来跟踪该作业的执行进度。
  • 资源和任务初始化:JobTracker 会将作业相关信息(如作业配置、输入分片信息等)存储在 HDFS 上,同时为作业的 Map 和 Reduce 任务分配资源。

Job分配

  • Map 任务分配:JobTracker 根据输入数据的分片情况,将 Map 任务分配给 TaskTracker。一般会尽量将 Map 任务分配到存储有对应输入分片数据的节点上,以实现数据的本地化处理,减少数据传输开销。
  • Reduce 任务分配:JobTracker 会根据作业配置中指定的 Reduce 任务数量,将 Reduce 任务分配给合适的 TaskTracker。Reduce 任务的分配没有数据本地化的要求。

Map 阶段

  • TaskTracker 接收任务:TaskTracker 从 JobTracker 接收分配的 Map 任务后,为该任务启动一个新的 Java 进程。
  • 数据读取:该 Java 进程从 HDFS 读取对应的输入分片数据,将其解析成键值对形式,作为 Mapper 的输入。
  • Map 函数执行Mapper 对输入的键值对执行用户自定义的 map 方法,生成一系列中间键值对。这些中间键值对会先被写入到内存缓冲区。
  • 溢写磁盘:当内存缓冲区达到一定阈值(默认 80%)时,会触发溢写操作。在溢写过程中,数据会按照键进行分区和排序(默认使用哈希分区),并将排序后的结果写入本地磁盘。如果配置了 Combiner,还会在溢写前对相同键的值进行局部合并。
  • 多次溢写合并:如果在 Map 处理过程中发生了多次溢写,最终会将这些溢写文件合并成一个大的分区且排序好的文件。

Shuffle 阶段

  • 数据复制:Reduce 任务启动后,会从各个 Map 任务所在的 TaskTracker 上复制属于自己分区的数据。
  • 归并排序:Reduce 任务将复制过来的数据进行归并排序,确保相同键的值相邻排列。这个过程会将来自不同 Map 任务的相同分区的数据合并在一起。

Reduce 阶段

  • TaskTracker 接收并执行:TaskTracker 从 JobTracker 接收分配的 Reduce 任务后,为其启动一个新的 Java 进程。
  • Reduce 函数执行Reducer 对排序好的数据执行用户自定义的 reduce 方法,对相同键的值进行聚合处理,生成最终的输出结果。
  • 结果输出Reducer 将处理后的结果写入到 HDFS 等指定的输出存储系统中。

Job完成清理

  • 状态更新:当所有的 Map 任务和 Reduce 任务都成功完成后,JobTracker 将作业的状态标记为成功完成。
  • 资源清理:JobTracker 会清理作业运行过程中产生的临时文件和其他相关资源。同时,TaskTracker 也会清理本地磁盘上的中间数据文件。

总结

今天梳理了MapReduce这个通用计算模型的总体设计落地思路,后面我们基于源码去进一步深入它是如何实现的。


http://www.kler.cn/a/516817.html

相关文章:

  • SQL注入漏洞之基础数据类型注入 字符 数字 搜索 XX 以及靶场实例哟
  • MECD+: 视频推理中事件级因果图推理--VLM长视频因果推理
  • 【Postgres_Python】使用python脚本批量创建和导入多个PG数据库
  • 数据结构——算法基础
  • Spark Streaming的核心功能及其示例PySpark代码
  • 优化使用 Flask 构建视频转 GIF 工具
  • TaskBuilder数据添加页面前后端交互原理解析
  • 力扣-数组-704 二分查找
  • AIGC视频扩散模型新星:Video 版本的SD模型
  • 特征选择(机器学习)
  • Jetson Xavier NX 安装 CUDA 支持的 PyTorch 指南
  • Mongodb 慢查询日志分析 - 1
  • AI代码生成器赋能房地产:ScriptEcho如何革新VR/AR房产浏览体验
  • spandsp_start_dtmf的bug及修复
  • 023:到底什么是感受野?
  • 【ComfyUI】python调用生图API,实现批量出图
  • MySQL 中如何进行 SQL 调优?
  • 【ElasticSearch】 Java API Client 7.17文档
  • 【springboot加密传输】
  • 机器学习-手写数字识别
  • 基于Springboot + vue实现的美发门店管理系统
  • Pyside6(PyQT5)中的QTableView与QSqlQueryModel、QSqlTableModel的联合使用
  • Redis支持数据类型详解
  • 后端的config包中的常用配置
  • Java毕设项目:基于Springboot农机农业设备租赁网站系统设计与实现开题报告
  • 「全网最细 + 实战源码案例」设计模式——模式扩展(配置工厂)