深度学习7 梯度下降优化、过拟合、手机价格预测
三、BP算法
3、梯度下降
w = w - lr * grad: w 表示权重,lr表示学习率,grad表示梯度
传统下降方式分三类:(BGD)批量梯度下降、(MBGD)小批量梯度下降、(SGD)随机梯度下降。机器学习4 线性回归、逻辑回归-CSDN博客 第九点 线性回归的第5点
优化梯度下降:
梯度为 0时,传统三类方法无法优化参数
3.1、指数加权平均 EMA
加权平均 指给每个数赋予不同的权重求得平均数。
移动平均数 指的是计算最近邻的 N 个数来获得平均数。
指数移动加权平均 (EMA ) 指各数值的权重都不同,距离越远的数字对平均数计算的贡献就越小(权重较小),距离越近则对平均数的计算贡献就越大(权重越大)。
假设:与之明天天气,那么今天和昨天的气温就比较有参考性,而上个月的今天可能就失去了关系。
那么通过今天的气温来预料明天的气温可以表示为:
是平滑系数,取值范围为 。 越接近 1,表示对历史数据依赖性越高;越接近 0 则越依赖当前数据。该值越大平均数越平缓。
St 表示指数加权平均值(EMA)
Yt 表示 t 时刻的值
优点:解决了鞍点;使用历史数据可以保持整个结构处于紧密联系
缺点:依赖系数,系数的大小直接导致数据的联系;并没有对学习率优化
代码:
import torch
import matplotlib.pyplot as plt
ELEMENT_NUMBER = 30
# 1. 实际平均温度
def test01():
# 固定随机数种子
torch.manual_seed(0)
# 产生30天的随机温度
temperature = torch.randn(size=[ELEMENT_NUMBER,]) * 10
# 绘制随机温度
days = torch.arange(1, ELEMENT_NUMBER + 1, 1)
plt.plot(days, temperature, color='r')
plt.scatter(days, temperature)
plt.show()
# 2. 指数加权平均温度
def test02(beta=0.9): # beta 为权重系数
# 固定随机数种子
torch.manual_seed(0)
# 产生30天的随机温度
temperature = torch.randn(size=[ELEMENT_NUMBER,]) * 10
# 存放处理后的每天的温度值
exp_weight_avg = []
# 遍历温度信息
for idx, temp in enumerate(temperature):
# 第一个元素的的 EWA 值等于自身
if idx == 0:
exp_weight_avg.append(temp)
continue
# 第二个元素的 EWA 值等于上一个 EWA 乘以 β + 当前气温乘以 (1-β)
new_temp = exp_weight_avg[-1] * beta + (1 - beta) * temp
exp_weight_avg.append(new_temp)
days = torch.arange(1, ELEMENT_NUMBER + 1, 1)
plt.plot(days, exp_weight_avg, color='r')
plt.scatter(days, temperature)
plt.show()
if __name__ == '__main__':
test01()
test02(0.2)
test02(0.5)
test02(0.8)
3.2、Momentum 动量
更好地应对梯度变化和梯度消失问题,从而提高训练模型的效率和稳定性。
惯性效应: 加入前面梯度的累积,使得即便遇到鞍点0,算法依旧沿着当前的方向继续更新。
公式: = β * + (1- β) *
:历史梯度移动加权平均值
:当前时刻的梯度值
β : 为权重系数
优点:使用惯性平滑的解决了鞍点
缺点:并没有对学习率优化
代码:在优化器中添加参数 momentum= 内容
optimizer = optim.SGD(model.parameters(), lr=0.6, momentum=0.5)
# momentum 参数指定了动量系数,默认为0。动量系数通常设置为 0 到0.5 之间的一个值
3.3、AdaGrad
每个参数获取独立的学习率,根据历史梯度平方和调整学习率,使得参数具有较大的历史梯度的学习率减小,而参数具有较小的历史梯度的学习率保持较大,从而实现更有效的学习。
AdaGrad 避免了统一学习率的不足,更多用于处理稀疏数据和梯度变化较大的问题。
学习率公式: 参数公式:
初始化学习率 α、初始化参数 θ、小常数 σ = 1e-6、初始化梯度累积变量 s = 0
每次计算梯度g、累计梯度 s = s + g ⊙ g(⊙ 表示各个分量相乘 )
优点:优化了学习率
缺点: 可能存在学习率过度衰减(累积的时间步梯度平方值越来越大,导致学习率逐渐接近零);不适合非稀疏矩阵;没有优化梯度梯度本身
代码:创建优化器对象,参数一为模型对象,参数 lr 设置初始化学习率
optimizer = optim.Adagrad(model.parameters(), lr=0.6)
3.4、RMSProp
用于解决AdaGrad在训练过程中学习率过度衰减的问题。
修改AdaGrad 累计梯度方法( s = s + g ⊙ g) 为 指数移动平均累积历史梯度
优点:在AdaGrad基础上优化学习率的衰减 和 鞍点问题
缺点: 对于参数过度依赖
代码:创建优化器对象,参数一为模型对象,参数 lr 设置初始化学习率,参数 momentum 设置初始动量。
optimizer = optim.RMSprop(model.parameters(), lr=0.6, momentum=0.5)
3.5、Adam
将 动量法(优化) 和 RMSProp 的优点结合在一起
优点:一阶动量(即梯度的指数加权平均)来加速收敛;二阶动量(即梯度平方的指数加权平均)来调整学习率;几乎可以在不调整超参数的情况下应用于各种深度学习模型
缺点:超参数敏感:对初始超参数较为敏感;初始阶段快速收敛,可能导致模型陷入局部最优甚至过拟合。
代码:创建优化器对象,参数一为模型对象,参数 lr 设置初始化学习率,参数 momentum 设置初始动量。
optimizer = optim.Adam(model.parameters(), lr=0.6)
四、过拟合与欠拟合
判断过拟合还是欠拟合的方式:判断训练误差和测试误差,两者都高为欠拟合,训练误差效果可以,但是测试误差高则为过拟合
1、欠拟合
1.1、概念
训练误差和测试误差都高。模型在训练数据和测试数据上的表现都不好,说明模型可能太简单,无法捕捉到数据中的复杂模式。
1.2、解决方式
增加模型复杂度:引入更多的参数、增加神经网络的层数或节点数量,使模型能够捕捉到数据中的复杂模式。
增加特征:通过特征工程添加更多有意义的特征,使模型能够更好地理解数据。
减少正则化强度:适当减小 L1、L2 正则化强度,允许模型有更多自由度来拟合数据。
训练更长时间:如果是因为训练不足导致的欠拟合,可以增加训练的轮数或时间
2、过拟合
2.1、概念
对训练数据拟合能力很强并表现很好,但在测试数据上表现较差。
可能因为数据少、模型复杂、正则化强度不足导致过度学习
2.2、解决方式
加强正则化力度、降低模型复杂度
3.2.1、L1正则化
添加权重参数的绝对值之和:
优点:将一些权重变为零,实现特征选择,降低模型复杂度
3.2.2、L2正则化
添加权重参数的平方和:
优点:不会将权重直接变为 0,而是缩小,这样模型就更加平滑的拟合数据,同时保留足够的表达能力。
3.2.3、Dropout
随机丢弃部分神经元,类似PCA的保留信息。
import torch
x = torch.tensor([1,2,3,1,2,3,1,2,3,1,2,3],dtype=torch.float32)
drop = torch.nn.Dropout(p=0.6)
print(drop(x))
print(f"{sum(drop(x)!=0)}, {x.shape[0]}, {sum(drop(x)!=0)/x.shape[0]}")
五、综合运用
手机价格分类_数据集-阿里云天池 阿里云数据集,手机价格预测
代码思路:
1、加载数据,划分数据集和目标、主成分分析、进行训练和测试划分
2、批量标准化数据,并使用数据加载器每次返回小批量数据
3、创建模型对象
4、训练数据,迭代加载训练数据集
5、测试数据,根据训练数据标准化对象直接对测试数据进行标准化(transform,不需要fit)
6、预测数据,同测试数据方法
结果展示:
数据加载文件 dataloader_t.py
加载数据、标准化、主成分分析、生成数据加载器对象
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
# 基础数据集
class Data_Basics(Dataset):
def __init__(self, path):
super(Data_Basics, self).__init__()
# 基础数据加载
data = pd.read_csv(path)
print("基础数据已获取")
self.x = data.iloc[:, :-1]
self.y = data.iloc[:, -1]
print(f"已获取特征:{self.x.shape},目标:{self.x.shape}")
# 主成分分析
self.x = Data_Pca(self.x)
print(f"特征主成分分析已完成")
# 将数据转化为 tensor
self.x = torch.tensor(self.x, dtype=torch.float32)
self.y = torch.tensor(self.y.values, dtype=torch.int64)
def __len__(self):
return len(self.x)
def __getitem__(self, index):
return self.x[index], self.y[index]
# 预测数据集
class Data_Preddataset(Dataset):
def __init__(self, path):
super(Data_Preddataset, self).__init__()
# 基础数据加载
data = pd.read_csv(path)
self.x = data
# 主成分分析
self.x = Data_Pca(self.x)
# 将数据转化为 tensor
self.x = torch.tensor(self.x, dtype=torch.float32)
def __len__(self):
return len(self.x)
def __getitem__(self, index):
return self.x[index]
# 主成分分析
def Data_Pca(data):
transfer1 = PCA(n_components=0.95)
data = transfer1.fit_transform(data)
return data
# 数据标准化
# 训练数据创建对象,测试数据直接使用训练数据标准的内容
def Data_Standard(data, type="train"):
if type == "train":
global transfer
transfer = StandardScaler()
train_data = transfer.fit_transform(data, type)
return torch.tensor(train_data, dtype=torch.float32)
else:
test_data = transfer.transform(data)
return torch.tensor(test_data, dtype=torch.float32)
# 数据加载器
def Data_Loader(data_path,pred_path = None):
# 加载基础数据
dataset = Data_Basics(data_path)
# 获取数据集的类别特征
y_unique = dataset.y.unique()
# 划分训练集和测试集
train_data, test_data, train_labels, test_labels = train_test_split(
dataset.x, dataset.y, test_size=0.2, random_state=222, shuffle=True, stratify=dataset.y
)
# 数据标准化
train_data = Data_Standard(train_data)
# 根据训练好的数据标准化模型 标准化数据
test_data = Data_Standard(test_data,type="test")
print("训练和测试数据标准化完成")
# 将划分结果转化为 tensor
train_dataset = torch.utils.data.TensorDataset(train_data, train_labels)
test_dataset = torch.utils.data.TensorDataset(test_data, test_labels)
# 创建数据加载器
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)
print("训练和测试数据加载器创建完成")
# 判断是否有给出预测地址
if pred_path != None:
preddataset = Data_Preddataset(data_path)
pred_data = preddataset.x
pred_data = Data_Standard(pred_data,type="test")
print("预测数据标准化完成")
pred_dataset = torch.utils.data.TensorDataset(pred_data)
pred_loader = torch.utils.data.DataLoader(pred_dataset, batch_size=32, shuffle=False)
print("预测数据加载器创建完成")
return train_loader, test_loader,y_unique,pred_loader
# 使用字典
return {'train_loader':train_loader, 'test_loader':test_loader, 'y_unique':y_unique, 'pred_loader':pred_loader}
网络模型文件 datanet_t.py
五层网络模型:前四层使用 LeakyReLU 函数激活;最后一次因为需要使用交叉熵,所以无需使用激活函数。
import torch
import torch.nn as nn
class Net(nn.Module):
def __init__(self,input_size, out_size):
super(Net,self).__init__()
self.hide1 = nn.Sequential(
nn.Linear(input_size, 128),
nn.LeakyReLU(0.05)
)
self.hide2 = nn.Sequential(
nn.Linear(128, 256),
nn.LeakyReLU(0.05)
)
self.hide3 = nn.Sequential(
nn.Linear(256, 512),
nn.LeakyReLU(0.05),
)
self.hide4 = nn.Sequential(
nn.Linear(512, 128),
nn.LeakyReLU(0.05)
)
self.out = nn.Linear(128, out_size)
print("网络对象创建完成")
def forward(self, inpu_data):
temp_data = self.hide1(inpu_data)
temp_data = self.hide2(temp_data)
temp_data = self.hide3(temp_data)
temp_data = self.hide4(temp_data)
out_data = self.out(temp_data)
return out_data
def weight_start(self):
torch.nn.init.kaiming_normal(self.hide1[0].weight, nonlinearity='leaky_relu')
torch.nn.init.kaiming_normal(self.hide2[0].weight, nonlinearity='leaky_relu')
torch.nn.init.kaiming_normal(self.hide3[0].weight, nonlinearity='leaky_relu')
torch.nn.init.kaiming_normal(self.hide4[0].weight, nonlinearity='leaky_relu')
print("参数初始化完成")
数据训练文件 datatrain_t.py
创建交叉熵损失函数对象,小批量训练。
from dataloader_t import Data_Loader,Data_Basics
import torch
from sklearn.model_selection import train_test_split
from datanet_t import Net
def train(train_loader,y_unique,epochs = 200):
# 获取 数据加载器
for x,y in train_loader:
x_shape, y_shape = x.shape, y.shape
break
# 创建模型 引入每次训练集合的列数
# 调用模型初始化
model = Net(x_shape[1], len(y_unique))
model.weight_start()
# 创建损失函数
loss_fn = torch.nn.CrossEntropyLoss()
print("损失函数对象完成")
# 创建优化器
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
print("优化器对象完成,开始训练")
# 根据训练代数循环
for epoch in range(epochs):
print(f"训练代数:{epoch}")
for x_train, y_true in train_loader:
# 预测类别
y_pred = model(x_train)
# 计算损失函数
loss = loss_fn(y_pred, y_true)
# 梯度清零
optimizer.zero_grad()
# 反向传播
loss.backward()
# 梯度更新
optimizer.step()
print("训练完成")
# 保存模型
torch.save(model.state_dict(), "../model/model.pth")
print("训模型保存成功")
数据测试文件 datatest_t.py
import torch
from datanet_t import Net
def Data_test(train_loader,test_loader,y_unique):
# 获取创建模型的输入和输出特征
for x,y in train_loader:
x_shape, y_shape = x.shape, y.shape
break
# 创建模型
model = Net(x_shape[1], len(y_unique))
model.load_state_dict(torch.load(f"../model/model.pth"))
model.eval()
print("模型加载完成,开始测试")
with torch.no_grad(): # 关闭梯度计算
true_num = 0
sum_num = 0
for x_test,y_true in test_loader:
predictions = model(x_test)
predicted_classes = torch.argmax(predictions,dim=1) # 获取预测类别
true_num += (predicted_classes == y_true).sum().item()
sum_num += len(y_true)
print(f"测试结束,分数为:{true_num/sum_num}")
数据预测文件 datapred_t.py
import torch
import torch.nn as nn
from datanet_t import Net
def Data_pred(train_loader,pred_data,y_unique):
# 获取创建模型的输入和输出特征
for x,y in train_loader:
x_shape = x.shape
break
# 创建模型
model = Net(x_shape[1], len(y_unique))
model.load_state_dict(torch.load(f"../model/model.pth"))
model.eval()
print("模型加载完成,开始预测")
with torch.no_grad(): # 关闭梯度计算
predicted_classes_list =[]
for x_pred, in pred_data:
y_pred = model(x_pred)
predicted_classes = torch.argmax(y_pred,dim=1).tolist()
predicted_classes_list.extend(predicted_classes)
print(f"预测结果为:\n{predicted_classes_list}")
# predicted_classes.extend = torch.argmax(y_pred,dim=1).tolist() # 获取预测类别
# print(f"预测结果为:{predicted_classes}")
主运行文件 mian.py
from dataloader_t import Data_Loader,Data_Basics
from datatrain_t import train
from datatest_t import Data_test
from datapred_t import Data_pred
class Main:
def __init__(self):
self.x = 1
def mian_data(self):
# 加载数据
self.train_loader,self.test_loader,self.y_unique,self.pred_loader = Data_Loader("../data/手机价格预测.csv","../data/手机价格预测test.csv")
def mian_train(self):
# 训练
train(self.train_loader,self.y_unique,epochs=200)
def main_test(self):
# 测试
Data_test(self.train_loader,self.test_loader,self.y_unique)
def main_pred(self):
# 预测
Data_pred(self.train_loader,self.pred_loader,self.y_unique)
def mian_data():
global train_loader,test_loader,y_unique,pred_loader
# 加载数据
train_loader,test_loader,y_unique,pred_loader = Data_Loader("../data/手机价格预测.csv","../data/手机价格预测test.csv")
def mian_train():
global train_loader,test_loader,y_unique,pred_loader
# 训练
train(train_loader,y_unique,epochs=200)
def main_test():
global train_loader,test_loader,y_unique,pred_loader
# 测试
Data_test(train_loader,test_loader,y_unique)
def main_pred():
global train_loader,test_loader,y_unique,pred_loader
# 预测
Data_pred(train_loader,pred_loader,y_unique)
if __name__ == "__main__":
mian_data()
mian_train()
main_test()
main_pred()
# if __name__ == "__main__":
# main = Main()
# main.mian_data()
# main.mian_train()
# main.main_test()
# main.main_pred()