当前位置: 首页 > article >正文

Flask创建流式返回的mock脚本

需求:对接口api/mocktest 接口进行流式返回的mock,其中要求接口请求方式为:api/mocktest/{first_time}/{total_time},first_time为首token耗时、total_time为流式返回总耗时

import sys
from flask import Flask, Response, request
import json
import time

app = Flask(__name__)

@app.route("/api/mocktest/<int:first_time>/<int:total_time>", methods=["POST"])
def process(first_time, total_time):
    if first_time >= total_time:
        return Response(json.dumps({"error": "First time is greater than total time"}), status=400)

    """
    /api接口定义,动态获取first token time和complete response time
    """
    stream = request.json.get("stream")
    stream_num = 10
    # 根据请求参数计算平均时间
    avg_time = (total_time - first_time) / (stream_num - 1)
    # 初始化流式数据
    mock_stream_data = [[json.dumps({"data": "this is a stream mock1", "status": 0}), first_time]]
    for i in range(2, stream_num + 1):
        res = [json.dumps({"data": "this is a stream mock" + str(i), "status": 0}), avg_time]
        mock_stream_data.append(res)

    # 初始化非流式数据
    no_stream_time = first_time
    mock_nostream_data = [
        [json.dumps({"data": "this is a nostream mock", "status": 0}), no_stream_time]
    ]

    def generate_stream():
        """
        流式返回数据
        """
        for each in mock_stream_data:
            v = each[0]
            t = each[1]
            if t > 0:
                time.sleep(t)  # 等待指定的时间后再发送下一条数据
            yield v + "\r\n"
            sys.stdout.flush()

    def generate_nostream():
        """
        非流式返回数据
        """
        v = mock_nostream_data[0][0]
        t = mock_nostream_data[0][1]
        if t > 0:
            time.sleep(t)
        yield v + "\r\n"

    if stream:
        return Response(generate_stream(), mimetype="text/plain")
    else:
        return Response(generate_nostream(), mimetype="text/plain")

app.run()

python运行后通过curl命令访问地址:
流式请求:

curl --location '127.0.0.1:5000/api/mocktest/3/10' \
--header 'Content-Type: application/json' \
--data '{
    "messages": [{"role": "user", "content": "发起请求"}],
    "stream": true
}'

结果:
在这里插入图片描述
非流式请求:

curl --location '127.0.0.1:5000/api/mocktest/3/10' \
--header 'Content-Type: application/json' \
--data '{
    "messages": [{"role": "user", "content": "发起请求"}],
    "stream": false
}'

结果:
在这里插入图片描述


http://www.kler.cn/a/354464.html

相关文章:

  • 数据结构——栈的实现
  • 【Optional 的 orElseGet 和 orElse区别】
  • 成为LabVIEW自由开发者
  • 什么是 ES6 “模板语法” ?
  • vue el table 不出滚动条样式显示 is_scrolling-none,如何修改?
  • 信息系统管理师试题-人力资源
  • Linux 重置 root 密码
  • Flume面试整理-Flume的基本架构
  • 限流是什么?如何限流?怎么限流?
  • 如何轻松使用pip安装Git仓库中的私有Python模块(使用pip和Git仓库发布和安装私有Python模块)
  • 解决ffmpeg通过srt文件给视频添加字幕时乱码问题
  • 【2024最新版】Win10下 Java环境变量配置----适合入门小白
  • RTThread-Nano学习二-RT-Thread启动流程
  • C会赢的!(牛客周赛 Round 58)
  • 力反馈手套如何在VR培训解决方案中为用户提供沉浸式体验?
  • c++链式调用
  • 【css-在一个元素中设置font-size和实际渲染字体大小不一致】
  • CAT(Card Application Toolkit)- LSI
  • Jenkins整合Docker实现CICD自动化部署(若依项目)
  • ESP32-IDF USART 专题
  • 如何在Android中进行日志打印和调试?
  • 即时通讯增加kafka渠道
  • 基于workbox实现PWA预缓存能力
  • 11.9K Star!强大的 Web 爬虫工具 FireCrawl:为 AI 训练与数据提取提供全面支持
  • 【Linux】解读信号的本质&相关函数及指令的介绍
  • UI自动化测试 —— web端元素获取元素等待实践!