当前位置: 首页 > article >正文

celery在django项目中实现并发任务和定时任务

创建一个django项目

django-admin startproject celeryDemo

进入项目目录

cd celeryDemo

在你的 Django 项目中,创建一个 celery_.py 文件,通常放在项目的根目录(与 settings.py 同级):

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 设置 Django 的默认配置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryDemo.settings')

app = Celery('celeryDemo')

# 从 Django 的 settings.py 中加载配置
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现任务模块
# app.autodiscover_tasks()
# 列出所有需要的应用
app.autodiscover_tasks(['home']) 

settings.py 中配置 Celery 添加一些基本配置,这里使用 Redis 作为消息代理:

# celery_config
CELERY_BROKER_URL = 'redis://localhost:6379/7'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/8'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'

创建任务 在你的 Django 应用中创建任务,app.tasks.py 文件中 app是你创建的app名 我这里是home

from celery import shared_task

# 定时任务
@shared_task
def my_task():
    # 任务逻辑
    print("Task is running!")
    return "Task done!"

# 并发任务
@shared_task
def add(x, y):
    print(x + y)
    return x + y

在app.views.py里添加视图函数

from django.shortcuts import render
from django.http import HttpResponse, JsonResponse
from home.tasks import add


def trigger_tasks(request):
    for i in range(10):
        add.delay(i, i + 1)  # 异步调用任务
    return HttpResponse("Tasks triggered!")


def trigger_task(request):
    # 调用并发任务
    result = add.delay(4, 6)  # 异步调用任务
    # 获取任务 ID
    task_id = result.id
    return JsonResponse({'task_id': task_id})


def get_result(request, task_id):
    from celery.result import AsyncResult
    result = AsyncResult(task_id)

    if result.ready():  # 检查任务是否完成
        response = {
            'task_id': task_id,
            'status': result.status,
            'result': result.result,  # 获取任务结果
        }
    else:
        response = {
            'task_id': task_id,
            'status': result.status,
            'result': None,
        }
    return JsonResponse(response)

设置定时任务 使用 Celery Beat 来设定定时任务。在你的 settings.py 中添加

from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'run-my-task-every-midnight': {
        'task': 'home.tasks.my_task',
        # 'schedule': crontab(minute=0, hour=0),  # 每天的 0 点 0 分执行
        'schedule': crontab('*/1'),  # 每1分钟执行一次
    },
}

创建tasks.py

from celery import shared_task

# 定时任务
@shared_task
def my_task():
    # 任务逻辑
    print("Task is running!")
    return "Task done!"

# 并发任务
@shared_task
def add(x, y):
    print(x + y)
    return x + y

设置 URL 路由 在你的 urls.py 中添加相应的 URL 路由,以便可以访问触发任务和获取结果的视图:

# home/urls.py
from django.urls import path
from . import views

urlpatterns = [
    path('trigger_tasks/', views.trigger_tasks, name='trigger_tasks'),
    path('trigger-task/', views.trigger_task, name='trigger_task'),
    path('get-result/<str:task_id>/', views.get_result, name='get_result'),
]
# urls.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('home/', include('home.urls')),
]

启动 Celery Worker 和 Beat 在命令行中,启动 Celery Worker 和 Beat

celery -A celeryDemo worker --loglevel=info --concurrency=4 -P thread

celery -A celeryDemo beat --loglevel=info

启动django项目

python manage.py runserver 8002

监控和调试

确保你能看到 Worker 的日志输出,以验证任务是否成功执行。你可以使用 Flower 来监控 Celery 任务的执行情况:

pip install flower
celery -A celeryDemo flower

然后访问 http://localhost:5555 以查看任务的状态。

调用异步任务返回id
![调用异步任务返回id](https://i-blog.csdnimg.cn/direct/d46c5f92121745eabd5c5bec3223928e.png)

获取异步任务结果
在这里插入图片描述

查看记录
在这里插入图片描述


http://www.kler.cn/a/379162.html

相关文章:

  • 解锁C#语法的无限可能:从基础到进阶的编程之旅
  • DETRs with Collaborative Hybrid Assignments Training论文阅读与代码
  • Jira中bug的流转流程
  • 【机器学习实战入门】使用OpenCV和Keras的驾驶员疲劳检测系统
  • Java设计模式——单例模式(特性、各种实现、懒汉式、饿汉式、内部类实现、枚举方式、双重校验+锁)
  • @Query(org.springframework.data.jpa.repository.Query)
  • SOLIDWORKS 2025用户体验新功能
  • NineData云原生智能数据管理平台新功能发布|2024年10月版
  • distrobox install in ubuntu 22.04 / 在 ubuntu 22.04 上安装 distrobox (***) OK
  • qt的c++环境配置和c++基础【正点原子】嵌入式Qt5 C++开发视频
  • Stable Diffusion Web UI 1.9.4常用插件扩展-WD14-tagger
  • Spring Boot技术:校园社团信息管理的创新解决方案
  • 123.WEB渗透测试-信息收集-ARL(14)
  • 初始计算机网络
  • sqlserver、达梦、mysql的差异
  • React 组件生命周期与 Hooks 简明指南
  • HTTP代理是什么?有什么用?
  • git pull遇到一个问题
  • 揭秘Scam-as-a-Service:警惕钓鱼攻击的产业化
  • centos7之LVS-DR模式传统部署
  • 21 Docker容器集群网络架构:四、Docker集群网络验证
  • 在k8s环境中如何在本地和pod之间同步文件?
  • 基于微信小程序的生签到系统设计与实现(lw+演示+源码+运行)
  • 一键式配置适合 Web 开发的Ubuntu系统
  • 采用SpeedL模式控制UR5e机器人
  • HarmonyOS开发 - 餐饮APP中多门店多窗口打开实例补充