大数据处理技术:MapReduce综合实训
目录
1 实验名称
2 实验目的
3 实验内容
4 实验原理
5 实验过程或源代码
5.1 WordCount词频统计
5.2 HDFS文件读写
5.3 倒排索引
5.4 网页排序——PageRank算法
6 实验结果
6.1 WordCount词频统计
6.2 HDFS文件读写
6.3 倒排索引
6.4 网页排序——PageRank算法
1 实验名称
MapReduce综合实训
2 实验目的
1.了解什么是MapReduce框架。
2、理解MapReduce编程思想,学会编写MapReduce版本WordCount,会执行该程序,可以自行分析执行过程。
3、理解MapReduce编程思想,学会编写MapReduce版本计数器程序,并能执行该程序和分析执行过程
3 实验内容
(1)WordCount词频统计
(2)HDFS文件读写
(3)倒排索引
(4)网页排序——PageRank算法
4 实验原理
MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。Reduce负责“合”,即对map阶段的结果进行全局汇总。这两个阶段合起来正是MapReduce思想的体现。
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成–个完整的分布式运算程序,并发运行在Hadoop集上。既然是做计算的框架,那么表现形式就是有个输入( input ),MapReduce 操作这个输入( input),通过本身定义好的计算模型,得到一个输出(output)。
5 实验过程或源代码
5.1 WordCount词频统计
1.在主函数main中已初始化hadoop的系统设置,包括hadoop运行环境的连接。补全map函数内容,代码实现如下:
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
2.在main函数中,已经设置好了待处理文档路径(即input),以及结果输出路径(即output)。补全reduce对<k2, list(v2)> 进行合计得到list(<k3,v3>)过程,代码实现如下:
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
3.在main函数中,已经声明了job对象,程序运行的工作调度已经设定好。将list(<k3,v3>)统计输出,代码实现如下:
result.set(sum);
context.write(key, result);
4.为job设置类,代码实现如下:
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);//为job设置Combiner类
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
5.2 HDFS文件读写
1.在主函数main中已获取hadoop的系统设置,并在其中创建HDFS文件。在main函数中,指定创建文档路径。根据提示补全文件创建过程,代码实现如下:
Configuration conf = new Configuration(); //实例化设置文件,configuration类实现hadoop各模块之间值的传递
FileSystem fs = FileSystem.get(conf); //是hadoop访问系统的抽象类,获取文件系统, FileSystem的get()方法得到实例fs,然后fs调动create()创建文件,open()打开文件
System.out.println(fs.getUri());
Path file = new Path("/user/hadoop/myfile");
2.添加读取文件输出部分,补全使用文件流将字符写入文件过程,使用outStream.writeUTF()函数,代码实现如下:
FSDataOutputStream outStream = fs.create(file); //获取文件流
outStream.writeUTF("china cstor cstor cstor china"); //使用文件流写入文件内容
3.补全读取文件内容,代码实现如下:
FSDataInputStream inStream = fs.open(file);
String data = inStream.readUTF();
5.3 倒排索引
1.在主函数main中已初始化hadoop的系统设置,包括hadoop运行环境的连接。用hashmap定义的方法统计每一行中相同单词的个数,key为行值是每一行对应的偏移,代码实现如下:
for(;itr.hasMoreTokens(); )
{
word=itr.nextToken();
if(hashmap.containsKey(word)){
hashmap.put(word,hashmap.get(word)+1);
}else{
hashmap.put(word, one);
}
}
2.在main函数中,已经设置好了待处理文档路径(即input),以及结果输出路径(即output)。合并mapper函数的输出,并提取“文件@1”中‘@’后面的词频,以<K2,list(“单词 文件名@出现频次”)>的格式输出,代码实现如下:
String fileName="";
int sum=0;
String num;
String s;
for (Text val : values) {
s= val.toString();
fileName=s.substring(0, val.find("@"));
num=s.substring(val.find("@")+1, val.getLength()); //提取“doc1@1”中‘@’后面的词频
sum+=Integer.parseInt(num);
}
IntWritable frequence=new IntWritable(sum);
context.write(key,new Text(fileName+"@"+frequence.toString()));
3.在main函数中,已经声明了job对象,程序运行的工作调度已经设定好。输出最终键值对list(K3,“单词", “文件1@频次; 文件2@频次,代码实现如下:
context.write(key, new Text(all.toString()));
4.为job设置Combiner类,代码实现如下:
job.setCombinerClass(InvertedIndexCombiner.class);
5.设置输出value的类型,代码实现如下:
job.setOutputValueClass(Text.class);
5.4 网页排序——PageRank算法
1.通过url判断否则是外链pr,作计算前预处理;补全用完整PageRank计算公式计算输出过程,q取0.85,代码实现如下:
for(Text val:values)
{
//发现_标记则表明是url,否则是外链pr,要参与计算
if(!val.toString().contains("_"))
{
sum=sum+Double.valueOf(val.toString());
}
else
{
url=val.toString();
}
}
pr=0.15+0.85*sum;
String str=String.format("%.3f",pr);
result.set(new Text(str+" "+url));
context.write(key,result);
2.为job设置Combiner类,代码实现如下:
job.setCombinerClass(MyReducer.class);