celery在django项目中实现并发任务和定时任务
创建一个django项目
django-admin startproject celeryDemo
进入项目目录
cd celeryDemo
在你的 Django 项目中,创建一个 celery_.py
文件,通常放在项目的根目录(与 settings.py
同级):
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# 设置 Django 的默认配置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryDemo.settings')
app = Celery('celeryDemo')
# 从 Django 的 settings.py 中加载配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现任务模块
# app.autodiscover_tasks()
# 列出所有需要的应用
app.autodiscover_tasks(['home'])
在 settings.py
中配置 Celery 添加一些基本配置,这里使用 Redis 作为消息代理:
# celery_config
CELERY_BROKER_URL = 'redis://localhost:6379/7'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/8'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
创建任务 在你的 Django 应用中创建任务,app.tasks.py
文件中 app是你创建的app名 我这里是home
from celery import shared_task
# 定时任务
@shared_task
def my_task():
# 任务逻辑
print("Task is running!")
return "Task done!"
# 并发任务
@shared_task
def add(x, y):
print(x + y)
return x + y
在app.views.py里添加视图函数
from django.shortcuts import render
from django.http import HttpResponse, JsonResponse
from home.tasks import add
def trigger_tasks(request):
for i in range(10):
add.delay(i, i + 1) # 异步调用任务
return HttpResponse("Tasks triggered!")
def trigger_task(request):
# 调用并发任务
result = add.delay(4, 6) # 异步调用任务
# 获取任务 ID
task_id = result.id
return JsonResponse({'task_id': task_id})
def get_result(request, task_id):
from celery.result import AsyncResult
result = AsyncResult(task_id)
if result.ready(): # 检查任务是否完成
response = {
'task_id': task_id,
'status': result.status,
'result': result.result, # 获取任务结果
}
else:
response = {
'task_id': task_id,
'status': result.status,
'result': None,
}
return JsonResponse(response)
设置定时任务 使用 Celery Beat 来设定定时任务。在你的 settings.py
中添加
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'run-my-task-every-midnight': {
'task': 'home.tasks.my_task',
# 'schedule': crontab(minute=0, hour=0), # 每天的 0 点 0 分执行
'schedule': crontab('*/1'), # 每1分钟执行一次
},
}
创建tasks.py
from celery import shared_task
# 定时任务
@shared_task
def my_task():
# 任务逻辑
print("Task is running!")
return "Task done!"
# 并发任务
@shared_task
def add(x, y):
print(x + y)
return x + y
设置 URL 路由 在你的 urls.py
中添加相应的 URL 路由,以便可以访问触发任务和获取结果的视图:
# home/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('trigger_tasks/', views.trigger_tasks, name='trigger_tasks'),
path('trigger-task/', views.trigger_task, name='trigger_task'),
path('get-result/<str:task_id>/', views.get_result, name='get_result'),
]
# urls.py
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('home/', include('home.urls')),
]
启动 Celery Worker 和 Beat 在命令行中,启动 Celery Worker 和 Beat
celery -A celeryDemo worker --loglevel=info --concurrency=4 -P thread
celery -A celeryDemo beat --loglevel=info
启动django项目
python manage.py runserver 8002
监控和调试
确保你能看到 Worker 的日志输出,以验证任务是否成功执行。你可以使用 Flower 来监控 Celery 任务的执行情况:
pip install flower
celery -A celeryDemo flower
然后访问 http://localhost:5555
以查看任务的状态。
调用异步任务返回id
![调用异步任务返回id
获取异步任务结果
查看记录