Python批量发送任务请求(POST)和批量查询任务状态(GET)
目的
在之前我的博客基于celery的任务管理,本文主要是处理自己的算法,暴露API,管理任务并发,多线程基础上,编写脚本方便发布任务和查询任务状态信息都存储在本地log文件中。
代码
task_sender_config.py
配置文件,所有的发送任务和查询任务的配置信息都放在这里
# task_sender_config.py
# Server information
server_ip = "your ip"
port = 5000
endpoint = "/new_task_rainfall_simulation" # Updated endpoint
interval = 1 # interval for each task post call
status_endpoint = "/task-status" # Endpoint for checking task status
# Data path
data_path = "/home/xxx/"
# File prefixes and ranges
# File format for me is 0009_0001_DSM.tif
prefixes = {
"0009": {"start": 6, "end": 25},
"0010": {"start": 5, "end": 25}
}
# Single files
single_files = ["0010_0000_DSM.tif"] # List of single files
# Task template
task_template = {
"starting_precip_mmhr": 142.0,
"storm_duration": 3600.0,
"elapsed_time": 0.0,
"model_run_time": 3600.0,
"GPU": 1
}
# Log directory
log_dir = "./log" # Specify the directory where log files will be saved
log_file_to_check = "task_1737011612.txt" # Specify the log file to read
task_sender.py
import requests
import time
import os
import json
import task_sender_config as config
from datetime import datetime
def generate_tasks(config):
"""
Generate a list of tasks based on the configuration file.
:param config: Configuration module
:return: List of task dictionaries
"""
data_path = config.data_path
prefixes = config.prefixes
task_template = config.task_template
tasks = []
# Add files matching the prefixes and ranges
for prefix, numbers in prefixes.items():
for number in range(numbers["start"], numbers["end"] + 1):
filename = f"{prefix}_{str(number).zfill(4)}_DSM.tif"
full_path = os.path.join(data_path, filename)
if os.path.exists(full_path):
task = task_template.copy()
task["watershed_dem"] = full_path
tasks.append(task)
else:
print(f"File not found: {full_path}")
# Add the single files from config
for single_file in config.single_files:
full_path = os.path.join(data_path, single_file)
if os.path.exists(full_path):
task = task_template.copy()
task["watershed_dem"] = full_path
tasks.append(task)
else:
print(f"File not found: {full_path}")
return tasks
def send_tasks(server_ip, port, data_list, interval=1, log_dir="."):
"""
Send tasks to a server in a loop via POST requests.
:param server_ip: IP address of the server
:param port: Port number of the server
:param data_list: List of data payloads to send
:param interval: Time interval (in seconds) between tasks
:param log_dir: Directory to save the log file
"""
url = f"http://{server_ip}:{port}/new_task_rainfall_simulation"
timestamp = int(time.time())
log_filename = f"task_{timestamp}.txt"
log_path = os.path.join(log_dir, log_filename)
if not os.path.exists(log_dir):
os.mkdir(log_dir)
with open(log_path, "w") as log_file:
for idx, data in enumerate(data_list):
try:
response = requests.post(url, json=data)
if response.status_code in [200, 202]:
response_data = response.json()
task_id = response_data.get("task_id", "N/A")
log_entry = {
"file_path": data["watershed_dem"],
"task_parameters": data,
"task_id": task_id
}
log_file.write(json.dumps(log_entry) + "\n")
print(f"Task {idx+1}/{len(data_list)} sent successfully: {response_data}")
else:
print(f"Task {idx+1}/{len(data_list)} failed with status code {response.status_code}: {response.text}")
except requests.RequestException as e:
print(f"Error sending task {idx+1}/{len(data_list)}: {e}")
time.sleep(interval) # Wait for the specified interval before sending the next task
if __name__ == "__main__":
# Server information
server_ip = config.server_ip
port = config.port
# Generate tasks dynamically based on the configuration
tasks = generate_tasks(config)
# Send tasks with the specified interval
interval = config.interval # Interval specified in the config
log_dir = config.log_dir # Log directory specified in the config
send_tasks(server_ip, port, tasks, interval, log_dir)
task_checker.py
import requests
import os
import json
from datetime import datetime
import task_sender_config as config
def extract_task_info(log_file_path):
"""
Extract task IDs and corresponding file paths from the specified log file.
:param log_file_path: Path to the log file
:return: List of tuples containing (task_id, file_path)
"""
task_info = []
with open(log_file_path, 'r') as file:
for line in file:
try:
log_entry = json.loads(line.strip())
task_id = log_entry.get("task_id")
file_path = log_entry.get("file_path")
if task_id and file_path:
task_info.append((task_id, file_path))
else:
print(f"Missing task_id or file_path in log entry: {line.strip()}")
except json.JSONDecodeError as e:
print(f"Error decoding JSON in line: {line.strip()} - {e}")
return task_info
def check_task_status(server_ip, port, status_endpoint, task_id):
"""
Check the status of a task by sending a GET request to the server.
:param server_ip: IP address of the server
:param port: Port number of the server
:param status_endpoint: API endpoint for checking task status
:param task_id: The task ID to check
:return: Response body from the server
"""
url = f"http://{server_ip}:{port}{status_endpoint}/{task_id}"
print(f"Sending GET request to URL: {url}")
try:
response = requests.get(url)
print(f"Received response with status code: {response.status_code}")
return response.text # Store only the response body
except requests.RequestException as e:
print(f"Request failed: {e}")
return str(e)
def main():
# Log file name and directory specified in the configuration
log_file = config.log_file_to_check
log_path = os.path.join(config.log_dir, log_file)
print(f"Log file path: {log_path}")
if not os.path.exists(log_path):
print(f"Log file {log_path} does not exist.")
return
task_info_list = extract_task_info(log_path)
print(f"Extracted {len(task_info_list)} tasks.")
# Generate timestamp for the status log file, formatted as "10-31-50_16-01-2025"
timestamp = datetime.now().strftime("%H-%M-%S_%d-%m-%Y")
status_log_file = f"{log_file.replace('.txt', '')}_status_{timestamp}.txt"
status_log_path = os.path.join(config.log_dir, status_log_file)
print(f"Status log file will be saved to: {status_log_path}")
with open(status_log_path, 'w') as status_file:
for task_id, file_path in task_info_list:
status = check_task_status(config.server_ip, config.port, config.status_endpoint, task_id)
status_entry = {
"task_id": task_id,
"file_path": file_path,
"status": status
}
status_file.write(json.dumps(status_entry) + "\n")
print(f"Checked Task ID: {task_id}, Corresponding file path: {file_path}")
print(f"Status log saved to {status_log_path}")
if __name__ == "__main__":
main()