当前位置: 首页 > article >正文

HDFS工具类

使用hdfs工具类,必须要引用hdfs依赖文件,以下是maven引入配置:

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>3.3.4</version>

        </dependency>

具体代码如下:

import org.apache.commons.lang3.StringUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.Arrays;

/**

  • ClassName: HdfsUtils

  • Package: com.cennavi.ps.track.util

  • Description: hdfs工具类

  • Datetime: 2024/8/23 10:07

  • Author: zhouxl

*/

public class HdfsUtils {

    private static String HADOOP_URL;

    private static String HADOOP_USERNAME;

    static Configuration config = new Configuration();

    public static void init(String hadoopUrl,String hadoopUser){

        HADOOP_URL = hadoopUrl;

        HADOOP_USERNAME = hadoopUser;

        config.set("fs.defalut.name", HADOOP_URL);

        config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");

        config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");

        config.set("dfs.client.use.datanode.hostname", "true");

        config.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");

    }

/**

    * 创建目录

    * @param folder 目录路径

    * @throws IOException

    */

    public static void mkdirs(String folder) throws IOException, InterruptedException {

        Path path = new Path(folder);

        FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);

        if (!fs.exists(path)) {

            fs.mkdirs(path);

            System.out.println("Create: " + folder);

        }

        fs.close();

    }

    /**

    * 上传文件

    * @param localPath 本地文件路径

    * @param remotePath 远程文件路径

    */

    public static void upload(String localPath, String remotePath) throws IOException, InterruptedException {

        FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);

        fs.copyFromLocalFile(new Path(localPath),new Path(remotePath));

        fs.close();

    }

    /**

    * 下载文件

    * @param remotePath 远程文件路径

    * @param localPath 本地文件路径

    */

    public static void download(String remotePath, String localPath) throws IOException, InterruptedException {

        FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);

        fs.copyToLocalFile(new Path(remotePath),new Path(localPath));

        fs.close();

    }

    /**

    * 删除文件或文件夹

    * @param remotePath 远程文件路径

    */

    public static void delete(String remotePath) throws IOException, InterruptedException {

        FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);

        fs.delete(new Path(remotePath),true);

        fs.close();

    }

    /**

    * 创建并写入文件

    * @param file 文件路径

    * @param content 文件内容

    * @throws IOException

    * @throws InterruptedException

    */

    public static void createFile(String file, byte[] content) throws IOException, InterruptedException {

        FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);

        FSDataOutputStream os = null;

        try {

            os = fs.create(new Path(file));

            os.write(content);

        } finally {

            if (os != null){

                os.close();

            }

        }

        fs.close();

    }

/**

    * 追加文件内容

    * @param data 数据

    * @param path 文件路径

    * @throws Exception

    */

    public static void appendFile(String data, String path) throws Exception {

        byte[] byteData = data.getBytes();

        int formatLength = 1024 * 1024 * 8;

        int count = byteData.length / formatLength;

        for (int i = 0; i <= count; i++) {

            if (i == count) {

                byte[] end = Arrays.copyOfRange(byteData, formatLength*i, byteData.length);

                append(path, end, end.length);

            } else {

                append(path, Arrays.copyOfRange(byteData, formatLengthi, formatLength(i+1)), formatLength);

            }

        }

    }

    /**

    * 追加文件内容

    * @throws Exception

    */

    private static void append(String hdfsPath, byte[] content, int length) throws Exception {

        if (StringUtils.isNoneBlank(hdfsPath) && null != content) {

            FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);

            Path path = new Path(hdfsPath);

            if (fs.exists(path)) {

                FSDataOutputStream outputStream = fs.append(path);

                outputStream.write(content, 0, length);

                outputStream.close();

                fs.close();

            } else {

                createFile(hdfsPath, content);

            }

        }

    }

    /**

    * 查看文件内容

    * @param remotePath 文件路径

    * @param encode 编码

    * @return

    */

    public static String readFile(String remotePath, String encode) throws IOException, InterruptedException {

        FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);

        FSDataInputStream os = fs.open(new Path(remotePath));

        return imputStramToString(os,encode);

    }

    /**

    * 把输入流转为指定字符

    * @param inputStream 输入流

    * @param encode 编码类型

    * @return

    */

    private static String imputStramToString(FSDataInputStream inputStream, String encode) {

        if(StringUtils.isBlank(encode)){

            encode = "utf-8";

        }

        try {

            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream,encode));

            StringBuilder builder = new StringBuilder();

            String str = "";

            while ((str=reader.readLine())!=null){

                builder.append(str);

            }

            return builder.toString();

        } catch (Exception e) {

            e.printStackTrace();

        }

        return null;

    }

public static void main(String[] args) {

        try {

            HdfsUtils.init("hdfs://127.0.0.1:9000","root");

            String s = HdfsUtils.readFile("/tmp/zxl/test1/test.json", null);

            System.out.println("==="+s);

        } catch (Exception e) {

            e.printStackTrace();

        }

        System.out.println("处理完成");

    }

}


http://www.kler.cn/a/293881.html

相关文章:

  • C++面试基础知识:排序算法 C++实现
  • Elasticsearch可视化工具Elasticvue插件用法
  • BERT配置详解1:构建强大的自然语言处理模型
  • scrapy爬取中信证券销售金融产品信息
  • 开源vs闭源:你更看好哪一方?
  • 【Spring】@Autowired与@Resource的区别
  • 高级架构师备考计划
  • Maven持续集成(Continuous integration,简称CI)版本友好管理
  • tailwindcss在vue2中安装配置流程
  • Kafka大厂面试14问(附答案)
  • windows下php+nginx的wordpress配置教程和问题解决
  • Python绘制嫦娥奔月
  • IP和品牌有什么区别?
  • 深度学习每周学习总结N9:transformer复现
  • 牵手西安,产业园区如何“玩转”数字媒体产业?
  • 产品经理就业
  • 深度学习(三)-反向传播
  • mac 安装brew并配置国内源
  • 前端框架介绍
  • 昆明理工大学MBA工商管理学费
  • 二、再识Django
  • 彩虹数字屏保时钟 芝麻时钟开启个性化的时代 屏保怎么能少它
  • 马来西亚参访团走进数字人企业世优科技,共鉴元宇宙数字创新成果
  • 【vue、UI】使用 Vue2 和 Element UI 封装 CSV 文件上传组件,实现csv回显
  • IIS中间件
  • Windows 11安装nvm教程