当前位置: 首页 > article >正文

【最后203篇系列】007 使用APS搭建本地定时任务

说明

最大的好处是方便。

其实所有任务的源头,应该都是通过定时的方式,在每个时隙发起轮询。当然在任务的后续传递中,可以通过CallBack或者WebHook的方式,以事件的形态进行。这样可以避免长任务执行的过程中进行等待和轮询。

总结一下:源头是定时轮询,中间过程是事件传递。

本次使用APS搭建本地定时任务的目的是为了简化实验性质的定时任务,通过在git项目下进行编辑任务脚本和执行任务清单,而运行容器本身会周期性的自动拉取代码,然后按照任务清单执行。

执行过程采用多线程方式,任务的负载通常都不高。整体设计上,复杂和繁重的任务会包在微服务中,定时任务主要是向这些微服务发起触发动作。通常,微服务收到触发元信息后进行自动的任务/数据拉取处理,处理完毕后通过webhook将结果持久化,或进一步发起其他的触发动作。

另外,具有共性的任务将会被提取出来,之后会交给celery以分布&协程方式执行,这些任务包括:

  • 1 数据库IO。例如从队列里取数,存到数据库中。
  • 2 网络数据获取IO。爬取网页、或者通过接口,获取数据。
  • 3 接口化标准操作。按url, json input这样的标准web请求,这种灵活性很强。表面上是一个IO动作,但背后可能触发密集计算,但是又不需要celery集群承担。(可能是ray集群、dask集群、基于显卡计算的集群)

内容

1 读取任务列表

主要为了简单的读入任务(脚本),同时可以方便的进行注释

# 用于将代表任务列表的数据读入
# 去掉换行和空格
# 如果以# 号开头表示注释
def read_all_lines_clean(fpath):
    with open(fpath, 'r') as f:
        lines = f.readlines()
    lines1 = [x.replace('\n','').strip() for x in lines]
    lines2 = [x for x in lines1 if len(x) and not x.startswith('#')]
    return lines2

任务文件如下task_list.txt

task_01_probably_git_pull.py
task_02_del_event_null_recs.py
# task_03_sync_xs_backup.py
#task_04_rotate_data.py
# task_05_sync_milvus.py
#task_06_rotate_mysql_time.py

读入后

In [4]: a = read_all_lines_clean('task_list.txt')

In [5]: a
Out[5]: ['task_01_probably_git_pull.py', 'task_02_del_event_null_recs.py']

这些就是之后要定时调度的任务

2 并行执行

为了使得每一次定时任务都可以执行,且保证效率,需要用一些简单的调度(容错问题均在脚本内解决)。调度器可以保证每30秒起来一次。

线程的并行执行:

def exe_tasks_threads(task_list_file = base_config.task_list_file, project_folder = base_config.project_folder):
    tasks = read_all_lines_clean(project_folder + task_list_file)
    dedup_tasks = remove_duplicates_preserve_order(tasks)
    pytask_list = [ {'some_path':base_config.project_folder+x} for x in dedup_tasks]
    thread_concurrent_run(os_system_python, keyword_args_list=pytask_list, max_workers =50)

每一次执行os_system_python

import subprocess

def os_system_python(some_path=None, timeout=30):
    try:
        result = subprocess.run(['python3', some_path], timeout=timeout)
        return result
    except subprocess.TimeoutExpired:
        print(f"Task {some_path} timed out after {timeout} seconds.")
        return None

'''
代码说明
subprocess.run:

这是 subprocess 模块的高级 API,用于运行命令并等待其完成。

它支持 timeout 参数,如果命令在指定时间内未完成,会抛出 TimeoutExpired 异常。

timeout 参数:

你设置了默认超时时间为 30 秒,这是一个合理的默认值。

如果任务在 30 秒内未完成,subprocess.run 会抛出 TimeoutExpired 异常。

异常处理:

捕获 TimeoutExpired 异常后,打印超时信息并返回 None。

这样可以避免程序因超时而崩溃,同时提供清晰的日志信息。
'''

3 自动更新

更新git项目,作为一个任务脚本被周期执行。由于代码更新并不是高频事件,所以一般概率上保证5分钟会更新一次代码。

(base) root@76a14afa199b:/workspace/local_aps_v2/base# python3 task_01_probably_git_pull.py
2000-01-01 08:00:00
2000-01-01 08:00:00
2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
task_01_probably_git_pull running
2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
Git pull executed successfully for branch 'master':
Already up to date.

2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
(base) root@76a14afa199b:/workspace/local_aps_v2/base#

4 定时调度

调度器在每分钟的0/30秒执行,我把30秒定为一拍(pace),一分钟定位一时隙(slot)。绝大部分任务都应该在30秒内完成。

# 执行本地脚本
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler

from base_config import base_config
from Basefuncs import * 
def exe_tasks_threads(task_list_file = base_config.task_list_file, project_folder = base_config.project_folder):
    tasks = read_all_lines_clean(project_folder + task_list_file)
    dedup_tasks = remove_duplicates_preserve_order(tasks)
    pytask_list = [ {'some_path':base_config.project_folder+x} for x in dedup_tasks]
    thread_concurrent_run(os_system_python, keyword_args_list=pytask_list, max_workers =50)


# 后台启动命令 nohup python3 /root/prj27_timetask/cron_task/test_001.py >/dev/null 2>&1 &

if __name__ == '__main__':
    # 创建调度器
    sche1 = BlockingScheduler()

    # 添加任务,使用 cron 表达式每分钟的第 0 秒和第 30 秒执行
    sche1.add_job(
        exe_tasks_threads,
        'cron',
        second='0,30',  # 每分钟的第 0 秒和第 30 秒
        kwargs={},
        coalesce=True,
        max_instances=1
    )

    print('[S] Starting scheduler with cron (0s and 30s of every minute)...')
    try:
        sche1.start()  # 启动调度器
    except (KeyboardInterrupt, SystemExit):
        print('[S] Scheduler stopped.')

5 Docker运行

为了保证执行的稳定性,使用docker执行

docker run -d --name=local_aps_v2 \
 --restart=always \
 -v /etc/localtime:/etc/localtime -v /etc/timezone:/etc/timezone -v /etc/hostname:/etc/hostname -e "LANG=C.UTF-8" \
 -w /workspace/local_aps_v2/base \
 YOURIMAGE  \
 sh -c "git pull && python3 aps.py"

只有环境改变时才需要修改镜像重发布,大部分时候只要调试和修改代码,然后推送就可以了。


http://www.kler.cn/a/525055.html

相关文章:

  • [论文总结] 深度学习在农业领域应用论文笔记14
  • 抠图神器,全离线使用,支持win和mac
  • 性能优化2-删除无效引用
  • 设计模式的艺术-策略模式
  • 准备知识——旋转机械的频率和振动基础
  • 如何获取小程序的code在uniapp开发中
  • 1.27补题 回训练营
  • ODP(OBProxy)路由初探
  • 【starrocks学习】之catalog
  • java面试题:10个线程如何按顺序分别输出1-100
  • Airflow:掌握Airflow调度器基本原理
  • LangChain的开发流程
  • HTB:Active[RE-WriteUP]
  • Go语言中的Select
  • 芯片AI深度实战:进阶篇之vim内verilog实时自定义检视
  • 03链表+栈+队列(D2_栈)
  • Windows 11 应用开发实战指南
  • 【Elasticsearch 】悬挂索引(Dangling Indices)
  • 指针的介绍3后
  • Kafka 日志存储 — 磁盘存储
  • LeetCode:40. 组合总和 II(回溯 + 剪枝 Java)
  • Python3 【高阶函数】水平考试:30道精选试题和答案
  • SOME/IP--协议英文原文讲解4
  • 人工智能如何驱动SEO关键词优化策略的转型与效果提升
  • Unity阿里云OpenAPI 获取 Token的C#【记录】
  • Windows程序设计4:API函数URLDownloadToFile和ShellExecuteEx