Hadoop学习笔记(持续更新中)
Hadoop
Hadoop 的组成
- Hadoop1.x
- MapReduce(计算+资源调度)
- HDFS(数据存储)
- Common(辅助工具)
- Hadoop2.x、3.x
- Yarn(资源调度): A framework for job scheduling and cluster resource management.
- MapReduce(计算): A YARN-based system for parallel processing of large data sets.
- HDFS(数据存储): A distributed file system that provides high-throughput access to application data.
- Common(辅助工具): The common utilities that support the other Hadoop modules.
Hadoop1.x 时 代 ,Hadoop中 的MapReduce同时处理业务逻辑运算和资源的调度,耦合性较大。
在Hadoop2.x时 代,增加 了Yarn。Yarn只负责资 源 的 调 度 ,MapReduce 只负责运算。
Hadoop3.x在组成上没有变化。
HDFS
Hadoop Distributed File System
特点
以 高吞吐 的 流式数据访问 模式来存储 单写入多读取仅末尾追加 的 超大文件,运行于 商用硬件 集群上
流式数据访问:一次写入、多次读取
超大文件:HDFS中的典型文件大小为GB到TB
商用硬件:即轻易能买到的普通硬件,对庞大集群来说节点故障率不低,于是HDFS核心目标是能检测故障并快速自动地从故障中恢复,且在故障时继续运行,不让用户察觉到明显的中断
高吞吐:HDFS重点在于数据访问的高吞吐量,而不是低延迟,低延迟场景不适合用HDFS(几十毫秒)
简单一致性:单用户写入(不可多用户同时写入),写操作仅支持“文件末尾追加”,不支持任意位置修改
可移植性:HDFS是使用Java语言构建的,任何支持Java的机器都可以运行NameNode或DataNode软件。使用高度可移植的Java语言意味着HDFS可以部署在各种各样的机器上。
NameNode和DataNode
HDFS集群具有主/从架构,由一个NameNode和多个DataNode组成。
NameNode是一个主服务器,它管理文件系统命名空间,并规范客户端对文件的访问。NameNode执行文件系统命名空间操作,如打开、关闭和重命名文件和目录。它还决定了块到datanode的映射。——HDFS元数据
DataNode通常集群中每个节点一个,用于管理与它们所运行的节点相连的存储。文件将被分割成一个或多个块,存储在一组datanode中。datanode负责为来自文件系统客户端的读写请求提供服务。datanode还根据NameNode的指令执行块的创建、删除和复制。——用户数据
文件系统命名空间
HDFS支持传统的分层文件组织,其层次结构与大多数其他现有文件系统相似。
用户或应用程序可以创建目录并将文件存储在这些目录中,可以进行创建、删除、移动、重命名等操作。
支持用户配额和访问权限。
NameNode维护文件系统名称空间。对文件系统名称空间或其属性的任何更改均由NameNode记录。
数据块
HDFS上的文件被分为多个块,除最后一个块外的所有块都具有相同的大小。
可以为文件指定复制因子,将数据块复制到其他结点,确保发生故障时数据不丢失(故障时读取副本的过程对用户透明)。
复制因子可以在文件创建时指定,以后可以更改。
副本的最大数量是当时DataNode的总数,因为一个DataNode不能具有同一块的多个副本。
小于一个块大小的文件不会占据整个块的空间。
MapReduce
Hadoop MapReduce是一个软件框架,可以编写用于大规模数据处理的分布式应用程序(TB级别),并发、可靠可容错地运行在hadoop集群上。
job、task、input split
MapReduce作业(Job)被Hadoop分成若干个任务(task),包括map任务和reduce任务,由YARN进行调度。多个map并发执行。
输入数据会被分成等长的小数据块,称为输入分片(input split)。每个分片对应一个map任务,该任务会调用用户自定义的map()函数以处理该分片的数据。一般建议分片大小与HDFS块大小相同,以减少网络传输。
通常Hadoop在存有输入数据的HDFS节点上运行map任务,这样可以减少网络传输。
combiner
combiner函数属于可选的优化方案,可对一个map任务指定一个combiner,在进入reduce之前先处理map的输出,以减少reduce的输入。
一般可以和reduce函数一样的实现。
其他
map和reduce的输入输出都是键值对,其键
和值
类必须序列化, 此外,关键类必须实现WritableComparable接口,以利于框架进行排序。
Hadoop提供了一套可优化网络序列化传输的基本类型,在org.apache.hadoop.io中,例如LongWritable相当于Java的Long、Text相当于String
示例
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}