机器学习编译器(二)
四、端到端模型执行
示例模型:两层神经网络包含两个全连接层和一个Relu激活层
4.1 numpy实现
1.高层API
def numpy_mlp(data,w0,b0,w1,b1):
linear0 = data @ w0.T + b0
relu = np.maximum(linear0, 0)
linear1 = relu @ w1.T + b1
return linear1
上述的示例代码显示了利用高层numpy数组操作执行端到端模型执行的过程,
2.底层细节
为了说明底层细节,我们将数组函数展开成循环并用np.empty显示分配数组来传递数组。
# 定义全连接层0
def lnumpy_linear0(X: np.ndarray, W: np.ndarray, B: np.ndarray, Z: np.ndarray):
Y = np.empty((1, 128), dtype="float32")
for i in range(1):
for j in range(128):
for k in range(784):
if k == 0:
Y[i, j] = 0
Y[i, j] += X[i, k] * W[j, k]
for i in range(1):
for j in range(128):
Z[i, j] = Y[i, j] + B[j]
# 定义ReLU激活层
def lnumpy_relu0(X: np.ndarray, Y: np.ndarray):
for i in range(1):
for j in range(128):
Y[i, j] = np.maximum(X[i, j], 0)
# 定义全连接层1
def lnumpy_linear1(X: np.ndarray, W: np.ndarray, B: np.ndarray, Z: np.ndarray):
Y = np.empty((1, 10), dtype="float32")
for i in range(1):
for j in range(10):
for k in range(128):
if k == 0:
Y[i, j] = 0
Y[i, j] += X[i, k] * W[j, k]
for i in range(1):
for j in range(10):
Z[i, j] = Y[i, j] + B[j]
# 定义整体模型
def lnumpy_mlp(data, w0, b0, w1, b1):
lv0 = np.empty((1, 128), dtype="float32")
lnumpy_linear0(data, w0, b0, lv0)
lv1 = np.empty((1, 128), dtype="float32")
lnumpy_relu0(lv0, lv1)
out = np.empty((1, 10), dtype="float32")
lnumpy_linear1(lv1, w1, b1, out)
return out
3.预测模型
3.1 下载模型参数
!wget https://github.com/mlc-ai/web-data/raw/main/models/fasionmnist_mlp_params.pkl
3.2 加载模型
import torchvision
import torch
import matplotlib.pyplot as plt
# 加载数据集
test_data = torchvision.datasets.FashionMNIST(
root="data",
train=False,
download=True,
transform=torchvision.transforms.ToTensor()
)
classes = [
"T-shirt/top", "Trouser", "Pullover", "Dress", "Coat",
"Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot"
]
# 创建 DataLoader
test_DataLoader = torch.utils.data.DataLoader(test_data, batch_size=1, shuffle=True)
# 获取一个批次的数据
img, label = next(iter(test_DataLoader))
# 将图像从 Tensor 转换为 NumPy 数组,并调整形状
img = img.squeeze().numpy() # 从 (1, 1, 28, 28) 转换为 (28, 28)
# 打印标签
print(f"Label: {label.item()} ({classes[label.item()]})")
# 显示图像
plt.imshow(img, cmap='gray')
plt.title(f"Label: {label}")
plt.axis('off') # 不显示坐标轴
plt.show()
4.使用函数预测
4.1 高层
import pickle as pkl
# 加载模型参数,用numpy测试下
mlp_params = pkl.load(open("fasionmnist_mlp_params.pkl", "rb"))
res = numpy_mlp(img.reshape(1, 784),
mlp_params["w0"],
mlp_params["b0"],
mlp_params["w1"],
mlp_params["b1"])
pred_kind = np.argmax(res, axis=1)
print("预测的类别:", classes[pred_kind[0]])
预测的类别: Sneaker
4.2 底层
import pickle as pkl
# 加载模型参数,用numpy测试下
mlp_params = pkl.load(open("fasionmnist_mlp_params.pkl", "rb"))
res = lnumpy_mlp(img.reshape(1, 784),
mlp_params["w0"],
mlp_params["b0"],
mlp_params["w1"],
mlp_params["b1"])
pred_kind = np.argmax(res, axis=1)
print("预测的类别:", classes[pred_kind[0]])
预测的类别: Sneaker
4.2 在TVMScript中构建端到端IRModule
以低级Numpy为例,现在准备利用MLC抽象来实现端到端模型运行,一下是TVMScript实现
import tvm
from tvm.script import tir as T
from tvm.script import relax as R
# 上图简单神经网络的实现,编译执行要去掉注释
@tvm.script.ir_module
class MyModule:
@T.prim_func
def relu0(x: T.handle, y: T.handle):
n = T.int64()
X = T.match_buffer(x, (1, n), "float32")
Y = T.match_buffer(y, (1, n), "float32")
for i, j in T.grid(1, n):
with T.block("Y"):
vi, vj = T.axis.remap("SS", [i, j])
Y[vi, vj] = T.max(X[vi, vj], T.float32(0))
@T.prim_func
def linear0(x: T.handle,
w: T.handle,
b: T.handle,
z: T.handle):
m, n, k = T.int64(), T.int64(), T.int64()
X = T.match_buffer(x, (1, m), "float32")
W = T.match_buffer(w, (n, m), "float32")
B = T.match_buffer(b, (n, ), "float32")
Z = T.match_buffer(z, (1, n), "float32")
Y = T.alloc_buffer((1, n), "float32")
for i, j, k in T.grid(1, n, m):
with T.block("Y"):
vi, vj, vk = T.axis.remap("SSR", [i, j, k])
with T.init():
Y[vi, vj] = T.float32(0)
Y[vi, vj] = Y[vi, vj] + X[vi, vk] * W[vj, vk]
for i, j in T.grid(1, n):
with T.block("Z"):
vi, vj = T.axis.remap("SS", [i, j])
Z[vi, vj] = Y[vi, vj] + B[vj]
@R.function
def main(x: R.Tensor((1, "m"), "float32"),
w0: R.Tensor(("n", "m"), "float32"),
b0: R.Tensor(("n", ), "float32"),
w1: R.Tensor(("k", "n"), "float32"),
b1: R.Tensor(("k", ), "float32")):
m, n, k = T.int64(), T.int64(), T.int64()
with R.dataflow():
lv0 = R.call_dps_packed("linear0", (x, w0, b0), R.Tensor((1, n), "float32"))
lv1 = R.call_dps_packed("relu0", (lv0, ), R.Tensor((1, n), "float32"))
out = R.call_dps_packed("linear0", (lv1, w1, b1), R.Tensor((1, k), "float32"))
R.output(out)
return out
对上述程序进行如下部分的注释:
1.T.handle
与T.match_buffer()
:表示数据缓冲区的抽象数据类型,表示一个数据对象但不会指定其具体形状、数据类型和内存位置。通过T.match_buffer()
函数,可以将抽象的T.handle
转换为具体的T.Buffer
对象。
2.R.function
:Relax函数是一种上层神经网络执行的全新抽象。
4.3 在TVMScript中构建端到端 IRModule
1.计算图
使用图(graph)来可视化高层模型执行通常很有帮助,上面是main函数的计算图视图:
-
图中的每个框都对应于计算操作
-
箭头对应于中间张量的输入输出
图本身可以看做是一种抽象,在机器学习框架中称为计算图。
2.call_dps_packed
计算图中的每个操作步骤都包含一个R.call_dps_packed操作。这是引入元张量函数的过程:
lv0 = R.call_dps_packed("linear0", (x, w0, b0), R.Tensor((1, n), "float32"))
在其他语言中,我们通常在函数形参中引入输入和输出两个参数,在底层显式地把所需内存开出来。而在计算图中,我们关注的是输入经过计算操作后得到输出,所以提前开输出内存可能会比较占内存。这是因为计算图的设计更倾向于延迟内存分配,直到实际需要输出时才分配内存,从而减少内存占用并可能提高性能。
为了解释R.call_dps_packed
,先看一下操作等效底层Numpy实现:
def lnumpy_call_dps_packed(prim_func, inputs, shape, dtype):
res = np.empty(shape, dtype=dtype)
prim_func(*inputs, res)
return res
call_dps_packed
接受一个原函数(prim_func)的输入列表,并分配一个输出张量 res
,然后将输入和输出传递给 prim_func
。执行 prim_func
后,结果会填充到 res
中。这种机制被称为目标传递(destination passing),即函数的输入需要我们先为输出结果开辟内存。
为什么需要call_dps_packed
这样一个函数呢?因为元张量函数采用以下约定:
def low_level_prim_func(in0, in1, ..., out):
# implementations
此约定称为:目标传递(destination passing),具体来说,输入和输出在外部显式分配并传递给底层元函数(即代码中的prim_func
)。这种风格通常用于底层库设计,因此高层框架可以处理内存分配决策。
注:并非所有张量操作都可以通过这种方式呈现(比如,有些操作的输出形状取决于输入)
这里的关键思想是我们想要隐藏可能的分配或对函数的显式写入。 用更正式的术语来说,我们希望函数是 pure 或 side-effect free。如果一个函数只从其输入中读取并通过其输出返回结果,它不会改变程序的其他部分(例如递增全局计数器),那么它是pure或side-effect free的。
3.Dataflow Block
dataflow block是我们标记程序计算图区域的一种方式。 具体来说,在dataflow block中,所有操作都需要side-effect free。 在dataflow block之外,操作可能包含side-effect。示例如下:
with R.dataflow():
lv0 = R.call_dps_packed("linear0", (x, w0, b0), R.Tensor((1, n), "float32"))
gv0 = R.call_dps_packed("relu0", (lv0, ), R.Tensor((1, n), "float32"))
R.output(gv0)
-
模块化和重用:通过将计算分解为多个步骤,每个步骤的输出可以作为下一个步骤的输入,从而实现模块化设计。这种方式使得代码更容易理解和维护,同时也便于重用各个计算步骤。
-
优化和分析:在构建计算图的过程中,可以更容易地分析和优化整个计算流程。例如,可以识别出可以并行执行的计算步骤,或者找到可以合并的重复计算,从而提高程序的执行效率。
-
动态计算图:使用
R.call_dps_packed
构建的计算图是动态的,这意味着可以在运行时根据需要动态地构建和修改计算图。这为实现灵活的计算流程提供了可能,特别是在处理复杂的机器学习模型时。 -
资源管理:通过计算图,可以更有效地管理计算资源,如内存和计算单元。计算图框架通常会自动处理内存分配和释放,以及计算资源的调度,从而减少资源浪费和提高程序性能。
-
易于集成:构建的计算图可以方便地与其他系统或库集成,因为计算图提供了一种标准化的方式来描述和执行计算。这使得将模型部署到不同的硬件平台或与其他软件系统集成变得更加容易。
4.4 构建并运行模型
调用 relax.build
来构建这个函数。 注意:Relax 仍在开发中,因此某些 API 可能会更改。 不过,我们的主要目标是熟悉端到端模型的整体 MLC 流程(构造、转换、构建)。
ex = relax.build(MyModule, target="llvm")
build 函数会给我们一个可执行文件(译者注:“可执行文件”并非传统操作系统中的可执行文件,不能直接在系统中运行,而是针对Relax VM设计的一种文件格式)。
vm = relax.VirtualMachine(ex, tvm.cpu())
首先构建包含输入数据和权重的 tvm NDArray,然后我们可以通过传入输入参数和权重来运行 main
函数。
data_nd = tvm.nd.array(img.reshape(1, 784))
nd_params = {k: tvm.nd.array(v) for k, v in mlp_params.items()}
nd_res = vm["main"](data_nd,
nd_params["w0"],
nd_params["b0"],
nd_params["w1"],
nd_params["b1"])
pred_kind = np.argmax(nd_res.numpy(), axis=1)
print("预测的类别:", classes[pred_kind[0]])
4.5 在环境中集成现有运行库
很多情况下我们希望将现有的库函数集成到MLC过程中 ,一下示例IRModule展示了如何做到这一点:
@tvm.script.ir_module
class MyModuleWithExternCall:
@R.function
def main(x: R.Tensor((1, "m"), "float32"),
w0: R.Tensor(("n", "m"), "float32"),
b0: R.Tensor(("n", ), "float32"),
w1: R.Tensor(("k", "n"), "float32"),
b1: R.Tensor(("k", ), "float32")):
# block 0
m, n, k = T.int64(), T.int64(), T.int64()
with R.dataflow():
lv0 = R.call_dps_packed("env.linear", (x, w0, b0), R.Tensor((1, n), "float32"))
lv1 = R.call_dps_packed("env.relu", (lv0, ), R.Tensor((1, n), "float32"))
out = R.call_dps_packed("env.linear", (lv1, w1, b1), R.Tensor((1, k), "float32"))
R.output(out)
return out
"env.linear"是我们期望在模型执行期间的运行函数的名称
1.注册运行时函数
为了能够执行调用外部函数的代码,我们需注册相应的函数。下面的代码注册了函数的两个实现:
@tvm.register_func("env.linear", override=True)
def torch_linear(x: tvm.nd.NDArray,
w: tvm.nd.NDArray,
b: tvm.nd.NDArray,
out: tvm.nd.NDArray):
x_torch = torch.from_dlpack(x)
w_torch = torch.from_dlpack(w)
b_torch = torch.from_dlpack(b)
out_torch = torch.from_dlpack(out)
torch.mm(x_torch, w_torch.T, out=out_torch)
torch.add(out_torch, b_torch, out=out_torch)
@tvm.register_func("env.relu", override=True)
def lnumpy_relu(x: tvm.nd.NDArray,
out: tvm.nd.NDArray):
x_torch = torch.from_dlpack(x)
out_torch = torch.from_dlpack(out)
torch.maximum(x_torch, torch.Tensor([0.0]), out=out_torch)
代码注解:
-
from_dlpack
将 TVM NDArray 转换为 torch NDArray。 请注意,这是一个零拷贝转换,这意味着 Torch 阵列与 TVM NDArray 共享底层内存。 -
DLPack 是一种通用的交换标准,允许不同的框架交换 Tensor/NDArray 而无需参与数据复制。
2.构建和运行
现在我们可以构建并运行MyModuleWithExternCall
,我们可以验证模型得到了相同的结果。
ex = relax.build(MyModuleWithExternCall, target="llvm")
vm = relax.VirtualMachine(ex, tvm.cpu())
nd_res = vm["main"](data_nd,
nd_params["w0"],
nd_params["b0"],
nd_params["w1"],
nd_params["b1"])
pred_kind = np.argmax(nd_res.numpy(), axis=1)
print("MyModuleWithExternCall Prediction:", class_names[pred_kind[0]])
4.6 混合TensorIR Code和库
在上一个示例中,我们构建了一个IRModule,其中所有的元操作都被分配给运行库。有时将两者混合使用。
@tvm.script.ir_module
class MyModuleMixture:
@T.prim_func
def linear0(x: T.handle,
w: T.handle,
b: T.handle,
z: T.handle):
m, n, k = T.int64(), T.int64(), T.int64()
X = T.match_buffer(x, (1, m), "float32")
W = T.match_buffer(w, (n, m), "float32")
B = T.match_buffer(b, (n, ), "float32")
Z = T.match_buffer(z, (1, n), "float32")
Y = T.alloc_buffer((1, n), "float32")
for i, j, k in T.grid(1, n, m):
with T.block("Y"):
vi, vj, vk = T.axis.remap("SSR", [i, j, k])
with T.init():
Y[vi, vj] = T.float32(0)
Y[vi, vj] = Y[vi, vj] + X[vi, vk] * W[vj, vk]
for i, j in T.grid(1, n):
with T.block("Z"):
vi, vj = T.axis.remap("SS", [i, j])
Z[vi, vj] = Y[vi, vj] + B[vj]
@R.function
def main(x: R.Tensor((1, "m"), "float32"),
w0: R.Tensor(("n", "m"), "float32"),
b0: R.Tensor(("n", ), "float32"),
w1: R.Tensor(("k", "n"), "float32"),
b1: R.Tensor(("k", ), "float32")):
m, n, k = T.int64(), T.int64(), T.int64()
with R.dataflow():
lv0 = R.call_dps_packed("linear0", (x, w0, b0), R.Tensor((1, n), "float32"))
lv1 = R.call_dps_packed("env.relu", (lv0, ), R.Tensor((1, n), "float32"))
out = R.call_dps_packed("env.linear", (lv1, w1, b1), R.Tensor((1, k), "float32"))
R.output(out)
return out
4.7 将参数绑定到IRModule
在到目前为止的所有示例中,我们通过显式传递参数来构造主函数。 在许多情况下,将参数绑定为附加到 IRModule 的常量通常会降低API的复杂程度。
前置代码:
mlp_params = pkl.load(open("fasionmnist_mlp_params.pkl", "rb"))
nd_params = {k: tvm.nd.array(v) for k, v in mlp_params.items()}
以下代码通过将参数名称与 nd_params 中的键匹配来创建绑定。
MyModuleWithParams = relax.transform.BindParams("main", nd_params)(MyModuleMixture)
IPython.display.Code(MyModuleWithParams.script(), language="python")
结果如下:
# from tvm.script import ir as I
# from tvm.script import tir as T
# from tvm.script import relax as R
@I.ir_module
class Module:
@T.prim_func
def linear0(x: T.handle, w: T.handle, b: T.handle, z: T.handle):
m = T.int64()
X = T.match_buffer(x, (1, m))
n = T.int64()
W = T.match_buffer(w, (n, m))
B = T.match_buffer(b, (n,))
Z = T.match_buffer(z, (1, n))
# with T.block("root"):
Y = T.alloc_buffer((1, n))
for i, j, k in T.grid(1, n, m):
with T.block("Y"):
vi, vj, vk = T.axis.remap("SSR", [i, j, k])
T.reads(X[vi, vk], W[vj, vk])
T.writes(Y[vi, vj])
with T.init():
Y[vi, vj] = T.float32(0)
Y[vi, vj] = Y[vi, vj] + X[vi, vk] * W[vj, vk]
for i, j in T.grid(1, n):
with T.block("Z"):
vi, vj = T.axis.remap("SS", [i, j])
T.reads(Y[vi, vj], B[vj])
T.writes(Z[vi, vj])
Z[vi, vj] = Y[vi, vj] + B[vj]
@R.function
def main(x: R.Tensor((1, 784), dtype="float32")) -> R.Tensor((1, 10), dtype="float32"):
with R.dataflow():
lv0 = R.call_dps_packed("linear0", (x, metadata["relax.expr.Constant"][0], metadata["relax.expr.Constant"][1]), out_sinfo=R.Tensor((1, 128), dtype="float32"))
lv1 = R.call_dps_packed("env.relu", (lv0,), out_sinfo=R.Tensor((1, 128), dtype="float32"))
out = R.call_dps_packed("env.linear", (lv1, metadata["relax.expr.Constant"][2], metadata["relax.expr.Constant"][3]), out_sinfo=R.Tensor((1, 10), dtype="float32"))
R.output(out)
return out
# Metadata omitted. Use show_meta=True in script() method to show it.
其中如果要展示Metadata,可以这样写
MyModuleWithParams = relax.transform.BindParams("main", nd_params)(MyModuleMixture)
script_with_meta = MyModuleWithParams.script(show_meta=True)
print(script_with_meta)
五、自动程序优化
前文知识回顾:
-
驱动高层执行的计算图抽象
-
元张量函数的抽象(TensorIR表示)
-
通过注册环境函数从而能被调用的库函数
所有的元素被分装在一个IRModule中,大多数MLC过程可以看作是元张量函数之间的变换。
5.1 随机调度变换
1.固定调度
之前我们在矩阵乘法那块对j这个变量进行了切割,固定j1大小为4,j0为128/4=32
sch=tvm.tir.Schedule(Module)
block_c=sch.get_block("C","main")
i,j,k=sch.get_loops(block_c)
j0,j1=sch.split(j,factors=[None,4])
IPython.display.Code(sch.mod.script(), language="python")
2.概率式编程
在实际编程当中我们一般会固定一些值,但有些值我们无法确定,因此采用概率式编程,增加一些随机元素
def stochastic_schedule_mm(sch: tvm.tir.Schedule):
block_C = sch.get_block("C", "main")
i, j, k = sch.get_loops(block=block_C)
j_factors = sch.sample_perfect_tile(loop=j, n=2)
j_0, j_1 = sch.split(loop=j, factors=j_factors)
sch.reorder(i, j_0, k, j_1)
sch.decompose_reduction(block_C, k)
return sch
这里sch.sample_perfect_tile(loop=j,n=2)
方法表示对循环j进行随机采样分解成2个因子
通过print(sch.trace)
我们可以多次运行了j_factors会得到不同的值。
5.2 深入研究随机变换
在我们尝试逐步运行随机变换并且获得j_factors时,我们发现j_factors不是一个实整数,而是一个被采样随机变量的符号变量
sch=tvm.tir.Schedule(Module)
sch=stochastic_schedule_mm(sch)
IPython.display.Code(sch.mod.script(), language="python")
print(sch.trace)
观察sch调度器当中发生的事
每次跑都会丢出一个随机的值,会创造一个搜索空间。
当我们使用自己实现的测评函数
def random_search(mod:tvm.IRModule,num_trials=10):
best_result=None
best_sch=None
for i in range(num_trials):
sch=stochastic_schedule_mm(tvm.tir.Schedule(mod))
lib=tvm.build(sch.mod,target="llvm")
f_time_after=lib.time_evaluator("main",tvm.cpu())
result=f_time_after(a_nd, b_nd, c_nd).mean
print("==========Attempt %d,time-cost:%.9f ms======="%(i,result*1000))
print(sch.trace)
if best_result is None or result<best_result:
best_result=result
best_sch=sch
return best_sch
sch=random_search(Module)
==========Attempt 0,time-cost:0.000054800 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[8, 16]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 1,time-cost:0.000059300 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[16, 8]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 2,time-cost:0.000061500 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[8, 16]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 3,time-cost:0.000059100 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[128, 1]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 4,time-cost:0.000055800 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[128, 1]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 5,time-cost:0.000054400 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[8, 16]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 6,time-cost:0.000059500 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[32, 4]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 7,time-cost:0.000059300 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[32, 4]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 8,time-cost:0.000055500 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[16, 8]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 9,time-cost:0.000064800 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[8, 16]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3)==========Attempt 0,time-cost:0.000054800 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[8, 16]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 1,time-cost:0.000059300 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[16, 8]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 2,time-cost:0.000061500 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[8, 16]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 3,time-cost:0.000059100 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[128, 1]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 4,time-cost:0.000055800 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[128, 1]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 5,time-cost:0.000054400 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[8, 16]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 6,time-cost:0.000059500 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[32, 4]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 7,time-cost:0.000059300 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[32, 4]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 8,time-cost:0.000055500 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[16, 8]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3) ==========Attempt 9,time-cost:0.000064800 ms======= # from tvm import tir def apply_trace(sch: tir.Schedule) -> None: b0 = sch.get_block(name="C", func_name="main") l1, l2, l3 = sch.get_loops(block=b0) v4, v5 = sch.sample_perfect_tile(loop=l2, n=2, max_innermost_factor=16, decision=[8, 16]) l6, l7 = sch.split(loop=l2, factors=[v4, v5], preserve_unit_iters=True, disable_predication=False) sch.reorder(l1, l6, l3, l7) b8 = sch.decompose_reduction(block=b0, loop=l3)
得出最优结果是[8,16],当然这个结果会随着每次发生变化,只能得到本次调用中的最优。
5.3 随机变换搜索
事实上,stochastic_schedule_mm
创建了一个可能程序的搜索空间。
以它指定的是一组程序,那么问题是什么是最佳选择呢?
为此我们需要一个搜索算法,例如:连续运行很多次,取运行时间最短的那一次作为最佳选择。当然这是最直接简单的想法。在实践中,TVM 的 Meta-Schedule API
提供了一些附加功能:
-
跨越多个进程的并行基准测试。
-
使用代价模型 (cost model) 来避免每次都进行基准测试。
-
基于历史轨迹进行遗传搜索 (evolutionary search),而不是每次都随机采样。
纵使有这么多工具,我们的核心思想是保持不变的:使用随机变换来指定好的程序的搜索空间,使用 tune_tir
API 帮助在搜索空间内搜索并找到最优的调度变换。
以下是在指定的搜索空间进搜索,tune_tir
函数返回在调优过程中找到的优化后的调度。
from tvm import meta_schedule as ms
database = ms.tune_tir(
mod=MyModule,
target="llvm --num-cores=1",
max_trials_global=64,
num_trials_per_iter=64,
space=ms.space_generator.ScheduleFn(stochastic_schedule_mm),
work_dir="./tune_tmp",
task_name="main"
)
sch = ms.tir_integration.compile_tir(database, MyModule, "llvm --num-cores=1")
Meta-Schedule 带有内置通用随机变换集合,能够适用于广泛的 TensorIR 计算。这种方法也称为自动调度 (auto-scheduling),因为搜索空间是由系统生成的。我们可以通过删除行 space=ms.space_generator.ScheduleFn(stochastic_schedule_mm)
来运行它。
database = ms.tune_tir(
mod=MyModule,
target="llvm --num-cores=1",
max_trials_global=64,
num_trials_per_iter=64,
work_dir="./tune_tmp",
task_name="main",
)
sch = ms.tir_integration.compile_tir(database, MyModule, "llvm --num-cores=1")
2025-03-10 10:47:07 [INFO] Logging directory: ./tune_tmp/logs 2025-03-10 10:47:07 [INFO] LocalBuilder: max_workers = 1 2025-03-10 10:47:09 [INFO] LocalRunner: max_workers = 1 2025-03-10 10:47:11 [INFO] [task_scheduler.cc:159] Initializing Task #0: "main" 2025-03-10 10:47:11 [INFO] [task_scheduler.cc:320] ID | Name | FLOP | Weight | Speed (GFLOPS) | Latency (us) | Weighted Latency (us) | Trials | Done ------------------------------------------------------------------------------------------------------ 0 | main | 4194304 | 1 | N/A | N/A | N/A | 0 | ------------------------------------------------------------------------------------------------------ Total trials: 0 Total latency (us): 0 2025-03-10 10:47:11 [INFO] [task_scheduler.cc:180] TaskScheduler picks Task #0: "main" 2025-03-10 10:47:23 [INFO] [task_scheduler.cc:193] Sending 64 sample(s) to builder 2025-03-10 10:47:58 [INFO] [task_scheduler.cc:195] Sending 64 sample(s) to runner 2025-03-10 10:48:14 [DEBUG] XGB iter 0: tr-p-rmse: 0.714534 tr-a-peak@32: 1.000000 tr-rmse: 0.396155 tr-rmse: 0.396155 2025-03-10 10:48:14 [DEBUG] XGB iter 25: tr-p-rmse: 0.226917 tr-a-peak@32: 1.000000 tr-rmse: 0.407556 tr-rmse: 0.407556 2025-03-10 10:48:14 [DEBUG] XGB iter 50: tr-p-rmse: 0.225508 tr-a-peak@32: 1.000000 tr-rmse: 0.407631 tr-rmse: 0.407631 2025-03-10 10:48:14 [DEBUG] XGB iter 75: tr-p-rmse: 0.225508 tr-a-peak@32: 1.000000 tr-rmse: 0.407631 tr-rmse: 0.407631 2025-03-10 10:48:15 [DEBUG] XGB stopped. Best iteration: [41] tr-p-rmse:0.22551 tr-a-peak@32:1.00000 tr-rmse:0.40763 tr-rmse:0.40763 2025-03-10 10:48:15 [INFO] [task_scheduler.cc:237] [Updated] Task #0: "main" 2025-03-10 10:48:15 [INFO] [task_scheduler.cc:320] ID | Name | FLOP | Weight | Speed (GFLOPS) | Latency (us) | Weighted Latency (us) | Trials | Done ------------------------------------------------------------------------------------------------------ 0 | main | 4194304 | 1 | 765.2431 | 5.4810 | 5.4810 | 64 | ------------------------------------------------------------------------------------------------------ Total trials: 64 Total latency (us): 5.48101 2025-03-10 10:48:15 [INFO] [task_scheduler.cc:260] Task #0 has finished. Remaining task(s): 0 2025-03-10 10:48:15 [INFO] [task_scheduler.cc:320] ID | Name | FLOP | Weight | Speed (GFLOPS) | Latency (us) | Weighted Latency (us) | Trials | Done ------------------------------------------------------------------------------------------------------ 0 | main | 4194304 | 1 | 765.2431 | 5.4810 | 5.4810 | 64 | Y ------------------------------------------------------------------------------------------------------ Total trials: 64 Total latency (us): 5.48101
得出还是[16,8]是最优
5.4 集成到端到端模型部署中
下载数据集
import torchvision
import torch
import matplotlib.pyplot as plt
# 加载数据集
test_data = torchvision.datasets.FashionMNIST(
root="data",
train=False,
download=True,
transform=torchvision.transforms.ToTensor()
)
classes = [
"T-shirt/top", "Trouser", "Pullover", "Dress", "Coat",
"Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot"
]
# 创建 DataLoader
test_DataLoader = torch.utils.data.DataLoader(test_data, batch_size=1, shuffle=True)
# 获取一个批次的数据
img, label = next(iter(test_DataLoader))
# 将图像从 Tensor 转换为 NumPy 数组,并调整形状
img = img.squeeze().numpy() # 从 (1, 1, 28, 28) 转换为 (28, 28)
# 打印标签
print(f"Label: {label.item()} ({classes[label.item()]})")
# 显示图像
plt.imshow(img, cmap='gray')
plt.title(f"Label: {label}")
plt.axis('off') # 不显示坐标轴
plt.show()
下载权重参数
!wget https://github.com/mlc-ai/web-data/raw/main/models/fasionmnist_mlp_params.pkl
获取参数
import pickle as pkl
import numpy as np
import tvm
# 加载模型参数,用numpy测试下
mlp_params = pkl.load(open("fasionmnist_mlp_params.pkl", "rb"))
data_nd=tvm.nd.array(img.reshape(1,784))
nd_params={k:tvm.nd.array(v) for k,v in mlp_params.items()}
主要步骤一览:
-
构建原模型
MyModuleMixture
-
注册原环境中的算子
env.linear
和env.relu
-
对IRModule的自定义函数
linear0
进行自动程序优化 -
用自动程序优化好的
linear0
代替原来的`linear0 -
构建运行
以下为具体的实现:
(1)构建原模型MyModuleMixture
:
@tvm.script.ir_module
class MyModuleMixture:
@T.prim_func
def linear0(X: T.Buffer((1, 784), "float32"),
W: T.Buffer((128, 784), "float32"),
B: T.Buffer((128,), "float32"),
Z: T.Buffer((1, 128), "float32")):
T.func_attr({"global_symbol": "linear0", "tir.noalias": True})
Y = T.alloc_buffer((1, 128), "float32")
for i, j, k in T.grid(1, 128, 784):
with T.block("Y"):
vi, vj, vk = T.axis.remap("SSR", [i, j, k])
with T.init():
Y[vi, vj] = T.float32(0)
Y[vi, vj] = Y[vi, vj] + X[vi, vk] * W[vj, vk]
for i, j in T.grid(1, 128):
with T.block("Z"):
vi, vj = T.axis.remap("SS", [i, j])
Z[vi, vj] = Y[vi, vj] + B[vj]
@R.function
def main(x: R.Tensor((1, 784), "float32"),
w0: R.Tensor((128, 784), "float32"),
b0: R.Tensor((128,), "float32"),
w1: R.Tensor((10, 128), "float32"),
b1: R.Tensor((10,), "float32")):
with R.dataflow():
lv0 = R.call_dps_packed("linear0", (x, w0, b0), R.Tensor((1, 128), dtype="float32"))
lv1 = R.call_dps_packed("env.relu", (lv0,), R.Tensor((1, 128), dtype="float32"))
out = R.call_dps_packed("env.linear", (lv1, w1, b1), R.Tensor((1, 10), dtype="float32"))
R.output(out)
return out
(2)注册原环境中的算子env.linear
和env.relu
@tvm.register_func("env.linear", override=True)
def torch_linear(x: tvm.nd.NDArray,
w: tvm.nd.NDArray,
b: tvm.nd.NDArray,
out: tvm.nd.NDArray):
x_torch = torch.from_dlpack(x)
w_torch = torch.from_dlpack(w)
b_torch = torch.from_dlpack(b)
out_torch = torch.from_dlpack(out)
torch.mm(x_torch, w_torch.T, out=out_torch)
torch.add(out_torch, b_torch, out=out_torch)
@tvm.register_func("env.relu", override=True)
def lnumpy_relu(x: tvm.nd.NDArray,
out: tvm.nd.NDArray):
x_torch = torch.from_dlpack(x)
out_torch = torch.from_dlpack(out)
torch.maximum(x_torch, torch.Tensor([0.0]), out=out_torch)
(3)先绑定参数跑一遍模型
MyModuleWithParams=relax.transform.BindParams("main",nd_params)(MyModule)
用于将预训练的参数(如权重和偏置)绑定到 Relax 模块中。绑定参数后,这些参数将不再被视为模块的输入,而是直接嵌入到模块中.
ex = relax.build(MyModuleWithParams, target="llvm")
vm = relax.VirtualMachine(ex, tvm.cpu())
nd_res = vm["main"](data_nd)
pred_kind = np.argmax(nd_res.numpy(), axis=1)
print("MyModuleWithExternCall Prediction:", classes[pred_kind[0]])
绑定场所,然后编译运行模型,进行预测
观察旧模型运行时间
ftimer=vm.module.time_evaluator("main",tvm.cpu(),number=100) print("==========Attempt %d,time-cost:%.9f ms======="%(0,ftimer(data_nd).mean*1000))
(3)对IRModule的自定义函数linear0
进行自动程序优化
调优 API 只接受一个带有一个 main
函数的 IRModule,所以我们首先将 linear0
取出到另一个模块的 main 函数中并将其传递给 tune_tir
。(据说以后会优化这个操作?)
mod_linear = tvm.IRModule.from_expr(MyModuleMixture["linear0"].with_attr("global_symbol", "main"))
database = ms.tune_tir(
mod=mod_linear,
target="llvm --num-cores=1",
max_trials_global=64,
num_trials_per_iter=64,
work_dir="./tune_tmp",
task_name="main",
)
sch = ms.tir_integration.compile_tir(database, mod_linear, "llvm --num-cores=1")
(4)用自动程序优化好的linear0
代替原来的linear0
现在我们需要在调优后用新函数替换原来的 linear0
。我们可以通过首先获得一个 global_var
(一个指向 IRModule 中函数的 pointer
引用),然后调用 update_func
来用新的函数替换原本的函数。
MyModuleWithParams2 = relax.transform.BindParams("main", nd_params)(MyModuleMixture)
new_func = sch.mod["main"].with_attr("global_symbol", "linear0")
gv = MyModuleWithParams2.get_global_var("linear0")
MyModuleWithParams2.update_func(gv, new_func)
(5)构建运行
ex = relax.build(MyModuleWithParams2, target="llvm")
vm = relax.VirtualMachine(ex, tvm.cpu())
nd_res = vm["main"](data_nd)
pred_kind = np.argmax(nd_res.numpy(), axis=1)
print("MyModuleWithParams2 Prediction:", class_names[pred_kind[0]])
再观察优化后新模型的运行时间
ftimer=vm.module.time_evaluator("main",tvm.cpu(),number=100)
print("==========Attempt %d,time-cost:%.9f ms======="%(0,ftimer(data_nd).mean*1000))
六、与机器学习框架的整合
1.使用张量表达式(te)构建TensorIR(即@T.prim_func)
2.使用BlockBuilder构造IRModule
6.1 使用张量表达式构建TensorIR
1.创建输入
from tvm import te
# 表示TensorIR的输入
A = te.placeholder((128, 128), name="A", dtype="float32")
B = te.placeholder((128, 128), name="B", dtype="float32")
这里的A和B的类型都是te.Tensor
对象,每个te.Tensor
都有一个shape字段和dtype字段
TE 当中,tvm.te.Tensor 是指计算图中的某个数据块,概念类似于神经网络中的一个 feature map。例如,神经网络的 RGB Input 就是一个 Tensor;神经网络中 Conv、Pooling 算子的计算结果也是一个 Tensor。
2.描述计算过程
def te_matmul(A: te.Tensor, B: te.Tensor) -> te.Tensor:
assert A.shape[1] == B.shape[0]
n = A.shape[0]
m = B.shape[1]
k = te.reduce_axis((0, A.shape[1]), name="k")
return te.compute((n, m), lambda i, j: te.sum(A[i, k] * B[k, j], axis=k), name="matmul")
C = te_matmul(A, B)
这里使用到了te.compute
这样的接口,从一个或者多个前序节点接收数据,并按初始化的时候传入的 lambda 表达式计算 Tensor 内的数据。
之后我们使用te_matmul
使用A和B获得计算结果,至此完成了计算图的描述。
3.创建TensorIR函数
te.create_prim_func([A, B, C]).show()
可以调用 te.create_prim_func
并传入输入和输出值,至此生成一个TensorIR函数
利用类似的方法,我们可以为ReLu
生成一个张量表达式
def te_relu(A: te.Tensor) -> te.Tensor:
return te.compute(A.shape, lambda i, j: te.max(A[i, j], 0), name="relu")
X1=te.placeholder((1,128),name="X1",dtype="float32")
Y1=te_relu(X1)
te.create_prim_func([X1,Y1]).show()
4.算子融合
te API
允许我们做的另一件事是组合操作并创建“融合 (fused)”算子。例如,我们可以将 matmul 的结果再次应用 relu。
C = te_matmul(A, B)
D = te_relu(C)
我们可以通过只传递感兴趣的输入和输出值,跳过中间值来创建一个 TensorIR 函数。 这将导致 matmul 的结果被分配为 TensorIR 函数中的临时空间(TensorIR函数中会出现中间结果分配函数:matmul = T.alloc_buffer((128, 128))的语句)
te.create_prim_func([A, B, D]).show()
我们还可以将中间结果 C 传递到参数列表中。在这种情况下,TensorIR 函数希望我们也从调用方传入 C。通常我们建议只传入输入和输出,这样我们就可以在里面进行更高级的融合。
te.create_prim_func([A, B, C, D]).show()
6.2 使用BlockBuilder构造IRModule
目前我们已构建了一个TensorIR函数,为了构建端到端的模型执行,我们还需要能够通过计算图连接多个TensorIR函数
我们可以创建一个 block builder
,他可以帮助我们逐步创建一个relax.Function
A = relax.Var("A", relax.TensorStructInfo((128, 128), "float32"))
B = relax.Var("B", relax.TensorStructInfo((128, 128), "float32"))
bb = relax.BlockBuilder()
with bb.function("main"):
with bb.dataflow():
C = bb.emit_te(te_matmul, A, B)
D = bb.emit_te(te_relu, C)
R = bb.emit_output(D)
bb.emit_func_output(R, params=[A, B])
MyModule = bb.get()
MyModule.show()
终端得到的输出为:
# from tvm.script import ir as I
# from tvm.script import tir as T
# from tvm.script import relax as R
@I.ir_module
class Module:
@T.prim_func(private=True)
def te_matmul(A: T.Buffer((T.int64(128), T.int64(128)), "float32"), B: T.Buffer((T.int64(128), T.int64(128)), "float32"), matmul: T.Buffer((T.int64(128), T.int64(128)), "float32")):
T.func_attr({"tir.noalias": T.bool(True)})
# with T.block("root"):
for i, j, k in T.grid(T.int64(128), T.int64(128), T.int64(128)):
with T.block("matmul"):
v_i, v_j, v_k = T.axis.remap("SSR", [i, j, k])
T.reads(A[v_i, v_k], B[v_k, v_j])
T.writes(matmul[v_i, v_j])
with T.init():
matmul[v_i, v_j] = T.float32(0.0)
matmul[v_i, v_j] = matmul[v_i, v_j] + A[v_i, v_k] * B[v_k, v_j]
@T.prim_func(private=True)
def te_relu(lv: T.Buffer((T.int64(128), T.int64(128)), "float32"), relu: T.Buffer((T.int64(128), T.int64(128)), "float32")):
T.func_attr({"tir.noalias": T.bool(True)})
# with T.block("root"):
for i, j in T.grid(T.int64(128), T.int64(128)):
with T.block("relu"):
v_i, v_j = T.axis.remap("SS", [i, j])
T.reads(lv[v_i, v_j])
T.writes(relu[v_i, v_j])
relu[v_i, v_j] = T.max(lv[v_i, v_j], T.float32(0.0))
@R.function
def main(A: R.Tensor((128, 128), dtype="float32"), B: R.Tensor((128, 128), dtype="float32")) -> R.Tensor((128, 128), dtype="float32"):
cls = Module
with R.dataflow():
lv = R.call_tir(cls.te_matmul, (A, B), out_sinfo=R.Tensor((128, 128), dtype="float32"))
lv1 = R.call_tir(cls.te_relu, (lv,), out_sinfo=R.Tensor((128, 128), dtype="float32"))
gv: R.Tensor((128, 128), dtype="float32") = lv1
R.output(gv)
return gv
1. 对比BlockBuilder
代码和生成的IRModule

BlockBuilder
带有与 Relax
函数中相应的作用域。例如,bb.dataflow()
创建一个 dataflow block
,其中所有对 BlockBuilder
的调用都处在 dataflow block
的作用域中。
其中每个中间结果都是一个relax.Var
,对应一个存储计算结果的变量。
type(C)
tvm.relax.expr.DataflowVar # DataflowVar表示该变量是dataflow block(和计算图)内的中间步骤
isinstance(C, relax.Var)
True
Relax 函数中的每一行都是由 emit_te
调用生成的。 例如,
lv = R.call_dps_packed(te_matmul, (A, B), (128, 128), dtype="float32")
是由如下代码所生成
C = bb.emit_te(te_matmul, A, B).
综上所述,bb.emit_te
做了以下事情:
-
为 A 和 B 创建一个输入
te.placeholder
。 -
通过
te_matmul
函数运行它们(构建计算图?) -
调用
te.create_prim_func
来创建一个 TensorIR 函数。 -
通过
call_dps_packed
生成对函数的调用。
值得注意的是我们有两种方式来指定函数的参数列表:
方式一:
python with bb.function("main"): ... # specify parameters in the end bb.emit_func_output(R, params=[A, B])
方式二:
python # specify parameters in the beginning. with bb.function("main", params=[A, B]): ... bb.emit_func_output(R)
6.3 从Pytorch导入模型
前文已经学习了以编程方式构建 IRModule 的工具,现在我们将使用它们将机器学习模型从 Pytorch
导入成为IRModule
。
大多数机器学习框架都带有计算图抽象,其中每个节点对应一个操作,边对应它们之间的依赖关系。 我们将采用 PyTorch
模型,获取 PyTorch
原生格式的计算图,并将其转换为IRModule
。
模型定义如下,示例为一个matmul
+ReLU
class MyModel(nn.Module):
def __init__(self):
super(MyModel, self).__init__()
self.weight = nn.Parameter(torch.randn(128, 128))
def forward(self, x):
x = torch.matmul(x, self.weight)
x = torch.relu(x)
return x
6.3.1 创建TorchFX GraphModule
model = MyModel()
fx_module = fx.symbolic_trace(model)
type(fx_module)
# fx_module 包含一个简单的计算图,可以打印成表格便于查看。
fx_module.graph.print_tabular()
我们的目标是将这个计算图转化为IRModule
我们可以查看计算图中的节点和对应的操作
for node in fx_module.graph.nodes:
print(node)
print(node.op)
6.3.2 构造映射函数
让我们定义整体的翻译逻辑。 主要流程如下:
-
创建一个
node_map
,将fx.Node
映射到相应的relax.Var
,该relax.Var
代表 IRModule 中的已翻译节点。 -
以拓扑顺序迭代 FX 图中的节点。
-
给定映射输入,获取节点的映射输出。
def map_param(param: nn.Parameter):
return relax.const(
param.data.cpu().numpy(), relax.TensorStructInfo(param.data.shape, "float32")
)
def fetch_attr(fx_mod, target: str):
"""Helper function to fetch an attr"""
target_atoms = target.split('.')
attr_itr = fx_mod
for i, atom in enumerate(target_atoms):
if not hasattr(attr_itr, atom):
raise RuntimeError(f"Node referenced nonexistant target {'.'.join(target_atoms[:i])}")
attr_itr = getattr(attr_itr, atom)
return attr_itr
def from_fx(fx_mod, input_shapes, call_function_map, call_module_map):
input_index = 0
node_map = {}
named_modules = dict(fx_mod.named_modules())
bb = relax.BlockBuilder()
fn_inputs = []
fn_output = None
with bb.function("main"):
with bb.dataflow():
for node in fx_mod.graph.nodes:
if node.op == "placeholder":
# create input placeholder
shape = input_shapes[input_index]
input_index += 1
input_var = relax.Var(
node.target, relax.TensorStructInfo(shape, "float32")
)
fn_inputs.append(input_var)
node_map[node] = input_var
elif node.op == "get_attr":
node_map[node] = map_param(fetch_attr(fx_mod, node.target))
elif node.op == "call_function":
node_map[node] = call_function_map[node.target](bb, node_map, node)
elif node.op == "call_module":
named_module = named_modules[node.target]
node_map[node] = call_module_map[type(named_module)](bb, node_map, node, named_module)
elif node.op == "output":
output = node_map[node.args[0]]
assert fn_output is None
fn_output = bb.emit_output(output)
# output and finalize the function
bb.emit_func_output(output, fn_inputs)
return bb.get()
1.函数 map_param
def map_param(param: nn.Parameter): return relax.const( param.data.cpu().numpy(), relax.TensorStructInfo(param.data.shape, "float32") )
将 PyTorch 的 nn.Parameter
转换为 Relax 的常量张量。
2.函数 fetch_attr
def fetch_attr(fx_mod, target: str): """Helper function to fetch an attribute""" target_atoms = target.split('.') attr_itr = fx_mod for i, atom in enumerate(target_atoms): if not hasattr(attr_itr, atom): raise RuntimeError(f"Node referenced nonexistant target {'.'.join(target_atoms[:i])}") attr_itr = getattr(attr_itr, atom) return attr_itr
-
功能:从 PyTorch 的 FX 模块中获取指定的属性。
-
参数:
-
fx_mod
: 一个 PyTorch FX 模块。 -
target
: 一个字符串,表示要获取的属性路径(例如"layer1.conv.weight"
)。
-
-
逻辑:
-
将
target
按.
分割为多个部分(target_atoms
)。 -
逐级访问
fx_mod
的属性,直到找到目标属性。 -
如果某个属性不存在,抛出
RuntimeError
。
-
-
返回值:目标属性的值。
-
举个例子:
假设目标路径是
"layer1.0.weight"
,fetch_attr
的执行过程如下:-
target_atoms = ["layer1", "0", "weight"]
-
初始化
attr_itr = fx_mod
-
遍历
target_atoms
:-
第一步:
attr_itr = fx_mod.layer1
(访问layer1
属性) -
第二步:
attr_itr = fx_mod.layer1[0]
(访问layer1
中的第一个模块,即Conv2d
) -
第三步:
attr_itr = fx_mod.layer1[0].weight
(访问Conv2d
的weight
属性)
-
-
返回
attr_itr
,即Conv2d
的权重。
-
3.函数 from_fx
功能:将pytorch fx模块转换为 tvm relax模块
参数:
-
fx_mod
: 一个 PyTorch FX 模块。 -
input_shapes
: 一个列表,包含输入张量的形状。 -
call_function_map
: 一个字典,将 FX 中的函数节点映射到 Relax 的实现。 -
call_module_map
: 一个字典,将 FX 中的模块节点映射到 Relax 的实现。
处理输入节点(placeholer):
if node.op == "placeholder": # create input placeholder shape = input_shapes[input_index] input_index += 1 input_var = relax.Var( node.target, relax.TensorStructInfo(shape, "float32") ) fn_inputs.append(input_var) node_map[node] = input_var
-
如果节点类型是
"placeholder"
,表示这是一个输入节点。 -
创建一个 Relax 的输入变量
relax.Var
,其形状为input_shapes[input_index]
。 -
将输入变量添加到
fn_inputs
列表中,并将其存储到node_map
中。
elif node.op == "get_attr": node_map[node] = map_param(fetch_attr(fx_mod, node.target))
-
如果节点类型是
"get_attr"
,表示这是一个属性节点。 -
使用
fetch_attr
获取目标属性(例如权重或偏置)。 -
使用
map_param
将属性转换为 Relax 的常量张量,并存储到node_map
中。
elif node.op == "call_function": node_map[node] = call_function_map[node.target](bb, node_map, node)
-
如果节点类型是
"call_function"
,表示这是一个函数调用节点。 -
使用
call_function_map
查找对应的 Relax 实现。 -
调用 Relax 实现,并将结果存储到
node_map
中。
elif node.op == "call_module": named_module = named_modules[node.target] node_map[node] = call_module_map[type(named_module)](bb, node_map, node, named_module)
-
如果节点类型是
"call_module"
,表示这是一个模块调用节点。 -
使用
named_modules
获取目标模块。 -
使用
call_module_map
查找对应的 Relax 实现。 -
调用 Relax 实现,并将结果存储到
node_map
中。
elif node.op == "output": output = node_map[node.args[0]] assert fn_output is None fn_output = bb.emit_output(output)
-
如果节点类型是
"output"
,表示这是 FX 图的输出节点。 -
获取输出节点的值,并使用
bb.emit_output
将其设置为 Relax 函数的输出。 -
确保
fn_output
之前未被设置。
我们没有在 from_fx
函数中定义函数映射。 我们将通过映射提供每个 torch function 的翻译规则。 具体来说,以下代码块显示了我们如何通过 emit_te
API 做到这一点。
def map_matmul(bb, node_map, node: fx.Node):
A = node_map[node.args[0]]
B = node_map[node.args[1]]
return bb.emit_te(te_matmul, A, B)
def map_relu(bb, node_map, node: fx.Node):
A = node_map[node.args[0]]
return bb.emit_te(te_relu, A)
MyModule = from_fx(
fx_module,
input_shapes = [(1, 128)],
call_function_map = {
torch.matmul: map_matmul,
torch.relu: map_relu,
},
call_module_map={},
)
MyModule.show()
# from tvm.script import ir as I
# from tvm.script import tir as T
# from tvm.script import relax as R
@I.ir_module
class Module:
@T.prim_func(private=True)
def te_matmul(x: T.Buffer((T.int64(1), T.int64(128)), "float32"), B: T.Buffer((T.int64(128), T.int64(128)), "float32"), matmul: T.Buffer((T.int64(1), T.int64(128)), "float32")):
T.func_attr({"tir.noalias": T.bool(True)})
# with T.block("root"):
for i, j, k in T.grid(T.int64(1), T.int64(128), T.int64(128)):
with T.block("matmul"):
v_i, v_j, v_k = T.axis.remap("SSR", [i, j, k])
T.reads(x[v_i, v_k], B[v_k, v_j])
T.writes(matmul[v_i, v_j])
with T.init():
matmul[v_i, v_j] = T.float32(0.0)
matmul[v_i, v_j] = matmul[v_i, v_j] + x[v_i, v_k] * B[v_k, v_j]
@T.prim_func(private=True)
def te_relu(lv: T.Buffer((T.int64(1), T.int64(128)), "float32"), relu: T.Buffer((T.int64(1), T.int64(128)), "float32")):
T.func_attr({"tir.noalias": T.bool(True)})
# with T.block("root"):
for i, j in T.grid(T.int64(1), T.int64(128)):
with T.block("relu"):
v_i, v_j = T.axis.remap("SS", [i, j])
T.reads(lv[v_i, v_j])
T.writes(relu[v_i, v_j])
relu[v_i, v_j] = T.max(lv[v_i, v_j], T.float32(0.0))
@R.function
def main(x: R.Tensor((1, 128), dtype="float32")) -> R.Tensor((1, 128), dtype="float32"):
cls = Module
with R.dataflow():
lv = R.call_tir(cls.te_matmul, (x, metadata["relax.expr.Constant"][0]), out_sinfo=R.Tensor((1, 128), dtype="float32"))
lv1 = R.call_tir(cls.te_relu, (lv,), out_sinfo=R.Tensor((1, 128), dtype="float32"))
gv: R.Tensor((1, 128), dtype="float32") = lv1
R.output(gv)
return lv1
# Metadata omitted. Use show_meta=True in script() method to show it.
6.4 FashionMNIST示例
import torch
import torchvision
test_data = torchvision.datasets.FashionMNIST(
root="data",
train=False,
download=True,
transform=torchvision.transforms.ToTensor()
)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=1, shuffle=True)
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
img, label = next(iter(test_loader))
img = img.reshape(1, 28, 28).numpy()
# Hide outputs
!wget -nc https://github.com/mlc-ai/web-data/raw/main/models/fasionmnist_mlp_params.pkl
class MLP(nn.Module):
def __init__(self):
super(MLP, self).__init__()
self.linear0 = nn.Linear(784, 128, bias=True)
self.relu = nn.ReLU()
self.linear1 = nn.Linear(128, 10, bias=True)
def forward(self, x):
x = self.linear0(x)
x = self.relu(x)
x = self.linear1(x)
return x
import pickle as pkl
mlp_model = MLP()
mlp_params = pkl.load(open("fasionmnist_mlp_params.pkl", "rb"))
mlp_model.linear0.weight.data = torch.from_numpy(mlp_params["w0"])
mlp_model.linear0.bias.data = torch.from_numpy(mlp_params["b0"])
mlp_model.linear1.weight.data = torch.from_numpy(mlp_params["w1"])
mlp_model.linear1.bias.data = torch.from_numpy(mlp_params["b1"])
torch_res = mlp_model(torch.from_numpy(img.reshape(1, 784)))
pred_kind = np.argmax(torch_res.detach().numpy(), axis=1)
print("Torch Prediction:", class_names[pred_kind[0]])
from tvm import topi
def map_nn_linear(bb, node_map, node, nn_mod):
x = node_map[node.args[0]]
w = map_param(nn_mod.weight)
if nn_mod.bias is not None:
b = map_param(nn_mod.bias)
y = bb.emit_te(topi.nn.dense, x, w)
return bb.emit_te(topi.add, y, b)
def map_nn_relu(bb, node_map, node, nn_mod):
return map_relu(bb, node_map, node)
MLPModule = from_fx(
fx.symbolic_trace(mlp_model),
input_shapes = [(1, 784)],
call_function_map={
},
call_module_map={
torch.nn.Linear: map_nn_linear,
torch.nn.ReLU: map_nn_relu,
},
)
MLPModule.show()
结果:
# from tvm.script import ir as I
# from tvm.script import tir as T
# from tvm.script import relax as R
@I.ir_module
class Module:
@T.prim_func(private=True)
def add(lv: T.Buffer((T.int64(1), T.int64(128)), "float32"), B: T.Buffer((T.int64(128),), "float32"), T_add: T.Buffer((T.int64(1), T.int64(128)), "float32")):
T.func_attr({"tir.noalias": T.bool(True)})
# with T.block("root"):
for ax0, ax1 in T.grid(T.int64(1), T.int64(128)):
with T.block("T_add"):
v_ax0, v_ax1 = T.axis.remap("SS", [ax0, ax1])
T.reads(lv[v_ax0, v_ax1], B[v_ax1])
T.writes(T_add[v_ax0, v_ax1])
T_add[v_ax0, v_ax1] = lv[v_ax0, v_ax1] + B[v_ax1]
@T.prim_func(private=True)
def add1(lv3: T.Buffer((T.int64(1), T.int64(10)), "float32"), B: T.Buffer((T.int64(10),), "float32"), T_add: T.Buffer((T.int64(1), T.int64(10)), "float32")):
T.func_attr({"tir.noalias": T.bool(True)})
# with T.block("root"):
for ax0, ax1 in T.grid(T.int64(1), T.int64(10)):
with T.block("T_add"):
v_ax0, v_ax1 = T.axis.remap("SS", [ax0, ax1])
T.reads(lv3[v_ax0, v_ax1], B[v_ax1])
T.writes(T_add[v_ax0, v_ax1])
T_add[v_ax0, v_ax1] = lv3[v_ax0, v_ax1] + B[v_ax1]
@T.prim_func(private=True)
def dense(x: T.Buffer((T.int64(1), T.int64(784)), "float32"), B: T.Buffer((T.int64(128), T.int64(784)), "float32"), T_matmul_NT: T.Buffer((T.int64(1), T.int64(128)), "float32")):
T.func_attr({"layout_free_buffers": [1], "tir.noalias": T.bool(True)})
# with T.block("root"):
for i0, i1, k in T.grid(T.int64(1), T.int64(128), T.int64(784)):
with T.block("T_matmul_NT"):
v_i0, v_i1, v_k = T.axis.remap("SSR", [i0, i1, k])
T.reads(x[v_i0, v_k], B[v_i1, v_k])
T.writes(T_matmul_NT[v_i0, v_i1])
with T.init():
T_matmul_NT[v_i0, v_i1] = T.float32(0.0)
T_matmul_NT[v_i0, v_i1] = T_matmul_NT[v_i0, v_i1] + x[v_i0, v_k] * B[v_i1, v_k]
@T.prim_func(private=True)
def dense1(lv2: T.Buffer((T.int64(1), T.int64(128)), "float32"), B: T.Buffer((T.int64(10), T.int64(128)), "float32"), T_matmul_NT: T.Buffer((T.int64(1), T.int64(10)), "float32")):
T.func_attr({"layout_free_buffers": [1], "tir.noalias": T.bool(True)})
# with T.block("root"):
for i0, i1, k in T.grid(T.int64(1), T.int64(10), T.int64(128)):
with T.block("T_matmul_NT"):
v_i0, v_i1, v_k = T.axis.remap("SSR", [i0, i1, k])
T.reads(lv2[v_i0, v_k], B[v_i1, v_k])
T.writes(T_matmul_NT[v_i0, v_i1])
with T.init():
T_matmul_NT[v_i0, v_i1] = T.float32(0.0)
T_matmul_NT[v_i0, v_i1] = T_matmul_NT[v_i0, v_i1] + lv2[v_i0, v_k] * B[v_i1, v_k]
@T.prim_func(private=True)
def te_relu(lv1: T.Buffer((T.int64(1), T.int64(128)), "float32"), relu: T.Buffer((T.int64(1), T.int64(128)), "float32")):
T.func_attr({"tir.noalias": T.bool(True)})
# with T.block("root"):
for i, j in T.grid(T.int64(1), T.int64(128)):
with T.block("relu"):
v_i, v_j = T.axis.remap("SS", [i, j])
T.reads(lv1[v_i, v_j])
T.writes(relu[v_i, v_j])
relu[v_i, v_j] = T.max(lv1[v_i, v_j], T.float32(0.0))
@R.function
def main(x: R.Tensor((1, 784), dtype="float32")) -> R.Tensor((1, 10), dtype="float32"):
cls = Module
with R.dataflow():
lv = R.call_tir(cls.dense, (x, metadata["relax.expr.Constant"][0]), out_sinfo=R.Tensor((1, 128), dtype="float32"))
lv1 = R.call_tir(cls.add, (lv, metadata["relax.expr.Constant"][1]), out_sinfo=R.Tensor((1, 128), dtype="float32"))
lv2 = R.call_tir(cls.te_relu, (lv1,), out_sinfo=R.Tensor((1, 128), dtype="float32"))
lv3 = R.call_tir(cls.dense1, (lv2, metadata["relax.expr.Constant"][2]), out_sinfo=R.Tensor((1, 10), dtype="float32"))
lv4 = R.call_tir(cls.add1, (lv3, metadata["relax.expr.Constant"][3]), out_sinfo=R.Tensor((1, 10), dtype="float32"))
gv: R.Tensor((1, 10), dtype="float32") = lv4
R.output(gv)
return lv4