当前位置: 首页 > article >正文

Hadoop等大数据处理框架的Java API

Hadoop 是一个非常流行的大数据处理框架,主要用于存储和处理大规模数据集。Hadoop 主要有两个核心组件:HDFS(Hadoop Distributed File System)和 MapReduce。此外,还有许多其他组件,如 YARN(Yet Another Resource Negotiator)、HBase、Hive 等。下面详细介绍 Hadoop 及其相关组件的 Java API 及其使用方法。

Hadoop

Hadoop 的主要组件
  1. HDFS:分布式文件系统,用于存储大规模数据。
  2. MapReduce:分布式计算框架,用于处理大规模数据。
  3. YARN:资源管理系统,用于调度和管理集群资源。
  4. HBase:NoSQL 数据库,用于存储海量数据。
  5. Hive:数据仓库工具,提供 SQL 接口用于查询 Hadoop 数据。

HDFS Java API

HDFS Java API 提供了对 HDFS 文件系统的操作支持,包括文件的创建、读取、写入等。

示例代码
  1. 创建 HDFS 配置
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSDemo {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");

        FileSystem fs = FileSystem.get(conf);
    }
}
  1. 上传文件到 HDFS
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.net.URI;

public class UploadFileToHDFS {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");

        FileSystem fs = FileSystem.get(conf);
        Path localPath = new Path("/path/to/local/file.txt");
        Path hdfsPath = new Path("/hdfs/path/file.txt");

        fs.copyFromLocalFile(localPath, hdfsPath);
        System.out.println("File uploaded successfully.");
    }
}
  1. 从 HDFS 下载文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

public class DownloadFileFromHDFS {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");

        FileSystem fs = FileSystem.get(conf);
        Path hdfsPath = new Path("/hdfs/path/file.txt");
        Path localPath = new Path("/path/to/local/file.txt");

        fs.copyToLocalFile(hdfsPath, localPath);
        System.out.println("File downloaded successfully.");
    }
}
  1. 列出 HDFS 目录下的文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

import java.io.IOException;

public class ListFilesInHDFS {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");

        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("/hdfs/path");

        RemoteIterator<org.apache.hadoop.fs.FileStatus> files = fs.listFiles(path, true);
        while (files.hasNext()) {
            org.apache.hadoop.fs.FileStatus file = files.next();
            System.out.println(file.getPath());
        }
    }
}

MapReduce Java API

MapReduce 是 Hadoop 的分布式计算框架,用于处理大规模数据集。MapReduce Java API 提供了编写 MapReduce 任务的支持。

示例代码
  1. 编写 Mapper 类
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\\W+");
        for (String w : words) {
            if (!w.isEmpty()) {
                word.set(w.toLowerCase());
                context.write(word, one);
            }
        }
    }
}
  1. 编写 Reducer 类
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}
  1. 编写 Driver 类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCountDriver.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

YARN Java API

YARN 是 Hadoop 的资源管理系统,用于调度和管理集群资源。YARN Java API 提供了对 YARN 资源管理的支持。

示例代码
  1. 提交作业
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.io.IOException;

public class YARNSubmitJob {
    public static void main(String[] args) throws IOException {
        YarnConfiguration conf = new YarnConfiguration();
        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(conf);
        yarnClient.start();

        ApplicationSubmissionContext appContext = yarnClient.createApplicationSubmissionContext();
        appContext.setApplicationName("MyApp");
        appContext.setAMCommand("yarn jar /path/to/jar.jar");
        appContext.setAMResource(Resource.newInstance(1024, 1));
        appContext.setApplicationMasterClass("com.example.MyApplicationMaster");
        appContext.setApplicationMasterResource(URL.pathToUrl("/path/to/resource.jar"));
        appContext.setQueue("default");

        ApplicationId appId = yarnClient.submitApplication(appContext);
        System.out.println("Application submitted with ID: " + appId);

        while (true) {
            ApplicationReport report = yarnClient.getApplicationReport(appId);
            YarnApplicationState state = report.getYarnApplicationState();
            if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED || state == YarnApplicationState.FAILED) {
                System.out.println("Application completed with state: " + state);
                break;
            }
            Thread.sleep(1000);
        }

        yarnClient.stop();
    }
}

HBase Java API

HBase 是 Hadoop 生态系统中的 NoSQL 数据库,用于存储海量数据。HBase Java API 提供了对 HBase 表的操作支持。

示例代码
  1. 创建 HBase 表
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class CreateHBaseTable {
    public static void main(String[] args) throws IOException {
        Configuration conf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();

        TableName tableName = TableName.valueOf("mytable");
        if (!admin.tableExists(tableName)) {
            HTableDescriptor tableDesc = new HTableDescriptor(tableName);
            HColumnDescriptor columnFamily = new HColumnDescriptor("cf");
            tableDesc.addFamily(columnFamily);
            admin.createTable(tableDesc);
            System.out.println("Table created successfully.");
        } else {
            System.out.println("Table already exists.");
        }

        admin.close();
        connection.close();
    }
}
  1. 向 HBase 表中插入数据
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class InsertDataIntoHBase {
    public static void main(String[] args) throws IOException {
        Configuration conf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("mytable"));

        Put put = new Put(Bytes.toBytes("row1"));
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes("value1"));
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col2"), Bytes.toBytes("value2"));
        table.put(put);

        System.out.println("Data inserted successfully.");

        table.close();
        connection.close();
    }
}

Hive Java API

Hive 是 Hadoop 生态系统中的数据仓库工具,提供 SQL 接口用于查询 Hadoop 数据。Hive Java API 提供了对 Hive 数据库的操作支持。

示例代码
  1. 创建 Hive 表
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.jdbc.HiveDriver;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

public class CreateHiveTable {
    public static void main(String[] args) throws Exception {
        HiveConf hiveConf = new HiveConf(CreateHiveTable.class);
        hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOK, "jdbc:hive2://localhost:10000/default;transportMode=http");
        hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT, "10000");
        hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL, "false");
        hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH, "cliservice");

        Class.forName(hiveConf.get(HiveConf.ConfVars.HIVEJDBC_DRIVER.varname));
        Connection conn = DriverManager.getConnection(hiveConf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_URL.varname), hiveConf);
        Statement stmt = conn.createStatement();

        String createTableSQL = "CREATE TABLE IF NOT EXISTS mytable (id INT, name STRING)";
        stmt.execute(createTableSQL);

        System.out.println("Table created successfully.");

        stmt.close();
        conn.close();
    }
}
  1. 向 Hive 表中插入数据
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.jdbc.HiveDriver;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class InsertDataIntoHive {
    public static void main(String[] args) throws Exception {
        HiveConf hiveConf = new HiveConf(InsertDataIntoHive.class);
        hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOK, "jdbc:hive2://localhost:10000/default;transportMode=http");
        hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT, "10000");
        hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL, "false");
        hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH, "cliservice");

        Class.forName(hiveConf.get(HiveConf.ConfVars.HIVEJDBC_DRIVER.varname));
        Connection conn = DriverManager.getConnection(hiveConf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_URL.varname), hiveConf);
        PreparedStatement pstmt = conn.prepareStatement("INSERT INTO TABLE mytable VALUES (?, ?)");

        pstmt.setInt(1, 1);
        pstmt.setString(2, "John Doe");
        pstmt.executeUpdate();

        System.out.println("Data inserted successfully.");

        pstmt.close();
        conn.close();
    }
}

总结

Hadoop 及其相关组件提供了丰富的 Java API,用于处理大规模数据集。这些组件包括:

  1. HDFS:分布式文件系统,用于存储大规模数据。
  2. MapReduce:分布式计算框架,用于处理大规模数据。
  3. YARN:资源管理系统,用于调度和管理集群资源。
  4. HBase:NoSQL 数据库,用于存储海量数据。
  5. Hive:数据仓库工具,提供 SQL 接口用于查询 Hadoop 数据。

通过使用这些 Java API,可以有效地管理和处理大规模数据集。这些组件相互配合,可以实现复杂的大数据处理任务。掌握了这些组件的 Java API 后,可以更好地利用 Hadoop 生态系统来构建高性能、高可靠性的大数据处理系统。


http://www.kler.cn/a/351025.html

相关文章:

  • 计算机网络 (44)电子邮件
  • FPGA工程师成长四阶段
  • DNS解析域名简记
  • 每日学习30分轻松掌握CursorAI:Cursor插件系统与扩展功能
  • 【Hive】新增字段(column)后,旧分区无法更新数据问题
  • PyQt5
  • Ansible自动化运维实践:从入门到进阶
  • Java 枚举类
  • 【深度学习】阿里云GPU服务器免费试用3月
  • 【Python】 list dict数据合并汇总demo
  • LinkedList和链表(上)
  • no WeWorkFinanceSdk in java.library.path
  • 嵌入式数据结构中树与查找方法实现
  • Java 8 Stream API:从基础到高级,掌握流处理的艺术
  • RabbitMQ 入门(四)SpringAMQP五种消息类型
  • 在 Windows 环境下,Git 默认会自动处理 CRLF 和 LF 之间的转换。
  • 探索MB15镁合金棒:高强度与轻质性的完美结合
  • 编译Thingsboard3.8.0的过程记录
  • 【人工智能】解释性AI(Explainable AI)——揭开机器学习模型的“黑箱”
  • 架构师备考-背诵精华(架构开发方法)
  • 利用LangGraph和Waii实现你的chat2db!
  • 嵌入式工业显示器在食品生产行业的应用
  • 机器学习面试笔试知识点-线性回归、逻辑回归(Logistics Regression)和支持向量机(SVM)
  • postgresql 安装
  • 系统开发基础——开发模型
  • 数据结构 ——— 顺序表和链表的区别以及各自的优缺点