Pytorch自定义算子反向传播
文章目录
- 自定义一个线性函数算子
- 如何实现反向传播
有关 自定义算子的实现前面已经提到,可以参考。本文讲述自定义算子如何前向推理+反向传播进行模型训练。
自定义一个线性函数算子
线性函数
Y
=
X
W
T
+
B
Y = XW^T + B
Y=XWT+B 定义输入M 个X变量,输出N个Y变量的线性方程组。
X
X
X 为一个 1 x M 矩阵,
W
W
W为 N x M 矩阵,
B
B
B 为 1xN 矩阵,根据公式,输出
Y
Y
Y为1xN 矩阵。其中 W 和 B 为算子权重参数,保存在模型中。
在训练时刻,模型输入
X
X
X , 和监督值
Y
Y
Y,根据 算子forward()计算的
Y
p
Y^p
Yp ,计算Loss = criterion(
Y
Y
Y,
Y
p
Y^p
Yp ),然后根据backward()链式求导反向传播计算梯度值。最后根据梯度更新W 和 B 参数。
class LinearF(torch.autograd.Function):
@staticmethod
def symbolic(g, input, weight, bias):
return g.op("MYLINEAR", input, weight, bias)
@staticmethod
def forward(ctx, input:Tensor, weight: Tensor, bias: Tensor) -> Tensor:
output = input @ weight.T + bias[None, ...]
ctx.save_for_backward(input, weight)
return output
@staticmethod
def backward(ctx, grad_output:Tensor)->Tuple[Tensor, Tensor, Tensor]:
# grad_output -- [B, N] = d(Loss) / d(Y)
input, weight = ctx.saved_tensors
grad_input = grad_output @ weight
grad_weight = grad_output.T @ input
grad_bias = grad_output.sum(0)
# print("grad_input: ", grad_input)
# print("grad_weight: ", grad_weight)
# print("grad_bias: ", grad_bias)
return grad_input, grad_weight, grad_bias
如何实现反向传播
前向推理比较简单,就根据公式来既可以。反向传播backward() 怎么写呢?
反向传播有两个输入参数,第一个为ctx,第二个grad_output,grad_output就是对forward() 输出output 的求导,如果是最后的节点,那就是loss对输出的求导,否则就是下一层对输出求导,输出grad_input, grad_weight, grad_bias则分别对应了forward的输入input、weight、bias的梯度。这很容易理解,因为是在做链式求导,LinearFunction是这条链上的某个节点,输入输出的数量和含义刚好相反且对应。
根据公式:
Y
=
X
W
T
+
B
Y = XW^T + B
Y=XWT+B
Loss = criterion(
Y
t
Y^t_{}
Yt,
Y
Y_{}
Y ), 假设我们选择判别函数为L2范数,Loss =
∑
j
=
0
N
0.5
∗
(
Y
j
t
−
Y
j
)
2
\sum_{j=0}^N0.5 * (Y^t_{j}-Y_{j} )^2
∑j=0N0.5∗(Yjt−Yj)2
grad_output(j) = d ( L o s s ) d ( Y j ) \frac{d(Loss) }{d(Y_{j})} d(Yj)d(Loss) = Y j t − Y j Y^t_{j} - Y_{j} Yjt−Yj
其中 Y j t Y^t_{j} Yjt为监督值, Y j Y_{j} Yj为模型输出值。
根据链式求导法则, 对输入 X i X_{i} Xi 的求导为:
grad_input[i] = ∑ j = 0 N d ( L o s s ) d ( Y j ) ∗ d ( Y j ) d ( X i ) \sum_{j=0}^N\frac{d(Loss) }{d(Y_{j})}*\frac{d(Y_{j}) }{d(X_{i})} ∑j=0Nd(Yj)d(Loss)∗d(Xi)d(Yj)= ∑ j = 0 N g r a d _ o u t p u t [ j ] ∗ d ( Y j ) d ( X i ) \sum_{j=0}^N{grad\_output}[j] *\frac{d(Y_{j}) }{d(X_{i})} ∑j=0Ngrad_output[j]∗d(Xi)d(Yj)
d ( Y j ) d ( X i ) \frac{d(Y_{j}) }{d(X_{i})} d(Xi)d(Yj) 即为 W i j T = W j i W^T_{ij} = W_{ji} WijT=Wji
其中i 对应X维度, j对应输出Y维度。
最后整理成矩阵形式:
g r a d _ i n p u t = g r a d _ o u t p u t ∗ W {grad\_input}={grad\_output} * W grad_input=grad_output∗W
同理:
g
r
a
d
_
w
e
i
g
h
t
=
g
r
a
d
_
o
u
t
p
u
t
T
∗
X
{grad\_weight}={grad\_output}^T * X
grad_weight=grad_outputT∗X
g r a d _ b i a s = ∑ q = 0 N g r a d _ o u t p u t {grad\_bias}=\sum_{q=0}^N{grad\_output} grad_bias=∑q=0Ngrad_output
最后根据公式形式得到backward()函数。
反向传播的梯度求解还是不容易的,一不小心可能算错了,所以务必在模型训练以前检查梯度计算的正确性。pytorch提供了torch.autograd.gradcheck方法来检验梯度计算的正确性。
其他参考文献:pytorch自定义算子实现详解及反向传播梯度推导
最后根据自定义算子,搭建模型,训练模型参数W,B。并导出onnx。参考代码如下:
import torch
from torch import Tensor
from typing import Tuple
import numpy as np
class LinearF(torch.autograd.Function):
@staticmethod
def symbolic(g, input, weight, bias):
return g.op("MYLINEAR", input, weight, bias)
@staticmethod
def forward(ctx, input:Tensor, weight: Tensor, bias: Tensor) -> Tensor:
output = input @ weight.T + bias[None, ...]
ctx.save_for_backward(input, weight)
return output
@staticmethod
def backward(ctx, grad_output:Tensor)->Tuple[Tensor, Tensor, Tensor]:
print("grad_output: ", grad_output)
# grad_output -- [B, N] = d(Loss) / d(Y)
input, weight = ctx.saved_tensors
grad_input = grad_output @ weight
grad_weight = grad_output.T @ input
grad_bias = grad_output.sum(0)
return grad_input, grad_weight, grad_bias
#对LinearFunction进行封装
class MyLinear(torch.nn.Module):
def __init__(self, in_features: int, out_features: int, dtype:torch.dtype) -> None:
super().__init__()
self.in_features = in_features
self.out_features = out_features
self.weight = torch.nn.Parameter(torch.empty((out_features, in_features), dtype=dtype))
self.bias = torch.nn.Parameter(torch.empty((out_features,), dtype=dtype))
self.reset_parameters()
# self.weight = torch.nn.Parameter(torch.Tensor([2.0, 3.0]))
# self.bias = torch.nn.Parameter(torch.Tensor([4.0]))
#y = 2 * x1 + 3 * x2 + 4
def reset_parameters(self) -> None:
torch.nn.init.uniform_(self.weight)
torch.nn.init.uniform_(self.bias)
def forward(self, input: Tensor) -> Tensor:
# for name, pa in self.named_parameters():
# print(name, pa)
return LinearF.apply(input, self.weight, self.bias) # 在此处使用
if __name__ == "__main__":
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device.type)
model = MyLinear(2, 1, dtype=torch.float64).to(device)
# torch.Tensor 默认类型为float32,使用gpu时,输入数据类型与W权重类型一致,否则报错
# torch.Tensor([3.0, 2.0].double() 转换为float64
#input = torch.Tensor([3.0, 2.0], ).requires_grad_(True).unsqueeze(0).double()
#input = input.to(device)
#assert torch.autograd.gradcheck(model, input)
import torch.optim as optim
#定义优化策略和判别函数
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
criterion = torch.nn.MSELoss()
for epoch in range(300):
print("************** epoch: ", epoch , " ************************************* ")
inputx = torch.Tensor(np.random.rand(2)).unsqueeze(0).double().to(device)
lable = torch.Tensor(2 * inputx[:, 0] + 3 * inputx[:, 1] + 4).double().to(device)
print("outlable", lable)
optimizer.zero_grad() # 梯度清零
prob = model(inputx)
print("prob", prob)
loss = criterion(lable, prob)
print("loss: ", loss)
loss.backward() #反向传播
optimizer.step() #更新参数
# 完成训练
model.cpu().eval()
input = torch.tensor([[3.0, 2.0]], dtype=torch.float64)
output = model(input)
torch.onnx.export(
model, # 这里的args,是指输入给model的参数,需要传递tuple,因此用括号
(input,),
"linear.onnx", # 储存的文件路径
verbose=True, # 打印详细信息
input_names=["x"], #为输入和输出节点指定名称,方便后面查看或者操作
output_names=["y"],
opset_version=11, #这里的opset,指,各类算子以何种方式导出,对应于symbolic_opset11
dynamic_axes={
"image": {0: "batch"},
"output": {0: "batch"},
},
operator_export_type=torch.onnx.OperatorExportTypes.ONNX_ATEN_FALLBACK
)