当前位置: 首页 > article >正文

智能BI项目第五期

本期主要内容

  1. 系统问题分析
  2. 异步化业务流程分析
  3. 线程池讲解(入门 + 原理 + 实战)
  4. 系统异步化改造开发

1.系统问题分析

当系统面临大量用户请求时,我们后端的 AI 处理能力有限,例如服务器的内存、CPU、网络带宽等资源有限,这可能导致用户处在一个长时间的等待状态。为了确保平台的安全性,我们可能会限制用户的访问频率,即每秒或每几秒用户只能访问一次或几次。特别是在许多用户同时提交请求的情况下,服务器可能需要较长的时间来处理。

解决:在处理繁重任务的时候,我们应该考虑采用异步处理,避免让用户长时间等待。因此,本期的主题就是如何实现异步化,以有效解决这个问题。

异步化

同步:一件事情做完,再做另外一件事情(烧水后才能处理工作)。

异步:在处理一件事情的同时,可以处理另一件事情。

通常,如果想将同步变为异步,必须知道何时任务已经完成。因此,需要一个通知机制。

本项目流程

用户在点击提交后就不需要在当前界面等待,他们可以直接回到主界面,或者继续填写下一个需要生成或分析的数据。提交完成后,他们回到主页,在主页上就可以看到图表的生成状态。

如果图表已经生成好,那么我们的系统可以在界面的右上角添加一个消息通知功能,用户可以在那里看到相关信息,大致就是这样的一个流程。

标准异步化流程

1.当用户要进行耗时很长的操作时,点击提交后,不需要在界面空等,而是应该把这个任务保存到数据库中记录下来

2.用户要执行新任务时:

        任务提交成功

                若程序存在空闲线程,可以立即执行此任务 若所有线程均繁忙,任务将入队列等待处理

        任务提交失败

                比如所有线程都在忙碌且任务队列满了 选择拒绝此任务,不再执行

3.通过查阅数据库记录,发现提交失败的任务,并在程序空闲时将这些任务取出执行 程序(线程)

4.从任务队列中取出任务依次执行,每完成一项任务,就更新任务状态。

5.用户可以查询任务的执行状态,或者在任务执行成功或失败时接收通知(例如:发邮件、系统消息提示或短信),从而优化体验

6.对于复杂且包含多个环节的任务,在每个小任务完成时,要在程序(数据库中))记录任务的执行状态(进度)。

系统的业务流程总结

  1. 用户点击智能分析页的提交按钮时,先把图表立刻保存到数据库中(作为一个任务)。
  2. 用户可以在图表管理页面查看所有图表(已生成的、生成中的、生成失败)的信息和状态。
  3. 用户可以修改生成失败的图表信息,点击重新生成,以尝试再次创建图表。

原始架构图:

改造后的架构图

 存在的问题

  1. 任务队列的最大容量应该设置为多少?
  2. 程序怎么从任务队列中取出任务去执行?这个任务队列的流程怎么实现?怎么保证程序最多同时执行多少个任务?

线程池的讲解

为什么需要线程池?

  1. 线程的管理比较复杂(比如什么时候新增线程、什么时候减少空闲线程)
  2. 任务存取比较复杂(什么时候接受任务、什么时候拒绝任务、怎么保证大家不抢到同一个任务)

线程池的作用:帮助你轻松管理线程、协调任务的执行过程。 扩充:可以向线程池表达你的需求,比如最多只允许四个人同时执行任务。线程池就能自动为你进行管理。在任务紧急时,它会帮你将任务放入队列。而在任务不紧急或者还有线程空闲时,它会直接将任务交给空闲的线程,而不是放入队列。

线程池的实现

  • 在 Spring 中,我们可以利用 ThreadPoolTaskExecutor 配合 @Async 注解来实现线程池(不太建议)。

ps.虽然 Spring 框架提供了线程池的实现,但并不特别推荐使用。因为 Spring 毕竟是一个框架,它进行了一定程度的封装,可能隐藏了一些细节。更推荐大家直接使用 Java 并发包中的线程池,请注意,这并不是绝对不使用 Spring 的线程池,对其使用有一定的保留意见。

  • 在 Java 中,可以使用JUC并发编程包中的 ThreadPoolExecutor,来实现非常灵活地自定义线程池。

ps.建议学完 SpringBoot 并能够实现一个项目,以及学完 Redis 之后,再系统学习 Java 并发编程(JUC)。这样可以避免过早的压力和困扰,在具备一定实践基础的情况下,更好地理解并发编程的概念和应用。

1.创建配置类

在config目录下创建ThreadPoolExecutorConfig,并声明@Configuration

@Configuration 是 Spring 框架中的一个注解,它用于类级别,表明这个类是一个配置类,其目的是允许在应用程序上下文中定义额外的 bean。

package com.yupi.springbootinit.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class ThreadPoolExecutorConfig {
    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor();
        return threadPoolExecutor;
    }
}

2. 线程池参数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
/**
该构造函数用于创建一个线程池:
设置核心线程数(corePoolSize);
设置最大线程数(maximumPoolSize);
设置空闲线程存活时间(keepAliveTime)及时间单位;
设置任务队列(workQueue)用于存放待执行任务;
设置线程工厂(ThreadFactory)用于创建新线程。
**/

回归到我们的业务,要考虑系统最脆弱的环节(系统的瓶颈)在哪里? 现有条件:比如 AI 生成能力的并发是只允许 4 个任务同时去执行,AI 能力允许 20 个任务排队。

corePoolSize (核心线程数 => 正式员工数):正常情况下,我们的系统可以同时工作的线程数(随时就绪的状态)

maximumPoolSize (最大线程数 => 哪怕任务再多,你也最多招这些人):极限情况下,线程池最多可以拥有多少个线程?

keepAliveTime (空闲线程存活时间):非核心线程在没有任务的情况下,过多久要删除(理解为开除临时工),从而释放无用的线程资源。

TimeUnit unit (空闲线程存活时间的单位):分钟、秒 workQueue (工作队列):用于存放给线程执行的任务,存在一个队列的长度(一定要设置,不要说队列长度无限,因为也会占用资源) threadFactory (线程工厂):控制每个线程的生成、线程的属性(比如线程名) RejectedExecutionHandler (拒绝策略):任务队列满的时候,我们采取什么措施,比如抛异常、不抛异常、自定义策略

资源隔离策略:比如重要的任务(VIP 任务)一个队列,普通任务一个队列,保证这两个队列互不干扰。

3.线程工作流程

比如corePoolSize = 2,maximumPoolSize = 4,workQueue.size = 2

先来了两个请求,corePoolSize可以承受,当继续来请求,并且corePoolSize的两个请求还没有结束,那么就会先放到workQueue,如果workQueue满了,并且corePoolSize也没处理完,那么就会创建新的进程处理请求,但是所有线程数不能超过maximumPoolSize,如何超过了,会触发RejectedExecutionHandler拒绝策略。若是线程处理完请求后处于空闲状态,那么经过keepAliveTime unit 的时刻后,这个线程就会销毁

4.IO/CPU密集型

一般情况下,任务分为 IO 密集型和计算密集型两种。 计算密集型:吃 CPU,比如音视频处理、图像处理、数学计算等,一般是设置 corePoolSize 为 CPU 的核数 + 1(空余线程),可以让每个线程都能利用好 CPU 的每个核,而且线程之间不用频繁切换(减少打架、减少开销) IO 密集型:吃带宽/内存/硬盘的读写资源,corePoolSize 可以设置大一点,一般经验值是 2n 左右,但是建议以 IO 的能力为主。

5.线程池开发

基础配置类代码
package com.yupi.springbootinit.config;

import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
public class ThreadPoolExecutorConfig {
    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        // 创建一个线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            // 初始化线程数为 1
            private int count = 1;

            @Override
            // 每当线程池需要创建新线程时,就会调用newThread方法
            // @NotNull Runnable r 表示方法参数 r 应该永远不为null,
            // 如果这个方法被调用的时候传递了一个null参数,就会报错
            public Thread newThread(@NotNull Runnable r) {
                // 创建一个新的线程,记得要把参数传进去
                Thread thread = new Thread(r);
                // 给新线程设置一个名称,名称中包含线程数的当前值
                thread.setName("线程" + count);
                // 线程数递增
                count++;
                // 返回新创建的线程
                return thread;
            }
        };
        // 创建一个新的线程池,线程池核心大小为2,最大线程数为4,
        // 非核心线程空闲时间为100秒,任务队列为阻塞队列,长度为4,使用自定义的线程工厂创建线程
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4), threadFactory);
        // 返回创建的线程池
        return threadPoolExecutor;
    }
}

为了方便测试效果,新建一个controller(控制器),以接口的方式实时控制这个线程池,查看线程池的参数;

复制controller目录下的ChartController,并重命名为QueueController,修改一下,删除多余的内容。

测试类代码
package com.yupi.springbootinit.controller;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 队列测试
 *
 * @author <a href="https://github.com/liyupi">程序员鱼皮</a>
 * @from <a href="https://yupi.icu">编程导航知识星球</a>
 */
@RestController
@RequestMapping("/queue")
@Slf4j
@Profile({"dev","local"})
//指定只能开发、本地环境生效;正式上线前要把测试去掉,不要把测试暴露出去。
public class QueueController {

    @Resource
    // 自动注入一个线程池的实例
    private ThreadPoolExecutor threadPoolExecutor;

    @GetMapping("/add")
    // 接收一个参数name,然后将任务添加到线程池中
    public void add(String name) {
        // 使用CompletableFuture运行一个异步任务
        CompletableFuture.runAsync(() -> {
            // 打印一条日志信息,包括任务名称和执行线程的名称
            log.info("任务执行中:" + name + ",执行人:" + Thread.currentThread().getName());
            try {
                // 让线程休眠10分钟,模拟长时间运行的任务
                Thread.sleep(600000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        // 异步任务在threadPoolExecutor中执行
        }, threadPoolExecutor);
    }

    @GetMapping("/get")
    // 该方法返回线程池的状态信息
    public String get() {
        // 创建一个HashMap存储线程池的状态信息
        Map<String, Object> map = new HashMap<>();
        // 获取线程池的队列长度
        int size = threadPoolExecutor.getQueue().size();
        // 将队列长度放入map中
        map.put("队列长度", size);
        // 获取线程池已接收的任务总数
        long taskCount = threadPoolExecutor.getTaskCount();
        // 将任务总数放入map中
        map.put("任务总数", taskCount);
        // 获取线程池已完成的任务数
        long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
        // 将已完成的任务数放入map中
        map.put("已完成任务数", completedTaskCount);
        // 获取线程池中正在执行任务的线程数
        int activeCount = threadPoolExecutor.getActiveCount();
        // 将正在工作的线程数放入map中
        map.put("正在工作的线程数", activeCount);
        // 将map转换为JSON字符串并返回
        return JSONUtil.toJsonStr(map);
    }
}

当核心线程满了,等待队列也满了,会创建新线程来处理,这时候先处理新的请求,而不是请求等待队列里的请求,当达到最大线程容量时,继续发送请求会报错

1、2由核心队列处理,3、4、5、6全部进入等待队列,7、8由最大线程数处理、9以后报错

前后端异步化改造

 实现工作流程

  1. 给 chart 表新增任务状态字段(比如排队中、执行中、已完成、失败),任务执行信息字段(用于记录任务执行中、或者失败的一些信息)
  2. 用户点击智能分析页的提交按钮时,先把图表立刻保存到数据库中,然后提交任务
  3. 任务:先修改图表任务状态为 “执行中”。等执行成功后,修改为 “已完成”、保存执行结果;执行失败后,状态修改为 “失败”,记录任务失败信息。
  4. 用户可以在图表管理页面查看所有图表(已生成的、生成中的、生成失败)的信息和状态 → (优化点)
  5. 用户可以修改生成失败的图表信息,点击重新生成 → (优化点)

1.新增任务字段 

先对sql进行归档(修改建表语句)

再去修改图表信息,记得到实体类补充这两个字段

2.任务执行逻辑

之前咱们的业务流程是校验 → 限流 → 构造用户输入、调用 AI;

现在可以把调用 AI 变成提交任务。

先修改图表任务状态为 “执行中”,减少重复执行的风险、同时让用户知道执行状态。等执行成功后,修改为 “已完成”、保存执行结果;执行失败后,状态修改为 “失败”,记录任务失败信息。

智能分析将会被拆分为同步和异步操作。由于异步操作没有返回值信息,我们无法解析并返回图表信息,这导致可视化图表和分析结果无法被展示。因此,我们保留之前的智能分析结果;

先注入刚刚的线程池实例到控制层中

    @Resource
    private ThreadPoolExecutor threadPoolExecutor;


/**
 * 智能分析(异步)
 *
 * @param multipartFile
 * @param genChartByAiRequest
 * @param request
 * @return
 */
@PostMapping("/gen/async")
public BaseResponse<BiResponse> genChartByAiAsync(@RequestPart("file") MultipartFile multipartFile,
                                                  GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) {
    String name = genChartByAiRequest.getName();
    String goal = genChartByAiRequest.getGoal();
    String chartType = genChartByAiRequest.getChartType();

    // 校验
    ThrowUtils.throwIf(StringUtils.isBlank(goal), ErrorCode.PARAMS_ERROR, "目标为空");
    ThrowUtils.throwIf(StringUtils.isNotBlank(name) && name.length() > 100, ErrorCode.PARAMS_ERROR, "名称过长");

    // 校验文件
    long size = multipartFile.getSize();
    String originalFilename = multipartFile.getOriginalFilename();

    // 校验文件大小
    final long ONE_MB = 1024 * 1024L;
    ThrowUtils.throwIf(size > ONE_MB, ErrorCode.PARAMS_ERROR, "文件超过 1M");

    // 校验文件大小缀 aaa.png
    String suffix = FileUtil.getSuffix(originalFilename);
    final List<String> validFileSuffixList = Arrays.asList("xlsx", "xls");
    ThrowUtils.throwIf(!validFileSuffixList.contains(suffix), ErrorCode.PARAMS_ERROR, "文件后缀非法");

    User loginUser = userService.getLoginUser(request);

    // 限流判断,每个用户一个限流器
    redisLimiterManager.doRateLimit("genChartByAi_" + loginUser.getId());

    // 指定一个模型id(把id写死,也可以定义成一个常量)
    long biModelId = 1659171950288818178L;
    // 分析需求:
    // 分析网站用户的增长情况
    // 原始数据:
    // 日期,用户数
    // 1号,10
    // 2号,20
    // 3号,30

    // 构造用户输入
    StringBuilder userInput = new StringBuilder();
    userInput.append("分析需求:").append("\n");

    // 拼接分析目标
    String userGoal = goal;
    if (StringUtils.isNotBlank(chartType)) {
        userGoal += ",请使用" + chartType;
    }
    userInput.append(userGoal).append("\n");
    userInput.append("原始数据:").append("\n");
    // 压缩后的数据
    String csvData = ExcelUtils.excelToCsv(multipartFile);
    userInput.append(csvData).append("\n");

    // 先把图表保存到数据库中
    Chart chart = new Chart();
    chart.setName(name);
    chart.setGoal(goal);
    chart.setChartData(csvData);
    chart.setChartType(chartType);
    // 插入数据库时,还没生成结束,把生成结果都去掉
//        chart.setGenChart(genChart);
//        chart.setGenResult(genResult);
    // 设置任务状态为排队中
    chart.setStatus("wait");
    chart.setUserId(loginUser.getId());
    boolean saveResult = chartService.save(chart);
    ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败");

    // 在最终的返回结果前提交一个任务
    // todo 建议处理任务队列满了后,抛异常的情况(因为提交任务报错了,前端会返回异常)
    CompletableFuture.runAsync(() -> {
        // 先修改图表任务状态为 “执行中”。等执行成功后,修改为 “已完成”、保存执行结果;执行失败后,状态修改为 “失败”,记录任务失败信息。(为了防止同一个任务被多次执行)
        Chart updateChart = new Chart();
        updateChart.setId(chart.getId());
        // 把任务状态改为执行中
        updateChart.setStatus("running");
        boolean b = chartService.updateById(updateChart);
        // 如果提交失败(一般情况下,更新失败可能意味着你的数据库出问题了)
        if (!b) {
            handleChartUpdateError(chart.getId(), "更新图表执行中状态失败");
            return;
        }

        // 调用 AI
        String result = aiManager.doChat(biModelId, userInput.toString());
        String[] splits = result.split("-----");
        if (splits.length < 3) {
            handleChartUpdateError(chart.getId(), "AI 生成错误");
            return;
        }
        String genChart = splits[1].trim();
        String genResult = splits[2].trim();
        // 调用AI得到结果之后,再更新一次
        Chart updateChartResult = new Chart();
        updateChartResult.setId(chart.getId());
        updateChartResult.setGenChart(genChart);
        updateChartResult.setGenResult(genResult);
        updateChartResult.setStatus("succeed");
        boolean updateResult = chartService.updateById(updateChartResult);
        if (!updateResult) {
            handleChartUpdateError(chart.getId(), "更新图表成功状态失败");
        }
    },threadPoolExecutor);

    BiResponse biResponse = new BiResponse();
//        biResponse.setGenChart(genChart);
//        biResponse.setGenResult(genResult);
    biResponse.setChartId(chart.getId());
    return ResultUtils.success(biResponse);
}
// 上面的接口很多用到异常,直接定义一个工具类
private void handleChartUpdateError(long chartId, String execMessage) {
    Chart updateChartResult = new Chart();
    updateChartResult.setId(chartId);
    updateChartResult.setStatus("failed");
    updateChartResult.setExecMessage(execMessage);
    boolean updateResult = chartService.updateById(updateChartResult);
    if (!updateResult) {
        log.error("更新图表失败状态失败" + chartId + "," + execMessage);
    }
}

如果为了给面试官看,建议把这两种处理方式都保留,你可以向面试官解释为什么要引入异步流程,异步的优点,以及如何实现异步。只要谈到线程池,有超过 50% 的可能性面试官会问你关于线程池的核心参数。这时你可以把本期中介绍的内容全都讲出来,相信面试官会认为你的思路非常清晰

3.前端开发

来到前端,在routes.ts内新增智能分析(异步)的路由。

复制page目录下的AddChart目录,粘贴至page目录下,并重命名为AddChartAsync 

修改智能分析(异步)页面:

修改注释和页面名称。

修改类名为add-chart-async

然后把一些没用到的东西删掉,再修改提示词

如果分析成功,就把表单清空,先定义一个 form,然后在表单引用。

 引用 form 之后,如果说分析成功就清空表单内容。

最终代码 

import { genChartByAiUsingPOST } from '@/services/yubi/chartController';
import { UploadOutlined } from '@ant-design/icons';
import { Button, Card, Form, Input, message, Select, Space, Upload } from 'antd';
import TextArea from 'antd/es/input/TextArea';
import React, { useState } from 'react';
import {useForm} from "antd/es/form/Form";

/**
 * 添加图表(异步)页面
 * @constructor
 */
const AddChartAsync: React.FC = () => {
  // useForm:and design操作表单的语法
  const [form] = useForm();
  // 提交中的状态,默认未提交
  const [submitting, setSubmitting] = useState<boolean>(false);

  /**
   * 提交
   * @param values
   */
    const onFinish = async (values: any) => {
      // 如果已经是提交中的状态(还在加载),直接返回,避免重复提交
    if (submitting) {
      return;
    }
    // 当开始提交,把submitting设置为true
    setSubmitting(true);

      // 对接后端,上传数据
      const params = {
        ...values,
        file: undefined,
      };
      try {
        // 需要取到上传的原始数据file→file→originFileObj(原始数据)
        const res = await genChartByAiUsingPOST(params, {}, values.file.file.originFileObj);
        // 正常情况下,如果没有返回值就分析失败,有,就分析成功
        if (!res?.data) {
          message.error('分析失败');
        } else {
          message.success('分析任务提交成功,稍后请在我的图表页面查看');  
          // 重置所有字段
          form.resetFields();
        }  
      // 异常情况下,提示分析失败+具体失败原因
      } catch (e: any) {
        message.error('分析失败,' + e.message);
      }
      // 当结束提交,把submitting设置为false
      setSubmitting(false);
    };  

  return (
    <div className="add-chart-async">
      <Card title="智能分析">
            <Form
            form={form}
            name="addChart"
            labelAlign="left" 
            labelCol={{ span: 4 }}
            wrapperCol={{ span: 16 }}
            onFinish={onFinish}
            initialValues={{  }}
            >
            <Form.Item name="goal" label="分析目标" rules={[{ required: true, message: '请输入分析目标!' }]}>
                <TextArea placeholder="请输入你的分析需求,比如:分析网站用户的增长情况"/>
            </Form.Item>

            <Form.Item name="name" label="图表名称">
                <Input placeholder="请输入图表名称" />
            </Form.Item>

            <Form.Item
              name="chartType"
              label="图表类型"
              >
              <Select
              options={[
                { value: '折线图', label: '折线图' },
                { value: '柱状图', label: '柱状图' },
                { value: '堆叠图', label: '堆叠图' },
                { value: '饼图', label: '饼图' },
                { value: '雷达图', label: '雷达图' },
              ]}
              />
            </Form.Item>

            <Form.Item
                name="file"
                label="原始数据"
              >
                <Upload name="file" maxCount={1}>
                  <Button icon={<UploadOutlined />}>上传 CSV 文件</Button>
                </Upload>
              </Form.Item>
              <Form.Item wrapperCol={{ span: 16, offset: 4 }}>
                <Space>
                  <Button type="primary" htmlType="submit" loading={submitting} disabled={submitting}>
                    提交
                  </Button>
                  <Button htmlType="reset">重置</Button>
                </Space>
              </Form.Item>
            </Form>
          </Card>
    </div>
  );
};
export default AddChartAsync;

如果图表生成成功就展示图表,没有生成成功就展示现在的状态(进度条、转圈圈);

访问 ant.design 组件库 找一下有没有失败的组件

修改我的图表页 

 然后在左下图的内容中加一层判断。

最终代码

import { listMyChartByPageUsingPOST } from '@/services/yubi/chartController';
import { useModel } from '@@/exports';
import {Avatar, Card, List, message, Result} from 'antd';
import ReactECharts from 'echarts-for-react';
import React, { useEffect, useState } from 'react';
import Search from "antd/es/input/Search";

/**
 * 我的图表页面
 * @constructor
 */
const MyChartPage: React.FC = () => {
  const initSearchParams = {
    // 默认第一页
    current: 1,
    // 每页展示4条数据
    pageSize: 4,
    sortField: 'createTime',
    sortOrder: 'desc',
  };

  const [searchParams, setSearchParams] = useState<API.ChartQueryRequest>({ ...initSearchParams });
  // 从全局状态中获取到当前登录的用户信息
  const { initialState } = useModel('@@initialState');
  const { currentUser } = initialState ?? {};
  const [chartList, setChartList] = useState<API.Chart[]>();
  const [total, setTotal] = useState<number>(0);
  // 用来控制页面是否加载
  const [loading, setLoading] = useState<boolean>(true);

  const loadData = async () => {
    // 当触发搜索,把loading设置为true
    setLoading(true);
    try {
      const res = await listMyChartByPageUsingPOST(searchParams);
      if (res.data) {
        setChartList(res.data.records ?? []);
        setTotal(res.data.total ?? 0);
        // 有些图表有标题,有些没有,直接把标题全部去掉
        if (res.data.records) {
          res.data.records.forEach(data => {
            // 要把后端返回的图表字符串改为对象数组,如果后端返回空字符串,就返回'{}'
            const chartOption = JSON.parse(data.genChart ?? '{}');
            // 把标题设为undefined
            chartOption.title = undefined;
            // 然后把修改后的数据转换为json设置回去
            data.genChart = JSON.stringify(chartOption);
          })
        }
      } else {
        message.error('获取我的图表失败');
      }
    } catch (e: any) {
      message.error('获取我的图表失败,' + e.message);
    }
    // 搜索结束设置为false
    setLoading(false);
  };

  useEffect(() => {
    loadData();
  }, [searchParams]);

  return (
    <div className="my-chart-page">
      {/* 引入搜索框 */}
      <div>
        {/* 
          当用户点击搜索按钮触发 一定要把新设置的搜索条件初始化,要把页面切回到第一页;
          如果用户在第二页,输入了一个新的搜索关键词,应该重新展示第一页,而不是还在搜第二页的内容
        */}
        <Search placeholder="请输入图表名称" enterButton loading={loading} onSearch={(value) => {
          // 设置搜索条件
          setSearchParams({
            // 原始搜索条件
            ...initSearchParams,
            // 搜索词
            name: value,
          })
        }}/>
      </div>
      <div className="margin-16" />
      <List
        /*
          栅格间隔16像素;xs屏幕<576px,栅格数1;
          sm屏幕≥576px,栅格数1;md屏幕≥768px,栅格数1;
          lg屏幕≥992px,栅格数2;xl屏幕≥1200px,栅格数2;
          xxl屏幕≥1600px,栅格数2
        */
        grid={{
          gutter: 16,
          xs: 1,
          sm: 1,
          md: 1,
          lg: 2,
          xl: 2,
          xxl: 2,
        }}
        pagination={{
          /*
            page第几页,pageSize每页显示多少条;
            当用户点击这个分页组件,切换分页时,这个组件就会去触发onChange方法,会改变咱们现在这个页面的搜索条件
          */
          onChange: (page, pageSize) => {
            // 当切换分页,在当前搜索条件的基础上,把页数调整为当前的页数
            setSearchParams({
              ...searchParams,
              current: page,
              pageSize,
            })
          },
          // 显示当前页数
          current: searchParams.current,
          // 页面参数改成自己的
          pageSize: searchParams.pageSize,
          // 总数设置成自己的
          total: total,
        }}
        loading={loading}
        dataSource={chartList}
        renderItem={(item) => (
          <List.Item key={item.id}>
            <Card style={{ width: '100%' }}>
              <List.Item.Meta
                // 把当前登录用户信息的头像展示出来
                avatar={<Avatar src={currentUser && currentUser.userAvatar} />}
                title={item.name}
                description={item.chartType ? '图表类型:' + item.chartType : undefined}
              />
              <>
                {
                  // 当状态(item.status)为'wait'时,显示待生成的结果组件
                  item.status === 'wait' && <>
                    <Result
                      // 状态为警告
                      status="warning"
                      title="待生成"
                       // 子标题显示执行消息,如果执行消息为空,则显示'当前图表生成队列繁忙,请耐心等候'
                      subTitle={item.execMessage ?? '当前图表生成队列繁忙,请耐心等候'}
                    />
                  </>
                }
                {
                  item.status === 'running' && <>
                    <Result
                      // 状态为信息
                      status="info"
                      title="图表生成中"
                       // 子标题显示执行消息
                      subTitle={item.execMessage}
                    />
                  </>
                }
                {
                  // 当状态(item.status)为'succeed'时,显示生成的图表
                  item.status === 'succeed' && <>
                    <div style={{ marginBottom: 16 }} />
                    <p>{'分析目标:' + item.goal}</p>
                    <div style={{ marginBottom: 16 }} />
                    <ReactECharts option={item.genChart && JSON.parse(item.genChart)} />
                  </>
                }
                {
                  // 当状态(item.status)为'failed'时,显示生成失败的结果组件
                  item.status === 'failed' && <>
                    <Result
                      status="error"
                      title="图表生成失败"
                      subTitle={item.execMessage}
                    />
                  </>
                }
              </>
            </Card>
          </List.Item>
        )}
      />
    </div>
  );
};
export default MyChartPage;

最终情况:

此时发起请求后可以在这里等待AI处理完成,返回结果

过一段时间再来看就已经生成了

优化思路

肯定有些同学会倾向于同步方式,因为他们可以随时查看结果,即使可能需要等待十几秒或者 20 秒。然而,另一些同学可能会觉得如果需要等待一分钟或者五分钟的话,异步方式可能会更合适。实际上,你也可以选择实时更新,比如每隔几秒刷新一下页面,自动获取新结果。批量异步也是一种可行的方式。

另外,还有一种策略。你可以根据系统当前的负载动态地调整用户查询的处理方式。比如,如果系统当前状态良好,就可以选择同步返回结果。而如果用户提交请求后发现系统非常繁忙,预计需要等待很长时间,那么就可以选择异步处理方式。这种思考方式在实际的企业项目开发中也是很常见的。

除了刚刚提到的一些点,我们还可以使用定时任务来处理失败的图表,添加重试机制。此外,我们也可以更精确地预见AI生成错误,并在后端进行异常处理,如提取正确的字符串。例如,AI 说一些多余的话,我们就需要提取出正确的信息。同时,如果任务没有提交,我们可以使用定时任务将其提取出来。我们还可以为任务增加一个超时时间。如果超时,任务将自动标记为失败,这就是超时控制,这一点非常重要。对于 AI 生成的脏数据,会导致最后出现错误,因此前端也需要进行异常处理,不能仅仅依赖于后端。

🐟之前写过一篇关于反向压力的文章,可以看一下。刚刚提到了一点,那就是在系统压力大的时候,使用异步,而在系统压力小的时候,使用同步,这就是反向压力的概念。

进一步扩展一下,关于我们的线程池,现在的核心参数不是设定为二嘛。实际上,如果 AI 最多允许四个任务同时执行,我们是否可以提前确认 AI 当前的业务是否繁忙,即我们调用的第三方 API 是否还有多余的资源给我们使用。如果他表示资源已经耗尽,我们为了保证系统的稳定性,是否可以将核心线程数调小一些。反之,如果我们询问 AI 第三方并发现它的状态是空闲,我们是否可以将核心线程数增加,以此来提高系统性能。这种通过下游服务来调整你的业务以及核心线程池参数,进而改变你的系统策略的方式就是反向压力。

例如,你发现当前 AI 服务的任务队列中没有任何人提交任务,那么你是否可以提高使用率。这其实是一个很好的点,如果你能在简历上写到反向压力,将会是一个很大的加分项。反向压力其实是我们在做大数据系统中,特别是在做实时数据流系统时经常会用到的一个术语。我们是不是可以在任务执行成功或失败后,给用户发送消息通知。比如说,在图表页面增加一个刷新或者定时自动刷新的按钮,以保证用户能够获取到图表的最新状态。这就是前端轮询的技术。还有就是在任务执行过程中,我们可以向用户发送消息通知,虽然这可能比较复杂。这东西后面🐟可能会带大家做这个系统。但是在短期内,大家可以自己尝试实现,如通过数据库记录消息,这是最简单的方式。当然还有其他的方式,如 websocket 实时通知或者 server side event,这都是实时的。

优化点

  1. guava Retrying 重试机制
  2. 提前考虑到 AI 生成错误的情况,在后端进行异常处理(比如 AI 说了多余的话,提取正确的字符串)
  3. 如果说任务根本没提交到队列中(或者队列满了),是不是可以用定时任务把失败状态的图表放到队列中(补偿)
  4. 建议给任务的执行增加一个超时时间,超时自动标记为失败(超时控制)
  5. 反向压力:https://zhuanlan.zhihu.com/p/404993753,通过调用的服务状态来选择当前系统的策略(比如根据 AI 服务的当前任务队列数来控制咱们系统的核心线程数),从而最大化利用系统资源
  6. 我的图表页面增加一个刷新、定时自动刷新的按钮,保证获取到图表的最新状态(前端轮询)
  7. 任务执行成功或失败,给用户发送实时消息通知(实时:websocket、server side event)

下期:RabbitMQ 分布式消息队列(消息持久化、多机共享、可扩展性)


http://www.kler.cn/a/312822.html

相关文章:

  • ElasticSearch备考 -- 集群配置常见问题
  • Python——数列1/2,2/3,3/4,···,n/(n+1)···的一般项为Xn=n/(n+1),当n—>∞时,判断数列{Xn}是否收敛
  • ffmpeg 视频滤镜:屏蔽边框杂色- fillborders
  • 抓包工具WireShark使用记录
  • .NET使用SqlSugar实现单列批量更新的几种实现和对比
  • Ascend Extension for PyTorch是个what?
  • http免费升级https教程
  • 极狐GitLab CI/CD 功能合集(超详细教程)
  • rtmp推流
  • Linux基础命令——账户简单管理
  • 英集芯IP5902:集成电压可调异步升压转换充电管理功能的8位MCU芯片
  • uniapp使用uview2上传图片功能
  • 通威股份半年报业绩巨降:销售费用大增,近一年股价跌四成
  • 算法-分治和逆序
  • 操作系统笔记三
  • Jboss 低版本JMX Console未授权
  • 828华为云征文|华为Flexus云服务器打造FastBee物联网平台
  • Linux Inode 概念、查看、引发的问题及常见解决方案
  • Unity多语言插件I2 Localization国际化应用
  • JAIN SLEE 中Container Managed Persistent (CMP)
  • 使用 Spring Boot + Redis + Vue 实现动态路由加载页面
  • centos 安装VNC,实现远程连接
  • Unity3d开发的C#编码规范
  • 【自然语言处理】补充:布尔模型
  • VMware Fusion虚拟机Mac版 安装Win10系统教程
  • 如何在Windows上安装Docker