当前位置: 首页 > article >正文

基于celery的任务管理,本文主要是处理自己的算法,暴露API,管理任务并发,多线程

基于celery的任务管理,本文主要是处理自己的算法,暴露API,管理任务并发,多线程

  • 基本需求描述
  • 潜在问题
  • 主函数
  • 配置文件

基本需求描述

  • 暴露API,供其他人调用算法。
  • 方便查看任务状态。
  • 因为服务器资源有限,控制并发数量。
  • 多任务并发加快处理速度。这里需要说明的是python本身是可以做多线程的,但是(1)直接使用threading,GIL的存在导致并不是多线程处理,实际上并发还是一个CPU核在处理;(2)可以使用multiprocessing来实现多线程,但是这里有个问题就是自己如果按照我之前的博客来通过数组来实现一个任务队列的话,必然会涉及到全局变量问题,这里全局变量是可以实现的,但是不够优雅,因为有很多现成的库具备更加强大的功能,所以如果不是离线环境等受多种客观环境限制,建议还是使用现成的即可,本文这里所以使用了celery+redis的方式来管理任务队列。
  • 管理任务队列,某个任务失败后自动再次尝试,如果超过尝试次数阈值则该任务将认为失败,不会再尝试处理。

潜在问题

  • 因为是针对我自己的需求,CPU每个核处理一个任务。这意味着如果你想几个任务多个CPU Core来处理的话,可以考虑算法中multiprocessing或者其他更优雅的方式,只是我在本项目中不需要考虑这个问题。

主函数

import time
import json
import redis
import logging
from flask import Flask, request, jsonify
from flask_cors import CORS
from celery import Celery
from config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, REDIS_HOST, REDIS_PORT, REDIS_DB, OUTPUT_DIR, CELERY_WORKER_CONCURRENCY, CELERY_TASK_MAX_RETRIES, CELERY_TASK_RETRY_DELAY

# your algorithms
from simulator import rainfall_simulation_gpu, rainfall_simulation_cpu

# Flask
app = Flask(__name__)
CORS(app)

app.config.update(
    CELERY_BROKER_URL=CELERY_BROKER_URL,
    CELERY_RESULT_BACKEND=CELERY_RESULT_BACKEND,
    JSON_AS_ASCII=False,
)

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(result_backend=app.config['CELERY_RESULT_BACKEND'], worker_concurrency=CELERY_WORKER_CONCURRENCY)

redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@celery.task(bind=True, max_retries=CELERY_TASK_MAX_RETRIES, default_retry_delay=CELERY_TASK_RETRY_DELAY)
def long_running_task(self, task_info):
    unique_key = f"task:{hash(str(task_info))}"
    try:
        if not redis_client.set(unique_key, self.request.id, nx=True, ex=3600):
            return {'status': 'Duplicate task, skipping execution'}
        logger.info(f"Processing task: {task_info}")
        if task_info.get('GPU') == 1:
            simulation_status, save_path = rainfall_simulation_gpu(task_info, ngpus=1, output_dir=OUTPUT_DIR)
        else:
            simulation_status, save_path = rainfall_simulation_cpu(task_info, output_dir=OUTPUT_DIR)
        if simulation_status:
            return {'status': 'Task completed!', 'save_path': save_path}
        raise Exception('Simulation failed')
    except Exception as exc:
        logger.error(f"Error in task: {exc}")
        if self.request.retries >= self.max_retries:
            redis_client.delete(unique_key)
            raise self.update_state(state='FAILURE', meta={'exc': str(exc)})
        raise self.retry(exc=exc)

@app.route('/new_task_rainfall_simulation/', methods=['POST'])
def new_task_rainfall_simulation():
    task_info = json.loads(request.data.decode('utf-8'))
    task = long_running_task.apply_async(args=[task_info])
    return jsonify({'task_id': task.id}), 202

@app.route('/task-status/<task_id>', methods=['GET'])
def task_status(task_id):
    task = long_running_task.AsyncResult(task_id)
    response = {'state': task.state, 'result': task.info if task.state in ['SUCCESS', 'FAILURE'] else None}
    return jsonify(response)

if __name__ == '__main__':
    app.run(host='your ip', port=5000) // set your server IP and port

配置文件

config.py

# Redis configuration
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0

# Celery configuration
CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'
CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'

# General application settings
JSON_AS_ASCII = False

# Simulation configuration
OUTPUT_DIR = 'xxx' # Save simulation results

# Celery worker configuration
CELERY_WORKER_CONCURRENCY = 6 # concurrent users, depends on your hardware and algorithms
CELERY_TASK_MAX_RETRIES = 2 # define retry times for each task
CELERY_TASK_RETRY_DELAY = 10 # retry the task after n seconds


http://www.kler.cn/a/503994.html

相关文章:

  • 网络基础知识指南|1-20个
  • 对MySQL滴MVCC理解(超详细)
  • 【React】新建React项目
  • Maven 在尝试连接到 Maven Central 仓库超时的解决方案和排查步骤
  • 小结:华为路由器常用的操作指令
  • 【Python】数据容器:列表,元组,字符串,集合字典及通用操作
  • LeetCode 2657. Find the Prefix Common Array of Two Arrays
  • SCDN跟高防IP相比哪个更好
  • 计算机视觉算法实战——实时车辆检测和分类(主页有相关源码)
  • 大语言模型训练的基本步骤解析
  • llama.cpp 模型可视化工具 GGUF Visualizer
  • 提高互联网Web安全性:避免越权漏洞的技术方案
  • 在Visual Studio中编译.c文件和.cpp文件主要有哪些不同
  • 第三篇 Avaya IP Office的架构及其服务组成
  • Mysql--运维篇--安全性(数据库访问控制,最小权限原则,表空间加密,TLS加密,证书签发,SQL注入及防范等)
  • centos 8 中安装Docker
  • [读书日志]8051软核处理器设计实战(基于FPGA)第七篇:8051软核处理器的测试(verilog+C)
  • 多商家入驻商城系统架构与功能分析
  • 《鸿蒙Next旅游应用:人工智能赋能个性化与智能导览新体验》
  • workloadSelector 是一种在服务网格(如Istio)中用于选择特定工作负载实例的机制。
  • Kafka权威指南(第2版)读书笔记
  • 【如何从0到1设计测试用例使用Fiddler完成弱网测试】
  • Android 调用系统服务接口获取屏幕投影(需要android.uid.system)
  • 数据结构之顺序结构二叉树(超详解)
  • Codeforces Round 976 (Div. 2) and Divide By Zero 9.0(A-E)
  • 接口测试总结(http与rpc)