当前位置: 首页 > article >正文

【大数据学习 | HBASE高级】mr操作hbase

1. mr读取hbase数据

首先在hbase中准备数据

# 创建分数表
create 'score','info'
# 增加学生数据
put 'score','001','info:name','zhangsan'
put 'score','001','info:score','100'
put 'score','001','info:class','1'
put 'score','002','info:name','lisi'
put 'score','002','info:score','95'
put 'score','002','info:class','1'
put 'score','003','info:name','wangwu'
put 'score','003','info:score','98'
put 'score','003','info:class','2'
put 'score','004','info:name','zhaosi'
put 'score','004','info:score','92'
put 'score','004','info:class','2'

下面读取hbase的案例,使用mr读取数据并且求出每个班级的平均分。

读取HBASE中的数据需要继承TableMapper类。

class HMapper extends TableMapper<outkey,outvalue>

获取score表的数据然后输出到reducer端进行分组,将班级作为key,然后这班级的数据都会按照key分在一起,我们就可以计算出来这个班级的平均分了。

首先我们引入maven依赖。

在hbase中去除hadoop的所有依赖,这样就不会出现冲突问题。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hainiu.hbase</groupId>
    <artifactId>TestHBase</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>2.4.13</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.4.13</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.4.13</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    </dependencies>
</project>

整体代码如下:

package com.hainiu.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class MapreduceRead {
    public static class HMapper extends TableMapper<Text, IntWritable>{
        Text k = new Text();
        IntWritable v = new IntWritable();
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            byte[] classBytes = value.getValue(Bytes.toBytes("info"), Bytes.toBytes("class"));
            byte[] scoreBytes = value.getValue(Bytes.toBytes("info"), Bytes.toBytes("score"));
            k.set(Bytes.toString(classBytes));
            String score = Bytes.toString(scoreBytes);
            v.set(Integer.valueOf(score));
            System.out.println(k);
            System.out.println(v);
            context.write(k,v);
        }
    }

    public static class HReducer extends Reducer<Text,IntWritable,Text, DoubleWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            int count = 0;
            for (IntWritable value : values) {
                sum += value.get();
                count ++;
            }
            double avg = sum * 1.0 / count;
            context.write(key,new DoubleWritable(avg));
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = HBaseConfiguration.create();
//        conf.set("HADOOP_HOME","/hadoop");
//        conf.set("HBASE_HOME","/hbase");
        Job job = Job.getInstance(conf);
        job.setJarByClass(MapreduceRead.class);

        TableMapReduceUtil.initTableMapperJob(
                "score",new Scan(),HMapper.class,Text.class,IntWritable.class,job
        );

        job.setReducerClass(HReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        TextOutputFormat.setOutputPath(job,new Path("res"));

        job.waitForCompletion(true);
    }
}

2. mr写出数据到hbase中

首先在本地data文件下面准备数据a.txt

hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack

统计每个单词的出现次数并且将结果存储到hbase的表中。

#在hbase中创建存储单词出现次数的表
create 'wordcount','info'
# 存储数据的时候rowkey设定为单词,info:count 记录单词出现次数

这个时候要存储数据到hbase中那么我们需要在reducer中增加TableReducer的类用于插入hbase中数据。

class HReducer extends TableReducer

整体代码如下:

package com.hainiu.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import java.io.IOException;

public class MapreduceWrite {
    public static class HMapper extends Mapper<LongWritable,Text, Text, IntWritable>{
        Text k = new Text();
        IntWritable v = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split(" ");
            for (String str : strs) {
                k.set(str);
                context.write(k,v);
            }
        }
    }

    public static class HReducer extends TableReducer<Text,IntWritable, NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += 1;
            }
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"),Bytes.toBytes(sum));
            context.write(NullWritable.get(),put);
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = HBaseConfiguration.create();
        Job job = Job.getInstance(conf);
        job.setJarByClass(MapreduceWrite.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setMapperClass(HMapper.class);

        TextInputFormat.addInputPath(job,new Path("data/a.txt"));

        TableMapReduceUtil.initTableReducerJob("wordcount",HReducer.class,job);

        job.waitForCompletion(true);
    }
}

http://www.kler.cn/a/392068.html

相关文章:

  • STM32F4分别驱动SN65HVD230和TJA1050进行CAN通信
  • MySQL 如何赶上 PostgreSQL 的势头?
  • C++ 如何将 gRPC集成到机器人系统中
  • Visio 画阀门 符号 : 电动阀的画法
  • Ubuntu上安装Apache Spark
  • spring boot 集成 knife4j
  • MySQL技巧之跨服务器数据查询:基础篇-更新语句如何写
  • 音视频入门基础:MPEG2-TS专题(3)——TS Header简介
  • 从零开始使用YOLOv11——Yolo检测detect数据集自建格式转换为模型训练格式:20w+图片1w+类别代码测试成功
  • PointMamba: A Simple State Space Model for Point Cloud Analysis——点云论文阅读(10)
  • 边缘计算与推理算力:智能时代的加速引擎
  • 开源大模型推理引擎现状及常见推理优化方法总结
  • ubontu安装anaconda
  • 简单理解回调函数
  • Jenkins配置步骤
  • Spring学习笔记_30——事务接口PlatformTransactionManager
  • 汽车牌照识别系统的设计与仿真(论文+源码)
  • Vue 组件间传值指南:Vue 组件通信的七种方法
  • 软考系统架构设计师论文:论多源数据集成及应用
  • 企业“3D官网”主要有哪些功能?
  • labview实现定时器的功能
  • ❤React-React 组件基础(类组件)
  • Redhat7.9 安装 KingbaseES 金仓数据库 V9单机版(命令行安装)
  • 【设计模式】单例设计模式
  • openresty入门教程:ngx.print ngx.say ngx.log
  • Java LeetCode练习