Docker torchserve workflow部署流程
1. 先部署相关模型, 步骤见: Docker torchserve 部署模型流程
搭建涉及到的模型,如ocr_detection,ocr_judge,ocr_text, xxx_detection …
2. workflow文件
2.1 串行执行,示例1
(1)xxx_workflow.yaml
models:
#global model params
batch-size: 1
max-batch-delay : 10
retry-attempts : 3
timeout-ms : 5000
ocr_detection:
url : ocr_detection.mar #local or public URI
ocr_judge:
url : ocr_judge.mar
ocr_text:
url : ocr_text.mar
dag:
ocr_detection : [first_processing]
first_processing: [ocr_judge]
ocr_judge : [second_processing]
second_processing : [ocr_text]
(2)xxx_workflow_handler.py
import json
import numpy as np
def first_processing(data, context):
if data:
image = data[0].get("data") or data[0].get("body")
image = image.decode('utf-8')
image = json.loads(image)
return [{"image": image}] # [{"image":["xxx","xxx"]}]
return None
def second_processing(data, context):
if data:
image = data[0].get("data") or data[0].get("body")
image = image.decode('utf-8')
image = json.loads(image)
return [{"image": image}] # [{"image":["xxx","xxx"]}]
return None
2.2 串行执行,示例2
(1)xxx_workflow.yaml
models:
#global model params
batch-size: 1
max-batch-delay : 10
retry-attempts : 3
timeout-ms : 5000
xxx_detection:
url : xxx_detection.mar #local or public URI
xxx_recognition:
url : xxx_recognition.mar
dag:
xxx_detection : [xxx_recognition]
2.2 并行执行,示例
(1)xxx_workflow.yaml
models:
#global model params
batch-size: 1
max-batch-delay : 10
retry-attempts : 3
timeout-ms : 5000
xxx_branch_1:
url : xxx_branch_1.mar #local or public URI
xxx_branch_2:
url : xxx_branch_2.mar
xxx_branch_3:
url : xxx_branch_3.mar
dag:
pre_processing : [xxx_branch_1,xxx_branch_2,xxx_branch_3]
xxx_branch_1 : [aggregate_func]
xxx_branch_2 : [aggregate_func]
xxx_branch_3 : [aggregate_func]
(2)xxx_workflow_handler.py
import json
def pre_processing(data, context):
'''
Empty node as a starting node since the DAG doesn't support multiple start nodes
'''
if data:
text = data[0].get("data") or data[0].get("body")
return [text]
return None
def aggregate_func(data, context):
'''
Changes the output keys obtained from the individual model
to be more appropriate for the workflow output
'''
if data:
xxx_branch_1_result = json.loads(data[0].get("xxx_branch_1"))
xxx_branch_2_result = json.loads(data[0].get("xxx_branch_2"))
xxx_branch_3_result = json.loads(data[0].get("xxx_branch_3"))
response = {
"xxx_branch_1_result": xxx_branch_1_result,
"xxx_branch_2_result": xxx_branch_2_result,
"xxx_branch_3_result": xxx_branch_3_result
}
return [response]
3. 打包workflow文件
torch-workflow-archiver -f --workflow-name xxx_workflow --spec-file xxx_workflow.yaml --handler xxx_workflow_handler.py --export-path /home/model-server/model-store/
4. 配置接口
(1)查询已注册的workflow
curl "http://localhost:8381/workflows"
(2)注册workflow并为其分配资源
将.war文件注册,注意:.war文件必须放在model-store文件夹下
,即/path/model-server/model-store
curl -X POST "$server/workflows?url=$model_url&workflow_name=$model_name"
# 示例
curl -X POST "localhost:8381/workflows?url=xxx_workflow.war&workflow_name=xxx_workflow"
(3)查看workflow状态
curl http://localhost:8381/workflows/xxx_workflow
(4)删除注册workflow
curl -X DELETE http://localhost:8381/workflows/xxx_workflow
5. workflow推理
# -*- coding: utf-8 -*-
import requests
import json
from PIL import Image
import base64
import io
def get_image_bytes(image_path):
# 打开图像并转换为 RGB
with Image.open(image_path).convert("RGB") as img:
# 创建一个字节流
byte_stream = io.BytesIO()
# 将图像保存到字节流中,格式为 JPEG
img.save(byte_stream, format="JPEG")
# 获取字节流内容
byte_stream.seek(0)
return byte_stream.read()
image_path = '/path/img1.jpg'
image_bytes = get_image_bytes(image_path)
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
data = {'data':json.dumps({'image':image_base64})}
response = requests.post('http://localhost:8380/wfpredict/xxx_workflow',data = data)
print(response)
print(response.status_code)
print(eval(response.text))
参考:
容器内/serve readme或示例
https://pytorch.org/serve/workflows.html