[学成在线]06-视频分片上传
上传视频
需求分析
- 教学机构人员进入媒资管理列表查询自己上传的媒资文件。
点击“媒资管理”
进入媒资管理列表页面查询本机构上传的媒资文件。
- 教育机构用户在"媒资管理"页面中点击 "上传视频" 按钮。
点击“上传视频”打开上传页面
- 选择要上传的文件,自动执行文件上传。
- 视频上传成功会自动处理,处理完成可以预览视频。
断点续传
概念介绍
需求背景
通常视频文件都比较大,所以对于媒资系统上传文件的需求要满足大文件的上传要求。http协议本身对上传文件大小没有限制,但是客户的网络环境质量、电脑硬件环境等参差不齐,如果一个大文件快上传完了网断了没有上传完成,需要客户重新上传,用户体验非常差,所以对于大文件上传的要求最基本的是断点续传。
什么是断点续传
引用百度百科:断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压缩包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载未完成的部分,而没有必要从头开始上传下载,断点续传可以提高节省操作时间,提高用户体验性。
断点续传流程如下图:
流程如下:
1、前端上传前先把文件分成块
2、一块一块的上传,上传中断后重新上传,已上传的分块则不用再上传
3、各分块上传完成最后在服务端合并文件
分块与合并原理
为了更好的理解文件分块上传的原理,下边用java代码测试文件的分块与合并。
文件分块的流程如下:
1、获取源文件长度
2、根据设定的分块文件的大小计算出块数
3、从源文件读数据依次向每一个块文件写数据。
package com.xuecheng.media;
/**
* @author Mr.M
* @version 1.0
* @description 大文件处理测试
* @date 2022/9/13 9:21
*/
public class BigFileTest {
//测试文件分块方法
@Test
public void testChunk() throws IOException {
File sourceFile = new File("C:\\Users\\Lenovo\\Desktop\\学成在线项目—视频\\day06\\Day6-01.上传视频-什么是断点续传.mp4"); // 原始文件
String chunkPath = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\"; // 分块文件存储目录
File chunkFolder = new File(chunkPath);
if (!chunkFolder.exists()) {
chunkFolder.mkdirs();
}
//分块大小
long chunkSize = 1024 * 1024 * 1;
//分块数量
long chunkNum = (long) Math.ceil(sourceFile.length() * 1.0 / chunkSize);
System.out.println("分块总数:"+chunkNum);
//缓冲区大小
byte[] b = new byte[1024];
//使用RandomAccessFile访问文件(变化流), "r"表示读取流, "rw"表示写入流
RandomAccessFile raf_read = new RandomAccessFile(sourceFile, "r");
//分块
for (int i = 0; i < chunkNum; i++) {
//创建分块文件
File file = new File(chunkPath + i);
if(file.exists()){
file.delete();
}
boolean newFile = file.createNewFile();
if (newFile) {
//向分块文件中写数据
RandomAccessFile raf_write = new RandomAccessFile(file, "rw");
int len = -1;
while ((len = raf_read.read(b)) != -1) {
raf_write.write(b, 0, len);
if (file.length() >= chunkSize) {
break;
}
}
raf_write.close();
System.out.println("完成分块"+i);
}
}
raf_read.close();
}
}
执行结果: 目标文件被分片的储存在文件夹中
]文件合并流程:
1、找到要合并的文件并按文件合并的先后进行排序。
2、创建合并文件
3、依次从合并的文件中读取数据向合并文件写入数
package com.xuecheng.media;
/**
* @author Mr.M
* @version 1.0
* @description 大文件处理测试
* @date 2022/9/13 9:21
*/
public class BigFileTest {
//测试文件合并方法
@Test
public void testMerge() throws IOException {
//块文件目录
File chunkFolder = new File("D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\");
//原始文件
File originalFile = new File("C:\\Users\\Lenovo\\Desktop\\学成在线项目—视频\\day06\\Day6-01.上传视频-什么是断点续传.mp4");
//合并文件
File mergeFile = new File("D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\Day6-01.上传视频-什么是断点续传.mp4");
if (mergeFile.exists()) {
mergeFile.delete();
}
//创建新的合并文件
mergeFile.createNewFile();
//用于写文件
RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");
//指针指向文件顶端
raf_write.seek(0);
//缓冲区
byte[] b = new byte[1024];
//分块列表
File[] fileArray = chunkFolder.listFiles();
// 转成集合,便于排序
List<File> fileList = Arrays.asList(fileArray);
// 从小到大排序
Collections.sort(fileList, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
return Integer.parseInt(o1.getName()) - Integer.parseInt(o2.getName());
}
});
//开始合并文件
for (File chunkFile : fileList) {
RandomAccessFile raf_read = new RandomAccessFile(chunkFile, "rw");
int len = -1;
while ((len = raf_read.read(b)) != -1) {
raf_write.write(b, 0, len);
}
raf_read.close();
}
raf_write.close();
//校验文件
try (
FileInputStream fileInputStream = new FileInputStream(originalFile);
FileInputStream mergeFileStream = new FileInputStream(mergeFile);
) {
//取出原始文件的md5
String originalMd5 = DigestUtils.md5Hex(fileInputStream);
//取出合并文件的md5进行比较
String mergeFileMd5 = DigestUtils.md5Hex(mergeFileStream);
if (originalMd5.equals(mergeFileMd5)) {
System.out.println("合并文件成功");
} else {
System.out.println("合并文件失败");
}
}
}
}
执行结果: 分片文件被合并为正常文件
视频上传流程
下图是上传视频的整体流程:
1、前端对文件进行分块。
2、前端上传分块文件前请求媒资服务检查文件是否存在,如果已经存在则不再上传。
3、如果分块文件不存在则前端开始上传
4、前端请求媒资服务上传分块。
5、媒资服务将分块上传至MinIO。
6、前端将分块上传完毕请求媒资服务合并分块。
7、媒资服务判断分块上传完成则请求MinIO合并文件。
8、合并完成校验合并后的文件是否完整,如果完整则上传完成,否则删除文件。
minio合并文件测试
1、将分块文件上传至minio, minio限制每个分片文件不小于5M
/**
* @description 测试MinIO
*/
public class MinioTest {
static MinioClient minioClient =
MinioClient.builder()
.endpoint("http://192.168.101.65:9000")
.credentials("minioadmin", "minioadmin")
.build();
// 将分块文件上传至minio
@Test
public void uploadChunk() {
String chunkFolderPath = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\";
File chunkFolder = new File(chunkFolderPath);
//分块文件
File[] files = chunkFolder.listFiles();
//将分块文件上传至minio
for (int i = 0; i < files.length; i++) {
try {
UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder().bucket("testbucket").object("chunk/" + i).filename(files[i].getAbsolutePath()).build();
minioClient.uploadObject(uploadObjectArgs);
System.out.println("上传分块成功" + i);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
2、通过minio的合并文件
/**
* @description 测试MinIO
*/
public class MinioTest {
static MinioClient minioClient =
MinioClient.builder()
.endpoint("http://192.168.101.65:9000")
.credentials("minioadmin", "minioadmin")
.build();
//合并文件,要求分块文件最小5M
@Test
public void test_merge() throws Exception {
// 分块文件集合(传统方式)
// List<ComposeSource> sources = new ArrayList<>();
// for (int i = 0; i < 10; i++) {
// // 构建文件信息
// ComposeSource composeSource = ComposeSource.builder().bucket("testbucket").object("chunk/".concat(Integer.toString(i))).build();
// sources.add(composeSource);
// }
// 分块文件集合(steam流)
List<ComposeSource> sources = Stream.iterate(0, i -> ++i)
.limit(10)
.map(i -> ComposeSource.builder()
.bucket("testbucket")
.object("chunk/".concat(Integer.toString(i)))
.build())
.collect(Collectors.toList());
// 指定合并后的文件名
// 通过sources指定源文件
ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder().bucket("testbucket").object("01.上传视频-什么是断点续传.mp4").sources(sources).build();
// 合并文件
minioClient.composeObject(composeObjectArgs);
}
}
3、分块文件使用后就没用了, 清除分块文件
/**
* @description 测试MinIO
*/
public class MinioTest {
static MinioClient minioClient =
MinioClient.builder()
.endpoint("http://192.168.101.65:9000")
.credentials("minioadmin", "minioadmin")
.build();
//清除分块文件
@Test
public void test_removeObjects() {
//合并分块完成将分块文件清除
List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i)
.limit(10)
.map(i -> new DeleteObject("chunk/".concat(Integer.toString(i))))
.collect(Collectors.toList());
//构建参数
RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("testbucket").objects(deleteObjects).build();
//执行删除
Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);
results.forEach(r -> {
DeleteError deleteError = null;
try {
deleteError = r.get();
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
接口定义
根据上传视频流程,定义接口,与前端的约定是操作成功返回{code:0}否则返回{code:-1}
从课程资料中拷贝RestResponse.java类到base工程下的model包下。
/**
* @author Mr.M
* @version 1.0
* @description 通用结果类型
* @date 2022/9/13 14:44
*/
@Data
@ToString
public class RestResponse<T> {
/**
* 响应编码,0为正常,-1错误
*/
private int code;
/**
* 响应提示信息
*/
private String msg;
/**
* 响应内容
*/
private T result;
public RestResponse() {
this(0, "success");
}
public RestResponse(int code, String msg) {
this.code = code;
this.msg = msg;
}
/**
* 错误信息的封装
*
* @param msg
* @param <T>
* @return
*/
public static <T> RestResponse<T> validfail(String msg) {
RestResponse<T> response = new RestResponse<T>();
response.setCode(-1);
response.setMsg(msg);
return response;
}
public static <T> RestResponse<T> validfail(T result, String msg) {
RestResponse<T> response = new RestResponse<T>();
response.setCode(-1);
response.setResult(result);
response.setMsg(msg);
return response;
}
/**
* 添加正常响应数据(包含响应内容)
*
* @return RestResponse Rest服务封装相应数据
*/
public static <T> RestResponse<T> success(T result) {
RestResponse<T> response = new RestResponse<T>();
response.setResult(result);
return response;
}
public static <T> RestResponse<T> success(T result, String msg) {
RestResponse<T> response = new RestResponse<T>();
response.setResult(result);
response.setMsg(msg);
return response;
}
/**
* 添加正常响应数据(不包含响应内容)
*
* @return RestResponse Rest服务封装相应数据
*/
public static <T> RestResponse<T> success() {
return new RestResponse<T>();
}
public Boolean isSuccessful() {
return this.code == 0;
}
}
定义接口如下:
/**
* @author Mr.M
* @version 1.0
* @description 大文件上传接口
* @date 2022/9/6 11:29
*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {
@ApiOperation(value = "文件上传前检查文件")
@PostMapping("/upload/checkfile")
public RestResponse<Boolean> checkfile(
@RequestParam("fileMd5") String fileMd5
) throws Exception {
return null;
}
@ApiOperation(value = "分块文件上传前的检测")
@PostMapping("/upload/checkchunk")
public RestResponse<Boolean> checkchunk(
@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk) throws Exception {
return null;
}
@ApiOperation(value = "上传分块文件")
@PostMapping("/upload/uploadchunk")
public RestResponse uploadchunk(
@RequestParam("file") MultipartFile file,
@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk) throws Exception {
return null;
}
@ApiOperation(value = "合并文件")
@PostMapping("/upload/mergechunks")
public RestResponse mergechunks(
@RequestParam("fileMd5") String fileMd5,
@RequestParam("fileName") String fileName,
@RequestParam("chunkTotal") int chunkTotal) throws Exception {
return null;
}
}
service开发
校验方法
首先实现检查文件方法和检查分块方法, 在MediaFileService中定义service接口如下
/**
* @author Mr.M
* @version 1.0
* @description 媒资文件管理业务类
* @date 2022/9/10 8:55
*/
public interface MediaFileService {
/**
* @description 检查文件是否存在
* @param fileMd5 文件的md5
* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean> false不存在,true存在
* @author Mr.M
* @date 2022/9/13 15:38
*/
public RestResponse<Boolean> checkFile(String fileMd5);
/**
* @description 检查分块是否存在
* @param fileMd5 文件的md5
* @param chunkIndex 分块序号
* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean> false不存在,true存在
* @author Mr.M
* @date 2022/9/13 15:39
*/
public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex);
}
service接口实现方法
package com.xuecheng.media.api;
import com.xuecheng.base.model.RestResponse;
import com.xuecheng.media.mapper.MediaFilesMapper;
import com.xuecheng.media.service.MediaFileService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
/**
* @author Mr.M
* @version 1.0
* @description 大文件上传接口
* @date 2022/9/6 11:29
*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {
@Autowired
MediaFileService mediaFileService;
@ApiOperation(value = "文件上传前检查文件")
@PostMapping("/upload/checkfile")
public RestResponse<Boolean> checkfile(
@RequestParam("fileMd5") String fileMd5
) throws Exception {
return mediaFileService.checkFile(fileMd5);
}
@ApiOperation(value = "分块文件上传前的检测")
@PostMapping("/upload/checkchunk")
public RestResponse<Boolean> checkchunk(
@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk) throws Exception {
RestResponse<Boolean> booleanRestResponse = mediaFileService.checkChunk(fileMd5, chunk);
return booleanRestResponse;
}
@ApiOperation(value = "上传分块文件")
@PostMapping("/upload/uploadchunk")
public RestResponse uploadchunk(
@RequestParam("file") MultipartFile file,
@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk) throws Exception {
return null;
}
@ApiOperation(value = "合并文件")
@PostMapping("/upload/mergechunks")
public RestResponse mergechunks(
@RequestParam("fileMd5") String fileMd5,
@RequestParam("fileName") String fileName,
@RequestParam("chunkTotal") int chunkTotal) throws Exception {
return null;
}
}
在接口中调用service提供的检查文件方法和检查分块方法
/**
* @author Mr.M
* @version 1.0
* @description 大文件上传接口
* @date 2022/9/6 11:29
*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {
@Autowired
MediaFileService mediaFileService;
@ApiOperation(value = "文件上传前检查文件")
@PostMapping("/upload/checkfile")
public RestResponse<Boolean> checkfile(
@RequestParam("fileMd5") String fileMd5
) throws Exception {
return mediaFileService.checkFile(fileMd5);
}
@ApiOperation(value = "分块文件上传前的检测")
@PostMapping("/upload/checkchunk")
public RestResponse<Boolean> checkchunk(
@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk) throws Exception {
RestResponse<Boolean> booleanRestResponse = mediaFileService.checkChunk(fileMd5, chunk);
return booleanRestResponse;
}
}
上传方法
定义service接口
/**
* @author Mr.M
* @version 1.0
* @description 媒资文件管理业务类
* @date 2022/9/10 8:55
*/
public interface MediaFileService {
/**
* @description 上传分块
* @param fileMd5 文件md5
* @param chunk 分块序号
* @param localChunkFilePath 分块文件本地路径
* @return com.xuecheng.base.model.RestResponse
* @author Mr.M
* @date 2022/9/13 15:50
*/
public RestResponse uploadChunk(String fileMd5,int chunk,String localChunkFilePath);
}
接口实现:
/**
* @author Mr.M
* @version 1.0
* @description TODO
* @date 2022/9/10 8:58
*/
@Service
@Slf4j
public class MediaFileServiceImpl implements MediaFileService {
@Autowired
MediaFilesMapper mediaFilesMapper;
@Autowired
MinioClient minioClient;
//普通文件桶
@Value("${minio.bucket.files}")
private String bucket_mediafiles;
//视频文件桶
@Value("${minio.bucket.videofiles}")
private String bucket_video;
/**
* @param localFilePath 文件地址
* @param bucket 桶
* @param objectName 对象名称
* @return void
* @description 将文件写入minIO
* @author Mr.M
* @date 2022/10/12 21:22
*/
public boolean addMediaFilesToMinIO(String localFilePath, String mimeType, String bucket, String objectName) {
try {
// 构建文件参数
UploadObjectArgs testbucket = UploadObjectArgs.builder()
.bucket(bucket)
.object(objectName)
.filename(localFilePath)
.contentType(mimeType)
.build();
// 执行上传操作
minioClient.uploadObject(testbucket);
log.debug("上传文件到minio成功,bucket:{},objectName:{}", bucket, objectName);
System.out.println("上传成功");
return true;
} catch (Exception e) {
e.printStackTrace();
log.error("上传文件到minio出错,bucket:{},objectName:{},错误原因:{}", bucket, objectName, e.getMessage(), e);
XueChengPlusException.cast("上传文件到文件系统失败");
}
return false;
}
// 根据文件扩展名取出mimeType
private String getMimeType(String extension) {
if (extension == null)
extension = "";
//根据扩展名取出mimeType
ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);
//通用mimeType,字节流
String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;
if (extensionMatch != null) {
mimeType = extensionMatch.getMimeType();
}
return mimeType;
}
//得到分块文件的目录
private String getChunkFileFolderPath(String fileMd5) {
return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";
}
@Override
public RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {
//得到分块文件的目录路径
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
//得到分块文件的路径
String chunkFilePath = chunkFileFolderPath + chunk;
//mimeType
String mimeType = getMimeType(null);
//将文件存储至minIO
boolean b = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucket_video, chunkFilePath);
if (!b) {
log.debug("上传分块文件失败:{}", chunkFilePath);
return RestResponse.validfail(false, "上传分块失败");
}
log.debug("上传分块文件成功:{}",chunkFilePath);
return RestResponse.success(true);
}
}
完善接口
/**
* @author Mr.M
* @version 1.0
* @description 大文件上传接口
* @date 2022/9/6 11:29
*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {
@Autowired
MediaFileService mediaFileService;
@ApiOperation(value = "上传分块文件")
@PostMapping("/upload/uploadchunk")
public RestResponse uploadchunk(
@RequestParam("file") MultipartFile file,
@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk) throws Exception {
// 创建一个临时文件
File tempFile = File.createTempFile("minio", "temp");
file.transferTo(tempFile);
// 获取文件路径
String localFilePath = tempFile.getAbsolutePath();
RestResponse restResponse = mediaFileService.uploadChunk(fileMd5, chunk, localFilePath);
return restResponse;
}
}
接口测试
- 更新前端文件
- 将uploadtools.ts文件覆盖前端工程src/utils 目录下的同名文件,
- 把前端切换文件增大到10M
- 将 media-add-dialog.vue文件覆盖前端工程src\module-organization\pages\media-manage\components目录下的同名文件
- 修改后端配置
- 前端对文件分块的大小为5MB,SpringBoot web默认上传文件的大小限制为1MB,这里需要在media-api工程修改配置如下:
spring:
servlet:
multipart:
max-file-size: 50MB
max-request-size: 50MB
- max-file-size: 单个文件的大小限制
- Max-request-size: 单次请求的大小限制
- 启动前后端服务, 联调
合并方法
定义service接口
/**
* @author Mr.M
* @version 1.0
* @description 媒资文件管理业务类
* @date 2022/9/10 8:55
*/
public interface MediaFileService {
/**
* @description 合并分块
* @param companyId 机构id
* @param fileMd5 文件md5
* @param chunkTotal 分块总和
* @param uploadFileParamsDto 文件信息
* @return com.xuecheng.base.model.RestResponse
* @author Mr.M
* @date 2022/9/13 15:56
*/
public RestResponse mergechunks(Long companyId,String fileMd5,int chunkTotal,UploadFileParamsDto uploadFileParamsDto);
}
接口实现:
/**
* @author Mr.M
* @version 1.0
* @description TODO
* @date 2022/9/10 8:58
*/
@Service
@Slf4j
public class MediaFileServiceImpl implements MediaFileService {
@Autowired
MediaFilesMapper mediaFilesMapper;
@Autowired
MinioClient minioClient;
//普通文件桶
@Value("${minio.bucket.files}")
private String bucket_mediafiles;
//视频文件桶
@Value("${minio.bucket.videofiles}")
private String bucket_video;
@Autowired
MediaFileService currentProxy;
/**
* 合并分块
*
* @param companyId 机构id
* @param fileMd5 文件md5
* @param chunkTotal 分块总和
* @param uploadFileParamsDto 文件信息
* @return
*/
@Override
public RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {
// 1.找到分块文件, 调用minio的sdk进行文件合并
// 分块文件目录
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
// 1.1分块文件集合(steam流)
List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> ComposeSource.builder()
.bucket(bucket_video)
.object(chunkFileFolderPath.concat(Integer.toString(i))).build())
.collect(Collectors.toList());
//源文件名称
String fileName = uploadFileParamsDto.getFilename();
//文件扩展名
String extension = fileName.substring(fileName.lastIndexOf("."));
//合并后文件的objectName
String objectName = getFilePathByMd5(fileMd5, extension);
// 1.2指定合并后的文件信息
ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder()
.bucket(bucket_video)
.object(objectName) // 合并后的文件objectName
.sources(sources) // 通过sources指定源文件
.build();
// 1.3合并文件
try {
minioClient.composeObject(composeObjectArgs);
} catch (Exception e) {
e.printStackTrace();
log.debug("合并文件失败,fileMd5:{},异常:{}", fileMd5, e.getMessage(), e);
return RestResponse.validfail(false, "合并文件失败。");
}
// 2.校验合并后的文件和源文件是否一致
// 先下载合并后的文件
File file = downloadFileFromMinIO(bucket_video, objectName);
try (FileInputStream fileInputStream = new FileInputStream(file)) {
//计算合并后文件的md5值
String mergeFile_md5 = DigestUtils.md5Hex(fileInputStream);
//比较原始文件和合并后文件的MD5值
if (!fileMd5.equals(mergeFile_md5)) {
log.error("校验合并文件md5值不一致,原始文件:{},合并文件:{}", fileMd5, mergeFile_md5);
return RestResponse.validfail(false, "文件合并校验失败");
}
//保存文件大小
uploadFileParamsDto.setFileSize(file.length());
} catch (Exception e) {
e.printStackTrace();
return RestResponse.validfail(false, "文件合并校验失败");
}
// 3.将文件信息入库
MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_video, objectName);
if (mediaFiles == null) {
return RestResponse.validfail(false, "文件入库失败");
}
// 4.清理分块文件
clearChunkFiles(chunkFileFolderPath, chunkTotal);
return RestResponse.success(true);
}
//得到分块文件的目录
private String getChunkFileFolderPath(String fileMd5) {
return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";
}
/**
* 得到合并后的文件的地址
*
* @param fileMd5 文件id即md5值
* @param fileExt 文件扩展名
* @return
*/
private String getFilePathByMd5(String fileMd5, String fileExt) {
return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;
}
/**
* 从minio下载文件
*
* @param bucket 桶
* @param objectName 对象名称
* @return 下载后的文件
*/
public File downloadFileFromMinIO(String bucket, String objectName) {
//临时文件
File minioFile = null;
FileOutputStream outputStream = null;
try {
InputStream stream = minioClient.getObject(GetObjectArgs.builder()
.bucket(bucket)
.object(objectName)
.build());
//创建临时文件
minioFile = File.createTempFile("minio", ".merge");
outputStream = new FileOutputStream(minioFile);
IOUtils.copy(stream, outputStream);
return minioFile;
} catch (Exception e) {
e.printStackTrace();
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return null;
}
/**
* 清除分块文件
*
* @param chunkFileFolderPath 分块文件路径
* @param chunkTotal 分块文件总数
*/
private void clearChunkFiles(String chunkFileFolderPath, int chunkTotal) {
try {
//待删除分块文件列表
List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i)
.limit(chunkTotal)
.map(i -> new DeleteObject(chunkFileFolderPath.concat(Integer.toString(i))))
.collect(Collectors.toList());
// 分块文件信息
RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("video").objects(deleteObjects).build();
// 清除分块文件
Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);
// 真正删除分块文件
results.forEach(r -> {
DeleteError deleteError = null;
try {
deleteError = r.get();
} catch (Exception e) {
e.printStackTrace();
log.error("清除分块文件失败,objectname:{}", deleteError.objectName(), e);
}
});
} catch (Exception e) {
e.printStackTrace();
log.error("清除分块文件失败,chunkFileFolderPath:{}", chunkFileFolderPath, e);
}
}
}
controller完善
/**
* @author Mr.M
* @version 1.0
* @description 大文件上传接口
* @date 2022/9/6 11:29
*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {
@Autowired
MediaFileService mediaFileService;
@ApiOperation(value = "合并文件")
@PostMapping("/upload/mergechunks")
public RestResponse mergechunks(
@RequestParam("fileMd5") String fileMd5,
@RequestParam("fileName") String fileName,
@RequestParam("chunkTotal") int chunkTotal) throws Exception {
// todo: 机构id
Long companyId = 1232141425L;
UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();
uploadFileParamsDto.setFileType("001002");
uploadFileParamsDto.setTags("课程视频");
uploadFileParamsDto.setRemark("");
uploadFileParamsDto.setFilename(fileName);
return mediaFileService.mergechunks(companyId, fileMd5, chunkTotal, uploadFileParamsDto);
}
}
功能测试: 下边进行前后端联调
- 上传一个视频测试合并分块的执行逻辑
进入service方法逐行跟踪。
- 断点续传测试
上传一部分后,停止刷新浏览器再重新上传,通过浏览器日志发现已经上传过的分块不再重新上传
- 文件分片上传后合并分片, 合并完成后删除分片文件
update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=?
多个线程同时执行上边的sql只会有一个线程执行成功。
- 什么是乐观锁、悲观锁?
- synchronized是一种悲观锁,在执行被synchronized包裹的代码时需要首先获取锁,没有拿到锁则无法执行,是总悲观的认为别的线程会去抢,所以要悲观锁。
- 乐观锁的思想是它不认为会有线程去争抢,尽管去执行,如果没有执行成功就再去重试。
定义mapper
package com.xuecheng.media.mapper;
/**
* <p>
* Mapper 接口
* </p>
*
* @author itcast
*/
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {
/**
* 开启一个任务
* @param id 任务id
* @return 更新记录数
*/
@Update("update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=#{id}")
int startTask(@Param("id") long id);
}
service方法
package com.xuecheng.media.service;
/**
* @author Mr.M
* @version 1.0
* @description 媒资文件处理业务方法
* @date 2022/9/10 8:55
*/
public interface MediaFileProcessService {
/**
* 开启一个任务
* @param id 任务id
* @return true开启任务成功,false开启任务失败
*/
public boolean startTask(long id);
}
package com.xuecheng.media.service.impl;
/**
* @author Mr.M
* @version 1.0
* @description TODO
* @date 2022/9/14 14:41
*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {
@Autowired
MediaFilesMapper mediaFilesMapper;
@Autowired
MediaProcessMapper mediaProcessMapper;
//实现如下
public boolean startTask(long id) {
int result = mediaProcessMapper.startTask(id);
return result<=0?false:true;
}
}
更新任务状态
任务处理完成需要更新任务处理结果,任务执行成功更新视频的URL、及任务处理结果,将待处理任务记录删除,同时向历史任务表添加记录。
在MediaFileProcessService接口添加方法
package com.xuecheng.media.service;
/**
* @author Mr.M
* @version 1.0
* @description 媒资文件处理业务方法
* @date 2022/9/10 8:55
*/
public interface MediaFileProcessService {
/**
* @description 保存任务结果
* @param taskId 任务id
* @param status 任务状态
* @param fileId 文件id
* @param url url
* @param errorMsg 错误信息
* @return void
* @author Mr.M
* @date 2022/10/15 11:29
*/
void saveProcessFinishStatus(Long taskId,String status,String fileId,String url,String errorMsg);
}
service接口方法实现如下:
package com.xuecheng.media.service.impl;
/**
* @author Mr.M
* @version 1.0
* @description TODO
* @date 2022/9/14 14:41
*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {
@Autowired
MediaFilesMapper mediaFilesMapper;
@Autowired
MediaProcessMapper mediaProcessMapper;
@Autowired
MediaProcessHistoryMapper mediaProcessHistoryMapper;
@Transactional
@Override
public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {
//查出任务,如果不存在则直接返回
MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);
if(mediaProcess == null){
return ;
}
//处理失败,更新任务处理结果
LambdaQueryWrapper<MediaProcess> queryWrapperById = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId);
//处理失败
if(status.equals("3")){
MediaProcess mediaProcess_u = new MediaProcess();
mediaProcess_u.setStatus("3");
mediaProcess_u.setErrormsg(errorMsg);
mediaProcess_u.setFailCount(mediaProcess.getFailCount()+1);
mediaProcessMapper.update(mediaProcess_u,queryWrapperById);
log.debug("更新任务处理状态为失败,任务信息:{}",mediaProcess_u);
return ;
}
//任务处理成功
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);
if(mediaFiles!=null){
//更新媒资文件中的访问url
mediaFiles.setUrl(url);
mediaFilesMapper.updateById(mediaFiles);
}
//处理成功,更新url和状态
mediaProcess.setUrl(url);
mediaProcess.setStatus("2");
mediaProcess.setFinishDate(LocalDateTime.now());
mediaProcessMapper.updateById(mediaProcess);
//添加到历史记录
MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();
BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);
mediaProcessHistoryMapper.insert(mediaProcessHistory);
//删除mediaProcess
mediaProcessMapper.deleteById(mediaProcess.getId());
}
}
视频处理
视频采用并发处理,每个视频使用一个线程去处理,每次处理的视频数量不要超过cpu核心数。
所有视频处理完成结束本次执行,为防止代码异常出现无限期等待则添加超时设置,到达超时时间还没有处理完成仍结束任务。
定义任务类VideoTask 如下:
package com.xuecheng.media.jobhandler;
/**
* 视频处理任务类
*
* @author Mr.M
* @version 1.0
* @description TODO
* @date 2022/10/15 11:58
*/
@Slf4j
@Component
public class VideoTask {
@Autowired
MediaFileProcessService mediaFileProcessService;
@Autowired
MediaFileService mediaFileService;
@Value("${videoprocess.ffmpegpath}")
private String ffmpeg_path;
@XxlJob("videoJobHandler")
public void videoJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex(); // 执行器序号,从0开始
int shardTotal = XxlJobHelper.getShardTotal(); // 执行器总数
//取出cpu核心数作为一次处理数据的条数
int processors = Runtime.getRuntime().availableProcessors();
//查询待处理的任务
List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);
//实际查到的任务数
int size = mediaProcessList.size();
log.debug("取到视频处理任务数:" + size);
if (size <= 0) {
return;
}
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(size);
// 使用的计数器
CountDownLatch countDownLatch = new CountDownLatch(size);
mediaProcessList.forEach(mediaProcess -> {
// 将任务加入线程池
executorService.execute(() -> {
try {
Long taskId = mediaProcess.getId(); // 任务id
// 开启任务
boolean b = mediaFileProcessService.startTask(taskId);
if (!b) {
log.debug("抢占任务失败, 任务id: {}", taskId);
return;
}
// 准备参数
String bucket = mediaProcess.getBucket(); //桶
String filePath = mediaProcess.getFilePath(); //存储路径
String fileId = mediaProcess.getFileId(); //原始视频的md5值
//将要处理的文件下载到服务器上
File originalFile = mediaFileService.downloadFileFromMinIO(bucket, filePath);
if (originalFile == null) {
log.debug("下载待处理文件失败,originalFile:{}", mediaProcess.getBucket().concat(mediaProcess.getFilePath()));
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下载待处理文件失败");
return;
}
//处理结束的视频文件
File mp4File = null;
try {
mp4File = File.createTempFile("mp4", ".mp4");
} catch (IOException e) {
log.error("创建mp4临时文件失败");
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "创建mp4临时文件失败");
return;
}
//视频处理结果
String result = "";
try {
//开始处理视频
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path, originalFile.getAbsolutePath(), mp4File.getName(), mp4File.getAbsolutePath());
//开始视频转换,成功将返回success
result = videoUtil.generateMp4();
} catch (Exception e) {
e.printStackTrace();
log.error("处理视频文件:{},出错:{}", mediaProcess.getFilePath(), e.getMessage());
}
if (!result.equals("success")) {
//记录错误信息
log.error("处理视频失败,视频地址:{},错误信息:{}", bucket + filePath, result);
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, result);
return;
}
//将mp4上传至minio
//mp4在minio的存储路径
String objectName = getFilePath(fileId, ".mp4");
//访问url
String url = "/" + bucket + "/" + objectName;
try {
mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName);
//将url存储至数据,并更新状态为成功,并将待处理视频记录删除存入历史
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "2", fileId, url, null);
} catch (Exception e) {
log.error("上传视频失败或入库失败,视频地址:{},错误信息:{}", bucket + objectName, e.getMessage());
//最终还是失败了
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "处理后视频上传或入库失败");
}
} finally {
// 计数器减1
countDownLatch.countDown();
}
});
});
//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务
countDownLatch.await(30, TimeUnit.MINUTES);
}
private String getFilePath(String fileMd5, String fileExt) {
return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;
}
}
测试
基本测试
进入xxl-job调度中心添加执行器和视频处理任务
- 添加执行器
- 视频处理任务
- 在xxl-job配置任务调度策略:
-
- 1)配置阻塞处理策略为:丢弃后续调度。
- 2)配置视频处理调度时间间隔不用根据视频处理时间去确定,可以配置的小一些,如:5分钟,即使到达调度时间如果视频没有处理完会丢弃调度请求。
- 配置完成开始测试视频处理:
- 首先上传至少4个视频,非mp4格式。
- 在xxl-job启动视频处理任务
- 观察媒资管理服务后台日志
失败测试
1、先停止调度中心的视频处理任务。
2、上传视频,手动修改待处理任务表中file_path字段为一个不存在的文件地址
3、启动任务
观察任务处理失败后是否会重试,并记录失败次数。
抢占任务测试
1、修改调度中心中视频处理任务的阻塞处理策略为“覆盖之间的调度”
2、在抢占任务代码处打断点并选择支持多线程方式
3、在抢占任务代码处的下边两行代码分别打上断点,避免观察时代码继续执行。
4、启动任务
此时多个线程执行都停留在断点处
依次放行,观察同一个任务只会被一个线程抢占成功。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/612416.html 如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!