当前位置: 首页 > article >正文

Python批量发送任务请求(POST)和批量查询任务状态(GET)

目的

在之前我的博客基于celery的任务管理,本文主要是处理自己的算法,暴露API,管理任务并发,多线程基础上,编写脚本方便发布任务和查询任务状态信息都存储在本地log文件中。

代码

task_sender_config.py

配置文件,所有的发送任务和查询任务的配置信息都放在这里

# task_sender_config.py

# Server information
server_ip = "your ip"
port = 5000
endpoint = "/new_task_rainfall_simulation"  # Updated endpoint
interval = 1 # interval for each task post call

status_endpoint = "/task-status"  # Endpoint for checking task status

# Data path
data_path = "/home/xxx/"

# File prefixes and ranges
# File format for me is 0009_0001_DSM.tif
prefixes = {
    "0009": {"start": 6, "end": 25},
    "0010": {"start": 5, "end": 25}
}

# Single files
single_files = ["0010_0000_DSM.tif"]  # List of single files

# Task template
task_template = {
    "starting_precip_mmhr": 142.0,
    "storm_duration": 3600.0,
    "elapsed_time": 0.0,
    "model_run_time": 3600.0,
    "GPU": 1
}

# Log directory
log_dir = "./log"  # Specify the directory where log files will be saved


log_file_to_check = "task_1737011612.txt"  # Specify the log file to read

task_sender.py

import requests
import time
import os
import json
import task_sender_config as config
from datetime import datetime

def generate_tasks(config):
    """
    Generate a list of tasks based on the configuration file.

    :param config: Configuration module
    :return: List of task dictionaries
    """
    data_path = config.data_path
    prefixes = config.prefixes
    task_template = config.task_template

    tasks = []

    # Add files matching the prefixes and ranges
    for prefix, numbers in prefixes.items():
        for number in range(numbers["start"], numbers["end"] + 1):
            filename = f"{prefix}_{str(number).zfill(4)}_DSM.tif"
            full_path = os.path.join(data_path, filename)
            if os.path.exists(full_path):
                task = task_template.copy()
                task["watershed_dem"] = full_path
                tasks.append(task)
            else:
                print(f"File not found: {full_path}")

    # Add the single files from config
    for single_file in config.single_files:
        full_path = os.path.join(data_path, single_file)
        if os.path.exists(full_path):
            task = task_template.copy()
            task["watershed_dem"] = full_path
            tasks.append(task)
        else:
            print(f"File not found: {full_path}")

    return tasks

def send_tasks(server_ip, port, data_list, interval=1, log_dir="."):
    """
    Send tasks to a server in a loop via POST requests.

    :param server_ip: IP address of the server
    :param port: Port number of the server
    :param data_list: List of data payloads to send
    :param interval: Time interval (in seconds) between tasks
    :param log_dir: Directory to save the log file
    """
    url = f"http://{server_ip}:{port}/new_task_rainfall_simulation"
    timestamp = int(time.time())
    log_filename = f"task_{timestamp}.txt"
    log_path = os.path.join(log_dir, log_filename)

    if not os.path.exists(log_dir):
        os.mkdir(log_dir)

    with open(log_path, "w") as log_file:
        for idx, data in enumerate(data_list):
            try:
                response = requests.post(url, json=data)
                if response.status_code in [200, 202]:
                    response_data = response.json()
                    task_id = response_data.get("task_id", "N/A")
                    log_entry = {
                        "file_path": data["watershed_dem"],
                        "task_parameters": data,
                        "task_id": task_id
                    }
                    log_file.write(json.dumps(log_entry) + "\n")
                    print(f"Task {idx+1}/{len(data_list)} sent successfully: {response_data}")
                else:
                    print(f"Task {idx+1}/{len(data_list)} failed with status code {response.status_code}: {response.text}")
            except requests.RequestException as e:
                print(f"Error sending task {idx+1}/{len(data_list)}: {e}")
            
            time.sleep(interval)  # Wait for the specified interval before sending the next task

if __name__ == "__main__":
    # Server information
    server_ip = config.server_ip
    port = config.port

    # Generate tasks dynamically based on the configuration
    tasks = generate_tasks(config)

    # Send tasks with the specified interval
    interval = config.interval  # Interval specified in the config
    log_dir = config.log_dir  # Log directory specified in the config
    send_tasks(server_ip, port, tasks, interval, log_dir)

task_checker.py

import requests
import os
import json
from datetime import datetime
import task_sender_config as config

def extract_task_info(log_file_path):
    """
    Extract task IDs and corresponding file paths from the specified log file.

    :param log_file_path: Path to the log file
    :return: List of tuples containing (task_id, file_path)
    """
    task_info = []
    with open(log_file_path, 'r') as file:
        for line in file:
            try:
                log_entry = json.loads(line.strip())
                task_id = log_entry.get("task_id")
                file_path = log_entry.get("file_path")
                if task_id and file_path:
                    task_info.append((task_id, file_path))
                else:
                    print(f"Missing task_id or file_path in log entry: {line.strip()}")
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON in line: {line.strip()} - {e}")
    return task_info

def check_task_status(server_ip, port, status_endpoint, task_id):
    """
    Check the status of a task by sending a GET request to the server.

    :param server_ip: IP address of the server
    :param port: Port number of the server
    :param status_endpoint: API endpoint for checking task status
    :param task_id: The task ID to check
    :return: Response body from the server
    """
    url = f"http://{server_ip}:{port}{status_endpoint}/{task_id}"
    print(f"Sending GET request to URL: {url}")
    try:
        response = requests.get(url)
        print(f"Received response with status code: {response.status_code}")
        return response.text  # Store only the response body
    except requests.RequestException as e:
        print(f"Request failed: {e}")
        return str(e)

def main():
    # Log file name and directory specified in the configuration
    log_file = config.log_file_to_check
    log_path = os.path.join(config.log_dir, log_file)
    print(f"Log file path: {log_path}")

    if not os.path.exists(log_path):
        print(f"Log file {log_path} does not exist.")
        return

    task_info_list = extract_task_info(log_path)
    print(f"Extracted {len(task_info_list)} tasks.")

    # Generate timestamp for the status log file, formatted as "10-31-50_16-01-2025"
    timestamp = datetime.now().strftime("%H-%M-%S_%d-%m-%Y")
    status_log_file = f"{log_file.replace('.txt', '')}_status_{timestamp}.txt"
    status_log_path = os.path.join(config.log_dir, status_log_file)
    print(f"Status log file will be saved to: {status_log_path}")

    with open(status_log_path, 'w') as status_file:
        for task_id, file_path in task_info_list:
            status = check_task_status(config.server_ip, config.port, config.status_endpoint, task_id)
            status_entry = {
                "task_id": task_id,
                "file_path": file_path,
                "status": status
            }
            status_file.write(json.dumps(status_entry) + "\n")
            print(f"Checked Task ID: {task_id}, Corresponding file path: {file_path}")

    print(f"Status log saved to {status_log_path}")

if __name__ == "__main__":
    main()

http://www.kler.cn/a/511514.html

相关文章:

  • UllnnovationHub,一个开源的WPF控件库
  • 反转字符串中的单词 II:Swift 实现与详解
  • CV 图像处理基础笔记大全(超全版哦~)!!!
  • 无降智o1 pro——一次特别的ChatGPT专业模式探索
  • 【tailscale 和 ssh】当服务器建立好节点,但通过客户端无法通过 ssh 连接
  • Python自动化测试中定位隐藏菜单元素的策略
  • RC2在线加密工具
  • 游戏行业销售数据分析可视化
  • C#中的Timers.Timer使用用法及常见报错
  • 后端之路——阿里云OSS云存储
  • 关于机器学习的一份总结
  • Linux第五讲:进程概念
  • Nginx请求访问流程
  • 【VRChat · 改模】Unity2019、2022的版本选择哪个如何决策,功能有何区别;
  • iOS 性能优化:实战案例分享
  • 设置 Git 默认推送不需要输入账号和密码【Ubuntu、SSH】
  • 数据结构:栈和队列详解(上)
  • 郑州大学2022级大三期末复习总结(数据库,传感器,嵌入式,人工智能,移动终端开发,计算机英语)
  • Unity中不使用场景和预制体保存关卡信息(附源文件)
  • Gitblit 一些使用说明记录
  • 【React】静态组件动态组件
  • Jetpack 介绍
  • 删除字符串中的所有相邻重复项(力扣1047)
  • 怎么投稿各大媒体网站?如何快速辨别一家媒体是否适合自己?
  • 2025年01月17日Github流行趋势
  • 资源管理模块集成Spring Cache