当前位置: 首页 > article >正文

将大模型生成数据存入Excel,并用增量的方式存入Excel

将大模型生成数据存入Excel,并用增量的方式存入Excel

  • 1. 需求与要解决的问题
  • 2. 代码
  • 3. 部分代码分析

1. 需求与要解决的问题

首先就是大模型对话特别耗时,所以通过需要异步执行。
其次是中间对话会有终端或像死锁的那种情况,循环不再继续,所以要增量存到Excel。
最后就是多线程并发,与并发线程数量的控制,因为模型不支持太多线程并发执行。

2. 代码

@Async
    @Override
    public void generateMilitaryDescription() {
        log.info("开始执行 generateMilitaryDescription 方法");

        String outputPath = "/Users/fanzhen/Documents/tuijian-java-copy/src/main/java/com/landinn/common/client/military_descriptions.xlsx";
        XSSFWorkbook workbook;
        XSSFSheet sheet;
        int[] rowIndex = {1}; // 行索引初始化

        // 用于存储已存在的 golaxy_node_id 和 golaxy_vocab_id
        Set<String> existingVocabIds = new HashSet<>();

        // 检查文件是否存在
        File file = new File(outputPath);
        if (file.exists()) {
            try (FileInputStream fileIn = new FileInputStream(file)) {
                workbook = new XSSFWorkbook(fileIn); // 读取现有文件
                sheet = workbook.getSheetAt(0); // 获取第一个工作表
                rowIndex[0] = sheet.getLastRowNum() + 1; // 计算新数据的起始行
                log.info("检测到现有文件,从第 {} 行开始追加", rowIndex[0]);

                // 将现有的golaxy_vocab_id 存入集合
                for (int i = 1; i <= sheet.getLastRowNum(); i++) { // 从第 1 行开始读取数据
                    Row row = sheet.getRow(i);
                    if (row != null) {
                        Cell vocabIdCell = row.getCell(1); // 第 2 列为 golaxy_vocab_id
                        if (vocabIdCell != null && vocabIdCell.getCellType() == CellType.STRING) {
                            existingVocabIds.add(vocabIdCell.getStringCellValue());
                        }
                    }
                }
            } catch (IOException e) {
                log.error("读取现有文件时出错: {}", e.getMessage(), e);
                return;
            }
        } else {
            // 如果文件不存在,则创建新的文件和工作表
            workbook = new XSSFWorkbook();
            sheet = workbook.createSheet("军事描述");

            // 创建标题行
            Row headerRow = sheet.createRow(0);
            headerRow.createCell(0).setCellValue("golaxy_node_id");
            headerRow.createCell(1).setCellValue("golaxy_vocab_id");
            headerRow.createCell(2).setCellValue("node_name");
            headerRow.createCell(3).setCellValue("description");
            headerRow.createCell(4).setCellValue("created_at");
            headerRow.createCell(5).setCellValue("updated_at");
            log.info("未检测到文件,创建新文件");
        }

        // 查询未删除的节点
        List<TechniqueTreeNode> techniqueTreeNodes = techniqueTreeNodeMapper.selectAll();
        log.info("共检索到 {} 条 TechniqueTreeNode 数据", techniqueTreeNodes.size());

        // 创建线程池,限制最高并发为 2
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,  // 核心线程数
                2,  // 最大线程数
                90,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(), // 不允许任务排队,任务必须直接交给线程处理
                new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝的任务由调用线程执行
        );

        Semaphore semaphore = new Semaphore(2); // 限制并发任务数

        for (TechniqueTreeNode node : techniqueTreeNodes) {
            // 将 golaxy_node_id 和 golaxy_vocab_id 转为字符串
            String golaxyNodeId = String.valueOf(node.getGolaxyNodeId());
            String golaxyVocabId = String.valueOf(node.getGolaxyVocabId());

            // 检查是否已存在
            if (existingVocabIds.contains(golaxyVocabId)) {
                log.info("golaxy_vocab_id '{}' 已存在,跳过该节点", golaxyVocabId);
                continue;
            }

            executor.submit(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                try {
                    String nodeName = node.getNodeName();
                    log.info("正在处理节点名称:{}", nodeName);

                    String threadName = Thread.currentThread().getName();
                    log.info("线程 {} 开始处理节点 '{}'", threadName, nodeName);

                    long startTime = System.currentTimeMillis();
                    String resultRes = chatModelClient.chatModel(nodeName + "的发展,对军事装备会带来哪些影响?请分别按照军事应用场景列出并详细描述。");
                    String result = "\n # 军事领域应用前景 \n" + resultRes;

                    long endTime = System.currentTimeMillis();

                    log.info("线程 {} 调用模型生成对话完成,耗时:{} 秒", threadName, (endTime - startTime) / 1000.0);
                    log.info("节点 '{}' 的描述结果:{}", nodeName, result);

                    // 写入 Excel 数据并立即保存到文件
                    synchronized (sheet) {
                        Row row = sheet.createRow(rowIndex[0]++);

                        // 创建每个单元格并设置为文本格式
                        Cell cell0 = row.createCell(0);
                        cell0.setCellValue(golaxyNodeId);

                        Cell cell1 = row.createCell(1);
                        cell1.setCellValue(golaxyVocabId);

                        Cell cell2 = row.createCell(2);
                        cell2.setCellValue(nodeName);

                        Cell cell3 = row.createCell(3);
                        cell3.setCellValue(result);

                        // 设置 created_at 和 updated_at 为文本格式
                        Cell cell4 = row.createCell(4);
                        cell4.setCellValue(DateUtil.format(node.getCreatedAt(), "yyyy-MM-dd HH:mm:ss"));

                        Cell cell5 = row.createCell(5);
                        cell5.setCellValue(DateUtil.format(node.getUpdatedAt(), "yyyy-MM-dd HH:mm:ss"));

                        // 增量保存文件
                        try (FileOutputStream fileOut = new FileOutputStream(outputPath)) {
                            workbook.write(fileOut);
                            log.info("Excel 文件已增量更新至 {}", outputPath);
                        } catch (IOException e) {
                            log.error("增量更新文件时出错:{}", e.getMessage(), e);
                        }
                    }

                    log.info("节点 '{}' 处理成功", nodeName);
                } catch (Exception e) {
                    log.error("处理节点 '{}' 时发生错误:{}", node.getNodeName(), e.getMessage(), e);
                } finally {
                    semaphore.release(); // 释放许可
                }
            });
        }

        executor.shutdown();
        try {
            if (!executor.awaitTermination(90, TimeUnit.SECONDS)) {
                log.warn("线程池未在超时时间内关闭");
            }
        } catch (InterruptedException e) {
            log.error("等待线程池关闭时发生错误:{}", e.getMessage(), e);
            Thread.currentThread().interrupt();
        }

        log.info("generateMilitaryDescription 方法执行完成");
    }

3. 部分代码分析

这段代码防止重复生成数据存到Excel,用Set<String> existingVocabIds存储已在Excel中的数据(golaxy_vocab_id是唯一id)。

		// 用于存储已存在的golaxy_vocab_id
        Set<String> existingVocabIds = new HashSet<>();

        // 检查文件是否存在
        File file = new File(outputPath);
        if (file.exists()) {
            try (FileInputStream fileIn = new FileInputStream(file)) {
                workbook = new XSSFWorkbook(fileIn); // 读取现有文件
                sheet = workbook.getSheetAt(0); // 获取第一个工作表
                rowIndex[0] = sheet.getLastRowNum() + 1; // 计算新数据的起始行
                log.info("检测到现有文件,从第 {} 行开始追加", rowIndex[0]);

                // 将现有的 golaxy_node_id 和 golaxy_vocab_id 存入集合
                for (int i = 1; i <= sheet.getLastRowNum(); i++) { // 从第 1 行开始读取数据
                    Row row = sheet.getRow(i);
                    if (row != null) {
                        Cell vocabIdCell = row.getCell(1); // 第 2 列为 golaxy_vocab_id
                        if (vocabIdCell != null && vocabIdCell.getCellType() == CellType.STRING) {
                            existingVocabIds.add(vocabIdCell.getStringCellValue());
                        }
                    }
                }
            } catch (IOException e) {
                log.error("读取现有文件时出错: {}", e.getMessage(), e);
                return;
            }
        }

线程的并发限制

		// 创建线程池,限制最高并发为 2
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,  // 核心线程数
                2,  // 最大线程数
                90,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(), // 不允许任务排队,任务必须直接交给线程处理
                new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝的任务由调用线程执行
        );

        Semaphore semaphore = new Semaphore(2); // 限制并发任务数
		
		 semaphore.acquire(); // 获取许可
		semaphore.release(); // 释放许可

http://www.kler.cn/a/402204.html

相关文章:

  • 算法和程序的区别
  • Leetcode 快乐数
  • Vscode 远程切换Python虚拟环境
  • SpringCloud处理Websocket消息过长自动断开连接
  • 麒麟时间同步搭建chrony服务器
  • 4-7-1.C# 数据容器 - LinkedList(LinkedList 的定义、LinkedList 结点的遍历、LinkedList 的常用方法)
  • Linux全局替换配置文件的IP
  • 【PyTorch][chapter 28] 揭秘 Transformer:缩放定律指南
  • 第十五章 Spring之假如让你来写AOP——Joinpoint(连接点)篇
  • flex布局样式 类名化scss(sass)
  • 在centos7中安装SqlDeveloper的Oracle可视化工具
  • 网络安全领域的最新动态和漏洞信息
  • 解决docker mysql命令行无法输入中文
  • java设计模式 - 装饰者模式
  • go-zero(三) 数据库操作
  • 集群聊天服务器(8)用户登录业务
  • 游戏引擎学习第19天
  • 网络安全web之信息泄露
  • 语义分割(semantic segmentation)
  • 《生成式 AI》课程 第3講 CODE TASK执行文章摘要的机器人
  • 联通光猫(烽火通信设备)改桥接教程
  • 共建智能软件开发联合实验室,怿星科技助力东风柳汽加速智能化技术创新
  • 【学习心得】算力云平台上的大模型部署并实现远程调用
  • RabbitMQ消息可靠性保证机制4--消费端限流
  • EtherNet/IP从站转ModbusTCP主网关是一款 ETHERNET/IP 从站功能的通讯网关
  • python蓝桥杯刷题2