APScheduler:强大的Python定时任务调度器
安装
使用pip
安装APScheduler
:
pip install apscheduler
基本概念
APScheduler
有四种组件:
- Triggers:包含调度逻辑,每个作业有其专属触发器,决定下次运行时间。触发器无状态,仅依据初始配置工作。
- Job Stores:存储预定作业。默认存储于内存,但支持多种数据库存储。作业数据在持久化存储时序列化,加载时反序列化。非默认存储不保存作业数据于内存,而是作为后端存储、加载、更新和搜索作业的桥梁。注意,作业存储不可在调度器间共享。
- Executors:负责执行作业,通常通过线程或进程池提交可调用对象。作业完成时,执行器通知调度器,触发相应事件。
- Schedulers:连接其他组件。应用中通常运行一个调度器。开发者通常不直接处理作业存储、执行器或触发器,而是通过调度器接口管理。
选择合适的组件
调度器
选择调度器主要依据编程环境和用途:
- BlockingScheduler:适用于调度程序为进程唯一运行时。
- BackgroundScheduler:适用于无特定框架且希望调度器后台运行的应用。
- AsyncIOScheduler:适用于使用
asyncio
模块的应用。 - GeventScheduler:适用于使用
gevent
的应用。 - TornadoScheduler:适用于构建 Tornado 应用。
- TwistedScheduler:适用于构建 Twisted 应用。
- QtScheduler:适用于构建 Qt 应用。
作业存储
选择作业存储需考虑作业是否需要持久性。若作业在应用启动时重建,可使用默认值 MemoryJobStore
。若需作业在调度器重启或应用崩溃后保持不变,推荐在编程环境支持的数据库中选择,如 SQLAlchemyJobStore
(基于 PostgreSQL)。
执行器
若使用上述框架之一,执行器通常已选定。否则,默认的 ThreadPoolExecutor
适用于大多数场景。CPU 密集型操作应考虑 ProcessPoolExecutor
,甚至可同时使用两者,将进程池执行器作为辅助。
触发器
触发器决定作业运行逻辑,有三种类型:
-
Date:一次性运行作业。
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.date import DateTrigger from datetime import datetime, timedelta import time def my_job(): print("在指定时间执行一次任务...") # 创建调度器实例 scheduler = BackgroundScheduler() # 获取当前时间,并加上30秒作为任务执行时间 run_date = datetime.now() + timedelta(seconds=30) # 添加一个在指定时间执行一次的任务 scheduler.add_job(my_job, DateTrigger(run_date=run_date)) # 启动调度器 scheduler.start() try: # 保持主线程运行,直到任务执行完毕 while True: time.sleep(2) # 检查任务是否已执行完毕,并关闭调度器 if not scheduler.get_jobs(): break except (KeyboardInterrupt, SystemExit): # 关闭调度器 scheduler.shutdown()
-
Interval:固定时间间隔运行作业。
from apscheduler.schedulers.background import BackgroundScheduler import time def my_job(): print("每隔10秒执行一次任务...") # 创建调度器实例 scheduler = BackgroundScheduler() # 添加一个每隔10秒执行一次的任务 scheduler.add_job(my_job, 'interval', seconds=10) # 启动调度器 scheduler.start() try: # 保持主线程运行 while True: time.sleep(2) except (KeyboardInterrupt, SystemExit): # 关闭调度器 scheduler.shutdown()
-
cron:定期在特定时间运行作业。
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger import time def my_job(): print("每天凌晨2点执行一次任务...") # 创建调度器实例 scheduler = BackgroundScheduler() # 添加一个每天凌晨2点执行一次的任务 scheduler.add_job(my_job, CronTrigger(hour=2, minute=0)) # 启动调度器 scheduler.start() try: # 保持主线程运行 while True: time.sleep(2) except (KeyboardInterrupt, SystemExit): # 关闭调度器 scheduler.shutdown()
可组合多个触发器,在参与触发器均同意或任一触发时触发。
启动scheduler
调用 start()
启动调度器。对于 BlockingScheduler
,需在初始化步骤后调用;其他调度器调用后立即返回,继续应用初始化。
添加jobs
添加作业有两种方法:
-
add_job():最常见方法。
-
scheduled_job():适用于不变作业。
add_job()
返回一个 Job
实例,用于修改或删除作业。在持久存储中安排作业时,需定义显式 ID 并使用 replace_existing=True
,避免每次应用重启时创建新作业副本。
[!NOTE]
如果在应用程序初始化期间在持久作业存储中安排作业,则必须为作业定义一个显式ID并使用
replace_existeting=True
,否则每次应用程序重新启动时都会得到作业的新副本!
删除jobs
删除作业有两种方法:
-
remove_job(job_id, jobstore=None):使用作业 ID 和作业存储别名。
-
Job.remove():对
add_job()
返回的Job
实例调用。
作业计划结束时自动删除。
示例:
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
或:
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
暂停和恢复jobs
通过 Job
实例或调度器本身暂停和恢复作业。暂停时,清除下次运行时间,恢复前不计算运行时间。
- Job.pause() / BaseScheduler.pause_job():暂停作业。
- Job.resume() / BaseScheduler.resume_job():恢复作业。
修改jobs
调用 Job.modify()
或 scheduler.modify_job()
修改作业属性(除 ID 外)。要更改触发器,使用 Job.reschedule()
或 scheduler.reschedule_job()
。
示例:
job.modify(max_instances=6, name='Alternate name')
或:
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
关闭scheduler
要关闭调度程序,请执行以下操作:
scheduler.shutdown()
默认情况下,调度程序会关闭其作业存储和执行器,并等待所有当前正在执行的作业完成。如果不想等待:
scheduler.shutdown(wait=False)
这仍将关闭作业存储和执行器,但不会等待任何正在运行的任务完成。
暂停/恢复job processing
暂停作业处理:
scheduler.pause()
恢复作业处理:
scheduler.resume()
也可以在暂停状态下启动调度器:
scheduler.start(paused=True)
当您需要在不需要的作业有机会运行之前对其进行修剪时,这很有用。
限制作业并发执行实例的数量
默认每个作业仅同时运行一个实例。使用 max_instances
关键字参数设置并发运行实例数。
scheduler.add_job(myfunc, 'interval', minutes=2, max_instances=10)
Scheduler事件
将事件监听器附加到调度器,监听特定事件。使用 add_listener()
提供掩码参数监听特定类型事件。
def my_listener(event):
if event.exception:
print('The job crashed :(')
else:
print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
故障排除
若调度器未按预期工作,将 apscheduler
记录器日志级别提升至 DEBUG。
import logging
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)