当前位置: 首页 > article >正文

大数据 HDFS和MapReduce综合实训

一、关于此次实践

1、实战简介

        Hadoop是一个由Apache基金会所开发的分布式系统基础架构,可以在不了解分布式底层细节的情况下,开发分布式程序,以满足在低性能的集群上实现对高容错,高并发的大数据集的高速运算和存储的需要。Hadoop支持超大文件(可达PB级),能够检测和快速应对硬件故障、支持流式数据访问、同时在简化的一致性模型的基础上保证了高容错性。因而被大规模部署在分布式系统中,应用十分广泛。

        本实训的主要目标是让大家学习Hadoop的基本概念如MapReduce、HDFS等,并掌握Hadoop的基本操作,主要包括MapReduce编程(词频统计)、HDFS文件流读取操作、MapReduce迭代等。通过本次实训,建立起对Hadoop云计算的初步了解,后续大家可以通过进阶学习来深入学习Hadoop内部实现机制进行高级的应用开发。

2、全部任务

二、实践详解

1、第1关:WordCount 词频统计

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
    /*
    * MapReduceBase类:实现Mapper和Reducer接口的基类    
    * Mapper接口: 
    * WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类要实现此接口。    
    */  
  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    /*
    *LongWritable,IntWritable,Text是Hadoop中实现的用于封装Java数据类型的类,这些类实现了WritableComparable接口,
    *都能够被串行化,便于在分布式环境中进行数据交换,可以视为long,int,String数据类型的替代。
    */
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();//Text实现了BinaryComparable类,可以作为key值

    /*
    * Mapper接口中的map方法: 
    * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
    * 映射一个单个的输入<K1,V1>对到一个中间输出<K2,V2>对 
    * 中间输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。  
    * OutputCollector接口:收集Mapper和Reducer输出的<K,V>对。 
    * OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output 
    * Reporter 用于报告整个应用的运行进度
     */  

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
    /*
         * 原始数据(以test1.txt为例):
     *  tale as old as time
        true as it can be
        beauty and the beast
        map阶段,数据如下形式作为map的输入值:key为偏移量
            <0  tale as old as time>
            <21 world java hello>
            <39 you me too> 
         */

         /**
       * 解析(Spliting)后以得到键值对<K2,V2>(仅以test1.txt为例)
         * 格式如下:前者是键值,后者数字是值
         * tale 1
         * as 1
         * old 1
         * as 1
         * time 1
         * true 1
         * as 1
         * it 1
         * can 1
         * be 1
         * beauty 1
         * and 1
         * the 1
         * beast 1
         * 这些键值对作为map的输出数据
         */

    //****请补全map函数内容****//
    /*********begin*********/
    StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    /*********end**********/

    }
  }

  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

     /*
     * reduce过程是对输入键值对洗牌(Shuffing)形成<K2,list(V2)>格式数据(仅以test1.txt为例):
     * (tablie [1])
     * (as [1,1,1])
     * (old [1])
     * (time [1])
     * (true [1])
     * (it [1])
     * (can [1])
     * (be [1])
     * (beauty [1])
     * (and [1])
     * (the [1])
     * (beast [1])
     * 作为reduce的输入
     * 
     */
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
    //****请补全reduce对<k2, list(v2)> 进行合计得到list(<k3,v3>)过程****//
    /*********begin*********/
    int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
    /*********end**********/

    //****请将list(<k3,v3>)统计输出****//

    /*********begin*********/
    result.set(sum);
    context.write(key, result);
    /*********end**********/
    }
}
  public static void main(String[] args) throws Exception {
      /**
       * JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作 
       * 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等 
       */  
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    /*
    * 需要配置输入和输出的HDFS的文件路径参数
    * 可以使用"Usage: wordcount <in> <out>"实现程序运行时动态指定输入输出
    */
    if (otherArgs.length != 2) {
         System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");//Job(Configuration conf,String jobName)设置job名称
    job.setJarByClass(WordCount.class);//为job设置Mapper类
      /*********begin*********/
      //****请为job设置Mapper类****//
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);//为job设置Combiner类
      //****请为job设置Reduce类****//
    job.setReducerClass(IntSumReducer.class);
      //****请设置输出key的参数类型****//
    job.setOutputKeyClass(Text.class);
      //****请设置输出value的类型****//
    job.setOutputValueClass(IntWritable.class);
      /*********end**********/
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//为map-reduce任务设置InputFormat实现类,设置输入路径
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类,设置输出路径
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

2、第 2 关:HDFS 文件读写

import java.io.IOException;
import java.sql.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class hdfs {
    public static void main(String[] args) throws IOException {
//throws IOException捕获异常声明

//****请根据提示补全文件创建过程****//
/*********begin*********/
        Configuration conf = new Configuration();  //实例化设置文件,configuration类实现hadoop各模块之间值的传递
        FileSystem fs = FileSystem.get(conf);  //是hadoop访问系统的抽象类,获取文件系统, FileSystem的get()方法得到实例fs,然后fs调动create()创建文件,open()打开文件       
        System.out.println(fs.getUri());
//实现文件读写主要包含以下步骤:
//读取hadoop文件系统配置
//实例化设置文件,configuration类实现hadoop各模块之间值的传递
//FileSystem是hadoop访问系统的抽象类,获取文件系统, FileSystem的get()方法得到实例fs,然后fs调动create()创建文件,调用open()打开文件,调用close()关闭文件


//*****请按照题目填写要创建的路径,其他路径及文件名无法被识别******//

        Path file = new Path("/user/hadoop/myfile");

/*********end**********/

        if (fs.exists(file)) {

             System.out.println("File exists.");

        } else
            {
//****请补全使用文件流将字符写入文件过程,使用outStream.writeUTF()函数****//
                /*********begin*********/

        FSDataOutputStream outStream = fs.create(file); //获取文件流 
        outStream.writeUTF("china cstor cstor cstor china"); //使用文件流写入文件内容


                /*********end**********/

        }


//****请补全读取文件内容****//
/*********begin*********/
// 提示:FSDataInputStream实现接口,使Hadoop中的文件输入流具有流式搜索和流式定位读取的功能
        FSDataInputStream inStream = fs.open(file);  
        String data = inStream.readUTF(); 
/*********end**********/

//输出文件状态
//FileStatus对象封装了文件的和目录的元数据,包括文件长度、块大小、权限等信息
        FileSystem hdfs = file.getFileSystem(conf);
        FileStatus[] fileStatus = hdfs.listStatus(file);
        for(FileStatus status:fileStatus)
         {
           System.out.println("FileOwer:"+status.getOwner());//所有者
           System.out.println("FileReplication:"+status.getReplication());//备份数
           System.out.println("FileModificationTime:"+new Date(status.getModificationTime()));//目录修改时间
           System.out.println("FileBlockSize:"+status.getBlockSize());//块大小
        }

        System.out.println(data);
        System.out.println("Filename:"+file.getName());
        inStream.close();
        fs.close();
    }
  }

3、第 3 关:倒排索引

import java.io.IOException;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.util.Iterator;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

public class InvertedIndex {
    public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> 
    {
        public void map(LongWritable key, Text value, Context context)  
        throws IOException, InterruptedException 

        {   
            FileSplit fileSplit = (FileSplit)context.getInputSplit();
            String fileName = fileSplit.getPath().getName();

            String word;
            IntWritable frequence=new IntWritable();
            int one=1;
            Hashtable<String,Integer>   hashmap=new Hashtable();//key关键字设置为String
            StringTokenizer itr = new StringTokenizer(value.toString());

//****请用hashmap定义的方法统计每一行中相同单词的个数,key为行值是每一行对应的偏移****//
/*********begin*********/
         for(;itr.hasMoreTokens(); )   
            {     

                word=itr.nextToken();  
                if(hashmap.containsKey(word)){  
                    hashmap.put(word,hashmap.get(word)+1);  
             }else{  
                    hashmap.put(word, one);                         

                }  

            }  

/*********end**********/                                

for(Iterator<String> it=hashmap.keySet().iterator();it.hasNext();){
                word=it.next();
                frequence=new IntWritable(hashmap.get(word));
                Text fileName_frequence = new Text(fileName+"@"+frequence.toString());//以<K2,“单词 文件名@出现频次”> 的格式输出
                context.write(new Text(word),fileName_frequence);
            }

        }
    }

    public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text>{
         protected void reduce(Text key,Iterable<Text> values,Context context)
         throws IOException ,InterruptedException{ 
//****请合并mapper函数的输出,并提取“文件@1”中‘@’后面的词频,以<K2,list(“单词 文件名@出现频次”)>的格式输出****//
/*********begin*********/

            String fileName="";  
            int sum=0;  
            String num;  
            String s;  
            for (Text val : values) {  

                    s= val.toString();  
                    fileName=s.substring(0, val.find("@"));  
                    num=s.substring(val.find("@")+1, val.getLength());      //提取“doc1@1”中‘@’后面的词频  
                    sum+=Integer.parseInt(num);  
            }  
            IntWritable frequence=new IntWritable(sum);  
            context.write(key,new Text(fileName+"@"+frequence.toString()));  

/*********end**********/                

        }
    }

    public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> 
    {   @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException 
        {   Iterator<Text> it = values.iterator();
            StringBuilder all = new StringBuilder();
            if(it.hasNext())  all.append(it.next().toString());
            for(;it.hasNext();) {
                all.append(";");
                all.append(it.next().toString());                   
            }
//****请输出最终键值对list(K3,“单词", “文件1@频次; 文件2@频次;...")****//
/*********begin*********/

 context.write(key, new Text(all.toString()));  

/*********end**********/        
        }
    }

    public static void main(String[] args) 
    {
        if(args.length!=2){
            System.err.println("Usage: InvertedIndex <in> <out>");
            System.exit(2);
        }

      try {
                Configuration conf = new Configuration();
                String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

                Job job = new Job(conf, "invertedindex");
                job.setJarByClass(InvertedIndex.class);
                job.setMapperClass(InvertedIndexMapper.class);
            //****请为job设置Combiner类****//
/*********begin*********/
                job.setCombinerClass(InvertedIndexCombiner.class); 

/*********end**********/                                
                job.setReducerClass(InvertedIndexReducer.class);

                job.setOutputKeyClass(Text.class);
            //****请设置输出value的类型****//
/*********begin*********/
               job.setOutputValueClass(Text.class);  

/*********end**********/                                    
                FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
                FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

                System.exit(job.waitForCompletion(true) ? 0 : 1);

        } catch (Exception e) { 
            e.printStackTrace();
        }
    }
}

 4、第 4 关:网页排序—— PageRank 算法

import java.io.IOException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.StringTokenizer;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PageRank {

  public static class MyMapper   extends Mapper<Object, Text, Text, Text>
  {
        private Text id = new Text();
        public void map(Object key, Text value, Context context ) throws IOException, InterruptedException
        {
            String line = value.toString();
//判断是否为输入文件
            if(line.substring(0,1).matches("[0-9]{1}"))
            {
                  boolean flag = false;
                  if(line.contains("_"))
                  {
                        line = line.replace("_","");
                        flag = true;
                  }
//对输入文件进行处理
                  String[] values = line.split("\t");
                  Text t = new Text(values[0]);
                  String[] vals = values[1].split(" ");
                  String url="_";//保存url,用作下次计算
                  double pr = 0;
                  int i = 0;
                  int num = 0;

                  if(flag)
                  {
                      i=2;
                      pr=Double.valueOf(vals[1]);
                      num=vals.length-2;
                  }
                  else
                  {
                      i=1;
                      pr=Double.valueOf(vals[0]);
                      num=vals.length-1;
                  }

                  for(;i<vals.length;i++)
                  {
                      url=url+vals[i]+" ";
                      id.set(vals[i]);
                      Text prt = new Text(String.valueOf(pr/num));
                      context.write(id,prt);
                  }
                  context.write(t,new Text(url));
              }
          }
  }

  public static class MyReducer  extends Reducer<Text,Text,Text,Text>
  {
              private Text result = new Text();
              private Double pr = new Double(0);

         public void reduce(Text key, Iterable<Text> values,  Context context  ) throws IOException, InterruptedException
         {
              double sum=0;
              String url="";

//****请通过url判断否则是外链pr,作计算前预处理****//
/*********begin*********/
  for(Text val:values)  
              {  
                      //发现_标记则表明是url,否则是外链pr,要参与计算  
                  if(!val.toString().contains("_"))  
                  {  
                      sum=sum+Double.valueOf(val.toString());  
                  }  
                  else  
                 {  
                      url=val.toString();  
                  }  
              }  
              pr=0.15+0.85*sum;  
              String str=String.format("%.3f",pr);  
              result.set(new Text(str+" "+url));  
              context.write(key,result);  


/*********end**********/            


//****请补全用完整PageRank计算公式计算输出过程,q取0.85****//
/*********begin*********/


/*********end**********/    

          }
 }

    public static void main(String[] args) throws Exception
    {
             String paths="file:///tmp/input/Wiki0";//输入文件路径,不要改动
            String path1=paths;
            String path2="";

            for(int i=1;i<=5;i++)//迭代5次
              {
                System.out.println("This is the "+i+"th job!");
                System.out.println("path1:"+path1);
                System.out.println("path2:"+path2);
                Configuration conf = new Configuration();
                Job job = new Job(conf, "PageRank");
                path2=paths+i;    
                job.setJarByClass(PageRank.class);
                job.setMapperClass(MyMapper.class);
        //****请为job设置Combiner类****//
/*********begin*********/
job.setCombinerClass(MyReducer.class); 

/*********end**********/                    
                job.setReducerClass(MyReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                FileInputFormat.addInputPath(job, new Path(path1));
                FileOutputFormat.setOutputPath(job, new Path(path2));
                path1=path2;      
             job.waitForCompletion(true);
            System.out.println(i+"th end!");
        }
      } 
 }


http://www.kler.cn/a/414770.html

相关文章:

  • Spring JDBC 和 事务控制——(2)
  • Linux环境下配置neo4j图数据库
  • 链动星海 质引未来|中信银行加码科技金融 “接力式”服务助力“新质生产力”释放
  • Scrapy管道设置和数据保存
  • 什么是串联谐振
  • PHP 生成分享海报
  • BAT WPS OFFICE免登录工具
  • hadoop_zookeeper详解
  • 云原生时代的轻量级反向代理Traefik
  • 《C++搭建神经网络基石:开启智能编程新征程》
  • IDEA 2024 Maven 设置为全局本地仓库,避免新建项目重新配置maven
  • 2024-11-25 二叉树的定义
  • Java基础之控制语句:开启编程逻辑之门
  • 国外媒体发布新闻稿/海外媒体网站发稿创历史新潮流
  • C# 基于WPF实现数据记录导出excel
  • COMSOL工作站:配置指南与性能优化
  • 单片机知识总结(完整)
  • 网络安全概论——网络加密与密钥管理
  • MTK 展锐 高通 sensorhub架构
  • 蓝桥杯备赛笔记(一)
  • python中的判断语句
  • Javaweb 前端 js事件监听
  • 升级智享 AI 直播三代:领航原生直播驶向自动化运营新航道
  • think php处理 异步 url 请求 记录
  • 【前端Vue】day04
  • pyhton+yaml+pytest+allure框架封装-全局变量渲染