当前位置: 首页 > article >正文

APScheduler:强大的Python定时任务调度器

安装

使用pip安装APScheduler

pip install apscheduler

基本概念

APScheduler有四种组件:

  • Triggers:包含调度逻辑,每个作业有其专属触发器,决定下次运行时间。触发器无状态,仅依据初始配置工作。
  • Job Stores:存储预定作业。默认存储于内存,但支持多种数据库存储。作业数据在持久化存储时序列化,加载时反序列化。非默认存储不保存作业数据于内存,而是作为后端存储、加载、更新和搜索作业的桥梁。注意,作业存储不可在调度器间共享。
  • Executors:负责执行作业,通常通过线程或进程池提交可调用对象。作业完成时,执行器通知调度器,触发相应事件。
  • Schedulers:连接其他组件。应用中通常运行一个调度器。开发者通常不直接处理作业存储、执行器或触发器,而是通过调度器接口管理。

选择合适的组件

调度器

选择调度器主要依据编程环境和用途:

  • BlockingScheduler:适用于调度程序为进程唯一运行时。
  • BackgroundScheduler:适用于无特定框架且希望调度器后台运行的应用。
  • AsyncIOScheduler:适用于使用 asyncio 模块的应用。
  • GeventScheduler:适用于使用 gevent 的应用。
  • TornadoScheduler:适用于构建 Tornado 应用。
  • TwistedScheduler:适用于构建 Twisted 应用。
  • QtScheduler:适用于构建 Qt 应用。

作业存储

选择作业存储需考虑作业是否需要持久性。若作业在应用启动时重建,可使用默认值 MemoryJobStore。若需作业在调度器重启或应用崩溃后保持不变,推荐在编程环境支持的数据库中选择,如 SQLAlchemyJobStore(基于 PostgreSQL)。

执行器

若使用上述框架之一,执行器通常已选定。否则,默认的 ThreadPoolExecutor 适用于大多数场景。CPU 密集型操作应考虑 ProcessPoolExecutor,甚至可同时使用两者,将进程池执行器作为辅助。

触发器

触发器决定作业运行逻辑,有三种类型:

  • Date:一次性运行作业。

    from apscheduler.schedulers.background import BackgroundScheduler  
    from apscheduler.triggers.date import DateTrigger  
    from datetime import datetime, timedelta  
    import time  
      
    def my_job():  
        print("在指定时间执行一次任务...")  
      
    # 创建调度器实例  
    scheduler = BackgroundScheduler()  
      
    # 获取当前时间,并加上30秒作为任务执行时间  
    run_date = datetime.now() + timedelta(seconds=30)  
      
    # 添加一个在指定时间执行一次的任务  
    scheduler.add_job(my_job, DateTrigger(run_date=run_date))  
      
    # 启动调度器  
    scheduler.start()  
      
    try:  
        # 保持主线程运行,直到任务执行完毕  
        while True:  
            time.sleep(2)  
            # 检查任务是否已执行完毕,并关闭调度器  
            if not scheduler.get_jobs():  
                break  
    except (KeyboardInterrupt, SystemExit):  
        # 关闭调度器  
        scheduler.shutdown()
    
  • Interval:固定时间间隔运行作业。

    from apscheduler.schedulers.background import BackgroundScheduler  
    import time  
      
    def my_job():  
        print("每隔10秒执行一次任务...")  
      
    # 创建调度器实例  
    scheduler = BackgroundScheduler()  
      
    # 添加一个每隔10秒执行一次的任务  
    scheduler.add_job(my_job, 'interval', seconds=10)  
      
    # 启动调度器  
    scheduler.start()  
      
    try:  
        # 保持主线程运行  
        while True:  
            time.sleep(2)  
    except (KeyboardInterrupt, SystemExit):  
        # 关闭调度器  
        scheduler.shutdown()
    
  • cron:定期在特定时间运行作业。

    from apscheduler.schedulers.background import BackgroundScheduler  
    from apscheduler.triggers.cron import CronTrigger  
    import time  
      
    def my_job():  
        print("每天凌晨2点执行一次任务...")  
      
    # 创建调度器实例  
    scheduler = BackgroundScheduler()  
      
    # 添加一个每天凌晨2点执行一次的任务  
    scheduler.add_job(my_job, CronTrigger(hour=2, minute=0))  
      
    # 启动调度器  
    scheduler.start()  
      
    try:  
        # 保持主线程运行  
        while True:  
            time.sleep(2)  
    except (KeyboardInterrupt, SystemExit):  
        # 关闭调度器  
        scheduler.shutdown()
    

可组合多个触发器,在参与触发器均同意或任一触发时触发。

启动scheduler

调用 start() 启动调度器。对于 BlockingScheduler,需在初始化步骤后调用;其他调度器调用后立即返回,继续应用初始化。

添加jobs

添加作业有两种方法:

  • add_job():最常见方法。

  • scheduled_job():适用于不变作业。

add_job() 返回一个 Job 实例,用于修改或删除作业。在持久存储中安排作业时,需定义显式 ID 并使用 replace_existing=True,避免每次应用重启时创建新作业副本。

[!NOTE]

如果在应用程序初始化期间在持久作业存储中安排作业,则必须为作业定义一个显式ID并使用replace_existeting=True,否则每次应用程序重新启动时都会得到作业的新副本!

删除jobs

删除作业有两种方法:

  • remove_job(job_id, jobstore=None):使用作业 ID 和作业存储别名。

  • Job.remove():对 add_job() 返回的 Job 实例调用。

作业计划结束时自动删除。

示例:

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

或:

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

暂停和恢复jobs

通过 Job 实例或调度器本身暂停和恢复作业。暂停时,清除下次运行时间,恢复前不计算运行时间。

  • Job.pause() / BaseScheduler.pause_job():暂停作业。
  • Job.resume() / BaseScheduler.resume_job():恢复作业。

修改jobs

调用 Job.modify()scheduler.modify_job() 修改作业属性(除 ID 外)。要更改触发器,使用 Job.reschedule()scheduler.reschedule_job()

示例:

job.modify(max_instances=6, name='Alternate name')

或:

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

关闭scheduler

要关闭调度程序,请执行以下操作:

scheduler.shutdown()

默认情况下,调度程序会关闭其作业存储和执行器,并等待所有当前正在执行的作业完成。如果不想等待:

scheduler.shutdown(wait=False)

这仍将关闭作业存储和执行器,但不会等待任何正在运行的任务完成。

暂停/恢复job processing

暂停作业处理:

scheduler.pause()

恢复作业处理:

scheduler.resume()

也可以在暂停状态下启动调度器:

scheduler.start(paused=True)

当您需要在不需要的作业有机会运行之前对其进行修剪时,这很有用。

限制作业并发执行实例的数量

默认每个作业仅同时运行一个实例。使用 max_instances 关键字参数设置并发运行实例数。

scheduler.add_job(myfunc, 'interval', minutes=2, max_instances=10)

Scheduler事件

将事件监听器附加到调度器,监听特定事件。使用 add_listener() 提供掩码参数监听特定类型事件。

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

故障排除

若调度器未按预期工作,将 apscheduler 记录器日志级别提升至 DEBUG。

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

http://www.kler.cn/a/380402.html

相关文章:

  • 密码学知识点整理一:密码学概论
  • 【格式化查看JSON文件】coco的json文件内容都在一行如何按照json格式查看
  • 【linux】HTTPS 协议原理
  • git rebase 使用 - 【nolankywu】
  • energy 发布 v2.4.5
  • solidity selfdestruct合约销毁
  • Flutter鸿蒙next中的按钮封装:自定义样式与交互
  • AI绘画大热门!用AI做副业兼职3个月赚了10w,想辞职了
  • stl_list
  • 利用蒙特卡洛方法求定积分
  • Redis 初学者指南
  • 论文阅读-用于图像识别的深度残差学习
  • 应用targetsdk版本低于30,不符合华为应用市场审核标准
  • 【学习】软件测试中V模型、W模型、螺旋模型三者介绍
  • Docker Compose部署XXL-JOB
  • STM32实现串口接收不定长数据
  • 【专题】基于服务的体系结构
  • JS实现漂亮的登录页面(氛围感页面)
  • 【linux 多进程并发】0203 网络资源的多进程处理,子进程完全继承网络套接字,避免“惊群”问题
  • TypeScript实用笔记(三):泛型<T>的使用 <T>的12种工具类型的使用
  • python代码主要实现了对供水网络的水质模拟,并对模拟结果进行一系列处理
  • ‌5G SSB(同步信号块)位于物理层‌
  • Python淘宝数据挖掘与词云图制作指南
  • Python 继承、多态、封装、抽象
  • 华为HarmonyOS打造开放、合规的广告生态 - 原生广告
  • JVM出现OOM错误排查