hdfs API操作 hadoop3.3.5
目录
- 1.原理说明
- 2.pom.xml
- 3.文件操作代码(创建+上传+下载+重命名+移动+删除+获取文件详细+遍历目录+获取目录及子目录文件详细)
1.原理说明
关于hdfs的原理请查看如下的博客
原理
2.pom.xml
项目使用maven进行打包管理,配置如下
pom.xml
```bash
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.wunaiieq</groupId>
<artifactId>hadoop01</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!--声明-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<!--具体配置-->
<configuration>
<archive>
<manifest>
<!--jar包的执行入口-->
<mainClass>org.wunaiieq.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<!--描述符,此处为预定义的,表示创建一个包含项目所有依赖的可执行 JAR 文件;
允许自定义生成jar文件内容-->
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<!--执行配置-->
<executions>
<execution>
<!--执行配置ID,可修改-->
<id>make-assembly</id>
<!--执行的生命周期-->
<phase>package</phase>
<goals>
<!--执行的目标,single表示创建一个分发包-->
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
3.文件操作代码(创建+上传+下载+重命名+移动+删除+获取文件详细+遍历目录+获取目录及子目录文件详细)
FileOperation.java
我尽可能的将hdfs的文件操作封装到一起,以下是大部分的操作内容,直接用即可
package org.wunaiieq;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.*;
import java.util.Arrays;
public class FileOperation {
/**
* 创建目录
*
* @param path 创建的目录地址
*/
public void mkdir(String path, String nameNode) {
FileSystem fileSystem = null;
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", nameNode);
fileSystem = FileSystem.get(conf);
boolean result = fileSystem.mkdirs(new Path(path));
if (result) {
System.out.println("创建成功");
} else {
System.out.println("创建失败");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (fileSystem != null) {
fileSystem.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 上传文件
*
* @param file_path 文件地址
* @param upload_path 文件上传地址
*/
public void upload(String file_path, String upload_path, String nameNode) {
FileInputStream fileInputStream = null;
FSDataOutputStream fsDataOutputStream = null;
FileSystem fileSystem = null;
try {
//创建文件系统对象
Configuration conf = new Configuration();
//namenode地址
conf.set("fs.defaultFS", nameNode);
//设置block大小为1MB
conf.set("dfs.blocksize", "1048576");
//冗余度
conf.set("dfs.replication", "2");
fileSystem = FileSystem.get(conf);
//文件输入
fileInputStream = new FileInputStream(file_path);
//上传hdfs的地址
fsDataOutputStream = fileSystem.create(new Path(upload_path));
//每次最多写入1024个字节的数据,当数据量不足1024时,有多少写多少
byte[] data = new byte[1024];
int len = -1;
while ((len = fileInputStream.read(data)) != -1) {
fsDataOutputStream.write(data, 0, len);
}
System.out.println("上传成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (fileInputStream != null) {
fileInputStream.close();
}
if (fsDataOutputStream != null) {
fsDataOutputStream.flush();
fsDataOutputStream.close();
}
if (fileSystem != null) {
fileSystem.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 修改文件名+移动文件
*
* @param old_path 原路径以及名称
* @param new_path 新路径和名称
*/
public void rename(String old_path, String new_path, String nameNode) {
FileSystem fileSystem = null;
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", nameNode);
fileSystem = FileSystem.get(conf);
fileSystem.rename(new Path(old_path), new Path(new_path));
System.out.println("移动/重命名完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (fileSystem != null) {
fileSystem.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 文件从hdfs中下载到主机
*
* @param output_path 下载的位置
* @param hdfs_path hdfs中文件的位置
*/
public void download(String output_path, String hdfs_path, String nameNode) throws IOException {
FileSystem fileSystem = null;
try {
//Hadoop基本操作:NameNode设置,客户端创建
//指定NameNode地址
Configuration conf = new Configuration();
//这个在前面的分布模式中有相关设置,表示NameNode的具体地址
conf.set("fs.defaultFS", nameNode);
//创建一个HDFS的客户端DistributedFileSystem
fileSystem = FileSystem.get(conf);
//构造一个输出流指向本地
OutputStream output = new FileOutputStream(output_path);
//hdfs目录中的文件
InputStream input = fileSystem.open(new Path(hdfs_path));
// 使用IOUtils工具类简化文件复制过程
// IOUtils.copyBytes方法会自动从输入流读取数据并写入到输出流,直到输入流结束
// 这里的1024是缓冲区大小,表示每次从输入流读取的数据量
IOUtils.copyBytes(input, output, 1024);
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭客户端,释放与HDFS的连接
fileSystem.close();
}
}
/**
* 删除文件或目录
* <br>如果是文件recursive值不重要
*
* @param path 文件或目录的地址
* @param recursive 是否递归删除,true是
* <br>false:如果删除的目录非空,抛出异常
* <br>只能删除文件和非空目录
* <br>true:递归删除子文件和目录
* <br>删除目录和目录下所有内容,也可以删除单个文件
*/
public void delete(String path, boolean recursive, String nameNode) {
FileSystem fileSystem = null;
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", nameNode);
fileSystem = FileSystem.get(conf);
fileSystem.delete(new Path(path), recursive);
System.out.println("删除成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (fileSystem != null) {
fileSystem.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 获取指定文件信息
*
* @param path 文件地址
*/
public void getFileInfo(String path, String nameNode) {
FileSystem fileSystem = null;
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", nameNode);
fileSystem = FileSystem.get(conf);
//从文件系统对象中获取文件状态
FileStatus fileStatus = fileSystem.getFileStatus(new Path(path));
System.out.println("文件名" + fileStatus.getPath().getName());
System.out.println("文件所有者" + fileStatus.getOwner());
System.out.println("文件所属用户组" + fileStatus.getGroup());
System.out.println("文件大小" + fileStatus.getLen());
System.out.println("文件块大小" + fileStatus.getBlockSize());
System.out.println("文件权限" + fileStatus.getPermission());
System.out.println("文件副本数量" + fileStatus.getReplication());
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (fileSystem != null) {
fileSystem.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 遍历目录下所有的内容,对遍历内容进行判断,判断是文件还是目录
*
* @param path 遍历的目录
*/
public void fileOrDir(String path, String nameNode) {
FileSystem fileSystem = null;
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", nameNode);
fileSystem = FileSystem.get(conf);
FileStatus[] fileStatuses = fileSystem.listStatus(new Path(path));
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isFile()) {
System.out.println(fileStatus.getPath().getName() + "是文件");
} else if (fileStatus.isDirectory()) {
System.out.println(fileStatus.getPath().getName() + "是目录");
} else {
System.out.println("无法判断,既不是文件也不是目录");
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (fileSystem != null) {
fileSystem.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 列出Hdfs中指定路径下的文件和目录信息
*
* @param path 指定的目录
* @param recursive 是否递归
* <br>true:显示指定目录下,包含子目录所有文件信息
* <br>false:只显示当前目录,即指定的目录
*/
public void listFiles(String path, boolean recursive, String nameNode) {
FileSystem fileSystem = null;
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", nameNode);
fileSystem = FileSystem.get(conf);
//使用 listFiles 方法获取指定路径下的文件和目录迭代器。如果 recursive 为 true,则会递归地包含所有子目录的内容。
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path(path), recursive);
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus fileStatus = locatedFileStatusRemoteIterator.next();
System.out.println("文件名" + fileStatus.getPath().getName());
System.out.println("文件所有者" + fileStatus.getOwner());
System.out.println("文件所属用户组" + fileStatus.getGroup());
System.out.println("文件大小" + fileStatus.getLen());
System.out.println("文件块大小" + fileStatus.getBlockSize());
System.out.println("文件权限" + fileStatus.getPermission());
System.out.println("文件副本数量" + fileStatus.getReplication());
//获取文件块信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
System.out.println("偏移量" + blockLocation.getOffset());
System.out.println("名称" + Arrays.toString(blockLocation.getNames()));
String[] hosts = blockLocation.getHosts();
System.out.println("============host============");
for (String host : hosts) {
System.out.println(host);
}
System.out.println("============host end============");
}
System.out.println("======================================");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (fileSystem != null) {
fileSystem.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Main.java
主函数调用,这个没什么说的,自行调用修改吧
package org.wunaiieq;
public class Main {
public static void main(String[] args) throws Exception {
String nameNode = "hdfs://192.168.80.112:9000";
FileOperation fileOperation = new FileOperation();
// String file_path="/root/tools/data.txt";
// String upload_path="/20241015/data.txt";
// fileOperation.upload(file_path,upload_path,nameNode);
// String old_path="/20241015/data2.txt";
// String new_path="/data3.txt";
// fileOperation.rename(old_path,new_path,nameNode);
// String output_path="/root/tools/data5.txt";
// String hdfs_path="/data3.txt";
// fileOperation.download(output_path,hdfs_path,nameNode);
// String path="/data3.txt";
// boolean recursive=false;
// fileOperation.delete(path,recursive,nameNode);
// String path = "/20241015/data.txt";
// fileOperation.getFileInfo(path,nameNode);
String path = "/20241016";
// fileOperation.fileOrDir(path,nameNode);
//fileOperation.listFiles(path,false,nameNode);
fileOperation.mkdir(path, nameNode);
}
}