当前位置: 首页 > article >正文

【影刀RPA_启动任务api】

影刀RPA_启动任务api

#启动任务api
import requests
import json
from time import sleep

yingdao_Info={
    "accessKeyId":"XXX",
    "accessKeySecret":"XXX",
    "scheduleUuid":'XXX',
    "robotUuid1":"XXX",
    "robotUuid2":"XXX"}

#1.获取token
def get_access_token():
    url="https://api.yingdao.com/oapi/token/v2/token/create"
    headers={
        "Content-Type":"application/x-www-form-urlencoded"
    }
    params={
        "accessKeyId":yingdao_Info["accessKeyId"],
        "accessKeySecret":yingdao_Info["accessKeySecret"]
    }
    response = requests.post(url=url,headers=headers,params=params)
    return response.json()['data']['accessToken']

# print(get_access_token())

#2、启动任务
def start_task():
    url="https://api.yingdao.com/oapi/dispatch/v2/task/start"
    headers={
        "Authorization":f"Bearer {access_token}",
        "Content-Type":"application/json"
    }
    body={
        "scheduleUuid": yingdao_Info["scheduleUuid"],
        "scheduleRelaParams":[{
         "robotUuid":yingdao_Info["robotUuid1"],
         "params":[{"name":"单号","value":"123456","type":"str"}]
        },
        {
         "robotUuid":yingdao_Info["robotUuid2"],
         "params":[{"name":"姓名","value":"小王","type":"str"}]
        }]
    }
    response = requests.post(url=url,headers=headers,json=body)
    print(f"影刀返回:\n{response.json()}")
    return response.json()

#3、查询任务状态
def query_task():

    url = "https://api.yingdao.com/oapi/dispatch/v2/task/query"

    payload = json.dumps({
    "taskUuid": taskUuid
    })
    headers = {
    'Content-Type': 'application/json',
    'Authorization': f'Bearer {access_token}'
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    # print(response.json())
    return response.json()

access_token=get_access_token()
taskUuid=start_task()['data']['taskUuid']

while True:
    result_data=query_task()
    status_name=result_data['data']["statusName"]
    sleep(5)
    print(f"任务状态:{status_name}")
    if status_name=="完成":
        output_parames=result_data['data']["jobDataList"]
        print(f"输出参数:{output_parames}")
        break
	elif status_name =="异常":
      	break

http://www.kler.cn/a/507764.html

相关文章:

  • 【物联网】ARM核介绍
  • 关于jwt和security
  • 自动驾驶3D目标检测综述(八)
  • 【Python基础篇】——第3篇:从入门到精通:掌握Python数据类型与数据结构
  • Stream流
  • Ubuntu把应用程序放到桌面
  • 23- TIME-LLM: TIME SERIES FORECASTING BY REPRO- GRAMMING LARGE LANGUAGE MODELS
  • Python语言的数据类型
  • python学opencv|读取图像(三十七 )截断处理
  • C# OpenCV机器视觉:区域生长算法
  • 数据库事务隔离级别
  • 网络信息安全技术研究
  • maven常见知识点
  • Python操作Excel——openpyxl使用笔记(3)
  • Spring Web MVC综合案例
  • unity学习19:unity里用C#脚本获取 gameobject 和 Componenet
  • 【ComfyUI专栏】Git Clone 下载自定义节点的代理设置
  • 运维巡检报告,运维巡检检查单,服务器系统及数据库性能检查,日常运维检查记录表格,信息系统日常运维检查(原件)
  • 【数据分享】1929-2024年全球站点的逐月平均气温数据(Shp\Excel\免费获取)
  • 【华为OD-E卷 - 计算疫情扩散时间 100分(python、java、c++、js、c)】
  • 三种文本相似计算方法:规则、向量与大模型裁判
  • 去哪儿kafka优化案例
  • 广播网络实验
  • VSCode 的部署
  • 【Flink系列】5. DataStream API
  • Solidity01 Solidity极简入门