django-celery-beat搭建定时任务
一、创建django项目和app
1、安装定时任务第三方包
pip install django-celery-beat # 插件用来动态配置定时任务,一般会配合 django_celery_results 一起使用,所以一起安装 django_celery_results
pip install django_celery_results
pip install eventlet # windows下运行celery 4以后版本,还需额外安装eventlet库
2、创建django项目并创建一个使用定时任务的app
1.1创建django项目并创建app
创建的过程省略,不在这里展开,需要注意的是setting文件注册app的配置如下:
INSTALLED_APPS = [
......
'myapp', # 刚创建的使用定时任务的app
'django_celery_beat', # 插件用来动态配置定时任务,只要进行了第一步pip安装就可以直接注册了
]
1.2 创建定时任务数据库
依次执行: python manage.py makemigrations 和 python manage.py migrate
打开数据库发现,自动创建了一些表如下:
这些都是定时任务需要的表格,自动创建不需要手动管理
django_celery_beat.models.ClockedSchedule # 特定时刻任务
django_celery_beat.models.CrontabSchedule # 特定时间表任务,例如每周1运行的计划
django_celery_beat.models.IntervalSchedule # 以特定间隔(例如,每5秒)运行的计划。
django_celery_beat.models.PeriodicTask # 此模型定义要运行的单个周期性任务。
django_celery_beat.models.PeriodicTasks # 此模型仅用作索引以跟踪计划何时更改
django_celery_beat.models.SolarSchedule # 定制任务
如果安装注册了django_celery_results 还会有另外三个表:
2、新建一个celery.py文件
文件的作用是指定django环境、创建Celery app和指定Celery配置文件的启动位置,类似与wsgi.py或asgi.py,因此也建议文件创建位置与wsgi.py或asgi.py同级。
文件内容如下:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms
# 设置django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lc_manage.settings.settings')
# 创建一个Celery app
app = Celery('djangotask')
platforms.C_FORCE_ROOT = True # 如果配置没有生效需要在启动时设置 export C_FORCE_ROOT="true"
# 使用CELERY_ 作为前缀,在celeryconfig.py中写配置
# app.config_from_object('lc_manage.celeryconfig', namespace="CELERY")
app.config_from_object('lc_manage.celeryconfig')
# 发现任务文件每个app下的tasks.py
app.autodiscover_tasks()
3、创建配置文件config.py
这个文件里的内容可以写到项目的 settings.py里面,因为上面的celery.py 中可以指定配置文件位置;不过内容比较多,还是建议单独创建一个配置文件celeryconfig.py,可以与celery.py 同级目录,文件内容如下:
from __future__ import absolute_import
# broker 设置 指定中间代理人将任务存到哪里,这里是redis的11号库
CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/11'
# 指定 Backend 储存结果的地方,可以使用django数据库(django-db),也可以使用redis,
# 使用django数据库(django-db),以后运行worker就会保存到数据库中,可以通过ORM进行访问
# CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'django-db'
# 使用django_celery_beat插件用来动态配置任务
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 指定时区,默认是 UTC
CELERY_TIMEZONE = 'Asia/Shanghai'
# celery 序列化与反序列化配置
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
CELERY_TASK_IGNORE_RESULT = True
# 有些情况下可以防止死锁 非常重要!
CELERYD_FORCE_EXECV = True
# 为存储结果设置过期日期,默认1天过期。如果beat开启,Celery每天会自动清除。
# 设为0,存储结果永不过期
# CELERY_RESULT_EXPIRES = xx
# CELERY_TASK_RESULT_EXPIRES = 60*60*24 # 后端存储的任务超过一天时,自动删除数据库中的任务数据,单位秒
CELERY_MAX_TASKS_PER_CHILD = 1000 # 每个worker执行1000次任务后,自动重启worker,防止任务占用太多内存导致内存泄漏
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
CELERY_DISABLE_RATE_LIMITS = True
# celery beat配置(周期性任务设置)
CELERY_ENABLE_UTC = False
# 官方用来修复CELERY_ENABLE_UTC=False and USE_TZ = False 时时间比较错误的问题;
# 详情见:https://github.com/celery/django-celery-beat/pull/216/files
DJANGO_CELERY_BEAT_TZ_AWARE = False
这里需要注意的是,如果在celery.py中配置指定了confiig配置文件使用CELERY前缀:app.config_from_object(‘lc_manage.celeryconfig’, namespace=“CELERY”),那么celeryconfig.py配置文件的参数都应加:CELERY_,当然你也可以不用第二个参数,namespace=“CELERY"写成app.config_from_object(‘lc_manage.celeryconfig’”), 那么celeryconfig.py中就不需要加CELERY_ 前缀,注意一定要统一!!!否则可能 会报错:
consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
4、加载celery.py
我们自己创建的celery.py虽然与wsgi.py 或者 asgi.py等同级,但是 不会像他们一样自动加载,需要我们通过本级文件下的__init__.py 把celery.py 加载进来,打开__init__.py文件,添加如下内容:
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
# 使得django启动时加载celery的app
__all__ = ('celery_app',)
5、创建定时任务执行内容
经过上面的配置,django-celery_beta 会自动去扫描每个app目录下是否有 tasks.py 文件,需要创建定时任务的app下我们可以手动创建tasks.py,定时任务就写在这个文件上:
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
print("x + y = ", x + y)
return x + y
@shared_task
def mul(x, y):
print("x * y = ", x * y)
return x * y
二、定时器创建和定时任务添加
1、时间和周期控制:IntervalSchedule、ClockedSchedule和CrontabSchedule
其他帖子都把上面三个称为定时任务与PeriodicTask放一起结束,但是个人理解以上三个都是定时任务时控制时间和周期执行的控制器,并非创建定时任务,真正创建定时任务的只有PeriodicTask,所以这里个人把这三个称为定时任务的“时间和频率控制器”。
IntervalSchedule 按时间间隔频率执行定时任务的控制器,(例:每间隔1H/1M/…执行一次)
ClockedSchedule 指定某个时刻执行定时任务的控制器, (例:2018年8月8号 8:00这个时刻执行)
CrontabSchedule 指定某个时间执行定时任务的控制器 (例:每年的12月星期一的8:30)
导入这四个模块:
from django_celery_beat.models import CrontabSchedule, PeriodicTask, IntervalSchedule,ClockedSchedule
1.2 IntervalSchedule 时间间隔控制器
参数:every 间隔数,period 间隔单位
schedule, created = IntervalSchedule.objects.update_or_create(
every=1,
period=IntervalSchedule.MINUTES) # 按分钟间隔执行
第二个参数可选
IntervalSchedule.DAYS 固定间隔天数
IntervalSchedule.HOURS 固定间隔小时数
IntervalSchedule.MINUTES 固定间隔分钟数
IntervalSchedule.SECONDS 固定间隔秒数
IntervalSchedule.MICROSECONDS 固定间隔微秒
返回值可直接解包,其实只有第一个参数schedule有用,在PeriodicTask创建定时任务时作为时间和周期控制参数传入。解包获得的第二个数据created 可以直接用下划线取代
1.3 ClockedSchedule特定时刻定时器
参数:clocked_time 指定时间
clocked, _ = ClockedSchedule.objects.update_or_create(
clocked_time =datetime.strptime("2020-08-18 16:58:46","%Y-%m-%d %H:%M:%S")) # 按指定时间执行
这里第二个参数直接用下划线取代。特定时刻控制一般时执行一次,适合在view中调用执行一次性计划。
2、周期性任务 CrontabSchedule
参数:
month_of_year # 几月执行
day_of_month # 几号执行
day_of_week # 周几执行
hour # 几点执行
minute # 几分执行
timezone # 时区
crontab, _ = CrontabSchedule.objects.update_or_create(
minute="00",
hour="23",
day_of_week="*", # 周几执行
day_of_month='1', # 几点执行
month_of_year='*',
timezone=pytz.timezone("Asia/Shanghai"),
)
上面的星号代表不使用,上面的配置即: 每月1日的23点执行一次。如果day_of_month=“*”则代表每天23点执行。
3、动态添加任务 PeriodicTask
参数:
name:任务名
task:指定的任务
interval:时间间隔
crontab:时间控制器
clocked:指定时刻控制器
expires:有效日期
one_off:启用状态(如果为True,调度将只运行该任务一次)
start_time:开始时间
enabled:启用
last_run_at:最后运行时间
total_run_count:运行总次数
date_changed:最后的更改时间
description:描述
args: 传参
**注意:
1、interval、crontab、clocked三选一,不可同时使用。
2、name :任务名,确保其唯一性!!!!
3、task:后面是以字符串形式调用定时任务的具体工作内容函数,即第一项的第5条用@shared_task装饰器创建的方法。
4、enabled:是否启用,这里动态创建的话一般设为true
5、args: 传参(task中指定的任务需要传参时使用),注意需要json序列化 json.dumps(…)
其他参数均为非必填项。
**
PeriodicTask.objects.update_or_create(
name=working_point_record_id + 'working_time_1',
defaults={
"task": "apps.tasks.add",
"crontab": crontab,
"enabled": True,
"args": json.dumps([working_point_record_id])
},
4、封装方法
一般情况下,可以把IntervalSchedule、ClockedSchedule和CrontabSchedule根据业务需求单独封装一个文件,而PeriodicTask.objects直接在对应view视图中调用即可。
三、启动定时任务beat
定时任务是独立与django项目运行的,django只是定时任务的入口和操作数据库的入口,而这前提是django-celery-beat 已经独立启动,django-celery-beat的启动分两步,一是生产者单独启动(beat),而是工作者单独启动(worker),这里启动顺序需要注意一点,建议先启动worker 再启动beat ,曾经遇到先启动beat有可能beat会启动失败。
1、启动工作者worker
此时仍然需要重新打开一个命令窗口,进入django项目的根目录(manage.py同级目录)下:
# Linux下测试,启动Celery
Celery -A 【项目名称】worker -l info
# Windows下测试,启动Celery
Celery -A 【项目名称】worker -l info -P eventlet
# 如果Windows下Celery不工作,输入如下命令
Celery -A 【项目名称】worker -l info --pool=solo
2、启动生产者beat
beat 是一个生产者角色,是单独运行,生产者完全不依赖django,需要与django在一个不同的命令窗口运行,但启动时需要先进入django项目的根目录,即与manage.py在同一目录下:
celery -A 【项目名称】 beat -l info
四、定时任务创建
这里要划重点了,其实看完上面的已经可以使用了,下面的属于优化选择性理解,看个人理解能力和需求:
**一般情况下,定时器创建和定时任务添加 PeriodicTask会在view视图中创建,但是如果view视图比较频繁,那么每次执行view都创建一个定时任务的话会有无数个,会比较占用内存资源,当然可以在celeryconfig.py设置执行多少次后重启任务,高并发下view会不会导致频繁重启,这里需要根据业务逻辑把@shared_task包裹下的任务处理方法做成批量处理放到单独文件中(不需要在view视图中调用,定时调用批处理,那么就是一个任务批量处理数据而已),在间隔固定时间下执行,单纯语言难以表达清楚,理解的就理解了,不理解的就慢慢体会吧。这里需要画重点的是:如果你理解了单个文件写@shared_task批处理方法,那么你的问题重点就是:我如何启动它呢?难道每次在启动beat后在命令行启动吗?虽然也可以,但是如果项目需要创建的定时任务很多怎么办,不用多,几百行可以了,每次在命令行复制粘贴执行命令 吗?当然不要,其实直接创建一个单独的python文件,然后再根目录下的url中导入即可,因为url在项目启动的时候会自动加载一次,也就相当于启动django项目的时候就自动创建了一次定时任务!
**