MapReduce,Yarn,Spark理解与执行流程
MapReduce的API理解
Mapper
- 如果是单词计数:hello:1, hello:1, world:1
public void map(Object key, // 首字符偏移量
Text value, // 文件的一行内容
Context context) // Mapper端的上下文,与 OutputCollector 和 Reporter 的功能类似
throws IOException, InterruptedException {
Reduce
- 注意拿到的是value的集合
- 如果是单词计数:hello:【1,1】,world:【1】
public void reduce(Text key, // Map端 输出的 key 值
Iterable<IntWritable> values, // Map端 输出的 Value 集合(相同key的集合)
Context context) // Reduce 端的上下文,与 OutputCollector 和 Reporter 的功能类似
throws IOException, InterruptedException
{
整体架构
- NameNode: 负责存储文件的元信息(如文件的大小、块信息、存储位置等)。通常会部署一个 Secondary NameNode 来周期性合并 NameNode 的 edit log 以减少恢复时间,但它并不是热备,而是辅助管理。避免单点故障
- JobTracker:负责 Hadoop MapReduce 任务的调度和资源管理,接收作业请求并将任务分配给不同的 TaskTracker(工作节点)
- DataNode:实际存储数据的节点,存有多个数据块(Block),128MB
- TaskTracker:实际进行mapper和reduce工作的节点,可以看到图中TaskTracker接近DataNode,这是“移动计算比移动数据更便宜” 的设计理念,目的是减少数据传输,提高计算效率
输入阶段
- 文件存储到 HDFS(Hadoop Distributed File System):
- 当文件上传到 HDFS 时,文件会先到达
NameNode
,NameNode
负责存储文件的元信息(如文件的大小、块信息、存储位置等)。 - 随后,文件会被分割成固定大小的数据块(Block),默认每块大小是 128 MB(可以通过配置调整)。
- 这些数据块会被分布式地存储到集群中的不同
DataNode
节点上,通常每个块有多个副本(默认是 3 个),以保证数据的可靠性,同步到指定数量的副本后,才向NameNode确认写操作成功,保证一致性。TiDB的底层TiKV的存储也类似这个结构,根据键值对分为多个region,且不同的节点保存奇数个副本,并有raft协议保证一致性。
Mapper阶段
- TaskTracker 和 Mapper 的运行:
- 当执行 MapReduce 作业时,
JobTracker
会负责任务的调度。 - 根据
NameNode
提供的块位置信息,JobTracker
会在包含该块数据的DataNode
上启动Mapper
,这是数据本地化优化的核心。 - 每个
Mapper
会处理一个或多个数据块。 Mapper
会将每一行的文件处理成键值对形式,也可以进行数据预处理(过滤、清洗):
- 当执行 MapReduce 作业时,
// input.txt:
apple 10
banana 20
apple 5
// after mapper:
(apple, 5), (apple, 10), (banana, 20)
Mapper
的输出结果存储在本地磁盘的缓存文件中或者磁盘中,并分为多个分区,每个分区对应一个Reducer
。
Shuffle 阶段
什么是 Shuffle?
Shuffle 是将 Mapper
输出的中间数据(键值对)分发给 Reducer
的过程。
其主要任务包括:
- 对 Mapper 输出的数据进行分区。
- 将分区数据从 Mapper 节点移动到 Reducer 节点。
- 对分区数据进行排序和合并。
Shuffle 的执行角色
-
Mapper 节点执行分区:
- 在 Mapper 阶段结束后,输出结果会被分区(默认使用
HashPartitioner
)。 - 每个分区对应一个 Reducer。Mapper 的数据会按照分区规则存储到本地磁盘的多个文件中。
- 在 Mapper 阶段结束后,输出结果会被分区(默认使用
-
JobTracker(或 Yarn)负责协调:
JobTracker
会通知各 Reducer 节点**从相应的 Mapper 节点拉取(pull)属于自己的分区数据。
-
Reducer 节点的数据迁移:
- 每个 Reducer 从多个 Mapper 节点拉取自己的数据。
- 拉取的数据会临时存储在 Reducer 节点上,并在内存中进行排序和合并,以便 Reducer 处理。
Shuffle 阶段会涉及到数据从 Mapper 节点到 Reducer 节点的迁移,这也是整个 MapReduce 流程中最耗时的一部分。
举个例子,更好理解如何分区以及数据传输:
Shuffle例子
输入文件内容:
File1: Hello Hadoop Hello
File2: Hadoop MapReduce Hadoop
分块:
- Block1(File1)
- Block2(File2)
Mapper 输出:
假设有 2 个 Mapper
和 2 个 Reducer
,并用 key.hashCode() % 2
作为分区规则。
Mapper | Key | Partition (Reducer) | Output |
---|---|---|---|
Mapper1 | Hello | 0 | {Hello: 1} |
Mapper1 | Hadoop | 1 | {Hadoop: 1} |
Mapper1 | Hello | 0 | {Hello: 1} |
Mapper2 | Hadoop | 1 | {Hadoop: 1} |
Mapper2 | MapReduce | 0 | {MapReduce: 1} |
Mapper2 | Hadoop | 1 | {Hadoop: 1} |
Shuffle 阶段:
在 Shuffle 阶段,每个 Reducer 会从多个 Mapper 拉取数据:
Reducer | Data Source | Received Data |
---|---|---|
Reducer0 | Mapper1 + Mapper2 | {Hello: [1, 1], MapReduce: [1]} |
Reducer1 | Mapper1 + Mapper2 | {Hadoop: [1, 1, 1]} |
Reducer 聚合:
最终,Reducer 聚合数据,输出结果:
Reducer0: {Hello: 2, MapReduce: 1}
Reducer1: {Hadoop: 3}
Reduce阶段
前面提到,reduce时会向map的节点获取数据,它是如何直到那个mapper节点的呢?
具体是这样的:map任务成功完成后,它们会使用心跳机制通知它们JobTracker。因此,对于指定作业JobTracker 知道 map输出和主机位置之间的映射关系。reducer 中的一个线程定期询问 JobTracker 以便获取 map输出主机的位置,直到获得所有输出位置。
Reduce 任务执行完毕后,输出结果会直接存储到 HDFS,但是Reduce 节点不会主动通知 NameNode 数据位置,而是 HDFS 负责数据存储的元数据管理,Reduce 任务会通过 HDFS 客户端 API 将数据写入 HDFS
写数据到 HDFS 的过程(详)
-
客户端请求写入文件:
客户端(例如 Reducer 或用户程序)向 HDFS 的 NameNode 发起写入请求。
客户端需要告诉 NameNode 文件的元信息(如文件名、大小等)
NameNode 分配数据块NameNode 根据文件大小、HDFS 的配置(如块大小和副本数量),分配该文件需要的 数据块(Block)
对于每个块,NameNode 会选择多个 DataNode(通常是 3 个)作为存储目标,并将这些位置信息返回给客户端。
-
分配数据块:
通常会优先选择离客户端最近的节点,或者是同一个机架的节点,来减少网络延迟。副本的存储节点也会尽量分布在不同的机架上,提高数据可靠性
客户端直接写入 DataNode:客户端根据 NameNode 返回的块位置信息,开始向第一组目标 DataNode 写入数据。写入过程是 流式传输,数据被切分为块后,直接发送给第一个 DataNode
DataNode 进行副本复制:第一台 DataNode 在接收到数据后,会立即将该数据块传输到下一台 DataNode,依此类推,直到完成所有副本的写入(链式复制)
-
DataNode 汇报块信息:
每个 DataNode 在数据写入完成后,会向 NameNode 汇报存储的块信息(如块 ID、块大小、存储位置)
-
写入完成:
当所有数据块都写入成功,并且所有副本都存储完成后,HDFS 客户端通知 NameNode 文件写入完成,保证数据的一致性NameNode 将文件的元数据标记为 “完成状态”
MapReduce框架的Java实现
这里手写一个简易的java实现的框架,方便大家理解
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
// 定义 Mapper 接口
interface Mapper {
List<Pair<String, Integer>> map(String input);
}
// 定义 Reducer 接口
interface Reducer {
Pair<String, Integer> reduce(String key, List<Integer> values);
}
// 定义 Pair 类,用于存储键值对
class Pair<K, V> {
public final K key;
public final V value;
public Pair(K key, V value) {
this.key = key;
this.value = value;
}
@Override
public String toString() {
return key + ": " + value;
}
}
// 实现支持多个 Mapper 和 Reducer 的 MapReduce 框架
class ParallelMapReduceFramework {
private List<Mapper> mappers;
private List<Reducer> reducers;
private int reducerCount;
public ParallelMapReduceFramework(List<Mapper> mappers, List<Reducer> reducers) {
this.mappers = mappers;
this.reducers = reducers;
this.reducerCount = reducers.size();
}
public Map<String, Integer> execute(List<String> inputs) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(mappers.size());
// 1. Map 阶段:将输入数据分给多个 Mapper 并行处理
List<Future<List<Pair<String, Integer>>>> mapResults = new ArrayList<>();
int chunkSize = inputs.size() / mappers.size();
for (int i = 0; i < mappers.size(); i++) {
int start = i * chunkSize;
int end = (i == mappers.size() - 1) ? inputs.size() : (i + 1) * chunkSize;
List<String> chunk = inputs.subList(start, end);
Mapper mapper = mappers.get(i);
mapResults.add(executor.submit(() -> {
List<Pair<String, Integer>> results = new ArrayList<>();
for (String input : chunk) {
results.addAll(mapper.map(input));
}
return results;
}));
}
// 收集所有 Mapper 生成的键值对
List<Pair<String, Integer>> allMappedData = new ArrayList<>();
for (Future<List<Pair<String, Integer>>> future : mapResults) {
allMappedData.addAll(future.get());
}
// 2. Shuffle 阶段:将键值对分片,分配给不同的 Reducer
Map<Integer, List<Pair<String, Integer>>> reducerInput = new HashMap<>();
for (int i = 0; i < reducerCount; i++) {
reducerInput.put(i, new ArrayList<>());
}
for (Pair<String, Integer> pair : allMappedData) {
int reducerIndex = Math.abs(pair.key.hashCode() % reducerCount);
reducerInput.get(reducerIndex).add(pair);
}
// 3. Reduce 阶段:每个 Reducer 处理一个分片数据
List<Future<Map<String, Integer>>> reduceResults = new ArrayList<>();
for (int i = 0; i < reducers.size(); i++) {
int index = i;
Reducer reducer = reducers.get(i);
List<Pair<String, Integer>> inputForReducer = reducerInput.get(index);
reduceResults.add(executor.submit(() -> {
// 按键分组
Map<String, List<Integer>> groupedData = new HashMap<>();
for (Pair<String, Integer> pair : inputForReducer) {
groupedData.computeIfAbsent(pair.key, k -> new ArrayList<>()).add(pair.value);
}
// Reduce 操作
Map<String, Integer> result = new HashMap<>();
for (Map.Entry<String, List<Integer>> entry : groupedData.entrySet()) {
result.put(entry.getKey(), reducer.reduce(entry.getKey(), entry.getValue()).value);
}
return result;
}));
}
// 收集所有 Reducer 的结果
Map<String, Integer> finalResult = new HashMap<>();
for (Future<Map<String, Integer>> future : reduceResults) {
finalResult.putAll(future.get());
}
executor.shutdown();
return finalResult;
}
}
// 实现单词统计的 Mapper 和 Reducer
class WordCountMapper implements Mapper {
@Override
public List<Pair<String, Integer>> map(String input) {
String[] words = input.split("\\s+");
List<Pair<String, Integer>> result = new ArrayList<>();
for (String word : words) {
result.add(new Pair<>(word.toLowerCase(), 1));
}
return result;
}
}
class WordCountReducer implements Reducer {
@Override
public Pair<String, Integer> reduce(String key, List<Integer> values) {
int sum = values.stream().mapToInt(Integer::intValue).sum();
return new Pair<>(key, sum);
}
}
// 测试并行 MapReduce 框架
public class ParallelWordCountExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 输入数据
List<String> inputs = Arrays.asList(
"Hello world hello",
"MapReduce is powerful",
"Hello MapReduce world",
"Java is great",
"Hello from the other side"
);
// 创建多个 Mapper 和 Reducer 实例
List<Mapper> mappers = Arrays.asList(new WordCountMapper(), new WordCountMapper());
List<Reducer> reducers = Arrays.asList(new WordCountReducer(), new WordCountReducer());
// 执行 MapReduce
ParallelMapReduceFramework framework = new ParallelMapReduceFramework(mappers, reducers);
Map<String, Integer> wordCounts = framework.execute(inputs);
// 输出结果
wordCounts.forEach((word, count) -> System.out.println(word + ": " + count));
}
}
Yarn(MapReduce2)
MapReducer1的问题:
前面提到的MapReducer的模型的问题:
- JobTracker的负载(可扩展性):MapReduce 1 中,,obtracker 同时负责作业调度(将任务与 tasktracker 匹配)和任务进度监控(跟踪任务、重启失败或迟缓的任务;记录任务流水,如维护计数器的计数)。当任务实例过多时,会导致系统无法再扩展
- JobTracker的可用性:由于JobTracker管理所有应用的状态,为了实现其可用性,就要创建副本并同步内存中的数据,强一致性意味着性能的损耗,弱一致性意味着故障恢复时的数据的差异。
- 节点的利用率:MapReduce 1中,每个tasktracker都配置有若干固定长度和类型的slot,这些slot是静态分配的,在配置的时候就被划分为map slot和reduce slot。一个map slot仅能用于运行一个map任务,一个reduce slot仅能用于运行一个reduce任务,所以分配不当就会导致系统性能低下
针对以上几个问题,Yarn将jobTracker的工作拆分,分为资源管理器(负责作业调度),以及application Master(负责任务进度监控,一个MapperReducer应用对应一个application Master),通过合理的资源分配提高节点的利用率,每个应用交由一个master管理,可以无限扩展资源避免单点的负载过大,还可以通过zookeeper等机制分别实现资源管理器和application master的高可用(如失败后再次申请资源)。
还有一个优点就是实现了多租户,对于资源的抽象和分配机制,可以在Yarn上构建不同的应用,如Spark等
MapReducer2的工作流程
申明几个概念:
- Yarn的资源管理器,申请的资源即为一个container,可以指定其计算机的资源数量(内存和CPU),可以理解为之前版本的DataNode拆分成了多个容器
- Map,reduce的执行是容器中的进程,而前面提到的Application Master实际上也是容器中的进程,只是功能较为特殊
- 一个MapReduce应用对应一个Application Master
- 一个节点对应一个Node Manager,负责管理该节点上的所有容器和心跳
- 1-5的步骤较为简单,就是创建一个container用于生成application master。具体是资源管理器收到调用它的submitApplication()消息后,便将请求传递给 YARN调度器(scheduler)。调度器分配一个容器,然后资源管理器在节点管理器的管理下在容器中启动application master的进程
- 接下来,它接受来自共享文件系统的、在客户端计算的输入分片(步骤7)。然后对每一个分片创建一个 map任务对象以及由mapreduce.job.reduces 属性(通过作业的 setNumReduceTasks()方法设置),根据配置确定多个reduce任务对象。
- application master就会为该作业中的所有map任务和reduce任务向资源管理器请求执行具体任务的容器
- 一旦资源管理器的调度器为任务分配了一个特定节点上的容器,application master就通过与节点管理器通信来启动容器(步骤9a和9b)。该任务由主类为YarnChild的一个 Java 应用程序执行。在它运行任务之前,首先将任务需要的资源本地化,包括作业的配置、JAR 文件和所有来自分布式缓存的文件(步骤 10)。最后,运行map任务或reduce任务(步骤11)。
Spark
适用场景
Spark 最突出的表现在于它能将作业与作业之间产生的大规模的工作数据集存储在内存中。MapReduce的数据集始终需要从磁盘上加载。从Spark 处理模型中获益最大的两种应用类型分别为迭代算法(即对一个数据集重复应用某个函数,直至满足退出条件)和交互式分析(用户向数据集发出一系列专用的探索性查询,比如查出数据后,根据数据在进行多次筛选分析)。
相关概念
RDD
RDD(Resilient Distributed Dataset)是 Spark 的核心抽象,用于表示分布式、不变、容错的数据集。
RDD的分区(Partition) 是 Spark 对数据的基本并行处理单元。RDD会被分割成多个分区,并在多个节点上并行处理,以实现高效的分布式计算。分区的大小就是HDFS 文件默认块大小(HDFS block size),通常为 128MB。可以理解为从HDFS中读取block到内存中,并且对这个block进行计算的抽象
举个例子,我们在 Spark 中使用 textFile(“hdfs://path/to/my_data.txt”) 读取时,RDD 会被划分为2 个分区,分别对应:
Partition 1 在 Node A 处理 Block 1 数据
Partition 2 在 Node B 处理 Block 2 数据
RDD的操作
RDD的生成,一个是读取文件,在不同的block中生成分区,还有一个就是对现有的RDD进行转换
JavaRDD<String> rdd = sc.textFile("hdfs://path/to/my_data.txt");
JavaRDD<String> filteredRDD = rdd.filter(line -> line.contains("error"));
JavaPairRDD<String, Integer> counts = filteredRDD
.mapToPair(line -> new Tuple2<>(line, 1))
.reduceByKey((a, b) -> a + b);
// 触发RDD开始转换,foreach函数也可以触发
counts.saveAsTextFile("hdfs://path/to/output");
以上的代码中的filter,mapToPair,reduceByKey就是一系列动作,类似于响应式编程中的订阅发布,当实际订阅(也就是这里的saveAsTextFile执行时),才会触发发布(RDD的开始转换),即惰性转换
RDD的持久化
spark的特性就是能够保存中间数据在内存,默认RDD不会保存到内存中,当我们需要某部分数据时,可以手动将其保存到内存中,方便下一次计算,以下调用cache缓存
当执行RDD转换时,提示已经保存:
当下一次对该RDD重新进行不同的转换时,提示用到了缓存:
DAG和Stage
多个RDD的转换形成一个有向无环图,当一些可以基于本地的RDD的操作进行的转换的执行链,即每个分区的数据只依赖于上游 RDD (在本地)的一个分区的话(如 map(), filter()),我们当然可以在同一个节点中进行这个转换操作,这称为窄依赖
如果当前 RDD 的分区需要依赖多个上游 RDD 分区(如 reduceByKey(), groupBy()),那么会发生 Shuffle,相当于触发了一次reduce操作,这成为宽依赖
而这个DAG会因为出现宽依赖而进行stage的划分,将执行链拆分成不同的stage部分,每一个stage交给一个节点运行
这个很好理解,相当于上游RDD的reduceByKey需要进行一个类似于mapreducer中的shuffle操作,下游RDD的reduceByKey需要进行一个类似于mapreducer中的reduce操作,而reduce的数据非本地的,且对应的所需要的reduce的任务数量也不等同于map阶段的任务数,所以重新分配