当前位置: 首页 > article >正文

Hadoop中MapReduce过程中Shuffle过程实现自定义排序

文章目录

  • Hadoop中MapReduce过程中Shuffle过程实现自定义排序
    • 一、引言
    • 二、实现WritableComparable接口
      • 1、自定义Key类
    • 三、使用Job.setSortComparatorClass方法
      • 2、设置自定义排序器
      • 3、自定义排序器类
    • 四、使用示例
    • 五、总结

Hadoop中MapReduce过程中Shuffle过程实现自定义排序

在这里插入图片描述

一、引言

MapReduce框架中的Shuffle过程是连接Map阶段和Reduce阶段的桥梁,负责将Map任务的输出结果按照key进行分组和排序,并将相同key的数据传递给对应的Reduce任务进行处理。Shuffle过程的性能直接影响到整个MapReduce作业的执行效率。在默认情况下,Hadoop使用TotalOrderPartitioner进行排序,但有时我们需要根据特定的业务逻辑进行自定义排序。本文将介绍两种方法来实现自定义排序:实现WritableComparable接口和使用Job.setSortComparatorClass方法。下面是详细的步骤和代码示例。

二、实现WritableComparable接口

1、自定义Key类

首先,我们需要定义一个类并实现WritableComparable接口,该接口要求实现compareTo方法,用于定义排序逻辑。

package mr;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Employee implements WritableComparable<Employee> {
    private int empno;
    private String ename;
    private String job;
    private int mgr;
    private String hiredate;
    private int sal;
    private int comm;
    private int deptno;

    @Override
    public String toString(){
        return "Employee[empno="+empno+",ename="+ename+",sal="+sal+",deptno="+deptno+"]";
    }

    @Override
    public int compareTo(Employee o) {
        // 多个列的排序:select * from emp order by deptno,sal;
        // 首先按照deptno排序
        if(this.deptno > o.getDeptno()){
            return 1;
        }else if(this.deptno < o.getDeptno()){
            return -1;
        }
        // 如果deptno相等,按照sal排序
        if(this.sal >= o.getSal()){
            return 1;
        }else{
            return -1;
        }
    }

    @Override
    public void write(DataOutput output) throws IOException {
        // 序列化
        output.writeInt(this.empno);
        output.writeUTF(this.ename);
        output.writeUTF(this.job);
        output.writeInt(this.mgr);
        output.writeUTF(this.hiredate);
        output.writeInt(this.sal);
        output.writeInt(this.comm);
        output.writeInt(this.deptno);
    }

    @Override
    public void readFields(DataInput input) throws IOException {
        // 反序列化
        this.empno = input.readInt();
        this.ename = input.readUTF();
        this.job = input.readUTF();
        this.mgr = input.readInt();
        this.hiredate = input.readUTF();
        this.sal = input.readInt();
        this.comm = input.readInt();
        this.deptno = input.readInt();
    }
}

三、使用Job.setSortComparatorClass方法

2、设置自定义排序器

除了实现WritableComparable接口外,我们还可以使用Job.setSortComparatorClass方法来设置自定义排序器。这种方法允许我们在不修改Key类的情况下实现自定义排序。

package mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CustomSort {
    public static class Map extends Mapper<Object, Text, Employee, IntWritable> {
        private static Employee emp = new Employee();
        private static IntWritable one = new IntWritable(1);

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split("\t");
            emp.setEmpno(Integer.parseInt(line[0]));
            emp.setEname(line[1]);
            emp.setJob(line[2]);
            emp.setMgr(Integer.parseInt(line[3]));
            emp.setHiredate(line[4]);
            emp.setSal(Integer.parseInt(line[5]));
            emp.setComm(Integer.parseInt(line[6]));
            emp.setDeptno(Integer.parseInt(line[7]));
            context.write(emp, one);
        }
    }

    public static class Reduce extends Reducer<Employee, IntWritable, Employee, IntWritable> {
        @Override
        protected void reduce(Employee key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            for (IntWritable val : values) {
                context.write(key, val);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "CustomSort");
        job.setJarByClass(CustomSort.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Employee.class);
        job.setOutputValueClass(IntWritable.class);
        // 设置自定义排序器
        job.setSortComparatorClass(EmployeeComparator.class);
        
        Path in = new Path("hdfs://localhost:9000/mr/in/customsort");
        Path out = new Path("hdfs://localhost:9000/mr/out/customsort");
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

3、自定义排序器类

package mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class EmployeeComparator extends WritableComparator {
    protected EmployeeComparator() {
        super(Employee.class, true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        Employee e1 = (Employee) w1;
        Employee e2 = (Employee) w2;
        // 首先按照deptno排序
        int deptCompare = Integer.compare(e1.getDeptno(), e2.getDeptno());
        if (deptCompare != 0) {
            return deptCompare;
        }
        // 如果deptno相等,按照sal排序
        return Integer.compare(e1.getSal(), e2.getSal());
    }
}

四、使用示例

下面是一个简单的MapReduce示例,展示了Shuffle过程在实际应用中的使用。这个示例中,我们使用了自定义的Employee类作为Key,并设置了自定义的排序器EmployeeComparator

五、总结

通过实现WritableComparable接口和使用Job.setSortComparatorClass方法,我们可以在Hadoop MapReduce过程中实现自定义排序。这两种方法提供了灵活的排序机制,允许我们根据不同的业务需求对数据进行排序处理,从而提高数据处理的效率和准确性。


版权声明:本博客内容为原创,转载请保留原文链接及作者信息。

参考文章

  • Hadoop之mapreduce数据排序案例(详细代码)
  • Java Job.setSortComparatorClass方法代码示例

http://www.kler.cn/a/451548.html

相关文章:

  • 三维动画的常用“视觉特效”有哪些?
  • vue-axios+springboot实现文件流下载
  • flask-admin的modelview 实现list列表视图中扩展修改状态按钮
  • 第十五章 C++ 数组
  • nlp新词发现——浅析 TF·IDF
  • webserver log日志系统的实现
  • 演讲 | 学好语文的经验介绍
  • [react]不能将类型“string | undefined”分配给类型“To”。 不能将类型“undefined”分配给类型“To”
  • cudnn版本gpu架构
  • 用 ElementUI 的日历组件 Calendar 自定义渲染
  • 面试经验分享 | 北京渗透测试岗位
  • Spring Boot框架结合MongoDB实现日志数据的保存和归档
  • 50.第二阶段x86游戏实战2-lua获取本地寻路,跨地图寻路和获取当前地图id
  • H3C交换机远程登录基本配置
  • es 中 terms set 使用
  • 爬虫代码的适应性:如何调整以匹配速卖通新商品页面
  • 牛客--迷宫问题
  • k8s备份 ETCD , 使用velero工具进行备份
  • MySQL45讲 第三十六讲 为什么临时表可以重名?——阅读总结
  • vue3入门教程:ref函数
  • 在C#中制作一个字符串扩展来确定字符串是否与正则表达式匹配
  • RTMW:实时多人2D和3D 全人体姿态估计
  • 纯相位全息图优化算法综述
  • 抖音电商的崛起:API接口在其中的作用
  • OpenCV相机标定与3D重建(28)估计两个三维点集之间的最优平移变换函数estimateTranslation3D()的使用
  • 【C++】18___list容器