当前位置: 首页 > article >正文

mapreduce 将数据清洗后保存到 hbase

mapreduce 将数据清洗后保存到 hbase

数据格式

{"年份":"1990","国家补贴(亿元)":"5.4","地方补贴(亿元)":"3.2","企业补贴(亿元)":"0.8","其他补贴(亿元)":"0.5"}
{"年份":"1991","国家补贴(亿元)":"5.8","地方补贴(亿元)":"3.4","企业补贴(亿元)":"0.9","其他补贴(亿元)":"0.6"}
{"年份":"1992","国家补贴(亿元)":"6.2","地方补贴(亿元)":"3.7","企业补贴(亿元)":"1","其他补贴(亿元)":"0.7"}
{"年份":"1993","国家补贴(亿元)":"7","地方补贴(亿元)":"4.1","企业补贴(亿元)":"1.2","其他补贴(亿元)":"0.8"}
{"年份":"1994","国家补贴(亿元)":"7.8","地方补贴(亿元)":"4.5","企业补贴(亿元)":"1.4","其他补贴(亿元)":"0.9"}
{"年份":"1995","国家补贴(亿元)":"8.5","地方补贴(亿元)":"4.9","企业补贴(亿元)":"1.6","其他补贴(亿元)":"1"}
{"年份":"1996","国家补贴(亿元)":"9.2","地方补贴(亿元)":"5.3","企业补贴(亿元)":"1.8","其他补贴(亿元)":"1.1"}
{"年份":"1997","国家补贴(亿元)":"10","地方补贴(亿元)":"5.7","企业补贴(亿元)":"2","其他补贴(亿元)":"1.2"}
{"年份":"1998","国家补贴(亿元)":"10.8","地方补贴(亿元)":"6.1","企业补贴(亿元)":"2.2","其他补贴(亿元)":"1.3"}
{"年份":"1999","国家补贴(亿元)":"11.6","地方补贴(亿元)":"6.6","企业补贴(亿元)":"2.5","其他补贴(亿元)":"1.4"}
{"年份":"2000","国家补贴(亿元)":"12.5","地方补贴(亿元)":"7.2","企业补贴(亿元)":"2.8","其他补贴(亿元)":"1.6"}
{"年份":"2001","国家补贴(亿元)":"13.5","地方补贴(亿元)":"7.9","企业补贴(亿元)":"3.2","其他补贴(亿元)":"1.8"}
{"年份":"2002","国家补贴(亿元)":"14.5","地方补贴(亿元)":"8.7","企业补贴(亿元)":"3.7","其他补贴(亿元)":"2"}
{"年份":"2003","国家补贴(亿元)":"15.6","地方补贴(亿元)":"9.6","企业补贴(亿元)":"4.3","其他补贴(亿元)":"2.2"}
{"年份":"2004","国家补贴(亿元)":"16.8","地方补贴(亿元)":"10.6","企业补贴(亿元)":"5","其他补贴(亿元)":"2.5"}
{"年份":"2005","国家补贴(亿元)":"18.2","地方补贴(亿元)":"11.7","企业补贴(亿元)":"5.8","其他补贴(亿元)":"2.8"}
{"年份":"2006","国家补贴(亿元)":"19.8","地方补贴(亿元)":"12.9","企业补贴(亿元)":"6.7","其他补贴(亿元)":"3.2"}
{"年份":"2007","国家补贴(亿元)":"21.5","地方补贴(亿元)":"14.3","企业补贴(亿元)":"7.7","其他补贴(亿元)":"3.7"}
{"年份":"2008","国家补贴(亿元)":"23.3","地方补贴(亿元)":"15.9","企业补贴(亿元)":"8.8","其他补贴(亿元)":"4.3"}
{"年份":"2009","国家补贴(亿元)":"25.2","地方补贴(亿元)":"17.6","企业补贴(亿元)":"10.1","其他补贴(亿元)":"5"}
{"年份":"2010","国家补贴(亿元)":"27.2","地方补贴(亿元)":"19.4","企业补贴(亿元)":"11.6","其他补贴(亿元)":"5.8"}
{"年份":"2011","国家补贴(亿元)":"29.2","地方补贴(亿元)":"21.3","企业补贴(亿元)":"13.3","其他补贴(亿元)":"6.7"}
{"年份":"2012","国家补贴(亿元)":"31.3","地方补贴(亿元)":"23.4","企业补贴(亿元)":"15.2","其他补贴(亿元)":"7.7"}
{"年份":"2013","国家补贴(亿元)":"33.5","地方补贴(亿元)":"25.6","企业补贴(亿元)":"17.3","其他补贴(亿元)":"8.8"}
{"年份":"2014","国家补贴(亿元)":"35.8","地方补贴(亿元)":"27.9","企业补贴(亿元)":"19.6","其他补贴(亿元)":"10"}
{"年份":"2015","国家补贴(亿元)":"38.2","地方补贴(亿元)":"30.3","企业补贴(亿元)":"22.1","其他补贴(亿元)":"11.4"}
{"年份":"2016","国家补贴(亿元)":"40.7","地方补贴(亿元)":"32.8","企业补贴(亿元)":"24.9","其他补贴(亿元)":"13.1"}
{"年份":"2017","国家补贴(亿元)":"43.3","地方补贴(亿元)":"35.5","企业补贴(亿元)":"27.9","其他补贴(亿元)":"15.2"}
{"年份":"2018","国家补贴(亿元)":"46.2","地方补贴(亿元)":"38.3","企业补贴(亿元)":"31.2","其他补贴(亿元)":"17.6"}
{"年份":"2019","国家补贴(亿元)":"49.3","地方补贴(亿元)":"41.3","企业补贴(亿元)":"34.8","其他补贴(亿元)":"20.3"}
{"年份":"2020","国家补贴(亿元)":"52.5","地方补贴(亿元)":"44.6","企业补贴(亿元)":"38.7","其他补贴(亿元)":"23.5"}
{"年份":"2021","国家补贴(亿元)":"55.9","地方补贴(亿元)":"48.2","企业补贴(亿元)":"42.8","其他补贴(亿元)":"27.1"}
{"年份":"2022","国家补贴(亿元)":"59.4","地方补贴(亿元)":"52.1","企业补贴(亿元)":"47.3","其他补贴(亿元)":"31.4"}
{"年份":"2023","国家补贴(亿元)":"63.1","地方补贴(亿元)":"56.5","企业补贴(亿元)":"52.4","其他补贴(亿元)":"36.2"}

javabean

package cn.lhz.bean;

import cn.lhz.util.annotation.RowKeyAnnotation;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
 * 教育历年补贴
 *
 * @author 李昊哲
 * @version 1.0.0
 */
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class SubsidyYear {
  /**
   * 年份
   */
  @RowKeyAnnotation
  private Integer year;

  /**
   * 国家
   */
  private double country;

  /**
   * 地方
   */
  private double local;

  /**
   * 企业
   */
  private double enterprise;

  /**
   * 其它
   */
  private double other;

  @Override
  public String toString() {
    return this.year + "\t" + this.country + "," + this.local + "," + this.enterprise + "," + this.other;
  }
}

mapreduce

package cn.lhz.etl;

import cn.lhz.bean.SubsidyYear;
import cn.lhz.util.hbase.HbaseUtil;
import cn.lhz.util.string.StringUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;

/**
 * 教育历年补贴
 *
 * @author 李昊哲
 * @version 1.0.0
 */
public class SubsidyYear2Hbase {
  public static class SubsidyYearMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
      // 将读取到的每行内容转为 java 字符串
      String json = value.toString();
      // 将读取到的 json 格式字符串 转为 csv 格式字符串
      String csv = StringUtil.extractValuesToString(json);
      System.out.println(csv);
      System.out.println("key >>> " + csv.substring(0, csv.indexOf(",")));
      System.out.println("value >>> " + csv.substring(csv.indexOf(",") + 1));
      // 截取 csv 格式字符串中第一个单元格的字符串作为输出的 key
      Text outKey = new Text(csv.substring(0, csv.indexOf(",")));
      // 截取 csv 格式字符串中除了第一个单元所有的字符串作为输出的 value
      Text outValue = new Text(csv.substring(csv.indexOf(",") + 1));
      // map输出
      context.write(outKey, outValue);
    }
  }

  public static class SubsidyYearReducer extends Reducer<Text, Text, Text, Text> {
    private Connection connection;
    public Table table;

    @Override
    protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
      // 与 hbase 建立连接
      connection = HbaseUtil.getConnection();
      // 数据表名称
      String tableName = "SUBSIDY_YEAR";
      // 获取数据表
      table = HbaseUtil.getTable(connection, tableName);
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
      String csv = "";
      for (Text value : values) {
        csv = value.toString();
      }
      try {
        SubsidyYear subsidyYear = StringUtil.csv2Bean(csv, false, SubsidyYear.class);
        subsidyYear.setYear(Integer.parseInt(key.toString()));
        HbaseUtil.upsert(table, "OVER_THE_YEARS", subsidyYear);
      } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException | InstantiationException e) {
        throw new RuntimeException(e);
      }
    }

    @Override
    protected void cleanup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
      if (table != null) {
        // 释放与 table 资源
        table.close();
      }
      if (connection != null) {
        // 释放与 hbase 之间的连接
        connection.close();
      }
    }
  }

  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    // 设置环境变量 hadoop 用户名 为 root
    System.setProperty("HADOOP_USER_NAME", "root");

    // 参数配置对象
    Configuration conf = new Configuration();

    // 跨平台提交
    conf.set("mapreduce.app-submission.cross-platform", "true");

    // 本地运行
    conf.set("mapreduce.framework.name", "local");
    // 设置集群本地文件系统路径
    conf.set("mapreduce.cluster.local.dir", "file:///home/lhz/hadoop");
    // 设置默认文件系统为 本地文件系统
    // conf.set("fs.defaultFS", "file:///");

    // 声明Job对象 就是一个应用
    // 为当前 job 设置名称 默认名称为打包后在的jar文件名称
    Job job = Job.getInstance(conf, "教育历年补贴");
    // 指定当前Job的驱动类
    job.setJarByClass(SubsidyYear2Hbase.class);
    // 指定当前Job的 Mapper
    job.setMapperClass(SubsidyYearMapper.class);
    // 设置 reduce 输出 value 的数据类型
    job.setReducerClass(SubsidyYearReducer.class);
    // 指定当前Job的 Reducer
    job.setOutputKeyClass(Text.class);
    // 设置 reduce 输出 key 的数据类型
    job.setOutputValueClass(Text.class);
    // 定义 map 输入的路径 注意:该路径默认为hdfs路径
    FileInputFormat.addInputPath(job, new Path("/edu-ods/教育补贴.log"));
    // 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径
    Path path = new Path("/edu-dwd");
    // 根据配置项获取 HDFS 文件系统
    FileSystem fs = path.getFileSystem(conf);
    if (fs.exists(path)) {
      // 如果 数据输出目录存在 则将数据输出目录删除
      fs.delete(path, true);
    }
    FileOutputFormat.setOutputPath(job, path);
    // 提交 job
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}


http://www.kler.cn/a/391454.html

相关文章:

  • 基于CiteSpace的知网专利文献计量分析与可视化
  • Python教程丨Python环境搭建 (含IDE安装)——保姆级教程!
  • Kbengine+Unity3D多人在线游戏服务器+客户端从源码详细搭建教程
  • 当当网热销书籍数据采集与可视化分析
  • 探秘 JMeter (Interleave Controller)交错控制器:解锁性能测试的隐藏密码
  • 5、波分复用 WDM
  • YOLOv8改进 | 利用YOLOv8进行视频划定区域目标统计计数
  • 软件架构技术深入解析:AOP、系统安全架构、企业集成平台与微服务架构
  • go语言进阶之并发模式
  • 产品经理如何优化项目管理流程
  • 哇喔!20种单例模式的实现与变异总结
  • 【LeetCode】【算法】55. 跳跃游戏
  • PyQt入门指南五十四 依赖管理与打包发布
  • 基于标签相关性的多标签学习
  • Ubuntu24.04安装搜狗输入法详细教程
  • Python的Web请求:requests库入门与应用
  • uniapp h5实现录音
  • 鸿蒙与团结引擎c#与ts简单交互
  • 【Linux】基础IO及文件描述符相关内容详细梳理
  • 深入剖析 Web HTTP 请求:从浏览器到服务器的完整流程
  • python:用 sklearn 构建 K-Means 聚类模型
  • 【Vue3】知识汇总,附详细定义和源码详解,后续出微信小程序项目(4)
  • Python爬虫:国家代码(ISO 3166-1)国家货币代码(ISO 4217)
  • 前端学习八股资料CSS(二)
  • requests库如何处理 - POST请求常见的两种请求体格式:表单格式JSON格式
  • 【H3C华三 】VRRP与BFD、Track联动配置案例