当前位置: 首页 > article >正文

【最后203篇系列】025 FastAPI+Celery

我发现我纠结于这个问题好几年了,在再次改版之前回顾一下:

1 Python 全栈系列22 - 分布式消息管理 celery

这个是2020-09的,当时准备开写,没有完成。里面有引用几个帖子

使用Celery踩过的坑

这个帖子说了启动的问题,多个q,worker使用次数的问题。

celer介绍

这个相当于已经给了一个精简的使用版本
在这里插入图片描述

2 建模杂谈系列36- celery + rabbiqmq进行分布式计算

这个是2020-10,只写了一些基本的想法

1 执行的任务是I/O问题还是计算问题。如果是IO并行,那么一个cpu也可以(协程)。
2 如果任务是类似矩阵计算之类的,那么单核利用已经达到最高。(如果是for的话其实本身可以考虑用矩阵提升)
3 如果任务的网络结构是比较扁平(而非纵深)的,那么适合分布执行。

3 建模杂谈系列38- 基于celery、rabbitmq、redis和asyncio的分布并行处理(概述)

2020-10的晚些时候,我终于发了第一篇帖子

当时是考虑用celery worker直接做计算负载;现在看来定位不太合适。不同的任务所需要的环境不同,最后的情况是逻辑跟着容器走。

4 建模杂谈系列39- celery + redis的单机异步并行

2020-10 试着用celery的worker进行etl类的工作,用pandas处理。其实这种计算密集的任务,用celery的意义不大,只是调度而已。如果真的计算量大,其实可以起任何一个容器,共同消耗任务队列就可以了。(或者说更具弹性,不需要考虑其他因素,只是队列)

5 Python 全栈系列48 - celery + flask 异步调用任务

2020-11 还是讨论了把逻辑放到task里,然后还额外设计了冗余分发的策略。也讨论了使用rabbitmq作为消息队列。

总结到这里,我也蛮奇怪celery和 rabbitmq是我拿起又放下好几次的工具,问题到底在哪里?

首先,在部署方式上总是把celery作为本地服务部署(systemd)
其次,希望celery做通用的计算任务
最后,rabbitmq总是作为简单队列被使用(高级功能没有真正发挥作用)

是不是可以得出这样的结论:celery不要直接在宿主机上,不要期待它能做通用任务。而rabbitmq的真正使用场景并不是这个(简单任务队列)。

6 Python 全栈系列194 Flask+Celery魔改版:准实时请求

2022-10 这是过了2年,然后我又想起来用Celery了,这时候结合了Flask。应该说,这时候的形态有点成熟了,和最新的设计想法在某方面很相似。

  • 1 微服务传入请求。发布任务时不是直接通过celery,而是通过Flask。
  • 2 伪实时请求。背后使用异步服务,而让前端感知为实时服务。

从现在的视角来看,当时的很多地方还是不够好,没用起来是正常的。

7 Python 全栈系列243 S2S flask_celery

2024-5 只是又过了快2年,继续在尝试。当时这个概念Stream To Stream,到当前版本是打算采用的,不过此时队列换成了Kafka。Kafka的特性可以让件事成功,这算是工具的问题了。
在这里插入图片描述
卖糕的,我觉得这个celery是现在还在运行的。任务还不少。这种参数化的执行是我现在也希望这么定义的。

任务通过ORM存在Mongo中,这个可以替换成我新的AMalchemy对象。

在找到项目文件夹了,写的非常完整,当时定义了三类任务:

  • 1 s2s: 从stream到stream
  • 2 s2ch: 从stream到clickhouse
  • 3 exe_sh:执行脚本

当时只用了一个server.py文件就完成了,还非常精简。

因为是systemd执行的,我真的没发现…
在这里插入图片描述
所以,除了一些特别特别基础的服务,真的不要systemd,会忘记…

另外,当时这个项目应该被打断了,去干了一些别的占用精力的项目,所以也没有继续去完善。然后在去年我决定再开一个fastapi框架,所以这个就被扔到一边了。不过看起来这个应该还是在稳定执行的。

8 Python 全栈系列253 再梳理flask-celery的搭建

2024-6 在上次的基础上发现了一些问题
在这里插入图片描述
这大概也是celery在我这里突然重要性下降的原因:大量的任务实际上是IO任务,如果都是线程同步占用就毫无意义了。这时候还是在考虑使用systemd来创建服务。


然后为啥又要搞celery?

我记得后来我有看了一些工具,airflow, prefect,发现这些工具似乎背后还是celery,那我问题就不在celery,而是我的认知可能有偏差。然后随着我对异步服务这块做了不少实践,再加上大模型的知识,我觉得似乎可以再搞一次。

这次目标非常明确:解决IO并发任务。

上面的目标就会要求:

  • 1 采用异步/协程方式
  • 2 方便进行定时控制

然后我也既不希望烂尾,也不希望失控(之前那个默默运行中的celery…)。
所以会采用一种微服务+ORM的方式进行。这被证明是黄金搭档,目前这种方式的服务都运行且管理良好。

最后,大约还需要一个表格状的前端来展示当前任务的状态。并能够通过按钮来进行任务状态的改变。

从应用上:

  • 1 一些定期需要通过爬取或者api获得数据的程序集中运行【数据获取】
  • 2 复杂任务,需要通过api调用完成,比如大模型api 【计算负载】

对于celery而言,本身不承担任何计算负载,只是完成api调用,这一定是可以做成异步的。

这样从功能上和应用上就能满足我的要求了。

新的架构

这是我最关心的基础:celery的worker可以以协程方式工作。
在这里插入图片描述

最后的产出是一个微服务:celery_fastapi_apscheduler_24165_24166
一个ORM:WCelery

先来细细看一下:

1 我所关心的协程模式

--loglevel=debug 可以看到比较详细的启动信息(嗯,其实gevent的信息用info级别也有)

(base) root@a93c876766f4:/workspace# celery -A celery_app.celery_app worker --loglevel=debug --pool=gevent --concurrency=100
[2025-03-28 23:23:10,249: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2025-03-28 23:23:10,251: DEBUG/MainProcess] | Worker: Building graph...
[2025-03-28 23:23:10,251: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Pool, Autoscaler, Beat, StateDB, Consumer}
[2025-03-28 23:23:10,258: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2025-03-28 23:23:10,258: DEBUG/MainProcess] | Consumer: Building graph...
[2025-03-28 23:23:10,316: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Agent, Events, Mingle, Gossip, Tasks, Control, Heart, event loop}

 -------------- celery@a93c876766f4 v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- Linux-5.15.0-60-generic-x86_64-with-glibc2.36 2025-03-28 23:23:10
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         celery_app:0x7f1e5c928b90
- ** ---------- .> transport:   redis://:**@172.17.0.1:24008/1
- ** ---------- .> results:     redis://:**@172.17.0.1:24008/1
- *** --- * --- .> concurrency: 100 (gevent)  <------ 这里说明用gevent启动协程了
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . celery_app.check_gevent
  . celery_app.process_task
  . celery_app.sniff_mongo_change

[2025-03-28 23:23:10,397: DEBUG/MainProcess] | Worker: Starting Pool
[2025-03-28 23:23:10,397: DEBUG/MainProcess] ^-- substep ok
[2025-03-28 23:23:10,398: DEBUG/MainProcess] | Worker: Starting Consumer
[2025-03-28 23:23:10,398: DEBUG/MainProcess] | Consumer: Starting Connection
[2025-03-28 23:23:10,411: INFO/MainProcess] Connected to redis://:**@172.17.0.1:24008/1
[2025-03-28 23:23:10,412: DEBUG/MainProcess] ^-- substep ok
[2025-03-28 23:23:10,412: DEBUG/MainProcess] | Consumer: Starting Events
[2025-03-28 23:23:10,415: DEBUG/MainProcess] ^-- substep ok
[2025-03-28 23:23:10,415: DEBUG/MainProcess] | Consumer: Starting Mingle
[2025-03-28 23:23:10,416: INFO/MainProcess] mingle: searching for neighbors
[2025-03-28 23:23:11,440: INFO/MainProcess] mingle: all alone
[2025-03-28 23:23:11,440: DEBUG/MainProcess] ^-- substep ok
[2025-03-28 23:23:11,440: DEBUG/MainProcess] | Consumer: Starting Gossip
[2025-03-28 23:23:11,449: DEBUG/MainProcess] ^-- substep ok
[2025-03-28 23:23:11,449: DEBUG/MainProcess] | Consumer: Starting Tasks
[2025-03-28 23:23:11,455: DEBUG/MainProcess] ^-- substep ok
[2025-03-28 23:23:11,455: DEBUG/MainProcess] | Consumer: Starting Control
[2025-03-28 23:23:11,455: DEBUG/MainProcess] ^-- substep ok
[2025-03-28 23:23:11,455: DEBUG/MainProcess] | Consumer: Starting Heart
[2025-03-28 23:23:11,461: INFO/MainProcess] pidbox: Connected to redis://:**@172.17.0.1:24008/1.
[2025-03-28 23:23:11,463: DEBUG/MainProcess] ^-- substep ok
[2025-03-28 23:23:11,463: DEBUG/MainProcess] | Consumer: Starting event loop
[2025-03-28 23:23:11,463: INFO/MainProcess] celery@a93c876766f4 ready.
[2025-03-28 23:23:11,463: DEBUG/MainProcess] basic.qos: prefetch_count->400
# 终端1
from celery_app import check_gevent
check_gevent.delay()

# 服务终端打印
[2025-03-28 23:36:24,139: WARNING/MainProcess] Running in greenlet: <Greenlet at 0x7fedb8f013a0: apply_target(<function fast_trace_task at 0x7fede5211c60>, ('celery_app.check_gevent', 'd95278c2-5324-4287-b1, {}, <bound method create_request_cls.<locals>.Request., <bound method Request.on_accepted of <Request: cel, timeout=None, timeout_callback=<bound method Request.on_timeout of <Request: cele)>

并发测试,服务开100并发,这么执行没问题

In [3]: from celery_app import process_task  # 替换为你的任务
   ...: import time
   ...:
   ...: def test_concurrency(n=100):
   ...:     start = time.time()
   ...:     results = [process_task.delay(f"task-{i}") for i in range(n)]
   ...:     print(f"已发送 {n} 个任务,耗时: {time.time() - start:.2f}s")
   ...:
   ...:     # 可选:等待所有任务完成(同步阻塞)
   ...:     for r in results:
   ...:         r.get(timeout=30)  # 设置超时避免卡死
   ...:     print(f"所有任务完成,总耗时: {time.time() - start:.2f}s")
   ...:
   ...: if __name__ == "__main__":
   ...:     test_concurrency(500)  # 测试 500 并发
   ...:
已发送 500 个任务,耗时: 0.60s
所有任务完成,总耗时: 53.98s

当服务开500并发,然后就出错了,正好单个任务的时间超过了之前设定的30s,重新改60s,问题解决。【并发大不一定有用】500和100相比,服务多了5倍,而实际的能力大约只能提升2倍。

In [7]: from celery_app import process_task  # 替换为你的任务
   ...: import time
   ...:
   ...: def test_concurrency(n=100):
   ...:     start = time.time()
   ...:     results = [process_task.delay(f"task-{i}") for i in range(n)]
   ...:     print(f"已发送 {n} 个任务,耗时: {time.time() - start:.2f}s")
   ...:
   ...:     # 可选:等待所有任务完成(同步阻塞)
   ...:     for r in results:
   ...:         r.get(timeout=60)  # 设置超时避免卡死
   ...:     print(f"所有任务完成,总耗时: {time.time() - start:.2f}s")
   ...:
   ...: if __name__ == "__main__":
   ...:     test_concurrency(500)  # 测试 500 并发
   ...:
已发送 500 个任务,耗时: 0.59s
所有任务完成,总耗时: 34.24s

然后我又试了下线程,开到100时还行,速度甚至比协程500还快;但是并发开到500就直接bbq了, 看到任务有执行,但是前端没有收到结果。
在这里插入图片描述
嗯,最后跑完了,没有超时,但是效果大幅降低了。看来是线程之间切换的问题。另外就是线程的确比协程要更消耗资源,占的内存更多。整体上感觉,线程不宜太多。

综上:

  • 1 协程验证没问题,并发可以至少设置100
  • 2 我大部分应该都是采用微服务执行复杂任务,所以一般采用协程。
  • 3 当需要进行密集计算时 ,可能还是采用进程

在这里插入图片描述

2 部署

首先,得搞清楚Celery服务的定位。
在这里插入图片描述
在这里插入图片描述
也就是说,celery的负载主要在运行服务的位置,和调用的位置无关。
如果是在本地,可以这样统一去调用

from celery_app import process_task,sniff_mongo_change

result = process_task.delay("test_value")
print(result.get())  # 获取任务结果

result = sniff_mongo_change.delay(db_server='xxx', tier1='xxx', tier2='xxx',query_col='xxx')
print(result.get())  # 获取任务结果

我上午在想,那么直接用celery+redis是不是就够了。例如,有一个celery worker,可以动态检测任务文件夹,然后开发端就可以动态的增加各种任务,然后通过这个worker执行…

结论是No。

没有一种方案能解决所有的问题,我们应该是根据不同类型的问题选择不同的方案。这里不同类型并不是无限枚举,而是基于同步、异步这样大的MECE框架分的。

从整个数据处理流程上,数据的IO和计算显然是两块完全不同的场景。所以Celery应该专注于解决IO问题,比如说获取数据,流转数据等。在大模型时代,这块工作的重要性被提高了:一般企业不可能部署大模型,因此只能向大厂调用。此时,原本繁重的计算问题,变成了IO问题 (prompt + 数据) -> 结果。而重计算的框架,则交给ray和dask这样的计算框架。
在这里插入图片描述
定义一个通用的http任务:

import httpx
from httpx import Timeout
from pydantic import BaseModel
@celery_app.task(
    name="http_request",
    autoretry_for=(httpx.RequestError,),
    retry_kwargs={'max_retries': 3, 'countdown': 2},
    time_limit=200
)
def http_request(
    url: str, 
    json_data: dict, 
    timeout: float = 10.0,
    headers: dict = None
):
    """
    同步HTTP POST请求
    :param url: 请求地址
    :param json_data: POST的JSON数据
    :param timeout: 超时时间(秒)
    :param headers: 可选请求头
    """
    with httpx.Client() as client:
        resp = client.post(
            url,
            json=json_data,
            timeout=Timeout(timeout),
            headers=headers or {"Content-Type": "application/json"}
        )
        resp.raise_for_status()
        return resp.json()

3 几种调用方法

同步(阻塞)调用。这个比较简单,适合用来做测试。

task = http_request.delay(
    url=test_url,
    json_data={"key": "value"},
    timeout=5.0
)

try:
    # 阻塞等待结果(超时时间建议大于Celery的time_limit)
    result = task.get(timeout=30)  # 最多等待30秒
    print("Result:", result)
except Exception as e:
    print("Task failed:", str(e))

异步分为两种,第一种是for循环加asleep,这种相对简单(我现在做的伪实时服务就是这样)

import asyncio
from celery.result import AsyncResult

async def async_wait_for_task(task_id: str, timeout: int = 30):
    """异步等待Celery任务完成"""
    result = AsyncResult(task_id)
    
    for _ in range(timeout):
        if result.ready():
            return result.result
        await asyncio.sleep(1)  # 非阻塞等待
    
    raise TimeoutError(f"Task {task_id} timeout after {timeout}s")

第二种是用异步服务封装,这就是关于【FastAPI】的部分

# 示例:FastAPI路由
@app.post("/run-task")
async def run_task():
    task = http_request.delay(
        url=test_url,
        json_data={"key": "value"},
        timeout=5.0
    )
    
    try:
        result = await async_wait_for_task(task.id)
        return {"status": "success", "result": result}
    except TimeoutError as e:
        return {"status": "error", "message": str(e)}

另一大类则是基于事件的方式。
在这里插入图片描述

第一种,应该来说也是最合适在内部系统里流转的方式。事件的callback。

from celery.signals import task_success

@task_success.connect(sender='http_request')
def handle_task_result(sender=None, result=None, **kwargs):
    """任务完成时自动触发"""
    print(f"Task {sender.request.id} completed! Result: {result}")

# 调用任务(无需主动等待)
http_request.delay(url=test_url, json_data={"key": "value"})

后来我试了下,发现有点不靠谱,我也懒得调,和deepseek商量了下,用webhook来替代这个动作。

修改后的celery_app服务代码如下:

import httpx
from httpx import Timeout
from pydantic import BaseModel
import time 
@celery_app.task(
    name="celery_app.http_request",
    autoretry_for=(httpx.RequestError,),
    retry_kwargs={'max_retries': 3, 'countdown': 3},
    time_limit=200
)
def http_request(
    url: str, 
    json_data: dict, 
    timeout: float = 10.0,
    headers: dict = None,
    webhook_url: str = None  # 新增webhook参数
):
    """
    同步HTTP POST请求
    :param url: 请求地址
    :param json_data: POST的JSON数据
    :param timeout: 超时时间(秒)
    :param headers: 可选请求头
    :webhook_url: 回调通知地址(可选)
    """
    # 设置默认headers
    final_headers = {"Content-Type": "application/json"}
    if headers:
        final_headers.update(headers)

    try:
        # 1. 执行主请求
        with httpx.Client() as client:
            resp = client.post(
                url,
                json=json_data,
                timeout=Timeout(timeout),
                headers=headers or {"Content-Type": "application/json"}
            )
            resp.raise_for_status()
            result = resp.json()
        
        # 2. 如果配置了webhook则回调
        if webhook_url:
            # 【主请求】
            webhook_payload = {
                "status": "success",
                "task_result": result,
                "task_for": json_data.get("task_for", "tem"),
                "function": json_data.get("function", "tem"),
                "rec_id": json_data.get("rec_id", str(int(time.time()*1e6))),
                "metadata": {
                    "source_url": url,
                    "timestamp": get_time_str1()
                }
            }
            try:
                with httpx.Client() as client:
                    callback_resp = client.post(
                        webhook_url,
                        json=webhook_payload,
                        timeout=5.0
                    )
                    callback_resp.raise_for_status()
            except Exception as e:
                print(f"Webhook回调失败: {str(e)}")
                # 这里不建议raise,因为主请求已成功
            
        return result

    except httpx.HTTPStatusError as e:
        # 处理HTTP状态码错误
        error_result = {
            "error": str(e),
            "status_code": e.response.status_code,
            "response": e.response.text
        }
        if webhook_url:
            # 即使主请求失败也尝试发送webhook
            try:
                with httpx.Client() as client:
                    client.post(
                        webhook_url,
                        json={
                            "status": "error",
                            "error": error_result,
                            "metadata": {
                                "source_url": url,
                                "timestamp": get_time_str1()
                            }
                        },
                        timeout=5.0
                    )
            except Exception:
                pass
        return error_result

主要改进点:

  • 1.错误处理更完善:

    • 1 保留了resp.raise_for_status()以确保HTTP状态码正确
    • 2 对主请求和webhook请求分别处理异常
  • 2.headers处理更规范:

    • 1 先设置默认headers再合并自定义headers
  • 3.webhook逻辑优化:

    • 1 即使主请求失败也会尝试发送错误通知
    • 2 webhook失败不再影响主任务结果
  • 4.代码结构更清晰:

    • 1 使用嵌套的try-except块区分不同层级的错误
    • 2 添加了更详细的注释和文档字符串
  1. 资源管理:
    • 1 为webhook请求也使用了Client上下文管理器

调用如下:不带webhook的模式

# 获取等待
from celery_app import http_request
test_url ="https://httpbin.org/post"

task = http_request.delay(**{
    "url": test_url,
    "json_data": {"key": "value"},
    "timeout": 5.0
})
try:
    # 阻塞等待结果(超时时间建议大于Celery的time_limit)
    result = task.get(timeout=30)  # 最多等待30秒
    print("Result:", result)
except Exception as e:
    print("Task failed:", str(e))


Result: {'args': {}, 'data': '{"key": "value"}', 'files': {}, 'form': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Content-Length': '16', 'Content-Type': 'application/json', 'Host': 'httpbin.org', 'User-Agent': 'python-httpx/0.27.2', 'X-Amzn-Trace-Id': 'Root=1-67e811ed-257a775a7be6f5f56666bbcc'}, 'json': {'key': 'value'}, 'origin': '45.126.120.54', 'url': 'https://httpbin.org/post'}

带webhook的模式:

webhook_url="https://your-webhook.example.com/notify"
from celery_app import http_request
test_url ="https://httpbin.org/post"
task = http_request.delay(**{
    "url": test_url,
    "json_data": {"key": "value"},
    "timeout": 5.0,
    'webhook_url':webhook_url
})
try:
    # 阻塞等待结果(超时时间建议大于Celery的time_limit)
    result = task.get(timeout=30)  # 最多等待30秒
    print("Result:", result)
except Exception as e:
    print("Task failed:", str(e))


Result: {'args': {}, 'data': '{"key": "value"}', 'files': {}, 'form': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Content-Length': '16', 'Content-Type': 'application/json', 'Host': 'httpbin.org', 'User-Agent': 'python-httpx/0.27.2', 'X-Amzn-Trace-Id': 'Root=1-67e8127d-4cd39ca9145a9e095d0ba9f5'}, 'json': {'key': 'value'}, 'origin': '45.126.120.54', 'url': 'https://httpbin.org/post'}

对应的服务器信息:可以看到虚拟的webhook有问题,但是不影响返回。之后我做好一个专门的微服务来处理就可以了。

[2025-03-29 23:32:14,774: WARNING/MainProcess] Webhook回调失败: [Errno -2] Name or service not known
[2025-03-29 23:32:14,853: INFO/MainProcess] Task celery_app.http_request[8a5bb464-6a7e-49e2-bd04-3a5658f8bf91] succeeded in 2.476185337989591s: {'args': {}, 'data': '{"key": "value"}', 'files': {}, 'form': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Content-Length': '16', 'Content-Type': 'application/json', 'Host': 'httpbin.org', 'User-Agent': 'python-httpx/0.27.2', 'X-Amzn-Trace-Id': 'Root=1-67e8127d-4cd39ca9145a9e095d0ba9f5'}, 'json': {'key': 'value'}, 'origin': '45.126.120.54', 'url': 'https://httpbin.org/post'}

callback 和 webhook,我的理解是一样的东西,实现的机制不同。webhook出现的更晚,应该是更好一些的。callback的问题是耦合太紧了,webhook比较灵活。
在这里插入图片描述
在这里插入图片描述
还有另外两种返回的方法 SSE、WebSocket都需要借助Web服务器实现【FastAPI】。

Client Server Celery 发起任务 (POST /run-task) 提交任务 返回task_id 返回task_id (HTTP 202) 建立SSE连接 (GET /task-events/{task_id}) 定期检查任务状态 任务状态更新 通过SSE推送事件 Client Server Celery

SSE的样例

server.py 服务器端代码

from fastapi import FastAPI, Response
from fastapi.responses import StreamingResponse
from celery.result import AsyncResult

app = FastAPI()

@app.post("/run-task")
async def run_task():
    # 提交Celery任务
    task = http_request.delay(url="https://api.example.com", json_data={"key": "value"})
    return {"task_id": task.id}  # 立即返回task_id

@app.get("/task-events/{task_id}")
async def stream_task_events(task_id: str):
    """SSE事件流端点"""
    async def event_generator():
        result = AsyncResult(task_id)
        
        while not result.ready():
            # 推送等待状态
            yield {
                "event": "status",
                "data": {"status": "pending", "task_id": task_id}
            }
            await asyncio.sleep(1)  # 检查间隔
        
        # 任务完成时推送结果
        yield {
            "event": "result",
            "data": {
                "status": "completed",
                "result": result.result,
                "task_id": task_id
            }
        }
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache"}
    )

js 前端代码

// 1. 首先提交任务
const startTask = async () => {
  const res = await fetch('/run-task', {method: 'POST'});
  const {task_id} = await res.json();
  listenToTaskEvents(task_id);
};

// 2. 监听SSE事件
const listenToTaskEvents = (task_id) => {
  const eventSource = new EventSource(`/task-events/${task_id}`);
  
  eventSource.addEventListener('status', (e) => {
    console.log('Status update:', JSON.parse(e.data));
  });
  
  eventSource.addEventListener('result', (e) => {
    console.log('Final result:', JSON.parse(e.data));
    eventSource.close();  // 关闭连接
  });
  
  eventSource.onerror = () => {
    console.error('SSE error');
    eventSource.close();
  };
};

// 触发任务
startTask();

websocket方式

server.py

# 示例:FastAPI + WebSocket
@app.websocket("/task-status/{task_id}")
async def websocket_task_status(websocket: WebSocket, task_id: str):
    await websocket.accept()
    result = AsyncResult(task_id)
    
    while not result.ready():
        await asyncio.sleep(0.5)
    
    await websocket.send_json({
        "status": "completed",
        "result": result.result
    })

SSE和WebSocket方式我还不那么熟,回头可以试试看。SSE看起来是Stream的方式,这个和大模型实时返回有点像,websocket主要是用了新的协议,是可以在连接中双向交互的,适合实时对话的场景。


到这里写的有点太长了,我自己也没想到。还有一部分是关于FastAPI和Celery融合起来的(可以看到,如果想通过接口使用,还是要微服务);然后再写一个ORM来进行简便控制。这篇看明天能不能写出来吧。


http://www.kler.cn/a/614883.html

相关文章:

  • 企业级知识库建设:自建与开源产品集成的全景解析 —— 产品经理、CTO 与 CDO 的深度对话
  • python纯终端实现图片查看器(全彩)(windows)
  • 数据化管理(一)---什么是数据化管理
  • 【Linux】常见信号 + 进程和作业
  • 【弹性计算】异构计算云服务和 AI 加速器(五):EAIS AI 加速器
  • Apache Iceberg 解析,一文了解Iceberg定义、应用及未来发展
  • 【SDMs分析1】基于ENMTools R包的生态位分化分析和图像绘制(identity.test())
  • 全局思维与系统思考
  • Vue学习笔记集--computed
  • Jenkins插件安装失败如何解决
  • Spring Cloud Consul使用指南
  • 【现代深度学习技术】现代卷积神经网络04:含并行连接的网络(GoogLeNet)
  • 指针和引用
  • 通过一个led点灯的demo来熟悉openharmony驱动编写的过程(附带hdf详细调用过程)
  • 聚合根的特性
  • JavaScript基础-navigator 对象
  • Docker 的实质作用是什么
  • 蓝桥杯省模拟赛 质因数之和
  • 视频AI赋能水利行业生态治理,水电站大坝漂浮物实时监测与智能预警方案
  • 【C++标准IO库】文件的输入输出