PyTorch3
自动微分:
自动微分模块torch.autograd负责自动计算张量操作的梯度,具有自动求导功能。自动微分模块是构成神经网络训练的必要模块,可以实现网络权重参数的更新,使得反向传播算法的实现变得简单而高效。
张量:
Torch中一切皆为张量,属性requires_grad决定是否对其进行梯度计算。默认是 False,如需计算梯度则设置为True。
计算图:
torch.autograd通过创建一个动态计算图来跟踪张量的操作,每个张量是计算图中的一个节点,节点之间的操作构成图的边。
反向传播:
使用tensor.backward()方法执行反向传播,从而计算张量的梯度。这个过程会自动计算每个张量对损失函数的梯度。
梯度:
计算得到的梯度通过tensor.grad访问,这些梯度用于优化模型参数,以最小化损失函数。
标量梯度计算:
import torch
def test001():
# 1. 创建张量:必须为浮点类型
x = torch.tensor(7.0, requires_grad=True, dtype=torch.float32)
# 2. 操作张量
y = x**2 + 2 * x + 7
# 3. 计算梯度,也就是反向传播
y.backward()
# 4. 读取梯度值
# 结果:tensor(16.)
print(x.grad)
if __name__ == "__main__":
test001()
向量梯度计算:
import torch
def test002():
# 1. 创建张量:必须为浮点类型
x = torch.tensor([1.0, 2.0, 5.3], requires_grad=True)
# 2. 操作张量
y = x**2 + 2 * x + 7
# 3. 需要变为标量:求和或者求均值都可以
# z = y.sum()
z = y.mean()
# 4. 计算梯度,也就是反向传播
z.backward()
# 5. 读取梯度值
print(x.grad)
if __name__ == "__main__":
test002()
多标量梯度计算:
import torch
def test003():
# 1. 创建两个标量
x1 = torch.tensor(5.0, requires_grad=True, dtype=torch.float64)
x2 = torch.tensor(3.0, requires_grad=True, dtype=torch.float64)
# 2. 构建运算公式
y = x1**2 + 2 * x2 + 7
# 3. 计算梯度,也就是反向传播
y.backward()
# 4. 读取梯度值
print(x1.grad, x2.grad)
if __name__ == "__main__":
test003()
多向量梯度计算:
import torch
def test004():
# 1. 创建两个向量
x1 = torch.tensor([5.0, 6.0, 7.5], requires_grad=True)
x2 = torch.tensor([3.0, 5.2, 6.4], requires_grad=True)
# 2. 构建运算公式
y = x1**2 + 2 * x2 + 7
# 3. 向量构建
# z = y.sum()
z = y.mean()
# 4. 计算梯度,也就是反向传播
z.backward()
# 5. 读取梯度值
print(x1.grad, x2.grad)
if __name__ == "__main__":
test004()
控制梯度计算:
梯度计算是有性能开销的,有些时候我们只是简单的运算,并不需要梯度
import torch
def test001():
x = torch.tensor(10.5, requires_grad=True)
print(x.requires_grad) # True
# 1. 默认y的requires_grad=True
y = x**2 + 2 * x + 3
print(y.requires_grad) # True
# 2. 如果不需要y计算梯度-with进行上下文管理
with torch.no_grad():
y = x**2 + 2 * x + 3
print(y.requires_grad) # False
# 3. 如果不需要y计算梯度-使用装饰器
@torch.no_grad()
def y_fn(x):
return x**2 + 2 * x + 3
y = y_fn(x)
print(y.requires_grad) # False
# 4. 如果不需要y计算梯度-全局设置,需要谨慎
torch.set_grad_enabled(False)
y = x**2 + 2 * x + 3
print(y.requires_grad) # False
if __name__ == "__main__":
test001()
累计梯度:
默认情况下,当我们重复对一个自变量进行梯度计算时,梯度是累加的
import torch
def test002():
# 1. 创建张量:必须为浮点类型
x = torch.tensor([1.0, 2.0, 5.3], requires_grad=True)
# 2. 累计梯度:每次计算都会累计梯度
for i in range(3):
y = x**2 + 2 * x + 7
z = y.mean()
z.backward()
print(x.grad)
if __name__ == "__main__":
test002()
梯度清零:
大多数情况下是不需要梯度累加的,反向传播之前可以先对梯度进行清零
import torch
def test002():
# 1. 创建张量:必须为浮点类型
x = torch.tensor([1.0, 2.0, 5.3], requires_grad=True)
# 2. 累计梯度:每次计算都会累计梯度
for i in range(3):
y = x**2 + 2 * x + 7
z = y.mean()
# 2.1 反向传播之前先对梯度进行清零
if x.grad is not None:
x.grad.zero_()
z.backward()
print(x.grad)
if __name__ == "__main__":
test002()
叶子结点:
当requires_grad=True时,在调用numpy转换为ndarray时会报如下错误:
RuntimeError: Can't call numpy() on Tensor that requires grad. Use tensor.detach().numpy() instead.
detach()产生的张量是作为叶子结点存在的,并且该张量和原张量共享数据,只是该张量不需要计算梯度。
import torch
def test001():
x = torch.tensor([1, 2, 3], requires_grad=True, dtype=torch.float32)
x_np = x.detach()
print("是两个东西:", id(x), id(x_np))
print("数据是共享的:", id(x.data), id(x_np.data))
# 修改其中一个张量的值
x_np[1:] = 100
print(x, x_np)
if __name__ == "__main__":
test001()
模型训练:
名词 | 定义 |
---|---|
Epoch | 使用训练集的全部数据对模型进行一次完整训练,被称为“一代训练” |
Batch | 使用训练集中的一小部分样本对模型权重进行一次反向传播的参数更新,这一小部分样本被称为“一批数据” |
Iteration | 使用一个Batch数据对模型进行一次参数更新的过程,被称为“一次训练” |
完整的模型训练如下:
import math
import random
import torch
import numpy as np
from sklearn.datasets import make_regression
import pandas as pd
def build_dataset():
"""
使用 sklearn 的 make_regression 方法来构建一个模拟的回归数据集。
make_regression 方法的参数解释:
- n_samples: 生成的样本数量,决定了数据集的规模。
- n_features: 生成的特征数量,决定了数据维度。
- noise: 添加到目标变量的噪声标准差,用于模拟真实世界数据的不完美。
- coef: 如果为 True, 会返回生成数据的真实系数,用于了解特征与目标变量间的真实关系。
- random_state: 随机数生成的种子,确保在多次运行中能够复现相同的结果。
返回:
- X: 生成的特征矩阵。
- y: 生成的目标变量。
- coef: 如果在调用时 coef 参数为 True,则还会返回真实系数。
"""
noise = random.randint(1, 3)
bias = 14.5
X, y, coef = make_regression(
n_samples=1000, n_features=5, bias=bias, noise=noise, coef=True, random_state=0
)
# 数据转换为张量
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32)
coef = torch.tensor(coef, dtype=torch.float32)
bias = torch.tensor(bias, dtype=torch.float32)
return X, y, coef, bias
def data_loader(x, y, batch_size=16):
"""
将数据集转换为迭代器,以便在训练过程中进行批量处理。
"""
# 获取样本数量
num_samples = x.shape[0]
# 构建数据索引
indices = list(range(num_samples))
# 打乱数据顺序
random.shuffle(indices)
# 计算总的批次数量:向上取整
num_batches = math.ceil(num_samples / batch_size)
for i in range(num_batches):
start = i * batch_size
end = min((i + 1) * batch_size, num_samples)
# 开始切片数据
train_X = x[indices[start:end]]
train_y = y[indices[start:end]]
# 异步响应数据集
yield train_X, train_y
# 构建模型函数:权重参数和偏执参数
# 初始化参数
def initialize_params(n_features):
# 随机初始化权重w,并将偏置b初始化为0
w = torch.randn(n_features, requires_grad=True, dtype=torch.float32)
b = torch.tensor(0.0, requires_grad=True, dtype=torch.float32)
return w, b
def linear_regression(x, w, b):
return torch.matmul(x, w) + b
# 构建模型函数:损失函数和优化器
def mean_squared_error(y_pred, y_true):
return torch.mean((y_pred - y_true) ** 2)
# 优化器(SGD,手动实现)
def sgd(w, b, dw, db, learning_rate, batch_size):
w.data -= learning_rate * dw.data / batch_size
b.data -= learning_rate * db.data / batch_size
return w, b
# 训练函数
def train():
# 1. 构建数据集
X, y, coef, bias = build_dataset()
# 2. 初始化模型参数
w, b = initialize_params(X.shape[1])
# 3. 定义训练参数
learning_rate = 0.01
epochs = 120
batch_size = 16
# 4. 开始训练
for epoch in range(epochs):
epoch_loss = 0
num_batches = 0
for train_X, train_y in data_loader(X, y, batch_size):
num_batches += 1
# 5. 前向传播
y_pred = linear_regression(train_X, w, b)
# 6. 计算损失
loss = mean_squared_error(y_pred, train_y)
# 9. 梯度清零
if w.grad is not None:
w.grad.zero_()
if b.grad is not None:
b.grad.zero_()
# 7. 反向传播:会自动计算梯度
loss.backward()
# 8. 更新参数
w, b = sgd(w, b, w.grad, b.grad, learning_rate, batch_size)
# 10. 训练批次及损失率
epoch_loss += loss.item()
print(f"Epoch: {epoch}, Loss: {epoch_loss / num_batches}")
return coef, bias, w, b
if __name__ == "__main__":
coef, bias, w, b = train()
print(f"真实系数: {coef}")
print(f"预测系数: {w}")
print(f"真实偏置: {bias}")
print(f"预测偏置: {b}")
模型定义组件:
官方文档:torch.nn — PyTorch 2.5 documentation
损失函数组件:
PyTorch已内置多种损失函数,在构建神经网络时随用随取!
import torch
import torch.nn as nn
def test001():
criterion = nn.MSELoss()
# 真实值
y_true = torch.randn(5, 3)
# 预测值
y_pred = torch.randn(5, 3, requires_grad=True)
print(criterion(y_true, y_pred))
if __name__ == "__main__":
test001()
线性层组件:
import torch
import torch.nn as nn
def test002():
model = nn.Linear(20, 60)
input = torch.randn(128, 20)
output = model(input)
print(output.size())
if __name__ == "__main__":
test002()
优化器方法:
import torch
import torch.nn as nn
import torch.optim as optim
# 优化方法SGD的学习
def test003():
model = nn.Linear(20, 60)
criterion = nn.MSELoss()
# 优化器:更新模型参数
optimizer = optim.SGD(model.parameters(), lr=0.01)
input = torch.randn(128, 20)
output = model(input)
# 计算损失及反向传播
loss = criterion(output, torch.randn(128, 60))
# 梯度清零
optimizer.zero_grad()
# 反向传播
loss.backward()
# 更新模型参数
optimizer.step()
if __name__ == "__main__":
test003()
加载Excel数据集:
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
class MyExcelDataset(Dataset):
def __init__(self, filename):
# 使用pandas读取excel
df = pd.read_excel(filename)
# 数据处理根据具体情况去做,没有固定的思路
# 你也可以不处理,直接根据下标取你要的数据
# 数据处理:删除NAN的列
df = df.dropna(axis=1, how="all")
# 将数据转换为DataFrame
df = pd.DataFrame(df)
# 删除原有的表头,并重命名列名为英文
df.columns = [
"zubie",
"student_id",
"name",
"expression",
"ppt",
"answer",
"present",
"defense",
"comments",
]
# 删除列name和comments
df = df.drop(["zubie","student_id", "name", "comments"], axis=1)
# 转换为tensor
data = torch.tensor(df.values)
self.data = data[:, :-1]
self.label = data[:, -1]
self.len = len(self.data)
def __len__(self):
return self.len
def __getitem__(self, index):
idx = min(max(index, 0), self.len - 1)
return self.data[idx], self.label[idx]
def test001():
excel_path = r"./test.xlsx"
dataset = MyExcelDataset(excel_path)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)
for i, (data, label) in enumerate(dataloader):
print(i, data, label)
if __name__ == "__main__":
test001()
数据增强:
数据增强是提高模型泛化能力(鲁棒性)的一种有效方法,尤其在图像分类、目标检测等任务中。数据增强可以模拟更多的训练样本,从而减少过拟合风险。
from PIL import Image
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import torch
from torchvision.transforms import v2
plt.rcParams["savefig.bbox"] = "tight"
torch.manual_seed(0)
orig_img = Image.open("./origin.jpg")
def plot(imgs, title, with_orig=True, row_title=None, **imshow_kwargs):
if not isinstance(imgs[0], list):
# Make a 2d grid even if there's just 1 row
imgs = [imgs]
num_rows = len(imgs)
num_cols = len(imgs[0]) + with_orig
fig, axs = plt.subplots(nrows=num_rows, ncols=num_cols, squeeze=False)
plt.title(title)
for row_idx, row in enumerate(imgs):
row = [orig_img] + row if with_orig else row
for col_idx, img in enumerate(row):
ax = axs[row_idx, col_idx]
ax.imshow(np.asarray(img), **imshow_kwargs)
ax.set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])
if with_orig:
axs[0, 0].set(title="Original image")
axs[0, 0].title.set_size(8)
if row_title is not None:
for row_idx in range(num_rows):
axs[row_idx, 0].set(ylabel=row_title[row_idx])
plt.tight_layout()
plt.show()
# 边缘填充
padded_imgs = [v2.Pad(padding=padding)(orig_img) for padding in (3, 10, 30, 50)]
plot([orig_img] + padded_imgs, "v2.Pad")
# 大小调整
resized_imgs = [v2.Resize(size=size)(orig_img) for size in (30, 50, 100, orig_img.size)]
plot([orig_img] + resized_imgs, "v2.Resize")
# 中心裁剪
center_crops = [
v2.CenterCrop(size=size)(orig_img) for size in (30, 50, 100, orig_img.size)
]
plot([orig_img] + center_crops, "v2.CenterCrop")
# 周边裁剪
(top_left, top_right, bottom_left, bottom_right, center) = v2.FiveCrop(size=(100, 100))(
orig_img
)
plot(
[orig_img] + [top_left, top_right, bottom_left, bottom_right, center], "v2.FiveCrop"
)
模型的保存和加载:
训练一个模型通常需要大量的数据、时间和计算资源。通过保存训练好的模型,可以满足后续的模型部署、模型更新、迁移学习、训练恢复等各种业务需要求。
import torch
import torch.nn as nn
import torch.optim as optim
import pickle
class MyModle(nn.Module):
def __init__(self, input_size, output_size):
super(MyModle, self).__init__()
self.fc1 = nn.Linear(input_size, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.fc2(x)
output = self.fc3(x)
return output
def test003():
model = MyModle(input_size=128, output_size=32)
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 自己构建要存储的模型参数
save_dict = {
"init_params": {
"input_size": 128, # 输入特征数
"output_size": 32, # 输出特征数
},
"accuracy": 0.99, # 模型准确率
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
}
torch.save(save_dict, "model_dict.pth")
def test004():
save_dict = torch.load("model_dict.pth")
model = MyModle(
input_size=save_dict["init_params"]["input_size"],
output_size=save_dict["init_params"]["output_size"],
)
# 初始化模型参数
model.load_state_dict(save_dict["model_state_dict"])
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 初始化优化器参数
optimizer.load_state_dict(save_dict["optimizer_state_dict"])
# 打印模型信息
print(save_dict["accuracy"])
print(model)
if __name__ == "__main__":
test003()
test004()