当前位置: 首页 > article >正文

WordCount 源码解析 Mapper,Reducer,Driver

创建包 com.nefu.mapreduce.wordcount ,开始编写 Mapper Reducer
Driver
用户编写的程序分成三个部分: Mapper Reducer Driver
1 Mapper 阶段
用户自定义的 Mapper 要继承自己的父类
Mapper 的输入数据是 KV 对的形式 (KV 的类型可自定义 )
Mapper 中的业务逻辑写在 map () 方法中
Mapper 的输出数据是 KV 对的形式 (KV 的类型可自定义 )
map () 方法 (MapTask 进程 ) 对每一个 <K.V> 调用一次
package com.nefu.mapreducer.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordcountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    private Text outK=new Text();
    private IntWritable outV=new IntWritable(1);
    @Override
    protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
        String line=value.toString();
        String[] words=line.split(" ");
        for(String word:words){
            //封装
            outK.set(word);
            //写出
            context.write(outK,outV);
        }
    }
}
2 Reducer 阶段
用户自定义的 Reducer 要继承自己的父类
Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV
Reducer 的业务逻辑写在 reduce() 方法中
ReduceTask 进程对每一组相同 k <k,v> 组调用一 次 reduce () 方法,迭代
器类型
package com.nefu.mapreducer.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordcountReducer extends Reducer<Text,IntWritable,Text, IntWritable> {
    private IntWritable outV=new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
        int sum=0;
        for(IntWritable value:values){
            sum=sum+value.get();
        }
        outV.set(sum);
        context.write(key,outV);
    }
}
3 Driver 阶段
相当于 YARN 集群的客户端,用于提交我们整个程序到 YARN 集群,提交的是
封装了 MapReduce 程序相关运行参数的 job 对象
package com.nefu.mapreducer.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordcountDriver {
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        //获取job
        Configuration conf=new Configuration();
        Job job=Job.getInstance(conf);
        //设置jar包
        job.setJarByClass(WordcountDriver.class);

        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job,new Path("D:\\cluster\\mapreduce.txt"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\cluster\\partion"));
        boolean result=job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}


 

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>





http://www.kler.cn/news/161931.html

相关文章:

  • 【环境搭建】ubuntu22安装ros2
  • 麒麟KYLINOS操作系统修改GRUB字体大小
  • lodash常用方法
  • 2023.12.1 --数据仓库之 拉链表
  • ⭐Unity 搭建UDP客户端(01) 配合网络调试助手测试
  • BUUCTF-[GYCTF2020]FlaskApp flask爆破pin
  • Docker入门:容器化原理
  • Clean 架构下的现代 Android 架构指南
  • 实验3.5 路由器的单臂路由配置
  • 装配式技术助力EHS平台系统:打造全方位的安全在线监测平台!
  • 【PTA-C语言】编程练习4 - 数组Ⅱ
  • 【面试经典150 | 二叉树】翻转二叉树
  • ubuntu内移除snap
  • VUE2+THREE.JS 按照行动轨迹移动人物模型并相机视角跟随人物
  • 智能优化算法应用:基于材料生成算法无线传感器网络(WSN)覆盖优化 - 附代码
  • Thymeleaf生成pdf表格合并单元格描边不显示
  • SpringDataJPA基础
  • Cypress:前端自动化测试的终极利器
  • Leetcode刷题笔记题解(C++):165. 比较版本号
  • 安路Anlogic FPGA下载器的驱动安装教程
  • 【mysql】下一行减去上一行数据、自增序列场景应用
  • 2023年4K投影仪怎么选?极米H6 4K高亮版怎么样?
  • Leetcode—1466.重新规划路线【中等】
  • 【PTA题目】7-7 自守数 分数 15
  • 芯知识 | 如何选择合适的单片机语音芯片?
  • 使用单例模式+观察者模式实现参数配置实时更新
  • 算术运算(这么简单?进来坐坐?)
  • 复杂gRPC之go调用go
  • C++标准模板(STL)- 类型支持 (杂项变换,确定一组类型的公共类型,std::common_type)
  • C#-using处理非托管资源