当前位置: 首页 > article >正文

MapReduce简单应用(一)——WordCount

目录

  • 1. 执行过程
    • 1.1 分割
    • 1.2 Map
    • 1.3 Combine
    • 1.4 Reduce
  • 2. 代码和结果
    • 2.1 pom.xml中依赖配置
    • 2.2 工具类util
    • 2.3 WordCount
    • 2.4 结果
  • 参考

1. 执行过程

  假设WordCount的两个输入文本text1.txt和text2.txt如下。

Hello World
Bye World
Hello Hadoop
Bye Hadoop

1.1 分割

  将每个文件拆分成split分片,由于测试文件比较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,如下图所示。这一步由MapReduce自动完成,其中key值为偏移量,由MapReduce自动计算出来,包括回车所占的字符数。
在这里插入图片描述

1.2 Map

  将分割好的<key,value>对交给用户定义的Map方法处理,生成新的<key,value>对。处理流程为先对每一行文字按空格拆分为多个单词,每个单词出现次数设初值为1,key为某个单词,value为1,如下图所示。
在这里插入图片描述

1.3 Combine

  得到Map方法输出的<key,value>对后,Mapper将它们按照key值进行升序排列,并执行Combine合并过程,将key值相同的value值累加,得到Mapper的最终输出结果,并写入磁盘,如下图所示。
在这里插入图片描述

1.4 Reduce

  Reducer先对从Mapper接受的数据进行排序,并将key值相同的value值合并到一个list列表中,再交由用户自定义的Reduce方法进行汇总处理,得到新的<key,value>对,并作为WordCount的输出结果,存入HDFS,如下图所示。
在这里插入图片描述

2. 代码和结果

2.1 pom.xml中依赖配置

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.11</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>3.3.6</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-core</artifactId>
			<version>3.3.6</version>
			<type>pom</type>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
			<version>3.3.6</version>
		</dependency>
	</dependencies>

2.2 工具类util

  util.removeALL的功能是删除hdfs上的指定输出路径(如果存在的话),而util.showResult的功能是打印wordcount的结果。

import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;


public class util {
    public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {
        URI add = new URI(uri);
        return FileSystem.get(add, conf);
    }

    public static void removeALL(String uri, Configuration conf, String path) throws Exception {
        FileSystem fs = getFileSystem(uri, conf);
        if (fs.exists(new Path(path))) {
            boolean isDeleted = fs.delete(new Path(path), true);
            System.out.println("Delete Output Folder? " + isDeleted);
        }
    }

    public static void  showResult(String uri, Configuration conf, String path) throws Exception {
        FileSystem fs = getFileSystem(uri, conf);
        String regex = "part-r-";
        Pattern pattern = Pattern.compile(regex);

        if (fs.exists(new Path(path))) {
            FileStatus[] files = fs.listStatus(new Path(path));
            for (FileStatus file : files) {
                Matcher matcher = pattern.matcher(file.getPath().toString());
                if (matcher.find()) {
                    FSDataInputStream openStream = fs.open(file.getPath());
                    IOUtils.copyBytes(openStream, System.out, 1024);
                    openStream.close();
                }
            }
        }
    }
}

2.3 WordCount

  正常来说,MapReduce编程都是要把代码打包成jar文件,然后用hadoop jar jar文件名 主类名称 输入路径 输出路径。下面代码中直接给出了输入和输出路径,可以直接运行。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class App {
	public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			System.out.println(key + " " + value);
			
			Text keyOut;
			IntWritable valueOut = new IntWritable(1);
			StringTokenizer token = new StringTokenizer(value.toString());
			while (token.hasMoreTokens()) {
				keyOut = new Text(token.nextToken());
				context.write(keyOut, valueOut);
			}
		}
	}

	public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable value : values) {
				sum += value.get();
			}
			context.write(key, new IntWritable(sum));
		} 
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] myArgs = {
			"file:///home/developer/CodeArtsProjects/WordCount/text1.txt", 
			"file:///home/developer/CodeArtsProjects/WordCount/text2.txt", 
			"hdfs://localhost:9000/user/developer/wordcount/output"
		};
		util.removeALL("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);
		Job job = Job.getInstance(conf, "wordcount");
		job.setJarByClass(App.class);
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		job.setCombinerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		for (int i = 0; i < myArgs.length - 1; i++) {
			FileInputFormat.addInputPath(job, new Path(myArgs[i]));
		}
		FileOutputFormat.setOutputPath(job, new Path(myArgs[myArgs.length - 1]));
		int res = job.waitForCompletion(true) ? 0 : 1;
		if (res == 0) {
			System.out.println("WordCount结果:");
			util.showResult("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);
		}
		System.exit(res);
	}
}

2.4 结果

在这里插入图片描述

参考

吴章勇 杨强著 大数据Hadoop3.X分布式处理实战


http://www.kler.cn/a/526817.html

相关文章:

  • https数字签名手动验签
  • Go的内存逃逸
  • Redis学习之哨兵二
  • 【Julia】Julia预编译与外部库:从崩溃到完美集成
  • 多协议网关BL110钡铼6路RS485转MQTT协议云网关
  • 三天急速通关JavaWeb基础知识:Day 1 后端基础知识
  • CSS 基础:层叠、优先级与继承
  • DDD - 微服务架构模型_领域驱动设计(DDD)分层架构 vs 整洁架构(洋葱架构) vs 六边形架构(端口-适配器架构)
  • 搭建自己的专属AI——使用Ollama+AnythingLLM+Python实现DeepSeek本地部署
  • 动态规划DP 最长上升子序列模型 拦截导弹(题目分析+C++完整代码)
  • 【C++语言】卡码网语言基础课系列----5. A+B问题VIII
  • MySQL(导入sql文件)
  • 蓝桥杯思维训练营(一)
  • sleep和wait
  • 基于遗传优化GRNN和Hog特征提取的交通标志识别算法matlab仿真
  • Android Studio 正式版 10 周年回顾,承载 Androider 的峥嵘十年
  • 1.27刷题记录
  • 【leetcode练习·二叉树】计算完全二叉树的节点数
  • Git进阶之旅:Git Hub注册创建仓库
  • 解决运行npm时报错
  • 面向对象编程(OOP)基础:类与对象
  • 线性回归简介:从理论到应用
  • 01. 计算机系统
  • C++ 中的引用(Reference)
  • 第十一章 F - H 开头的术语
  • 数据结构与算法之哈希表: LeetCode 1797. 设计一个验证系统 (Ts版)