大数据技术-Hadoop(三)Mapreduce的介绍与使用
目录
一、概念和定义
二、WordCount案例
1、WordCountMapper
2、WordCountReducer
3、WordCountDriver
三、序列化
1、为什么序列化
2、为什么不用Java的序列化
3、Hadoop序列化特点:
4、自定义bean对象实现序列化接口(Writable)
4.1、bean
4.2、FlowBeanMapper
4.3、FlowReducer
4.4、FlowDriver
四、MapReduce框架原理
1、mapreduce流程
2、Shuffle机制
3、Partion分区
3.1、 默认分区方法
3.2、自定义分区
4、WritableComparable
5、Combiner合并
6、自定义FileOutputFormat
7、Reduce Join
8、数据清洗 ETL
五、数据压缩
1、参数说明
2、代码示例
六、完整代码
七、参考
一、概念和定义
请看 https://blog.csdn.net/weixin_48935611/article/details/137856999,这个文章概括的很全面,本文主要展示MapReduce的使用。
二、WordCount案例
1、WordCountMapper
package com.xiaojie.hadoop.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: TODO
* @date 2024/12/27 9:00
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text kOut = new Text();
IntWritable vOut = new IntWritable(1);
/**
* @param key 偏移量
* @param value 文本值
* @param context 上下文
* @description:
* @return: void
* @author 熟透的蜗牛
* @date: 2024/12/27 9:01
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// hello world
// hello mapreduce
// hello haddop
// hadoop
// java
// mysql
// mysql orcale
/**
这里输出的结果为(hello,1)(world,1)(hello,1) (mapreduce,1)(hello,1)......
*/
//获取一行,输入的内容
String line = value.toString();
//分隔
String[] words = line.split(" ");
for (String word : words) {
kOut.set(word);
//kout 即为单词 vout 单词出现的次数
context.write(kOut, vOut);
}
}
}
2、WordCountReducer
package com.xiaojie.hadoop.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: reduce把map的输出当作输入
* @date 2024/12/27 9:17
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum;
IntWritable v = new IntWritable();
/**
* @param key map 输出的key kOut
* @param values map输出的value Vout
* @param context
* @description:
* @return: void
* @author 熟透的蜗牛
* @date: 2024/12/27 9:22
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//累加求和,合并map传递过来的值
sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
//输出结果
v.set(sum);
context.write(key, v);
}
}
3、WordCountDriver
package com.xiaojie.hadoop.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: TODO
* @date 2024/12/27 9:23
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1 获取配置信息以及获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 关联本Driver程序的jar
job.setJarByClass(WordCountDriver.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\hello.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\wordcount"));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
三、序列化
1、为什么序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
2、为什么不用Java的序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
3、Hadoop序列化特点:
- (1)紧凑:高效使用存储空间。
- (2)快速:读写数据的额外开销小。
- (3)互操作:支持多语言的交互
4、自定义bean对象实现序列化接口(Writable)
4.1、bean
package com.xiaojie.hadoop.mapreduce.flow;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: 定义一个bean 实现 writable接口
* @date 2024/12/27 10:25
*/
public class FlowBean implements Writable {
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //总流量
//创建无参构造函数
public FlowBean() {
}
//创建gettter setter 方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
//重写setSumFlow 方法
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//重写序列化方法,输出和输入的顺序要保持一致
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
//结果显示在文本中,重写tostring 方法,
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
4.2、FlowBeanMapper
package com.xiaojie.hadoop.mapreduce.flow;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: 流量mapper
* @date 2024/12/27 10:32
*/
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
//定义一个输出的key
private Text outKey = new Text();
//定义输出的value 即 FlowBean
private FlowBean outValue = new FlowBean();
/**
* @param key map的输入值偏移量
* @param value map 的输入value
* @param context
* @description:
* @return: void
* @author 熟透的蜗牛
* @date: 2024/12/27 10:35
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
//获取一行数据
String line = value.toString();
//切割数据
String[] split = line.split("\t");
//抓取我们需要的数据:手机号,上行流量,下行流量
String phone = split[1]; //手机号
//上行流量 ,由于有的数据没有,这里从后面取值
Long upFlow = Long.parseLong(split[split.length - 3]);
Long downFlow = Long.parseLong(split[split.length - 2]);
//封装输出结果
//设置输出的key
outKey.set(phone);
//设置输出的value
outValue.setUpFlow(upFlow);
outValue.setDownFlow(downFlow);
outValue.setSumFlow();
//写出outK outV
context.write(outKey, outValue);
}
}
4.3、FlowReducer
package com.xiaojie.hadoop.mapreduce.flow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: 定义流量输出的reduce
* @date 2024/12/27 10:46
*/
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
private FlowBean finalOutV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
long totalUp = 0;
long totalDown = 0;
//遍历values,将其中的上行流量,下行流量分别累加
for (FlowBean bean : values) {
totalUp += bean.getUpFlow();
totalUp += bean.getDownFlow();
}
//封装输出结果
finalOutV.setUpFlow(totalUp);
finalOutV.setDownFlow(totalDown);
finalOutV.setSumFlow();
//输出结果
context.write(key, finalOutV);
}
}
4.4、FlowDriver
package com.xiaojie.hadoop.mapreduce.flow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: 驱动
* @date 2024/12/27 10:55
*/
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//设置jar
job.setJarByClass(FlowDriver.class);
//设置manpper 和reducer
job.setMapperClass(FlowBeanMapper.class);
job.setReducerClass(FlowReducer.class);
//设置map输出kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//设置最终输出结果kv
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("d://hadoop//phone.txt"));
FileOutputFormat.setOutputPath(job, new Path("d://hadoop//phone"));
//提交任务
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
四、MapReduce框架原理
1、mapreduce流程
直观的效果,图片来自 https://blog.csdn.net/weixin_48935611/article/details/137856999
2、Shuffle机制
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle
(1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
(3)多个溢出文件会被合并成大的溢出文件
(4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
(5)ReduceTask根据自己的分区号,去各个MapTask机器上拉取相应的结果分区数据
(6)ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
(7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)
注意:
(1)Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。
(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb默认100M。
3、Partion分区
3.1、 默认分区方法
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
分区个数小于1的时候,就不会再执行上面的分区计算
3.2、自定义分区
package com.xiaojie.hadoop.mapreduce.partitioner;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: 自定义分区
* @date 2024/12/29 15:52
*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
/**
* @param text 键值
* @param flowBean 值
* @param numPartitions 返回的分区数
* @description: 分区逻辑, 手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中
* @return: int
* @author 熟透的蜗牛
* @date: 2024/12/29 15:54
*/
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
int partition;
if (StringUtils.isNotBlank(text.toString())) {
if (text.toString().startsWith("136")) {
partition = 0;
} else if (text.toString().startsWith("137")) {
partition = 1;
} else if (text.toString().startsWith("138")) {
partition = 2;
} else if (text.toString().startsWith("139")) {
partition = 3;
} else {
partition = 4;
}
} else {
partition = 4;
}
return partition;
}
}
package com.xiaojie.hadoop.mapreduce.partitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: 驱动
* @date 2024/12/27 10:55
*/
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//设置jar
job.setJarByClass(FlowDriver.class);
//设置manpper 和reducer
job.setMapperClass(FlowBeanMapper.class);
job.setReducerClass(FlowReducer.class);
//设置map输出kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//设置最终输出结果kv
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//设施任务数 ,这里设置的要和分区个数一致,如果任务数>分区数则输出文件会有多个为空的文件,如果任务数>1并且<分区数,会有数据无法处理发生异常,
// 如果任务数为1 ,只会产生一个文件,分区号必须从0开始,逐渐累加
job.setNumReduceTasks(5);
//指定自定义分区类
job.setPartitionerClass(ProvincePartitioner.class);
//设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("d://hadoop//phone.txt"));
FileOutputFormat.setOutputPath(job, new Path("d://hadoop//phone33"));
//提交任务
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
4、WritableComparable
@Override
public int compareTo(FlowBean o) {
//按照总流量比较,倒序排列
if (this.sumFlow > o.sumFlow) {
return -1;
} else if (this.sumFlow < o.sumFlow) {
return 1;
} else {
//如果总流量一样,按照上行流量排
if (this.upFlow > o.upFlow) {
return -1;
} else if (this.upFlow < o.upFlow) {
return 1;
}
return 0;
}
}
5、Combiner合并
package com.xiaojie.hadoop.mapreduce.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: TODO
* @date 2024/12/29 18:50
*/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable outV= new IntWritable(0);
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum+=val.get();
}
outV.set(sum);
context.write(key, outV);
}
}
package com.xiaojie.hadoop.mapreduce.combiner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: TODO
* @date 2024/12/27 9:23
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1 获取配置信息以及获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 关联本Driver程序的jar
job.setJarByClass(WordCountDriver.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置Combiner
job.setCombinerClass(WordCountCombiner.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\hello.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\wordcount13"));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
6、自定义FileOutputFormat
package com.xiaojie.hadoop.mapreduce.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: TODO
* @date 2024/12/29 20:29
*/
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
//创建一个自定义的RecordWriter返回
LogRecordWriter logRecordWriter = new LogRecordWriter(job);
return logRecordWriter;
}
}
package com.xiaojie.hadoop.mapreduce.outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: TODO
* @date 2024/12/29 20:31
*/
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream fileOut;
private FSDataOutputStream otherOut;
public LogRecordWriter(TaskAttemptContext job) {
try {
//获取文件系统对象
FileSystem fs = FileSystem.get(job.getConfiguration());
//用文件系统对象创建两个输出流对应不同的目录
fileOut = fs.create(new Path("d:/hadoop/file.log"));
otherOut = fs.create(new Path("d:/hadoop/other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String log = key.toString();
//根据一行的log数据是否包含atguigu,判断两条输出流输出的内容
if (log.contains("atguigu")) {
fileOut.writeBytes(log + "\n");
} else {
otherOut.writeBytes(log + "\n");
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
//关流
IOUtils.closeStream(fileOut);
IOUtils.closeStream(otherOut);
}
}
7、Reduce Join
package com.xiaojie.hadoop.mapreduce.join2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapJoinDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置加载jar包路径
job.setJarByClass(MapJoinDriver.class);
// 3 关联mapper
job.setMapperClass(MapJoinMapper.class);
// 4 设置Map输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 设置最终输出KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 加载缓存数据
job.addCacheFile(new URI("file:///D:/hadoop/pd.txt"));
// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
// 6 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\order"));
FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output2222"));
// 7 提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
package com.xiaojie.hadoop.mapreduce.join2;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Map<String, String> pdMap = new HashMap<>();
private Text text = new Text();
//任务开始前将pd数据缓存进pdMap
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//通过缓存文件得到小表数据pd.txt
URI[] cacheFiles = context.getCacheFiles();
Path path = new Path(cacheFiles[0]);
//获取文件系统对象,并开流
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fis = fs.open(path);
//通过包装流转换为reader,方便按行读取
BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
//逐行读取,按行处理
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {
//切割一行
//01 小米
String[] split = line.split("\t");
pdMap.put(split[0], split[1]);
}
//关流
IOUtils.closeStream(reader);
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
//读取大表数据
//1001 01 1
String[] fields = value.toString().split("\t");
//通过大表每行数据的pid,去pdMap里面取出pname
String pname = pdMap.get(fields[1]);
//将大表每行数据的pid替换为pname
text.set(fields[0] + "\t" + pname + "\t" + fields[2]);
//写出
context.write(text,NullWritable.get());
}
}
8、数据清洗 ETL
package com.xiaojie.hadoop.mapreduce.etl;
import com.xiaojie.hadoop.mapreduce.outputformat.LogDriver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WebLogDriver {
public static void main(String[] args) throws Exception {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[]{"D:\\hadoop\\weblog", "D:\\hadoop\\outlog"};
// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 加载jar包
job.setJarByClass(LogDriver.class);
// 3 关联map
job.setMapperClass(WebLogMapper.class);
// 4 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置reducetask个数为0
job.setNumReduceTasks(0);
// 5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
package com.xiaojie.hadoop.mapreduce.etl;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: 数据清洗,清洗掉不符合格式的数据
* @date 2024/12/29 21:37
*/
public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取1行数据
String line = value.toString();
// 2 解析日志
boolean result = parseLog(line, context);
// 3 日志不合法退出
if (!result) {
return;
}
// 4 日志合法就直接写出
context.write(value, NullWritable.get());
}
// 2 封装解析日志的方法
private boolean parseLog(String line, Context context) {
// 1 截取
String[] fields = line.split(" ");
// 2 日志长度大于11的为合法
if (fields.length > 11) {
return true;
} else {
return false;
}
}
}
五、数据压缩
1、参数说明
参数 | 默认值 | 阶段 | 建议 |
io.compression.codecs (在core-site.xml中配置) | 无,这个需要在命令行输入hadoop checknative查看 | 输入压缩 | Hadoop使用文件扩展名判断是否支持某种编解码器 |
mapreduce.map.output.compress(在mapred-site.xml中配置) | false | mapper输出 | 这个参数设为true启用压缩 |
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 | 企业多使用LZO或Snappy编解码器在此阶段压缩数据 |
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) | false | reducer输出 | 这个参数设为true启用压缩 |
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec | reducer输出 | 使用标准工具或者编解码器,如gzip和bzip2 |
2、代码示例
package com.xiaojie.hadoop.mapreduce.zip;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: TODO
* @date 2024/12/27 9:23
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1 获取配置信息以及获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 关联本Driver程序的jar
job.setJarByClass(WordCountDriver.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置压缩格式
FileOutputFormat.setCompressOutput(job, true);
// 设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\hello.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\wordcount111"));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
六、完整代码
spring-boot: Springboot整合redis、消息中间件等相关代码 - Gitee.com
七、参考
https://blog.csdn.net/weixin_48935611/article/details/137856999
参考内容来自尚硅谷大数据学习