【大数据学习 | HBASE高级】mr操作hbase
1. mr读取hbase数据
首先在hbase中准备数据
# 创建分数表
create 'score','info'
# 增加学生数据
put 'score','001','info:name','zhangsan'
put 'score','001','info:score','100'
put 'score','001','info:class','1'
put 'score','002','info:name','lisi'
put 'score','002','info:score','95'
put 'score','002','info:class','1'
put 'score','003','info:name','wangwu'
put 'score','003','info:score','98'
put 'score','003','info:class','2'
put 'score','004','info:name','zhaosi'
put 'score','004','info:score','92'
put 'score','004','info:class','2'
下面读取hbase的案例,使用mr读取数据并且求出每个班级的平均分。
读取HBASE中的数据需要继承TableMapper类。
class HMapper extends TableMapper<outkey,outvalue>
获取score表的数据然后输出到reducer端进行分组,将班级作为key,然后这班级的数据都会按照key分在一起,我们就可以计算出来这个班级的平均分了。
首先我们引入maven依赖。
在hbase中去除hadoop的所有依赖,这样就不会出现冲突问题。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hainiu.hbase</groupId>
<artifactId>TestHBase</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.4.13</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.4.13</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.13</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
</project>
整体代码如下:
package com.hainiu.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class MapreduceRead {
public static class HMapper extends TableMapper<Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable();
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException {
byte[] classBytes = value.getValue(Bytes.toBytes("info"), Bytes.toBytes("class"));
byte[] scoreBytes = value.getValue(Bytes.toBytes("info"), Bytes.toBytes("score"));
k.set(Bytes.toString(classBytes));
String score = Bytes.toString(scoreBytes);
v.set(Integer.valueOf(score));
System.out.println(k);
System.out.println(v);
context.write(k,v);
}
}
public static class HReducer extends Reducer<Text,IntWritable,Text, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
for (IntWritable value : values) {
sum += value.get();
count ++;
}
double avg = sum * 1.0 / count;
context.write(key,new DoubleWritable(avg));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = HBaseConfiguration.create();
// conf.set("HADOOP_HOME","/hadoop");
// conf.set("HBASE_HOME","/hbase");
Job job = Job.getInstance(conf);
job.setJarByClass(MapreduceRead.class);
TableMapReduceUtil.initTableMapperJob(
"score",new Scan(),HMapper.class,Text.class,IntWritable.class,job
);
job.setReducerClass(HReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
TextOutputFormat.setOutputPath(job,new Path("res"));
job.waitForCompletion(true);
}
}
2. mr写出数据到hbase中
首先在本地data文件下面准备数据a.txt
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
统计每个单词的出现次数并且将结果存储到hbase的表中。
#在hbase中创建存储单词出现次数的表
create 'wordcount','info'
# 存储数据的时候rowkey设定为单词,info:count 记录单词出现次数
这个时候要存储数据到hbase中那么我们需要在reducer中增加TableReducer的类用于插入hbase中数据。
class HReducer extends TableReducer
整体代码如下:
package com.hainiu.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;
public class MapreduceWrite {
public static class HMapper extends Mapper<LongWritable,Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split(" ");
for (String str : strs) {
k.set(str);
context.write(k,v);
}
}
}
public static class HReducer extends TableReducer<Text,IntWritable, NullWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += 1;
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"),Bytes.toBytes(sum));
context.write(NullWritable.get(),put);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = HBaseConfiguration.create();
Job job = Job.getInstance(conf);
job.setJarByClass(MapreduceWrite.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapperClass(HMapper.class);
TextInputFormat.addInputPath(job,new Path("data/a.txt"));
TableMapReduceUtil.initTableReducerJob("wordcount",HReducer.class,job);
job.waitForCompletion(true);
}
}