基于celery的任务管理,本文主要是处理自己的算法,暴露API,管理任务并发,多线程
基于celery的任务管理,本文主要是处理自己的算法,暴露API,管理任务并发,多线程
- 基本需求描述
- 潜在问题
- 主函数
- 配置文件
基本需求描述
- 暴露API,供其他人调用算法。
- 方便查看任务状态。
- 因为服务器资源有限,控制并发数量。
- 多任务并发加快处理速度。这里需要说明的是python本身是可以做多线程的,但是(1)直接使用threading,GIL的存在导致并不是多线程处理,实际上并发还是一个CPU核在处理;(2)可以使用multiprocessing来实现多线程,但是这里有个问题就是自己如果按照我之前的博客来通过数组来实现一个任务队列的话,必然会涉及到全局变量问题,这里全局变量是可以实现的,但是不够优雅,因为有很多现成的库具备更加强大的功能,所以如果不是离线环境等受多种客观环境限制,建议还是使用现成的即可,本文这里所以使用了celery+redis的方式来管理任务队列。
- 管理任务队列,某个任务失败后自动再次尝试,如果超过尝试次数阈值则该任务将认为失败,不会再尝试处理。
潜在问题
- 因为是针对我自己的需求,CPU每个核处理一个任务。这意味着如果你想几个任务多个CPU Core来处理的话,可以考虑算法中multiprocessing或者其他更优雅的方式,只是我在本项目中不需要考虑这个问题。
主函数
import time
import json
import redis
import logging
from flask import Flask, request, jsonify
from flask_cors import CORS
from celery import Celery
from config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, REDIS_HOST, REDIS_PORT, REDIS_DB, OUTPUT_DIR, CELERY_WORKER_CONCURRENCY, CELERY_TASK_MAX_RETRIES, CELERY_TASK_RETRY_DELAY
# your algorithms
from simulator import rainfall_simulation_gpu, rainfall_simulation_cpu
# Flask
app = Flask(__name__)
CORS(app)
app.config.update(
CELERY_BROKER_URL=CELERY_BROKER_URL,
CELERY_RESULT_BACKEND=CELERY_RESULT_BACKEND,
JSON_AS_ASCII=False,
)
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(result_backend=app.config['CELERY_RESULT_BACKEND'], worker_concurrency=CELERY_WORKER_CONCURRENCY)
redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@celery.task(bind=True, max_retries=CELERY_TASK_MAX_RETRIES, default_retry_delay=CELERY_TASK_RETRY_DELAY)
def long_running_task(self, task_info):
unique_key = f"task:{hash(str(task_info))}"
try:
if not redis_client.set(unique_key, self.request.id, nx=True, ex=3600):
return {'status': 'Duplicate task, skipping execution'}
logger.info(f"Processing task: {task_info}")
if task_info.get('GPU') == 1:
simulation_status, save_path = rainfall_simulation_gpu(task_info, ngpus=1, output_dir=OUTPUT_DIR)
else:
simulation_status, save_path = rainfall_simulation_cpu(task_info, output_dir=OUTPUT_DIR)
if simulation_status:
return {'status': 'Task completed!', 'save_path': save_path}
raise Exception('Simulation failed')
except Exception as exc:
logger.error(f"Error in task: {exc}")
if self.request.retries >= self.max_retries:
redis_client.delete(unique_key)
raise self.update_state(state='FAILURE', meta={'exc': str(exc)})
raise self.retry(exc=exc)
@app.route('/new_task_rainfall_simulation/', methods=['POST'])
def new_task_rainfall_simulation():
task_info = json.loads(request.data.decode('utf-8'))
task = long_running_task.apply_async(args=[task_info])
return jsonify({'task_id': task.id}), 202
@app.route('/task-status/<task_id>', methods=['GET'])
def task_status(task_id):
task = long_running_task.AsyncResult(task_id)
response = {'state': task.state, 'result': task.info if task.state in ['SUCCESS', 'FAILURE'] else None}
return jsonify(response)
if __name__ == '__main__':
app.run(host='your ip', port=5000) // set your server IP and port
配置文件
config.py
# Redis configuration
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
# Celery configuration
CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'
CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'
# General application settings
JSON_AS_ASCII = False
# Simulation configuration
OUTPUT_DIR = 'xxx' # Save simulation results
# Celery worker configuration
CELERY_WORKER_CONCURRENCY = 6 # concurrent users, depends on your hardware and algorithms
CELERY_TASK_MAX_RETRIES = 2 # define retry times for each task
CELERY_TASK_RETRY_DELAY = 10 # retry the task after n seconds