将文件上传至hdfs(SpringBoot)
将文件上传至hdfs(SpringBoot)
1、依赖
-
<!-- Hadoop依赖--> <!-- Hadoop Client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.3.1</version> </dependency>
2、配置文件
-
server: port: 8686 #HDFS配置 hdfs: path: hdfs://192.168.44.128:9000 user: hadoop
-
主函数
-
@SpringBootApplication public class TrafficMainCodeApplication { public static void main(String[] args) { System.setProperty("HADOOP_USER_NAME","hadoop"); SpringApplication.run(TrafficMainCodeApplication.class, args); } }
-
3、链接Hadoop工具类
-
package xyz.zzj.traffic_main_code.utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.security.UserGroupInformation; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class HadoopUtil { // 使用@Value注解读取application.yml文件中的hdfs.path属性值,并赋给path变量 @Value("${hdfs.path}") private String path; // 使用@Value注解读取application.yml文件中的hdfs.user属性值,并赋给username变量 @Value("${hdfs.user}") private String username; // 声明一个FileSystem类型的成员变量hdfs,用于存储HDFS文件系统的实例 private FileSystem hdfs; /** * 获取HDFS文件系统对象的方法 */ public synchronized FileSystem getFileSystem() throws Exception { // 判断hdfs是否为空,如果为空则进行初始化操作 if (hdfs == null) { // 创建一个新的Configuration对象conf,该对象用于配置Hadoop客户端的行为 Configuration conf = new Configuration(); // 设置默认文件系统URI为从配置文件读取的路径 conf.set("fs.defaultFS", path); // 设置使用DataNode主机名而不是IP地址来连接DataNode conf.set("dfs.client.use.datanode.hostname", "true"); // 禁用FileSystem缓存,确保每次获取的是新的FileSystem实例 conf.setBoolean("fs.hdfs.impl.disable.cache", true); // 根据用户名创建UserGroupInformation对象ugi,用于身份验证 UserGroupInformation ugi = UserGroupInformation.createRemoteUser(username); // 使用配置信息conf创建一个新的FileSystem实例,并赋给hdfs成员变量 hdfs = FileSystem.newInstance(conf); } // 返回已初始化的hdfs文件系统对象 return hdfs; } // 关闭HDFS文件系统的方法 public void closeFileSystem() throws IOException { // 判断hdfs是否非空,如果是,则关闭hdfs文件系统 if (hdfs != null) { hdfs.close(); } } }
4、controller类
-
package xyz.zzj.traffic_main_code.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; import xyz.zzj.traffic_main_code.service.FileService; import java.io.File; import java.io.IOException; import java.util.List; @RestController @RequestMapping("/api/file") public class FileController { @Autowired private FileService fileUploadService; /** * 处理文件上传请求的方法 * * @param file 上传的文件对象 * @return ResponseEntity<String> 包含上传结果的状态码和消息 */ @PostMapping("/upload") public ResponseEntity<String> handleFileUpload(@RequestParam("file") MultipartFile file) { // 检查上传的文件是否为空 if (!file.isEmpty()) { try { // 构建临时文件路径,将文件保存到指定目录 String tempPath = "F:\\traffic_code\\traffic_main\\traffic_main_code\\src\\main\\resources\\static\\" + file.getOriginalFilename(); file.transferTo(new File(tempPath)); // 构建HDFS上的目标路径,并调用FileService将文件上传到HDFS String hdfsPath = "/" + file.getOriginalFilename(); boolean success = fileUploadService.uploadFileToHdfs(tempPath, hdfsPath); // 根据上传结果返回相应的响应实体 if (success) { return ResponseEntity.ok("File uploaded successfully."); } else { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to upload file."); } } catch (IOException e) { // 捕获并打印异常信息,返回服务器错误状态码及消息 e.printStackTrace(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error occurred during file upload."); } } else { // 如果上传的文件为空,则返回错误状态码及提示信息 return ResponseEntity.badRequest().body("Please select a file to upload."); } } }
5、service类
-
package xyz.zzj.traffic_main_code.service; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import xyz.zzj.traffic_main_code.utils.HadoopUtil; import java.nio.file.Files; import java.nio.file.Paths; @Service public class FileService { @Autowired private HadoopUtil hadoopUtil; /** * 将本地文件上传到HDFS的方法 * * @param localFilePath 本地文件路径 * @param remoteFilePath HDFS上的目标文件路径 * @return boolean 表示上传是否成功 */ public boolean uploadFileToHdfs(String localFilePath, String remoteFilePath) { try (FileSystem fs = hadoopUtil.getFileSystem(); // 获取HDFS文件系统对象 FSDataOutputStream outputStream = fs.create(new Path(remoteFilePath))) { // 在HDFS上创建输出流 Files.copy(Paths.get(localFilePath), outputStream); // 将本地文件复制到HDFS return true; // 返回true表示上传成功 } catch (Exception e) { e.printStackTrace(); // 捕获并打印异常信息 return false; // 返回false表示上传失败 } } }