antdesignvue + AWS-S3实现Minio大文件分片上传
一、后端:
1.引入pom
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.263</version>
</dependency>
2.配置application.yml
jeecg:
minio:
minioUrl: http://localhost:9000
minioName: minioadmin
minioPass: Aa123456@admin
bucketName: exam-bucket
3.aws配置
package com.ynfy.config;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmazonS3Config {
@Value(value = "${jeecg.minio.minioUrl}")
private String minioUrl;
@Value(value = "${jeecg.minio.minioName}")
private String minioName;
@Value(value = "${jeecg.minio.minioPass}")
private String minioPass;
@Bean(name = "amazonS3Client")
public AmazonS3 amazonS3Client() {
//设置连接时的参数
ClientConfiguration config = new ClientConfiguration();
//设置连接方式为HTTP,可选参数为HTTP和HTTPS
config.setProtocol(Protocol.HTTP);
//设置网络访问超时时间
config.setConnectionTimeout(5000);
config.setUseExpectContinue(true);
AWSCredentials credentials = new BasicAWSCredentials(minioName, minioPass);
//设置Endpoint
AwsClientBuilder.EndpointConfiguration endPoint = new AwsClientBuilder.EndpointConfiguration(minioUrl, Regions.US_EAST_1.name());
AmazonS3 amazonS3 = AmazonS3ClientBuilder.standard()
.withClientConfiguration(config)
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.withEndpointConfiguration(endPoint)
.withPathStyleAccessEnabled(true).build();
return amazonS3;
}
}
4.新建数据库表aws_s3_upload
用于记录上传的文件信息:md5值,对象key等信息。
5.接口IAwsS3UploadService:
package com.ynfy.buss.awss3upload.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.ynfy.buss.awss3upload.entity.AwsS3Upload;
import com.ynfy.buss.awss3upload.entity.TaskParam;
import com.ynfy.buss.awss3upload.entity.dto.TaskInfoDTO;
import java.util.Map;
/**
* @Description: AWS.S3 大文件分片上传
* @Author: jeecg-boot
* @Date: 2024-10-31
* @Version: V1.0
*/
public interface IAwsS3UploadService extends IService<AwsS3Upload> {
/**
* 根据md5标识获取分片上传任务
*
* @param identifier
* @return
*/
AwsS3Upload getByIdentifier(String identifier);
/**
* 初始化一个任务
*/
TaskInfoDTO initTask(TaskParam param);
/**
* 获取上传进度
*
* @param identifier
* @return
*/
TaskInfoDTO getTaskInfo(String identifier);
/**
* 生成预签名上传url
*
* @param bucket 桶名
* @param objectKey 对象的key
* @param params 额外的参数
* @return
*/
String genPreSignUploadUrl(String bucket, String objectKey, Map<String, String> params);
/**
* 合并分片
*
* @param identifier
*/
void merge(String identifier);
}
6.接口实现类AwsS3UploadServiceImpl:
package com.ynfy.buss.awss3upload.service.impl;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.amazonaws.HttpMethod;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.*;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ynfy.buss.awss3upload.constant.MinioConstant;
import com.ynfy.buss.awss3upload.entity.AwsS3Upload;
import com.ynfy.buss.awss3upload.entity.TaskParam;
import com.ynfy.buss.awss3upload.entity.dto.TaskInfoDTO;
import com.ynfy.buss.awss3upload.entity.dto.TaskRecordDTO;
import com.ynfy.buss.awss3upload.mapper.AwsS3UploadMapper;
import com.ynfy.buss.awss3upload.service.IAwsS3UploadService;
import org.jeecg.common.exception.JeecgBootException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.http.MediaTypeFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.net.URL;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @Description: AWS.S3 大文件分片上传
* @Author: jeecg-boot
* @Date: 2024-10-31
* @Version: V1.0
*/
@Service
public class AwsS3UploadServiceImpl extends ServiceImpl<AwsS3UploadMapper, AwsS3Upload> implements IAwsS3UploadService {
@Value(value = "${jeecg.minio.minioUrl}")
private String minioUrl;
@Value(value = "${jeecg.minio.minioName}")
private String minioName;
@Value(value = "${jeecg.minio.minioPass}")
private String minioPass;
@Value(value = "${jeecg.minio.bucketName}")
private String bucketName;
@Resource
private AmazonS3 amazonS3;
/**
* 根据md5标识获取分片上传任务
*
* @param identifier
* @return
*/
@Override
public AwsS3Upload getByIdentifier(String identifier) {
return this.getOne(new QueryWrapper<AwsS3Upload>().lambda().eq(AwsS3Upload::getFileIdentifier, identifier));
}
/**
* 初始化一个任务
*/
@Override
public TaskInfoDTO initTask(TaskParam param) {
Date currentDate = new Date();
String fileName = param.getFileName();
String suffix = fileName.substring(fileName.lastIndexOf(".") + 1, fileName.length());
String key = StrUtil.format("{}/{}.{}", DateUtil.format(currentDate, "YYYY/MM/dd"),
fileName.substring(0, fileName.lastIndexOf(".")) + "_" + System.currentTimeMillis(), suffix);
String contentType = MediaTypeFactory.getMediaType(key).orElse(MediaType.APPLICATION_OCTET_STREAM).toString();
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentType(contentType);
InitiateMultipartUploadResult initiateMultipartUploadResult = amazonS3
.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucketName, key).withObjectMetadata(objectMetadata));
String uploadId = initiateMultipartUploadResult.getUploadId();
AwsS3Upload task = new AwsS3Upload();
int chunkNum = (int) Math.ceil(param.getTotalSize() * 1.0 / param.getChunkSize());
task.setBucketName(bucketName);
task.setChunkNum(chunkNum);
task.setChunkSize(param.getChunkSize());
task.setTotalSize(param.getTotalSize());
task.setFileIdentifier(param.getIdentifier());
task.setFileName(param.getFileName());
task.setObjectKey(key);
task.setUploadId(uploadId);
save(task);
return new TaskInfoDTO().setFinished(false).setTaskRecord(TaskRecordDTO.convertFromEntity(task)).setPath(getPath(bucketName, key));
}
public String getPath(String bucket, String objectKey) {
return StrUtil.format("{}/{}/{}", minioUrl, bucket, objectKey);
}
/**
* 获取上传进度
*
* @param identifier
* @return
*/
@Override
public TaskInfoDTO getTaskInfo(String identifier) {
AwsS3Upload task = getByIdentifier(identifier);
if (Objects.isNull(task)) {
return null;
}
TaskInfoDTO result = new TaskInfoDTO().setFinished(true).setTaskRecord(TaskRecordDTO.convertFromEntity(task)).setPath(getPath(task.getBucketName(), task.getObjectKey()));
boolean doesObjectExist = amazonS3.doesObjectExist(task.getBucketName(), task.getObjectKey());
if (!doesObjectExist) {
// 未上传完,返回已上传的分片
ListPartsRequest listPartsRequest = new ListPartsRequest(task.getBucketName(), task.getObjectKey(), task.getUploadId());
PartListing partListing = amazonS3.listParts(listPartsRequest);
result.setFinished(false).getTaskRecord().setExitPartList(partListing.getParts());
}
return result;
}
@Override
public String genPreSignUploadUrl(String bucket, String objectKey, Map<String, String> params) {
Date currentDate = new Date();
Date expireDate = DateUtil.offsetMillisecond(currentDate, MinioConstant.PRE_SIGN_URL_EXPIRE.intValue());
GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucket, objectKey)
.withExpiration(expireDate).withMethod(HttpMethod.PUT);
if (!Objects.isNull(params)) {
params.forEach((key, val) -> request.addRequestParameter(key, val));
}
URL preSignedUrl = amazonS3.generatePresignedUrl(request);
return preSignedUrl.toString();
}
@Override
public void merge(String identifier) {
AwsS3Upload task = getByIdentifier(identifier);
if (Objects.isNull(task)) {
throw new JeecgBootException("分片任务不存");
}
ListPartsRequest listPartsRequest = new ListPartsRequest(task.getBucketName(), task.getObjectKey(), task.getUploadId());
PartListing partListing = amazonS3.listParts(listPartsRequest);
List<PartSummary> parts = partListing.getParts();
if (!task.getChunkNum().equals(parts.size())) {
// 已上传分块数量与记录中的数量不对应,不能合并分块
throw new JeecgBootException("分片缺失,请重新上传");
}
CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest()
.withUploadId(task.getUploadId())
.withKey(task.getObjectKey())
.withBucketName(task.getBucketName())
.withPartETags(parts.stream().map(partSummary -> new PartETag(partSummary.getPartNumber(), partSummary.getETag())).collect(Collectors.toList()));
CompleteMultipartUploadResult result = amazonS3.completeMultipartUpload(completeMultipartUploadRequest);
}
}
7.接口controller类AwsS3UploadController:
package com.ynfy.buss.awss3upload.controller;
import com.ynfy.buss.awss3upload.entity.AwsS3Upload;
import com.ynfy.buss.awss3upload.entity.TaskParam;
import com.ynfy.buss.awss3upload.entity.dto.TaskInfoDTO;
import com.ynfy.buss.awss3upload.service.IAwsS3UploadService;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.system.base.controller.JeecgController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* @Description: AWS.S3 大文件分片上传
* @Author: jeecg-boot
* @Date: 2024-10-31
* @Version: V1.0
*/
@Api(tags = "AWS.S3 大文件分片上传")
@RestController
@RequestMapping("/awsS3Upload")
@Slf4j
public class AwsS3UploadController extends JeecgController<AwsS3Upload, IAwsS3UploadService> {
@Autowired
private IAwsS3UploadService awsS3UploadService;
/**
* 获取上传进度
*
* @param identifier 文件md5
* @return
*/
@GetMapping("/task/{identifier}")
public Result<TaskInfoDTO> taskInfo(@PathVariable("identifier") String identifier) {
return Result.ok(awsS3UploadService.getTaskInfo(identifier));
}
/**
* 创建一个上传任务
*
* @return
*/
@PostMapping(value = "/task/init")
public Result<TaskInfoDTO> initTask(@Valid @RequestBody TaskParam param, BindingResult bindingResult) {
if (bindingResult.hasErrors()) {
return Result.error(bindingResult.getFieldError().getDefaultMessage());
}
return Result.OK(awsS3UploadService.initTask(param));
}
/**
* 获取每个分片的预签名上传地址
*
* @param identifier
* @param partNumber
* @return
*/
@GetMapping("/task/{identifier}/{partNumber}")
public Result<?> preSignUploadUrl(@PathVariable("identifier") String identifier, @PathVariable("partNumber") Integer partNumber) {
AwsS3Upload task = awsS3UploadService.getByIdentifier(identifier);
if (Objects.isNull(task)) {
return Result.error("分片任务不存在");
}
Map<String, String> params = new HashMap<>();
params.put("partNumber", partNumber.toString());
params.put("uploadId", task.getUploadId());
return Result.OK("", awsS3UploadService.genPreSignUploadUrl(task.getBucketName(), task.getObjectKey(), params));
}
/**
* 合并分片
*
* @param identifier
* @return
*/
@PostMapping("/task/merge/{identifier}")
public Result<?> merge(@PathVariable("identifier") String identifier) {
awsS3UploadService.merge(identifier);
return Result.OK();
}
}
入参类TaskParam:
package com.ynfy.buss.awss3upload.entity;
import lombok.Data;
import lombok.ToString;
import lombok.experimental.Accessors;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
@Data
@ToString
@Accessors(chain = true)
public class TaskParam {
/**
* 文件唯一标识(MD5)
*/
@NotBlank(message = "文件标识不能为空")
private String identifier;
/**
* 文件大小(byte)
*/
@NotNull(message = "文件大小不能为空")
private Long totalSize;
/**
* 分片大小(byte)
*/
@NotNull(message = "分片大小不能为空")
private Long chunkSize;
/**
* 文件名称
*/
@NotBlank(message = "文件名称不能为空")
private String fileName;
}
dto
package com.ynfy.buss.awss3upload.entity.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.Accessors;
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class TaskInfoDTO {
/**
* 是否完成上传(是否已经合并分片)
*/
private boolean finished;
/**
* 文件地址
*/
private String path;
/**
* 上传记录
*/
private TaskRecordDTO taskRecord;
}
package com.ynfy.buss.awss3upload.entity.dto;
import cn.hutool.core.bean.BeanUtil;
import com.amazonaws.services.s3.model.PartSummary;
import com.ynfy.buss.awss3upload.entity.AwsS3Upload;
import lombok.Data;
import lombok.ToString;
import lombok.experimental.Accessors;
import java.util.List;
@Data
@ToString
@Accessors(chain = true)
public class TaskRecordDTO extends AwsS3Upload {
/**
* 已上传完的分片
*/
private List<PartSummary> exitPartList;
public static TaskRecordDTO convertFromEntity(AwsS3Upload task) {
TaskRecordDTO dto = new TaskRecordDTO();
BeanUtil.copyProperties(task, dto);
return dto;
}
}
常量:
package com.ynfy.buss.awss3upload.constant;
public interface MinioConstant {
// 分块大小
int DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024;
// 预签名url过期时间(ms)
Long PRE_SIGN_URL_EXPIRE = 60 * 10 * 1000L;
}
二、前端
1.上传组件封装 AwsS3Upload.vue
<template>
<a-upload
:maxCount="1"
:fileList="fileList"
:progress="progress"
:custom-request="handleHttpRequest"
accept="video/*"
@change="handleChange"
:on-remove="handleRemoveFile">
<a-button :disabled="fileList.length == 1">
<upload-outlined></upload-outlined>
上传
</a-button>
</a-upload>
</template>
<script lang="ts" setup>
import { UploadOutlined } from "@ant-design/icons-vue";
import md5 from "/@/utils/lib/md5";
import Queue from "promise-queue-plus";
import { ref, watch } from "vue";
import { useMessage } from "@/hooks/web/useMessage";
import { initTask, merge, preSignUrl, taskInfo } from "/@/api/awss3upload/awss3";
import axios from "axios";
import { getFileAccessHttpUrl } from "@/utils/common/compUtils";
import type { UploadProps } from "ant-design-vue";
const { createMessage } = useMessage();
const emit = defineEmits(["update:modelValue"]);
const fileList = ref<any>([]);
const props = defineProps({
modelValue: String
});
watch(
() => props.modelValue,
(val) => {
parsePathsValue(val);
},
{ immediate: true }
);
// 解析数据库存储的逗号分割
function parsePathsValue(paths) {
if (!paths || paths.length == 0) {
fileList.value = [];
return;
}
let list: any[] = [];
for (const item of paths.split(",")) {
let url = getFileAccessHttpUrl(item);
list.push({
uid: uidGenerator(),
name: getFileName(item),
status: "done",
url: url,
response: { status: "history", message: item }
});
}
fileList.value = list;
}
function getFileName(path) {
if (path.lastIndexOf("\\") >= 0) {
let reg = new RegExp("\\\\", "g");
path = path.replace(reg, "/");
}
return path.substring(path.lastIndexOf("/") + 1);
}
function uidGenerator() {
return "-" + parseInt(Math.random() * 10000 + 1, 10);
}
const progress: UploadProps["progress"] = {
strokeColor: {
"0%": "#108ee9",
"100%": "#87d068"
},
style: {
margin: "15px 0"
},
strokeWidth: 3,
format: percent => `${percent}%`
};
// 文件上传分块任务的队列(用于移除文件时,停止该文件的上传队列) key:fileUid value: queue object
const fileUploadChunkQueue = ref({}).value;
/**
* 获取一个上传任务,没有则初始化一个
*/
const getTaskInfo = async (file) => {
let task;
const identifier = await md5(file);
task = await taskInfo(identifier);
if (!task) {
const initTaskData = {
identifier,
fileName: file.name,
totalSize: file.size,
chunkSize: 5 * 1024 * 1024
};
task = await initTask(initTaskData);
}
return task;
};
/**
* 上传逻辑处理,如果文件已经上传完成(完成分块合并操作),则不会进入到此方法中
*/
const handleUpload = (file, taskRecord, options) => {
let lastUploadedSize = 0; // 上次断点续传时上传的总大小
let uploadedSize = 0; // 已上传的大小
const totalSize = file.size || 0; // 文件总大小
let startMs = new Date().getTime(); // 开始上传的时间
const { exitPartList, chunkSize, chunkNum, fileIdentifier } = taskRecord;
// 获取从开始上传到现在的平均速度(byte/s)
const getSpeed = () => {
// 已上传的总大小 - 上次上传的总大小(断点续传)= 本次上传的总大小(byte)
const intervalSize = uploadedSize - lastUploadedSize;
const nowMs = new Date().getTime();
// 时间间隔(s)
const intervalTime = (nowMs - startMs) / 1000;
return intervalSize / intervalTime;
};
const uploadNext = async (partNumber: number) => {
const start = chunkSize * (partNumber - 1);
const end = start + chunkSize;
const blob = file.slice(start, end);
const preRes = await preSignUrl({
identifier: fileIdentifier,
partNumber: partNumber
});
if (preRes) {
await axios.request({
url: preRes,
method: "PUT",
data: blob,
headers: { "Content-Type": "application/octet-stream" }
});
return Promise.resolve({ partNumber: partNumber, uploadedSize: blob.size });
}
return Promise.reject(`分片${partNumber}, 获取上传地址失败`);
};
/**
* 更新上传进度
* @param increment 为已上传的进度增加的字节量
*/
const updateProcess = (increment: number) => {
const { onProgress } = options;
let factor = 1000; // 每次增加1000 byte
let from = 0;
// 通过循环一点一点的增加进度
while (from <= increment) {
from += factor;
uploadedSize += factor;
const percent = Math.round(uploadedSize / totalSize * 100).toFixed(2);
onProgress({ percent: percent });
}
const speed = getSpeed();
const remainingTime = speed != 0 ? Math.ceil((totalSize - uploadedSize) / speed) + "s" : "未知";
console.log("剩余大小:", (totalSize - uploadedSize) / 1024 / 1024, "mb");
console.log("当前速度:", (speed / 1024 / 1024).toFixed(2), "mbps");
console.log("预计完成:", remainingTime);
};
return new Promise(resolve => {
const failArr: any = [];
const queue = Queue(5, {
"retry": 3, //Number of retries
"retryIsJump": false, //retry now?
"workReject": function(reason, queue) {
failArr.push(reason);
},
"queueEnd": function(queue) {
resolve(failArr);
}
});
fileUploadChunkQueue[file.uid] = queue;
for (let partNumber = 1; partNumber <= chunkNum; partNumber++) {
const exitPart = (exitPartList || []).find(exitPart => exitPart.partNumber == partNumber);
if (exitPart) {
// 分片已上传完成,累计到上传完成的总额中,同时记录一下上次断点上传的大小,用于计算上传速度
lastUploadedSize += exitPart.size;
updateProcess(exitPart.size);
} else {
queue.push(() => uploadNext(partNumber).then(res => {
// 单片文件上传完成再更新上传进度
updateProcess(res.uploadedSize);
}));
}
}
if (queue.getLength() == 0) {
// 所有分片都上传完,但未合并,直接return出去,进行合并操作
resolve(failArr);
return;
}
queue.start();
});
};
/**
* el-upload 自定义上传方法入口
*/
const handleHttpRequest = async (options) => {
const { onSuccess } = options;
const file = options.file;
const task = await getTaskInfo(file);
if (task) {
const { finished, taskRecord } = task;
const { fileIdentifier: identifier } = taskRecord;
if (finished) {
emit("update:modelValue", taskRecord.objectKey);
onSuccess(null, file);
} else {
const errorList: any = await handleUpload(file, taskRecord, options);
if (errorList.length > 0) {
createMessage.error("部分分片上次失败,请尝试重新上传文件");
return;
}
await merge(identifier);
emit("update:modelValue", taskRecord.objectKey);
onSuccess(errorList, file);
}
} else {
createMessage.error("文件上传错误");
}
};
/**
* 移除文件列表中的文件
* 如果文件存在上传队列任务对象,则停止该队列的任务
*/
const handleRemoveFile = (uploadFile, uploadFiles) => {
fileList.value = fileList.value.filter(item => item.uid != uploadFile.uid);
const queueObject = fileUploadChunkQueue[uploadFile.uid];
if (queueObject) {
queueObject.stop();
fileUploadChunkQueue[uploadFile.uid] = undefined;
}
emit("update:modelValue", "");
};
const handleChange = (info) => {
fileList.value = info.fileList;
};
</script>
<style scoped>
</style>
2.接口 awss3.ts
import { defHttp } from "/@/utils/http/axios";
/**
* 根据文件的md5获取未上传完的任务
* @param identifier 文件md5
*/
export const taskInfo = (identifier) => {
return defHttp.get({ url: `/awsS3Upload/task/${identifier}` });
};
/**
* 初始化一个分片上传任务
* @param identifier 文件md5
* @param fileName 文件名称
* @param totalSize 文件大小
* @param chunkSize 分块大小
*/
export const initTask = (params) => {
return defHttp.post({ url: "/awsS3Upload/task/init", params });
};
/**
* 获取预签名分片上传地址
* @param identifier 文件md5
* @param partNumber 分片编号
*/
export const preSignUrl = ({ identifier, partNumber }) => {
return defHttp.get({ url: `/awsS3Upload/task/${identifier}/${partNumber}` });
};
/**
* 合并分片
* @param identifier
*/
export const merge = (identifier) => {
return defHttp.post({ url: `/awsS3Upload/task/merge/${identifier}` });
};
3.使用
<AwsS3Upload v-model:modelValue="form.videoId"
@update:modelValue="(value)=>{ form.videoId = value }" />