当前位置: 首页 > article >正文

Reactor编程模型中,阻塞上传文件FilePart的3中方式

1. 使用非阻塞的方式处理数据

将 Flux 转换为 Mono,并确保整个链式调用是非阻塞的。你可以使用 Mono.fromCallable 或其他非阻塞的方式来处理资源转换。

public Mono<Void> addDocumentsByFile(FilePart file) {
    String suffix = FileNameUtil.getSuffix(file.filename());
    
    return file.content()
        .reduce(DataBuffer::write)
        .map(DataBuffer::toByteBuffer)
        .map(ByteBuffer::array)
        .map(ByteArrayResource::new)
        .flatMap(resource -> {
            TikaDocumentReader tikaDocumentReader = new TikaDocumentReader(resource);
            List<Document> splitDocuments;
            if ("csv".equalsIgnoreCase(suffix)) {
                splitDocuments = new CsvTextSplitter().apply(tikaDocumentReader.read());
            } else {
                splitDocuments = new TokenTextSplitter().apply(tikaDocumentReader.read());
            }
            // 将文档存入向量数据库
            return Mono.fromRunnable(() -> {
                for (Document doc : splitDocuments) {
                    vectorStore.add(doc); // 假设vectorStore是一个异步接口
                }
            });
        })
        .then();
}

2. 使用 publishOn 切换到阻塞线程池

如果你确实需要使用阻塞操作,可以使用 publishOn 将操作切换到一个允许阻塞的线程池(例如 (Schedulers.boundedElastic()))。这会确保阻塞操作不会影响到非阻塞线程。

public Mono<Void> addDocumentsByFile(FilePart file) {
    String suffix = FileNameUtil.getSuffix(file.filename());

    return file.content()
        .reduce(DataBuffer::write)
        .map(DataBuffer::toByteBuffer)
        .map(ByteBuffer::array)
        .map(ByteArrayResource::new)
        .publishOn(Schedulers.boundedElastic())
        .flatMap(resource -> {
            TikaDocumentReader tikaDocumentReader = new TikaDocumentReader(resource);
            List<Document> splitDocuments;
            if ("csv".equalsIgnoreCase(suffix)) {
                splitDocuments = new CsvTextSplitter().apply(tikaDocumentReader.read());
            } else {
                splitDocuments = new TokenTextSplitter().apply(tikaDocumentReader.read());
            }
            // 将文档存入向量数据库
            for (Document doc : splitDocuments) {
                vectorStore.add(doc); // 假设vectorStore是一个同步接口
            }
            return Mono.empty();
        });
}

3. 使用 Mono.fromCallable 和 subscribeOn

如果你需要在一个特定的线程上执行阻塞操作,可以使用 Mono.fromCallable 并结合 subscribeOn 来确保操作在正确的线程上执行。

public Mono<Void> addDocumentsByFile(FilePart file) {
    String suffix = FileNameUtil.getSuffix(file.filename());

    return file.content()
        .reduce(DataBuffer::write)
        .map(DataBuffer::toByteBuffer)
        .map(ByteBuffer::array)
        .map(ByteArrayResource::new)
        .flatMap(resource -> Mono.fromCallable(() -> {
            TikaDocumentReader tikaDocumentReader = new TikaDocumentReader(resource);
            List<Document> splitDocuments;
            if ("csv".equalsIgnoreCase(suffix)) {
                splitDocuments = new CsvTextSplitter().apply(tikaDocumentReader.read());
            } else {
                splitDocuments = new TokenTextSplitter().apply(tikaDocumentReader.read());
            }
            // 将文档存入向量数据库
            for (Document doc : splitDocuments) {
                vectorStore.add(doc); // 假设vectorStore是一个同步接口
            }
            return null;
        }).subscribeOn(Schedulers.boundedElastic()))
        .then();
}

总结

为了避免阻塞操作导致的问题,建议尽量使用非阻塞的方式处理数据流。如果必须使用阻塞操作,可以通过 publishOn 或 subscribeOn 将操作切换到允许阻塞的线程池。这样可以确保你的应用程序在非阻塞环境中正常工作。


http://www.kler.cn/a/508466.html

相关文章:

  • 深入了解卷积神经网络(CNN):图像处理与深度学习的革命性技术
  • Linux操作命令之云计算基础命令
  • VSCode代理配置导致的SSL证书验证错误及解决方案
  • 【React】静态组件动态组件
  • 软件设计大致步骤
  • YOLOv10-1.1部分代码阅读笔记-build.py
  • 蓝桥杯3527阶乘的和 | 组合数学
  • 从零搭建SpringBoot3+Vue3前后端分离项目基座,中小项目可用
  • PCL K4PCS算法实现点云粗配准【2025最新版】
  • 软件工程3.0时代,AI落地研效成熟时
  • idea创建SpringBoot自动创建Lombok无效果(解决)
  • 力扣刷题汇总
  • vue3 移动端h5 加载高德地图 封装地图组件
  • java springboot3.x jwt+spring security6.x实现用户登录认证
  • 宝塔自动备份数据库到阿里云 OSS
  • 基于docker微服务日志ELK+Kafka搭建
  • Vue.js组件开发-如何处理跨域请求
  • 项目开发实践——基于SpringBoot+Vue3实现的在线考试系统(六)
  • 审计文件标识作为水印打印在pdf页面边角
  • 【开源宝藏】Jeepay VUE和React构建WebSocket通用模板
  • 【PyQt】图像处理系统
  • phpstudy靶场搭建问题
  • 【深度学习项目】语义分割-FCN网络(原理、网络架构、基于Pytorch实现FCN网络)
  • 物联网时代,知识库管理系统的拓展与创新
  • npm pack 手动下载非本机平台的依赖包
  • SDL2:Android APP编译使用 -- SDL2多媒体库使用音频实例