当前位置: 首页 > article >正文

MapReduce分区

目录

  • 1. MapReduce分区
    • 1.1 哈希分区
    • 1.2 自定义分区
  • 2. 成绩分组
    • 2.1 Map
    • 2.2 Partition
    • 2.3 Reduce
  • 3. 代码和结果
    • 3.1 pom.xml中依赖配置
    • 3.2 工具类util
    • 3.3 GroupScores
    • 3.4 结果
  • 参考

  本文引用的Apache Hadoop源代码基于Apache许可证 2.0,详情请参阅 Apache许可证2.0。

1. MapReduce分区

  在默认情况下,MapReduce认为Reduce函数处理的是数据汇总操作,因此其针对的必定是一个Map函数清洗处理后的相对规模较小的数据集,且需要对整个集群中所有Map的中间输出结果进行统一处理,因此只会启动一个Reduce计算节点来处理。
  这与某些特殊的应用需求并不相匹配。在某些特定的时刻,开发人员希望启动更多的Reduce并发节点来优化最终结果统计的性能,减小数据处理的延迟,这通过简单的设置代码即可完成;而在更定制化的环境中,开发人员希望符合特定规则的Map中间输出结果交由特定的Reduce节点处理,这就需要使用MapReduce分区,开发人员还可以提供自定义的分区规则。
  如果有很多个Reduce任务,每个Map任务就会针对输出进行分区,即为每个Reduce任务建立一个分区。每个分区中有很多键,但每个键对应的键值对记录都在同一分区中。如果不给定自定义的分区规则,则Hadoop使用默认的哈希函数来分区,效率较高。

1.1 哈希分区

  下面是Apache Hadoop中默认哈希分区的源代码。在这个分区规则中,选择Reduce节点的计算方法是(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.partition;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Partitioner;

/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

1.2 自定义分区

  如果希望实现自定义分区,需要继承Hadoop提供的分区类org.apache.hadoop.mapreduce.Partitioner,下面是该类的声明。继承该分区类的自定义分区类需要实现public abstract int getPartition(KEY key, VALUE value, int numPartitions),该函数的作用是设置Map中间处理结果的分区规则,其中numPartitions是总分区的个数。此外,在自定义分区类时,通过函数返回了多少个分区,那么在MapReduce任务调度代码中需要设置Job.setNumReduceTasks(自定义分区个数)

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;

/** 
 * Partitions the key space.
 * 
 * <p><code>Partitioner</code> controls the partitioning of the keys of the 
 * intermediate map-outputs. The key (or a subset of the key) is used to derive
 * the partition, typically by a hash function. The total number of partitions
 * is the same as the number of reduce tasks for the job. Hence this controls
 * which of the <code>m</code> reduce tasks the intermediate key (and hence the 
 * record) is sent for reduction.</p>
 *
 * <p>Note: A <code>Partitioner</code> is created only when there are multiple
 * reducers.</p>
 *
 * <p>Note: If you require your Partitioner class to obtain the Job's
 * configuration object, implement the {@link Configurable} interface.</p>
 * 
 * @see Reducer
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Partitioner<KEY, VALUE> {
  
  /** 
   * Get the partition number for a given key (hence record) given the total 
   * number of partitions i.e. number of reduce-tasks for the job.
   *   
   * <p>Typically a hash function on a all or a subset of the key.</p>
   *
   * @param key the key to be partioned.
   * @param value the entry value.
   * @param numPartitions the total number of partitions.
   * @return the partition number for the <code>key</code>.
   */
  public abstract int getPartition(KEY key, VALUE value, int numPartitions);
  
}

2. 成绩分组

  成绩文本如下,第一列为人名,第二列为成绩。目标是将成绩分为5段,分别为 [ 0 , 20 ) [0, 20) [0,20) [ 20 , 40 ) [20, 40) [20,40) [ 40 , 60 ) [40, 60) [40,60) [ 60 , 80 ) [60, 80) [60,80) [ 80 , 100 ] [80, 100] [80,100]

1 23
2 78
3 45
4 12
5 67
6 34
7 89
8 56
9 9
10 78
11 21
12 54
13 83
14 10
15 65
16 39
17 92
18 47
19 28
20 72

2.1 Map

  假设这个MapReduce作业使用了1个Map,Map的作用是从文本获取<人名,成绩>键值对,同时保证成绩在有效范围内。
在这里插入图片描述

2.2 Partition

  根据成绩进行分区,其中1的范围是 [ 20 , 40 ) [20, 40) [20,40),3的范围是 [ 60 , 80 ) [60, 80) [60,80),2的范围是 [ 40 , 60 ) [40, 60) [40,60),4的范围是 [ 80 , 100 ] [80, 100] [80,100],0的范围是 [ 0 , 20 ) [0, 20) [0,20)
在这里插入图片描述

2.3 Reduce

  reduce不进行任何操作,直接将分区结果排序后写入5个文件中。
在这里插入图片描述

3. 代码和结果

3.1 pom.xml中依赖配置

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>3.3.6</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>3.3.6</version>
      <type>pom</type>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <version>3.3.6</version>
    </dependency>
  </dependencies>

3.2 工具类util

import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;


public class util {
    public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {
        URI add = new URI(uri);
        return FileSystem.get(add, conf);
    }

    public static void removeALL(String uri, Configuration conf, String path) throws Exception {
        FileSystem fs = getFileSystem(uri, conf);
        if (fs.exists(new Path(path))) {
            boolean isDeleted = fs.delete(new Path(path), true);
            System.out.println("Delete Output Folder? " + isDeleted);
        }
    }

    public static void  showResult(String uri, Configuration conf, String path) throws Exception {
        FileSystem fs = getFileSystem(uri, conf);
        String regex = "part-r-";
        Pattern pattern = Pattern.compile(regex);

        if (fs.exists(new Path(path))) {
            FileStatus[] files = fs.listStatus(new Path(path));
            for (FileStatus file : files) {
                Matcher matcher = pattern.matcher(file.getPath().toString());
                if (matcher.find()) {
                	System.out.println(file.getPath() + ":");
                    FSDataInputStream openStream = fs.open(file.getPath());
                    IOUtils.copyBytes(openStream, System.out, 1024);
                    openStream.close();
                }
            }
        }
    }
}

3.3 GroupScores

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class App {
    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] splitStr = value.toString().split(" ");
            Text keyOut = new Text(splitStr[0]);
            int grade = Integer.parseInt(splitStr[1]);
            if (grade >= 0 && grade <= 100) {
                IntWritable valueOut = new IntWritable(grade);
                context.write(keyOut, valueOut);
            }
        }
    }

    public static class MyPartitioner extends Partitioner<Text, IntWritable> {
        @Override
        public int getPartition(Text key, IntWritable value, int numPartitions) {
            if (value.get() >= 80) {
                return 0;
            } else if (value.get() >= 60) {
                return 1;
            } else if (value.get() >= 40) {
                return 2;
            } else if (value.get() >= 20) {
                return 3;
            } else {
                return 4;
            }
        }
    }

    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            for (IntWritable value : values) {
                context.write(key, value);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] myArgs = {
            "file:///home/developer/CodeArtsProjects/mapreduce-partitioner/values.txt",
            "hdfs://localhost:9000/user/developer/GroupScores/output"
        };
        util.removeALL("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);
        Job job = Job.getInstance(conf, "GroupScores");
        job.setMapperClass(MyMapper.class);
        job.setPartitionerClass(MyPartitioner.class);
        job.setReducerClass(MyReducer.class);
        job.setNumReduceTasks(5);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < myArgs.length - 1; i++) {
            FileInputFormat.setInputPaths(job, new Path(myArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(myArgs[myArgs.length - 1]));
        int res = job.waitForCompletion(true) ? 0 : 1;
        if (res == 0) {
            System.out.println("GroupScores结果为:");
            util.showResult("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);
        }
        System.exit(res);
    }
}

3.4 结果

在这里插入图片描述

参考

吴章勇 杨强著 大数据Hadoop3.X分布式处理实战


http://www.kler.cn/a/532775.html

相关文章:

  • 游戏引擎 Unity - Unity 下载与安装
  • AI 编程工具—Cursor进阶使用 Agent模式
  • 【零基础到精通】小白如何自学网络安全
  • Docker使用指南(一)——镜像相关操作详解(实战案例教学,适合小白跟学)
  • selenium记录Spiderbuf例题C03
  • k8s二进制集群之ETCD集群证书生成
  • Vue2 项目中使用 Swiper
  • 尚硅谷课程【笔记】——大数据之Shell【一】
  • LeetCode:516.最长回文子序列
  • 【数据结构】_栈的结构与实现
  • 人工智能专业术语详解(A)
  • Windows:AList+RaiDrive挂载阿里云盘至本地磁盘
  • Javaweb学习之Mysql(Day5)
  • excel电子表(或csv)中如何合并两个工作表,超过1,048,576行
  • 大模型高级工程师实践 - 将课程内容转为音频
  • 手写MVVM框架-收集依赖
  • 优选算法合集————双指针(专题二)
  • ZZNUOJ(C/C++)基础练习1051——1060(详解版)
  • linux 命令笔记
  • Linux(Centos)安装allnnlp失败,jsonnet报错
  • git进阶--4---git rebase 和 git merge的区别与联系
  • kubernetes 核心技术-Helm
  • MySQL 事务实现原理( 详解 )
  • 【web js逆向分析易盾滑块fp参数】逆向分析网易易盾滑块的 fp 参数,仅供学习交流
  • 渗透笔记2
  • 人工智能赋能企业系统架构设计:以ERP与CRM系统为例