HDFS工具类
使用hdfs工具类,必须要引用hdfs依赖文件,以下是maven引入配置:
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
</dependency>
具体代码如下:
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
/**
ClassName: HdfsUtils
Package: com.cennavi.ps.track.util
Description: hdfs工具类
Datetime: 2024/8/23 10:07
Author: zhouxl
*/
public class HdfsUtils {
private static String HADOOP_URL;
private static String HADOOP_USERNAME;
static Configuration config = new Configuration();
public static void init(String hadoopUrl,String hadoopUser){
HADOOP_URL = hadoopUrl;
HADOOP_USERNAME = hadoopUser;
config.set("fs.defalut.name", HADOOP_URL);
config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
config.set("dfs.client.use.datanode.hostname", "true");
config.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
}
/**
* 创建目录
* @param folder 目录路径
* @throws IOException
*/
public static void mkdirs(String folder) throws IOException, InterruptedException {
Path path = new Path(folder);
FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);
if (!fs.exists(path)) {
fs.mkdirs(path);
System.out.println("Create: " + folder);
}
fs.close();
}
/**
* 上传文件
* @param localPath 本地文件路径
* @param remotePath 远程文件路径
*/
public static void upload(String localPath, String remotePath) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);
fs.copyFromLocalFile(new Path(localPath),new Path(remotePath));
fs.close();
}
/**
* 下载文件
* @param remotePath 远程文件路径
* @param localPath 本地文件路径
*/
public static void download(String remotePath, String localPath) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);
fs.copyToLocalFile(new Path(remotePath),new Path(localPath));
fs.close();
}
/**
* 删除文件或文件夹
* @param remotePath 远程文件路径
*/
public static void delete(String remotePath) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);
fs.delete(new Path(remotePath),true);
fs.close();
}
/**
* 创建并写入文件
* @param file 文件路径
* @param content 文件内容
* @throws IOException
* @throws InterruptedException
*/
public static void createFile(String file, byte[] content) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);
FSDataOutputStream os = null;
try {
os = fs.create(new Path(file));
os.write(content);
} finally {
if (os != null){
os.close();
}
}
fs.close();
}
/**
* 追加文件内容
* @param data 数据
* @param path 文件路径
* @throws Exception
*/
public static void appendFile(String data, String path) throws Exception {
byte[] byteData = data.getBytes();
int formatLength = 1024 * 1024 * 8;
int count = byteData.length / formatLength;
for (int i = 0; i <= count; i++) {
if (i == count) {
byte[] end = Arrays.copyOfRange(byteData, formatLength*i, byteData.length);
append(path, end, end.length);
} else {
append(path, Arrays.copyOfRange(byteData, formatLengthi, formatLength(i+1)), formatLength);
}
}
}
/**
* 追加文件内容
* @throws Exception
*/
private static void append(String hdfsPath, byte[] content, int length) throws Exception {
if (StringUtils.isNoneBlank(hdfsPath) && null != content) {
FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);
Path path = new Path(hdfsPath);
if (fs.exists(path)) {
FSDataOutputStream outputStream = fs.append(path);
outputStream.write(content, 0, length);
outputStream.close();
fs.close();
} else {
createFile(hdfsPath, content);
}
}
}
/**
* 查看文件内容
* @param remotePath 文件路径
* @param encode 编码
* @return
*/
public static String readFile(String remotePath, String encode) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(new Path(HADOOP_URL).toUri(), config,HADOOP_USERNAME);
FSDataInputStream os = fs.open(new Path(remotePath));
return imputStramToString(os,encode);
}
/**
* 把输入流转为指定字符
* @param inputStream 输入流
* @param encode 编码类型
* @return
*/
private static String imputStramToString(FSDataInputStream inputStream, String encode) {
if(StringUtils.isBlank(encode)){
encode = "utf-8";
}
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream,encode));
StringBuilder builder = new StringBuilder();
String str = "";
while ((str=reader.readLine())!=null){
builder.append(str);
}
return builder.toString();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
try {
HdfsUtils.init("hdfs://127.0.0.1:9000","root");
String s = HdfsUtils.readFile("/tmp/zxl/test1/test.json", null);
System.out.println("==="+s);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("处理完成");
}
}