MapReduce简单应用(三)——高级WordCount
目录
- 1. 高级WordCount
- 1.1 IntWritable降序排列
- 1.2 输入输出格式
- 1.3 处理流程
- 2. 代码和结果
- 2.1 pom.xml中依赖配置
- 2.2 工具类util
- 2.3 高级WordCount
- 2.4 结果
- 参考
本文引用的Apache Hadoop源代码基于Apache许可证 2.0,详情请参阅 Apache许可证2.0。
1. 高级WordCount
文本内容就是下文2.3中的代码,目标是要实现文本计数,并且数量在前,文本在后,同时数量要升序排列。
1.1 IntWritable降序排列
IntWritable类型中实现一个升序排列的比较器,代码如下。而实现IntWritable降序排序只需要定义一个新类,继承IntWritable.Comparator,并且重载public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
,使其返回值为父类该方法返回值的相反数。此外,如果你想要让作为键的IntWritable类型进行降序排列,还需要在MapReduce任务调度代码中设置Job.setSortComparatorClass(比较器.class)
。
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public static class Comparator extends WritableComparator {
public Comparator() {
super(IntWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int thisValue = readInt(b1, s1);
int thatValue = readInt(b2, s2);
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
}
1.2 输入输出格式
java类名 | 输入/输出 | 功能 |
---|---|---|
org.apache.hadoop.mapreduce.lib.input.TextInputFormat | MapReduce默认的输入格式 | 将输入文件按行分割,每一行作为<key, value>对,其中key是行的偏移量(从0开始),value 是行的内容 |
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat | MapReduce默认的输出格式 | 将输出写成文本文件,每个<key, value>对占一行,key和value之间用制表符(\t)分隔 |
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat | SequenceFile的输入格式 | 读取Hadoop的二进制文件格式SequenceFile |
org.apache.hadoop.mapreduce.lib.input.SequenceFileOutputFormat | SequenceFile的输出格式 | 将输出写成Hadoop的二进制文件格式SequenceFile |
(Hadoop定义的SequenceFile是一种高效、可分割的二进制文件格式,支持压缩)
(Hadoop定义了好多输入输出格式,由于我没有详细使用,这里就不介绍了)
如果要进行多次MapReduce作业,中间结果可以以SequenceFile的形式存储,加速作业的运行。
1.3 处理流程
首先高级WordCount也要像普通WordCount一样对文本进行计数,因此Reduce函数输入的键值对为<Text,IntWritable>。而最终要求的结果键值对为<IntWritable, Text>,如果把Reduce函数的输出键值对直接变为<IntWritable, Text>并且在该任务中只使用一个作业的话,你会发现无法完成IntWritable降序排列(尽管你可以已经设置SortComparatorClass),那是因为Shuffle过程的排序只会发生在Map结束后Reduce发生前,这时键的类型是Text而非IntWritable。
为了解决这个任务,需要进行两次作业,第一次作业负责计数,并以SequenceFile的格式输出,Map的输出、Reduce的输入和输出均为<Text, IntWritable>,最终文件输出格式选择SequenceFileOutputFormat;第二次作业负责交换键值对,并以SequenceFile的个数读入,然后再对键进行降序排列,这就需要使用Hadoop自带的org.apache.hadoop.mapreduce.lib.map.InverseMapper
,它能交换键值对。这次作业的输入格式选择SequenceFileInputFormat,Map输入和Map输出分别是<Text, IntWritable>、<IntWritable, Text>,这时设置SortComparatorClass就可以实现IntWritable降序排列。
2. 代码和结果
2.1 pom.xml中依赖配置
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.6</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.6</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>3.3.6</version>
</dependency>
</dependencies>
2.2 工具类util
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class util {
public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {
URI add = new URI(uri);
return FileSystem.get(add, conf);
}
public static void removeALL(String uri, Configuration conf, String path) throws Exception {
FileSystem fs = getFileSystem(uri, conf);
if (fs.exists(new Path(path))) {
boolean isDeleted = fs.delete(new Path(path), true);
System.out.println("Delete Output Folder? " + isDeleted);
}
}
public static void removeALL(String uri, Configuration conf, String[] pathList) throws Exception {
FileSystem fs = getFileSystem(uri, conf);
for (String path : pathList) {
if (fs.exists(new Path(path))) {
boolean isDeleted = fs.delete(new Path(path), true);
System.out.println(String.format("Delete %s? %s", path, isDeleted));
}
}
}
public static void showResult(String uri, Configuration conf, String path) throws Exception {
FileSystem fs = getFileSystem(uri, conf);
String regex = "part-r-";
Pattern pattern = Pattern.compile(regex);
if (fs.exists(new Path(path))) {
FileStatus[] files = fs.listStatus(new Path(path));
for (FileStatus file : files) {
Matcher matcher = pattern.matcher(file.getPath().toString());
if (matcher.find()) {
System.out.println(file.getPath() + ":");
FSDataInputStream openStream = fs.open(file.getPath());
IOUtils.copyBytes(openStream, System.out, 1024);
openStream.close();
}
}
}
}
}
2.3 高级WordCount
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class App {
public static class IntWritableDecreaseingComparator extends IntWritable.Comparator {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] splitStr = value.toString().split("\\s+");
for (String str : splitStr) {
context.write(new Text(str), new IntWritable(1));
}
}
}
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String tempPath = "hdfs://localhost:9000/user/developer/Temp";
String[] myArgs = {
"file:///home/developer/CodeArtsProjects/advanced-word-count/AdvancedWordCount.txt",
"hdfs://localhost:9000/user/developer/AdvancedWordCount/output"
};
util.removeALL("hdfs://localhost:9000", conf, new String[] { tempPath, myArgs[myArgs.length - 1] });
Job job = Job.getInstance(conf, "AdvancedWordCount");
job.setJarByClass(App.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setNumReduceTasks(2);
for (int i = 0; i < myArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(myArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(tempPath));
int res1 = job.waitForCompletion(true) ? 0 : 1;
if (res1 == 0) {
Job sortJob = Job.getInstance(conf, "Sort");
sortJob.setJarByClass(App.class);
sortJob.setMapperClass(InverseMapper.class);
sortJob.setInputFormatClass(SequenceFileInputFormat.class);
sortJob.setOutputKeyClass(IntWritable.class);
sortJob.setOutputValueClass(Text.class);
sortJob.setSortComparatorClass(IntWritableDecreaseingComparator.class);
FileInputFormat.addInputPath(sortJob, new Path(tempPath));
FileOutputFormat.setOutputPath(sortJob, new Path(myArgs[myArgs.length - 1]));
int res2 = sortJob.waitForCompletion(true) ? 0 : 1;
if (res2 == 0) {
System.out.println("高级WordCount结果为:");
util.showResult("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);
}
System.exit(res2);
}
System.exit(res1);
}
}
2.4 结果
结果文件内容如下:
64
14 {
13 }
12 import
8 int
7 public
7 =
4 static
4 class
4 -
4 new
4 @Override
3 for
3 :
3 void
3 throws
3 extends
2 l1,
2 1;
2 0;
2 String[]
2 s2,
2 s1,
2 i
2 context.write(new
2 context)
2 conf,
2 InterruptedException
2 key,
2 IntWritable,
2 return
2 IOException,
2 b2,
2 sum
2 Context
2 protected
2 myArgs[myArgs.length
2 Text,
2 1]);
1 };
1 values,
1 values)
1 value.toString().split("\\s+");
1 value,
1 val.get();
1 val
1 util.showResult("hdfs://localhost:9000",
1 util.removeALL("hdfs://localhost:9000",
1 str
1 splitStr)
1 splitStr
1 res
1 reduce(Text
1 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
1 org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
1 org.apache.hadoop.mapreduce.Reducer;
1 org.apache.hadoop.mapreduce.Mapper;
1 org.apache.hadoop.mapreduce.Job;
1 org.apache.hadoop.io.WritableComparable;
1 org.apache.hadoop.io.Text;
1 org.apache.hadoop.io.LongWritable;
1 org.apache.hadoop.io.IntWritable;
1 org.apache.hadoop.fs.Path;
1 org.apache.hadoop.conf.Configuration;
1 myArgs.length
1 myArgs
1 map(LongWritable
1 main(String[]
1 l2);
1 l2)
1 key);
1 job.waitForCompletion(true)
1 job.setSortComparatorClass(IntWritableDecreaseingComparator.class);
1 job.setReducerClass(MyReducer.class);
1 job.setOutputValueClass(Text.class);
1 job.setOutputKeyClass(IntWritable.class);
1 job.setMapperClass(MyMapper.class);
1 job.setJarByClass(App.class);
1 job.setCombinerClass(MyReducer.class);
1 job
1 java.io.IOException;
1 if
1 i++)
1 compare(byte[]
1 compare(WritableComparable
1 byte[]
1 b1,
1 b);
1 b)
1 args)
1 a,
1 WritableComparable
1 Text>
1 Text(str),
1 Text
1 System.out.println("高级WordCount结果为:");
1 System.exit(res);
1 Reducer<Text,
1 Path(myArgs[myArgs.length
1 Path(myArgs[i]));
1 MyReducer
1 MyMapper
1 Mapper<LongWritable,
1 Job.getInstance(conf,
1 Job
1 Iterable<IntWritable>
1 IntWritableDecreaseingComparator
1 IntWritable>
1 IntWritable.Comparator
1 IntWritable(sum),
1 IntWritable(1));
1 FileOutputFormat.setOutputPath(job,
1 FileInputFormat.addInputPath(job,
1 Exception
1 Configuration();
1 Configuration
1 App
1 ?
1 ==
1 <
1 1]));
1 0)
1 0
1 -super.compare(b1,
1 -super.compare(a,
1 +=
1 (res
1 (int
1 (String
1 (IntWritable
1 "hdfs://localhost:9000/user/developer/AdvancedWordCount/output"
1 "file:///home/developer/CodeArtsProjects/AdvancedWordCount.txt",
1 "AdvancedWordCount");
1 conf