当前位置: 首页 > article >正文

大数据 MapReduce基础实战

一、关于此次实践

1、实战简介

MapReduce是Hadoop的核心功能之一,掌握它对学习Hadoop至关重要。Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。

本章我们来通过几个示例来学习MapReduce的用法。

2、全部任务

二、实践详解

1、第 1 关:成绩统计

命令行
touch file01
echo Hello World Bye World
cat file01
echo Hello World Bye World >file01
cat file01
touch file02
echo Hello Hadoop Goodbye Hadoop >file02
cat file02
start-dfs.sh
hadoop fs -mkdir /usr
hadoop fs -mkdir /usr/input
hadoop fs -ls /usr/output
hadoop fs -ls /
hadoop fs -ls /usr
hadoop fs -put file01 /usr/input
hadoop fs -put file02 /usr/input
hadoop fs -ls /usr/input
import java.io.IOException;
import java.util.StringTokenizer;
 
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
    /********** Begin **********/
	//Mapper函数
    public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        private int maxValue = 0;
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString(),"\n");
            while (itr.hasMoreTokens()) {
                String[] str = itr.nextToken().split(" ");
                String name = str[0];
                one.set(Integer.parseInt(str[1]));
                word.set(name);
                context.write(word,one);
            }
            //context.write(word,one);
        }
    }
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int maxAge = 0;
            int age = 0;
            for (IntWritable intWritable : values) {
                maxAge = Math.max(maxAge, intWritable.get());
            }
            result.set(maxAge);
            context.write(key, result);
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        String inputfile = "/user/test/input";
        String outputFile = "/user/test/output/";
        FileInputFormat.addInputPath(job, new Path(inputfile));
        FileOutputFormat.setOutputPath(job, new Path(outputFile));
        job.waitForCompletion(true);
    /********** End **********/
    }
}

2、第 2 关:文件内容合并去重

import java.io.IOException;

import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Merge {

	/**
	 * @param args
	 * 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C
	 */
	//在这重载map函数,直接将输入中的value复制到输出数据的key上 注意在map方法中要抛出异常:throws IOException,InterruptedException
	public static class Map  extends Mapper<Object, Text, Text, Text>{
	
    /********** Begin **********/

        public void map(Object key, Text value, Context content) 
            throws IOException, InterruptedException {  
            Text text1 = new Text();
            Text text2 = new Text();
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                text1.set(itr.nextToken());
                text2.set(itr.nextToken());
                content.write(text1, text2);
            }
        }  
	/********** End **********/
	} 
		
	//在这重载reduce函数,直接将输入中的key复制到输出数据的key上  注意在reduce方法上要抛出异常:throws IOException,InterruptedException
	public static class  Reduce extends Reducer<Text, Text, Text, Text> {
    /********** Begin **********/
        
        public void reduce(Text key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {
            Set<String> set = new TreeSet<String>();
            for(Text tex : values){
                set.add(tex.toString());
            }
            for(String tex : set){
                context.write(key, new Text(tex));
            }
        }  
    
	/********** End **********/

	}
	
	public static void main(String[] args) throws Exception{

		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		conf.set("fs.default.name","hdfs://localhost:9000");
		
		Job job = Job.getInstance(conf,"Merge and duplicate removal");
		job.setJarByClass(Merge.class);
		job.setMapperClass(Map.class);
		job.setCombinerClass(Reduce.class);
		job.setReducerClass(Reduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		String inputPath = "/user/tmp/input/";  //在这里设置输入路径
		String outputPath = "/user/tmp/output/";  //在这里设置输出路径

		FileInputFormat.addInputPath(job, new Path(inputPath));
		FileOutputFormat.setOutputPath(job, new Path(outputPath));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

3、第 3 关:信息挖掘 - 挖掘父子关系

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class simple_data_mining {
	public static int time = 0;

	/**
	 * @param args
	 * 输入一个child-parent的表格
	 * 输出一个体现grandchild-grandparent关系的表格
	 */
	//Map将输入文件按照空格分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志
	public static class Map extends Mapper<Object, Text, Text, Text>{
		public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
			/********** Begin **********/
		String line = value.toString();
             String[] childAndParent = line.split(" ");
             List<String> list = new ArrayList<>(2);
              for (String childOrParent : childAndParent) {
                 if (!"".equals(childOrParent)) {
                     list.add(childOrParent);
                  } 
              } 
              if (!"child".equals(list.get(0))) {
                  String childName = list.get(0);
                  String parentName = list.get(1);
                  String relationType = "1";
                  context.write(new Text(parentName), new Text(relationType + "+"
                        + childName + "+" + parentName));
                  relationType = "2";
                  context.write(new Text(childName), new Text(relationType + "+"
                        + childName + "+" + parentName));
              }
			/********** End **********/
		}
	}

	public static class Reduce extends Reducer<Text, Text, Text, Text>{
		public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
				/********** Begin **********/

			    //输出表头
          if (time == 0) {
                context.write(new Text("grand_child"), new Text("grand_parent"));
                time++;
            }

				//获取value-list中value的child
List<String> grandChild = new ArrayList<>();

				//获取value-list中value的parent
 List<String> grandParent = new ArrayList<>();

				//左表,取出child放入grand_child
 for (Text text : values) {
                String s = text.toString();
                String[] relation = s.split("\\+");
                String relationType = relation[0];
                String childName = relation[1];
                String parentName = relation[2];
                if ("1".equals(relationType)) {
                    grandChild.add(childName);
                } else {
                    grandParent.add(parentName);
                }
            }

				//右表,取出parent放入grand_parent
 int grandParentNum = grandParent.size();
               int grandChildNum = grandChild.size();
               if (grandParentNum != 0 && grandChildNum != 0) {
                for (int m = 0; m < grandChildNum; m++) {
                    for (int n = 0; n < grandParentNum; n++) {
                        //输出结果
                    context.write(new Text(grandChild.get(m)), new Text(
                                grandParent.get(n)));
                    }
                }
            }
				/********** End **********/
		}
	}
	public static void main(String[] args) throws Exception{
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf,"Single table join");
		job.setJarByClass(simple_data_mining.class);
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		String inputPath = "/user/reduce/input";   //设置输入路径
		String outputPath = "/user/reduce/output";   //设置输出路径
		FileInputFormat.addInputPath(job, new Path(inputPath));
		FileOutputFormat.setOutputPath(job, new Path(outputPath));
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}
}


http://www.kler.cn/a/417338.html

相关文章:

  • 软考高级-软件系统架构师-02-软件工程(重点)
  • 调用腾讯云批量文本翻译API翻译srt字幕
  • OpenBMC:通过qemu-system-arm运行编译好的image
  • python:csv文件批量导入mysql
  • DeepSeek私有化本地部署图文(Win+Mac)
  • 绿联NAS安装cpolar内网穿透工具实现无公网IP远程访问教程
  • 基于Java Springboot Vue3图书管理系统
  • 港科夜闻 |香港科大推出 InvestLM生成式人工智能平台,支持金融中小企应用AI技术潜力...
  • 【docker】docker常用命令汇总
  • SpringCloud 详解
  • 数据分析的尽头是web APP?
  • 使用C#开发VTK笔记(二)Part1-VTK系统结构解析
  • 使用Github Action将Docker镜像转存到阿里云私有仓库,供国内服务器使用,免费易用
  • TouchGFX源码分析1---(Event类 和Click Event类)
  • C++多态的实现原理
  • 最短距离和路径问题 ford
  • 数据结构-图-领接表存储
  • HDLCPPP原理与配置
  • 关于最近win11不能使用ie,而不能使用考试客户端的解决方法
  • 人工智能 实验2 jupyter notebook平台 打印出分类器的正确率
  • 11 设计模式之代理模式(送资料案例)
  • 373. 查找和最小的 K 对数字
  • QTableView 实现表格及相关用法(C++)(QStandardItemModel+QItemSelectionModel)
  • [Linux] 进程间通信——匿名管道命名管道
  • 提升异步编程性能:使用 uvloop 加速你的 Python 应用
  • 云硬盘挂载到新服务器,怎么恢复数据?