当前位置: 首页 > article >正文

HDFS Java 客户端 API

一、基本调用

Configuration 配置对象类,用于加载或设置参数属性

FileSystem 文件系统对象基类。针对不同文件系统有不同具体实现。该类封装了文件系统的相关操作方法。

1. maven依赖pom.xml文件

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13</version>
        </dependency>

2. 创建跟hdfs文件系统的连接和关闭

package com.yifeng.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;


/**
 * @Author: EstellaQ
 * @Date: 2025/2/21 18:00
 * @Description: 测试类
 **/
public class HDFSClientTest {

    private static Configuration conf = null;
    private static FileSystem fs = null;


    /**
     * 初始化方法,用于和hdfs集群建立连接
     * @throws IOException
     */
    @Before
    public void connect2HDFS() throws IOException {
        // 创建配置对象实例
        conf = new Configuration();
        //设置操作的文件系统是HDFS,并且指定HDFS操作地址
        conf.set("fs.defaultFS", "hdfs://node1.yifeng.cn:8020");
        // 创建FileSystem对象实例
        fs = FileSystem.get(conf);
    }

    /**
     * 关闭客户端和hdfs连接
     * @throws IOException 
     */
    @After
    public void close() {
        // 首先判断文件系统实例是否为null,如果不为null,进行关闭
        if (fs != null) {
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
    }
}

3. 权限验证

直接调用api呢,会报错:Permission denied: user=hp, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x

原因:以Windows登录用户访问HDFS,而该用户不具备在HDFS操作权限

解决     

1、修改HDFS文件系统权限     

2、或者设置客户端身份,该身份具备在HDFS操作权限

//设置客户端身份,以具备权限在hdfs上进行操作
System.setProperty("HADOOP_USER_NAME", "root");

4. 创建文件夹

/**
     * 创建文件夹操作
     */
    @Test
    public void mkdir() throws IOException {
        // 判断文件夹是否存在,如果不存在再创建
        if (!fs.exists(new Path("/yifeng"))) {
            //创建文件夹
            fs.mkdirs(new Path("/yifeng"));
        }
    }

5. 上传文件

/**
     * 从本地文件系统上传文件到HDFS
     * Q:本地文件系统指的是?  客户端所在集群的文件系统 此处指的是Windows文件系统
     */
    @Test
    public void  putFile2HDFS() throws IOException {
        //本地文件路径
        Path src = new Path("E:\\appdata\\hdfs\\helloworld.txt");
        //hdfs目标路径
        Path dst = new Path("/yifeng");
        //上传文件 参数(源路径,目标路径)
        fs.copyFromLocalFile(src,dst);
    }

6. 下载文件

/**
     * 从HDFS下载文件到本地文件系统
     */
    @Test
    public void getFile2Local() throws IOException {
        //本地文件路径
        Path src = new Path("/yifeng/helloworld.txt");
        //hdfs目标路径
        Path dst = new Path("E:\\appdata\\helloworld.txt");
        //上传文件 参数(源路径,目标路径)
        fs.copyToLocalFile(src,dst);
    }

 此时会报错:

  • 原因:     
  1. Hadoop访问windows本地文件系统,要求Windows上的本地库能正常工作。
  2. 其中Hadoop使用某些Windows API来实现类似posix的文件访问权限。   
  3. 上述功能需要在hadoop.dll和winutils.exe来实现。
  • 解决:
  1. 下载Hadoop源码在windows平台编译,编译出windows本地库。然后配置Hadoop环境变量。
  2. 配置好之后重启IDEA,有些机器需要右键管理员权限运行IDEA。

HADOOP_HOME=E:\tools\hadoop\hadoop-3.1.4     

path=;%HADOOP_HOME%\bin   

二、Log4j使用

1. 介绍

        Log4J 是 Apache 的一个开源项目。通过在项目中使用 Log4J,我们可以控制日志信息输出到控制台、文件、GUI 组件、甚至是数据库中。我们可以控制每一条日志的输出格式,通过定义日志的输出级别,可以更灵活的控制日志的输出过程。方便项目的调试。

  • 官网: https://logging.apache.org/log4j/2.x/

2. 三大组件

  • Log4J主要由Logger(日志记录器)、Appender(输出端)和 Layout(日志格式化器)组成。
  • Logger控制日志的输出级别与日志是否输出;
  • Appender指定日志的输出方式(ConsoleAppender控制台、FileAppender文件、JDBCAppender等);
  • Layout控制日志信息的输出格式(simple格式、HTML格式、PatternLayout自定义格式)。

3. 日志的级别

  • Log4J 在 org.apache.log4j.Level 类中定义了OFF、FATAL、ERROR、WARN、INFO、DEBUG、TRACE、ALL八种日志级别。
  • 一般只使用4个级别,优先级从高到低为 ERROR > WARN > INFO > DEBUG。

4. 项目中应用

1)项目中引入log4j的jar包

hadoop中已经集成好log4j的jar包了。

2)添加配置文件log4j.properties

  • 只需要在项目路径下新建log4j.properties配置文件,并配置日志的输出格式等信息,Log4J框架会自动的加载配置文件,并将配置信息设置到Logger中。
# 控制台输出配置
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d [%t] %p [%c] - %m%n
#指定日志的输出级别与输出端
log4j.rootLogger=DEBUG,Console


#日志输出格式
%p: 输出日志信息优先级,即DEBUG,INFO,WARN,ERROR,FATAL, 
%d: 输出日志时间点的日期或时间,默认格式为ISO8601,也可以在其后指定格式,比如:%d{yyyy-MM-dd HH:mm:ss,SSS},输出类似:2011-10-18 22:10:28,921 
%r: 输出自应用启动到输出该log信息耗费的毫秒数 
%c: 输出日志信息所属的类目,通常就是所在类的全名 
%t: 输出产生该日志事件的线程名 
%l: 输出日志事件的发生位置,相当于%C.%M(%F:%L)的组合,包括类目名、发生的线程,以及在代码中的行数。 
%x: 输出和当前线程相关联的NDC(嵌套诊断环境),尤其用到像java servlets这样的多客户多线程的应用中。 
%%: 输出一个"%"字符 
%F: 输出日志消息产生时所在的文件名称 
%L: 输出代码中的行号 
%m: 输出代码中指定的消息,产生的日志具体信息 
%n: 输出一个回车换行符,Windows平台为"\r\n",Unix平台为"\n"输出日志信息换行

3)代码中使用

public class Log4Test {

    public static void main(String[] args) {
        Logger logger = Logger.getLogger(Log4Test.class);
        logger.debug("这是debug");
        logger.info("这是info");
        logger.warn("这是warn");
        logger.error("这是error");
        logger.fatal("这是fatal");
    }
}

三、Google Option使用

1. 介绍

  • Google-option是Bazel Project中的命令行参数解析器。
  • option程序包已拆分为一个单独的jar,可用于通用程序。
  • GitHub:https://github.com/pcj/google-options

2. 引入Google-option的jar包

<dependency>  
    <groupId>com.github.pcj</groupId>
    <artifactId>google-options</artifactId>
    <version>1.0.0</version>
</dependency> 

3. 首先创建参数实体类

  • 类应提供与预期的命令行选项相对应的public字段; 每个public字段都应使用@Option注释进行注释 。
  • Option注解参数说明

        name:选项名称

        defaultValue:默认值

        abbrev:选项的单字符缩写

        help:使用信息的帮助字符串

        category:描述此选项所属的类别

        allowMultiple:一个标志,指示是否应允许选项类型在单个选项列表中多次出现。

import com.google.devtools.common.options.Option;
import com.google.devtools.common.options.OptionsBase;

import java.util.List;

public class ServerOptions extends OptionsBase {

    @Option(
            name = "help",
            abbrev = 'h',
            help = "Prints usage info.",
            defaultValue = "true"
    )
    public boolean help;

    @Option(
            name = "host",
            abbrev = 'o',
            help = "The server host.",
            category = "startup",
            defaultValue = ""
    )
    public String host;

    @Option(
            name = "port",
            abbrev = 'p',
            help = "The server port.",
            category = "startup",
            defaultValue = "8080"
    )
    public int port;

    @Option(
            name = "dir",
            abbrev = 'd',
            help = "Name of directory to serve static files.",
            category = "startup",
            allowMultiple = true,
            defaultValue = ""
    )
    public List<String> dirs;

}

4. 在main方法中解析参数

import com.google.devtools.common.options.OptionsParser;

import java.util.Collections;

public class Server {
    public static void main(String[] args) {
        //创建一个参数Parser解析对象  解析规则为ServerOptions中定义
        OptionsParser parser = OptionsParser.newOptionsParser(ServerOptions.class);
        //解析输入参数数组args
        parser.parseAndExitUponError(args);
        //获得Option
        ServerOptions options = parser.getOptions(ServerOptions.class);

        //如果输入参数host为空 或 port<0(非法)或 dirs目录为空
        if (options.host.isEmpty() || options.port < 0 || options.dirs.isEmpty()) {
            //输出Usage使用方法
            printUsage(parser);
            return;
        }
        //输出Server运行的host port
        System.out.format("Starting server at %s:%d...\n", options.host, options.port);
        //输出每个dir的名字
        for (String dirname : options.dirs) {
            System.out.format("\\--> Serving static files at <%s>\n", dirname);
        }
    }

    private static void printUsage(OptionsParser parser) {
        System.out.println("Usage: java -jar server.jar OPTIONS");
        System.out.println(parser.describeOptions(Collections.<String, String>emptyMap(),
                OptionsParser.HelpVerbosity.LONG));
    }
}

四、舆情数据上报案例

  • 在HDFS上有一个目录,相当于企业中的数据中转站,会有源源不断的数据存储进去
  • 其中有些类型的数据是我们业务相关的舆情数据
  • 要求使用程序能够遍历数据源目录,找出符合需求的数据,上传移动到指定的HDFS目录下。

Step1:实现思路

  • 使用Google Option解析命令行参数。
  • 读取要采集的数据目录,生成上传任务,上传任务包含一个任务文件,该文件包含了要上传哪些文件到HDFS上。
  • 执行任务,读取要上传的任务文件,挨个将任务文件中的文件上传到HDFS。
  • 上传中、上传完毕需要给任务文件添加特别的标识(willdoing copy done)。

Step2:工程环境搭建

  • 添加maven依赖     
  • 构建代码包结构 

        arg: 处理命令行参数

        dfs: 存放操作HDFS的工具类

        task: 处理文件上传任务

Step3:开发舆情上报程序参数解析

同上,在main方法中解析参数

创建一个SentimentOptions类, 并从OptionsBase继承,定义以下几个参数

(1) 帮助,可以显示命令的帮助信息 help h 默认参数

(2) 要采集数据的位置 source s

(3) 生成待上传的临时目录 pending_dir p

(4) 生成要上传到的HDFS路径 output o

Step4:实现生成数据采集任务

task包下创建TaskMgr源文件。 先实现生成数据上报任务。

实现步骤:     

1.判断原始数据目录是否存在     

2.读取原始数据目录下的所有文件     

3.判断待上传目录是否存在,不存在则创建一个     

4.创建任务目录(目录名称:task_年月日时分秒_任务状态)     

5.遍历待上传的文件,在待上传目录生成一个willDoing文件     

6.将待移动的文件添加到willDoing文件中

Step5:实现执行数据上报任务

1. 读取待上传目录的willDoing任务文件,注意过滤COPY和DONE后的任务文件夹

2.遍历读取任务文件,开始上传     

        a)将任务文件修改为_COPY,表示正在处理中     

        b)获取任务的日期     

        c)判断HDFS目标上传目录是否存在,不存在则创建     

        d)读取任务文件     

        e)按照换行符切分     

        f)上传每一个文件,调用HDFSUtils进行数据文件上传     

        g)上传成功后,将_COPY后缀修改为_DONE

Step6:项目打包

shade插件可以将所有的jar打到一个jar包中。

将执行主类的项目路径复制到<mainClass></mainClass>中

然后打包:

这里打包好两个jar包,一个是原代码的jar包,下面那个是携带了依赖的jar包

Step7:结果测试

创建一个目录/root/sentiment_upload,使用以下shell脚本来驱动jar包执行。

#!/bin/bash
export SENTIMENT_HOME=/root/sentiment_upload
export JAVA_HOME=/export/server/jdk1.8.0_65
export JAVA_CMD="${JAVA_HOME}/bin/java"
export JAVA_OPS="-jar ${SENTIMENT_HOME}/hdfs-api-1.0-SNAPSHOT-jar-with-dependencies.jar"

SOURCE_DIR=$1
PENDING_DIR=$2
OUTPUT_DIR=$3

if [ ! $SOURCE_DIR ] || [ ! $OUTPUT_DIR ]; then
    ${JAVA_CMD} ${JAVA_OPS} -h
    exit;
fi

if [ ! $PENDING_DIR ] ; then
    ${JAVA_CMD} ${JAVA_OPS} -s $SOURCE_DIR -o $OUTPUT_DIR
    exit;
fi

${JAVA_CMD} ${JAVA_OPS} -s $SOURCE_DIR -p ${PENDING_DIR} -o $OUTPUT_DIR

将脚本和jar包都上传到服务器 /root/sentiment_upload 目录下

创建文件夹 source pending output

mkdir source

mkdir pending

mkdir output

 将数据集上传到source目录下(数据集在代码里边也有)

执行以下命令即可

sh sentiment_upload.sh /root/sentiment_upload/source /root/sentiment_upload/pending /root/sentiment_upload/out

项目地址:https://gitee.com/EstellaQ/hdfs-api.git


http://www.kler.cn/a/557526.html

相关文章:

  • js版本ES6、ES7、ES8、ES9、ES10、ES11、ES12、ES13、ES14[2023]新特性
  • 【多模态处理篇一】【 深度解析DeepSeek图文匹配:CLIP模型迁移实战——从原理到落地的保姆级教程】
  • 前端面试-网络协议篇
  • 【Node.js】express框架
  • 图论 之 迪斯科特拉算法求解最短路径
  • 代码随想录D52-53 图论 Python
  • DeepSeek 全面分析报告
  • Verilog define预处理命令
  • strcpy与strncpy作为复制函数的用法与区别
  • Java中的Stream API:从入门到实战
  • Scrum方法论指导下的Deepseek R1医疗AI部署开发
  • Python深度学习环境配置(Pytorch、CUDA、cuDNN),包括Anaconda搭配Pycharm的环境搭建以及基础使用教程(保姆级教程,适合小白、深度学习零基础入门)
  • Ubuntu 安装 Node.js 20.x
  • Promptic:Python 中的 LLM 应用开发利器
  • 配置haproxy实现MySQL服务器负载均衡
  • 【计算社会学】 多智能体建模 ABM Agent Based Modeling 笔记
  • JavaScript系列(81)--加密技术详解
  • JUC并发—9.并发安全集合四
  • TRL里面GRPOTrainer中grpo_train.py文件详解
  • CNN常用卷积核