大数据 HDFS和MapReduce综合实训
一、关于此次实践
1、实战简介
Hadoop是一个由Apache基金会所开发的分布式系统基础架构,可以在不了解分布式底层细节的情况下,开发分布式程序,以满足在低性能的集群上实现对高容错,高并发的大数据集的高速运算和存储的需要。Hadoop支持超大文件(可达PB级),能够检测和快速应对硬件故障、支持流式数据访问、同时在简化的一致性模型的基础上保证了高容错性。因而被大规模部署在分布式系统中,应用十分广泛。
本实训的主要目标是让大家学习Hadoop的基本概念如MapReduce、HDFS等,并掌握Hadoop的基本操作,主要包括MapReduce编程(词频统计)、HDFS文件流读取操作、MapReduce迭代等。通过本次实训,建立起对Hadoop云计算的初步了解,后续大家可以通过进阶学习来深入学习Hadoop内部实现机制进行高级的应用开发。
2、全部任务
二、实践详解
1、第1关:WordCount 词频统计
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
/*
* MapReduceBase类:实现Mapper和Reducer接口的基类
* Mapper接口:
* WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类要实现此接口。
*/
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
/*
*LongWritable,IntWritable,Text是Hadoop中实现的用于封装Java数据类型的类,这些类实现了WritableComparable接口,
*都能够被串行化,便于在分布式环境中进行数据交换,可以视为long,int,String数据类型的替代。
*/
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();//Text实现了BinaryComparable类,可以作为key值
/*
* Mapper接口中的map方法:
* void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
* 映射一个单个的输入<K1,V1>对到一个中间输出<K2,V2>对
* 中间输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。
* OutputCollector接口:收集Mapper和Reducer输出的<K,V>对。
* OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output
* Reporter 用于报告整个应用的运行进度
*/
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
/*
* 原始数据(以test1.txt为例):
* tale as old as time
true as it can be
beauty and the beast
map阶段,数据如下形式作为map的输入值:key为偏移量
<0 tale as old as time>
<21 world java hello>
<39 you me too>
*/
/**
* 解析(Spliting)后以得到键值对<K2,V2>(仅以test1.txt为例)
* 格式如下:前者是键值,后者数字是值
* tale 1
* as 1
* old 1
* as 1
* time 1
* true 1
* as 1
* it 1
* can 1
* be 1
* beauty 1
* and 1
* the 1
* beast 1
* 这些键值对作为map的输出数据
*/
//****请补全map函数内容****//
/*********begin*********/
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
/*********end**********/
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
/*
* reduce过程是对输入键值对洗牌(Shuffing)形成<K2,list(V2)>格式数据(仅以test1.txt为例):
* (tablie [1])
* (as [1,1,1])
* (old [1])
* (time [1])
* (true [1])
* (it [1])
* (can [1])
* (be [1])
* (beauty [1])
* (and [1])
* (the [1])
* (beast [1])
* 作为reduce的输入
*
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
//****请补全reduce对<k2, list(v2)> 进行合计得到list(<k3,v3>)过程****//
/*********begin*********/
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
/*********end**********/
//****请将list(<k3,v3>)统计输出****//
/*********begin*********/
result.set(sum);
context.write(key, result);
/*********end**********/
}
}
public static void main(String[] args) throws Exception {
/**
* JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作
* 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
*/
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
/*
* 需要配置输入和输出的HDFS的文件路径参数
* 可以使用"Usage: wordcount <in> <out>"实现程序运行时动态指定输入输出
*/
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");//Job(Configuration conf,String jobName)设置job名称
job.setJarByClass(WordCount.class);//为job设置Mapper类
/*********begin*********/
//****请为job设置Mapper类****//
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);//为job设置Combiner类
//****请为job设置Reduce类****//
job.setReducerClass(IntSumReducer.class);
//****请设置输出key的参数类型****//
job.setOutputKeyClass(Text.class);
//****请设置输出value的类型****//
job.setOutputValueClass(IntWritable.class);
/*********end**********/
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//为map-reduce任务设置InputFormat实现类,设置输入路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类,设置输出路径
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2、第 2 关:HDFS 文件读写
import java.io.IOException;
import java.sql.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class hdfs {
public static void main(String[] args) throws IOException {
//throws IOException捕获异常声明
//****请根据提示补全文件创建过程****//
/*********begin*********/
Configuration conf = new Configuration(); //实例化设置文件,configuration类实现hadoop各模块之间值的传递
FileSystem fs = FileSystem.get(conf); //是hadoop访问系统的抽象类,获取文件系统, FileSystem的get()方法得到实例fs,然后fs调动create()创建文件,open()打开文件
System.out.println(fs.getUri());
//实现文件读写主要包含以下步骤:
//读取hadoop文件系统配置
//实例化设置文件,configuration类实现hadoop各模块之间值的传递
//FileSystem是hadoop访问系统的抽象类,获取文件系统, FileSystem的get()方法得到实例fs,然后fs调动create()创建文件,调用open()打开文件,调用close()关闭文件
//*****请按照题目填写要创建的路径,其他路径及文件名无法被识别******//
Path file = new Path("/user/hadoop/myfile");
/*********end**********/
if (fs.exists(file)) {
System.out.println("File exists.");
} else
{
//****请补全使用文件流将字符写入文件过程,使用outStream.writeUTF()函数****//
/*********begin*********/
FSDataOutputStream outStream = fs.create(file); //获取文件流
outStream.writeUTF("china cstor cstor cstor china"); //使用文件流写入文件内容
/*********end**********/
}
//****请补全读取文件内容****//
/*********begin*********/
// 提示:FSDataInputStream实现接口,使Hadoop中的文件输入流具有流式搜索和流式定位读取的功能
FSDataInputStream inStream = fs.open(file);
String data = inStream.readUTF();
/*********end**********/
//输出文件状态
//FileStatus对象封装了文件的和目录的元数据,包括文件长度、块大小、权限等信息
FileSystem hdfs = file.getFileSystem(conf);
FileStatus[] fileStatus = hdfs.listStatus(file);
for(FileStatus status:fileStatus)
{
System.out.println("FileOwer:"+status.getOwner());//所有者
System.out.println("FileReplication:"+status.getReplication());//备份数
System.out.println("FileModificationTime:"+new Date(status.getModificationTime()));//目录修改时间
System.out.println("FileBlockSize:"+status.getBlockSize());//块大小
}
System.out.println(data);
System.out.println("Filename:"+file.getName());
inStream.close();
fs.close();
}
}
3、第 3 关:倒排索引
import java.io.IOException;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.util.Iterator;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex {
public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
String word;
IntWritable frequence=new IntWritable();
int one=1;
Hashtable<String,Integer> hashmap=new Hashtable();//key关键字设置为String
StringTokenizer itr = new StringTokenizer(value.toString());
//****请用hashmap定义的方法统计每一行中相同单词的个数,key为行值是每一行对应的偏移****//
/*********begin*********/
for(;itr.hasMoreTokens(); )
{
word=itr.nextToken();
if(hashmap.containsKey(word)){
hashmap.put(word,hashmap.get(word)+1);
}else{
hashmap.put(word, one);
}
}
/*********end**********/
for(Iterator<String> it=hashmap.keySet().iterator();it.hasNext();){
word=it.next();
frequence=new IntWritable(hashmap.get(word));
Text fileName_frequence = new Text(fileName+"@"+frequence.toString());//以<K2,“单词 文件名@出现频次”> 的格式输出
context.write(new Text(word),fileName_frequence);
}
}
}
public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text>{
protected void reduce(Text key,Iterable<Text> values,Context context)
throws IOException ,InterruptedException{
//****请合并mapper函数的输出,并提取“文件@1”中‘@’后面的词频,以<K2,list(“单词 文件名@出现频次”)>的格式输出****//
/*********begin*********/
String fileName="";
int sum=0;
String num;
String s;
for (Text val : values) {
s= val.toString();
fileName=s.substring(0, val.find("@"));
num=s.substring(val.find("@")+1, val.getLength()); //提取“doc1@1”中‘@’后面的词频
sum+=Integer.parseInt(num);
}
IntWritable frequence=new IntWritable(sum);
context.write(key,new Text(fileName+"@"+frequence.toString()));
/*********end**********/
}
}
public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>
{ @Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{ Iterator<Text> it = values.iterator();
StringBuilder all = new StringBuilder();
if(it.hasNext()) all.append(it.next().toString());
for(;it.hasNext();) {
all.append(";");
all.append(it.next().toString());
}
//****请输出最终键值对list(K3,“单词", “文件1@频次; 文件2@频次;...")****//
/*********begin*********/
context.write(key, new Text(all.toString()));
/*********end**********/
}
}
public static void main(String[] args)
{
if(args.length!=2){
System.err.println("Usage: InvertedIndex <in> <out>");
System.exit(2);
}
try {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "invertedindex");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedIndexMapper.class);
//****请为job设置Combiner类****//
/*********begin*********/
job.setCombinerClass(InvertedIndexCombiner.class);
/*********end**********/
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
//****请设置输出value的类型****//
/*********begin*********/
job.setOutputValueClass(Text.class);
/*********end**********/
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4、第 4 关:网页排序—— PageRank 算法
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.StringTokenizer;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class PageRank {
public static class MyMapper extends Mapper<Object, Text, Text, Text>
{
private Text id = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString();
//判断是否为输入文件
if(line.substring(0,1).matches("[0-9]{1}"))
{
boolean flag = false;
if(line.contains("_"))
{
line = line.replace("_","");
flag = true;
}
//对输入文件进行处理
String[] values = line.split("\t");
Text t = new Text(values[0]);
String[] vals = values[1].split(" ");
String url="_";//保存url,用作下次计算
double pr = 0;
int i = 0;
int num = 0;
if(flag)
{
i=2;
pr=Double.valueOf(vals[1]);
num=vals.length-2;
}
else
{
i=1;
pr=Double.valueOf(vals[0]);
num=vals.length-1;
}
for(;i<vals.length;i++)
{
url=url+vals[i]+" ";
id.set(vals[i]);
Text prt = new Text(String.valueOf(pr/num));
context.write(id,prt);
}
context.write(t,new Text(url));
}
}
}
public static class MyReducer extends Reducer<Text,Text,Text,Text>
{
private Text result = new Text();
private Double pr = new Double(0);
public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException
{
double sum=0;
String url="";
//****请通过url判断否则是外链pr,作计算前预处理****//
/*********begin*********/
for(Text val:values)
{
//发现_标记则表明是url,否则是外链pr,要参与计算
if(!val.toString().contains("_"))
{
sum=sum+Double.valueOf(val.toString());
}
else
{
url=val.toString();
}
}
pr=0.15+0.85*sum;
String str=String.format("%.3f",pr);
result.set(new Text(str+" "+url));
context.write(key,result);
/*********end**********/
//****请补全用完整PageRank计算公式计算输出过程,q取0.85****//
/*********begin*********/
/*********end**********/
}
}
public static void main(String[] args) throws Exception
{
String paths="file:///tmp/input/Wiki0";//输入文件路径,不要改动
String path1=paths;
String path2="";
for(int i=1;i<=5;i++)//迭代5次
{
System.out.println("This is the "+i+"th job!");
System.out.println("path1:"+path1);
System.out.println("path2:"+path2);
Configuration conf = new Configuration();
Job job = new Job(conf, "PageRank");
path2=paths+i;
job.setJarByClass(PageRank.class);
job.setMapperClass(MyMapper.class);
//****请为job设置Combiner类****//
/*********begin*********/
job.setCombinerClass(MyReducer.class);
/*********end**********/
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(path1));
FileOutputFormat.setOutputPath(job, new Path(path2));
path1=path2;
job.waitForCompletion(true);
System.out.println(i+"th end!");
}
}
}