实验17 优化算法的比较分析
一、实验原理:
除了批大小对模型收敛速度的影响外,学习率和梯度估计也是影响神经网络优化的重要因素。神经网络优化中常用的优化方法也主要是如下两方面的改进,包括:
学习率调整:主要通过自适应地调整学习率使得优化更稳定。这类算法主要有AdaGrad、RMSprop、AdaDelta算法等。
梯度估计修正:主要通过修正每次迭代时估计的梯度方向来加快收敛速度。这类算法主要有动量法、Nesterov加速梯度方法等。
(一)学习率调整
1.相同学习率
每个参数在每次迭代时都使用相同的学习率
①SGD
呈“之”字形移动,这是一个相当低效的路径,因为:梯度的方向没有指向最小值的方向。
2.自适应学习率
自适应地调整每个参数的学习率.
①AdaGrad算法
借鉴l2正则化的思想,每次迭代时自适应调整每个参数的学习率。
第t次迭代时,先计算每个参数梯度平方的累计值,公式为:
参数更新的差值是:
其中是为了保持数值稳定性而设置的非常小的常数,一般取值为到
如果某个参数的偏导数累计比较大,其学习率相对较小;
如果某个参数的偏导数累计比较小,其学习率相对较大。
按参数的元素进行学习率衰减,使变动大的参数的学习率逐渐减小。在上课讲的例子中,由于y轴方向上的梯度较大,因此刚开始变动较大,但是后面会根据这个较大的变动按比例进行调整,减小更新的步伐。因此y轴方向上的更新程度被减弱,“之”字形的变动程度有所衰减。函数的取值高效地向着最小值移动。
但是存在缺点:
随着迭代次数的增加,学习率逐渐缩小,经过一定次数的迭代没有找到最优点时,学习率已经非常小,很难再继续找到最优点。在上课讲的例子中,虽然y轴的变化抑制了,但是x轴的步长也变得很慢。
②RMSProp算法
可在有些情况下避免AdaGrad中学习率不断单调下降,防止过早衰减,是AdaGrad的改进方案。
RMSProp并不是将过去所有的梯度一视同仁,而是逐渐地遗忘过去的梯度,在做加法运算时将新梯度的信息更多地反映出来。专业上讲,称为“指数移动平均”,呈指数函数式地减小过去的梯度的尺度。
计算每次迭代梯度平方的指数衰减移动平均
越小,过去累计值的权重越低,当前抽样值的权重越高,实时性越强。
越大,过去累计值的权重越高,当前抽样值的权重越低,平稳性更好。
在这个算法中,的取值是0.9。
参数更新差值为:
RMSProp和AdaGrad的区别是:的计算由累计方式变成了指数衰减移动平均。在迭代过程中,每个参数的学习率并不是呈衰减趋势,既可以变小也可以变大。
③AdaDelta算法
类似于RMSProp算法,通过梯度平方的指数衰减移动平均来调整学习率此外,AdaDelta算法引入每次参数更新差值平方的指数衰减移动平均。
第t次迭代时,参数更新差值的平方的指数衰减移动平均是:
参数更新差值为:
AdaDelta动态调整学习率,不需要提前设置,将初始学习率改为动态计算,在一定程度上平抑了学习率的波动。
(二)梯度估计修正:
在小批量梯度下降法中,由于每次迭代的样本具有一定的随机性,因此每次迭代的梯度估计和整个训练集上的最优梯度并不一致。如果每次选取样本数量比较小,损失会呈振荡的方式下降。一种有效地缓解梯度估计随机性的方式是通过使用最近一段时间内的平均梯度来代替当前时刻的随机梯度来作为参数更新的方向,从而提高优化速度。
①动量法
计算负梯度的指数加权移动平均作为参数的更新方向
参数更新差值为:
为上一步的参数更新方向和当前梯度的反方向的叠加。(动量因子通常设为0.9)
某个参数在最近一段时间内的梯度方向:
不一致:参数更新幅度变小,动量法会起到减速作用,可以增加稳定性,减少震荡
一致:参数更新幅度变大,动量法会起到加速作用,可以更加快速地到达最优点
一般而言,在迭代初期,梯度方向都比较一致,动量法会起到加速作用,可以更快地到达最优点。在迭代后期,梯度方向会不一致,在收敛值附近振荡,动量法会起到减速作用,增加稳定性。
②Adam算法
可以看作动量法和RMSprop算法的结合,不但使用动量作为参数更新方向,而且可以自适应调整学习率。
先计算两个移动平均:
,跟动量法类似
,跟RMSProp类似
其中和分别为两个移动平均的衰减率,通常取值为=0.9, =0.99。我们可以把和
分别看作梯度的均值(一阶矩)和未减去均值的方差(二阶矩)。
假设=0,=0,那么在迭代初期和的值会比真实的均值和方差要小。特别是当和
都接近于1时,偏差会很大。因此,需要对偏差进行修正。
参数更新差值为:
其中学习率通常设为0.001,并且也可以进行衰减,。
二、实验步骤
(一)不用优化算法的比较分析(被优化函数是)
1.优化算法的实验设定
①SGD算法2D可视化实验
import copy
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from nndl.op import Op
from abc import abstractmethod
# 优化器基类
class Optimizer(object):
def __init__(self, init_lr, model):
self.init_lr = init_lr
#指定优化器需要优化的模型
self.model = model
@abstractmethod
def step(self):
pass
class SimpleBatchGD(Optimizer):
def __init__(self, init_lr, model):
super(SimpleBatchGD, self).__init__(init_lr=init_lr, model=model)
def step(self):
#参数更新
if isinstance(self.model.params, dict):
for key in self.model.params.keys():
self.model.params[key] = self.model.params[key] - self.init_lr * self.model.grads[key]
#这个类是一个被优化函数,forward是函数的前向计算,backward是函数对x的偏导
class OptimizedFunction(Op):
def __init__(self, w):
super(OptimizedFunction, self).__init__()
self.w = torch.as_tensor(w,dtype=torch.float32)#权重矩阵
self.params = {'x': torch.as_tensor(0,dtype=torch.float32)}#存储前向传播中的x
self.grads = {'x': torch.as_tensor(0,dtype=torch.float32)}#存储反向传播中x的梯度
def forward(self, x):
self.params['x'] = x#保存输入x
return torch.matmul(self.w.T, torch.square(self.params['x']))#转置w与x的平方相乘
def backward(self):
self.grads['x'] = 2 * torch.multiply(self.w.T, self.params['x'])#计算关于x的梯度
#这个类是一个训练函数,用于返回每个epoch中x的值和loss的值
# 训练函数
def train_f(model, optimizer, x_init, epoch):
x = x_init
all_x = []
losses = []
for i in range(epoch):
all_x.append(copy.copy(x.numpy())) #保存每轮的x值
loss = model(x)
losses.append(loss)
model.backward()
optimizer.step()
x = model.params['x']
return torch.as_tensor(all_x), losses #返回每个epoch的x值和损失值
#这个类是一个可视化函数,用于绘制x的更新轨迹
class Visualization(object):
def __init__(self):
#只画出参数x1和x2在区间[-5,5]的曲线部分
x1 = np.arange(-5, 5, 0.1)
x2 = np.arange(-5, 5, 0.1)
x1, x2 = np.meshgrid(x1, x2)#创建网格
self.init_x = torch.as_tensor([x1, x2])#转换为pytorch张量
def plot_2d(self, model, x, fig_name):#可视化参数轨迹
#model是模型对象,用于计算损失值;x参数更新的轨迹;fig_name保存的文件名
fig, ax = plt.subplots(figsize=(10, 6))
cp = ax.contourf(self.init_x[0], self.init_x[1], model(self.init_x.transpose(1,0)), colors=['#e4007f', '#f19ec2', '#e86096', '#eb7aaa', '#f6c8dc', '#f5f5f5', '#000000'])
c = ax.contour(self.init_x[0], self.init_x[1], model(self.init_x.transpose(1,0)), colors='black')
cbar = fig.colorbar(cp)
#绘制参数更新轨迹
ax.plot(x[:, 0], x[:, 1], '-o', color='#000000')
#绘制目标点(最优解)
ax.plot(0, 'r*', markersize=18, color='#fefefe')
ax.set_xlabel('$x1$')
ax.set_ylabel('$x2$')
ax.set_xlim((-2, 5))
ax.set_ylim((-2, 5))
plt.savefig(fig_name)
#这个类用于模型训练并可视化
def train_and_plot_f(model, optimizer, epoch, fig_name):
#设置x的初始值
x_init = torch.as_tensor([3, 4], dtype=torch.float32)
print('x1 initiate: {}, x2 initiate: {}'.format(x_init[0].numpy(), x_init[1].numpy()))
#训练函数调用,进行模型训练并返回更新后的参数 x 和每一轮的损失 losses
x, losses = train_f(model, optimizer, x_init, epoch)
losses = np.array(losses)
# 展示x1、x2的更新轨迹
vis = Visualization()
vis.plot_2d(model, x, fig_name)
# 固定随机种子,保证实验可重复性
torch.seed()
# 初始化模型的权重
w = torch.as_tensor([0.2, 2])
# 定义模型,可以进行前向计算(返回函数)和反向传播(返回求导)
model = OptimizedFunction(w)
# 定义优化器
opt = SimpleBatchGD(init_lr=0.2, model=model)
# 训练并可视化结果
train_and_plot_f(model, opt, epoch=20, fig_name='opti-vis-para.pdf')
②SGD算法简单拟合实验
# 固定随机种子
import numpy as np
import torch
from nndl.op import Op, SimpleBatchGD
from matplotlib import pyplot as plt
torch.seed()
# 随机生成shape为(1000,2)的训练数据
X = torch.randn([1000, 2])
w = torch.as_tensor([0.5, 0.8])
w = torch.unsqueeze(w, dim=1)
noise = 0.01 * torch.rand([1000])
noise = torch.unsqueeze(noise, dim=1)
# 计算y
y = torch.matmul(X, w) + noise
# 打印X, y样本
print('X: ', X[0].numpy())
print('y: ', y[0].numpy())
# X,y组成训练样本数据
data = torch.concat((X, y), dim=1)
print('input data shape: ', data.shape)
print('data: ', data[0].numpy())
class Linear(Op):
def __init__(self, input_size, weight_init=np.random.standard_normal, bias_init=torch.zeros):
self.params = {}
self.params['W'] = weight_init([input_size, 1])
self.params['W'] = torch.as_tensor(self.params['W'],dtype=torch.float32)
self.params['b'] = bias_init([1])
self.inputs = None
self.grads = {}
def forward(self, inputs):
self.inputs = inputs
self.outputs = torch.matmul(self.inputs, self.params['W']) + self.params['b']
return self.outputs
def backward(self, labels):
K = self.inputs.shape[0]
self.grads['W'] = 1./ K*torch.matmul(self.inputs.T, (self.outputs - labels))
self.grads['b'] = 1./K* torch.sum(self.outputs-labels, dim=0)
def train(data, num_epochs, batch_size, model, calculate_loss, optimizer, verbose=False):
# 记录每个回合损失的变化
epoch_loss = []
# 记录每次迭代损失的变化
iter_loss = []
N = len(data)
for epoch_id in range(num_epochs):
# np.random.shuffle(data) #不再随机打乱数据
# 将训练数据进行拆分,每个mini_batch包含batch_size条的数据
mini_batches = [data[i:i+batch_size] for i in range(0, N, batch_size)]
for iter_id, mini_batch in enumerate(mini_batches):
# data中前两个分量为X
inputs = mini_batch[:, :-1]
# data中最后一个分量为y
labels = mini_batch[:, -1:]
# 前向计算
outputs = model(inputs)
# 计算损失
loss = calculate_loss(outputs, labels).numpy()
# 计算梯度
model.backward(labels)
# 梯度更新
optimizer.step()
iter_loss.append(loss)
# verbose = True 则打印当前回合的损失
if verbose:
print('Epoch {:3d}, loss = {:.4f}'.format(epoch_id, np.mean(iter_loss)))
epoch_loss.append(np.mean(iter_loss))
return iter_loss, epoch_loss
def plot_loss(iter_loss, epoch_loss, fig_name):
"""
可视化损失函数的变化趋势
"""
plt.figure(figsize=(10, 4))
ax1 = plt.subplot(121)
ax1.plot(iter_loss, color='#e4007f')
plt.title('iteration loss')
ax2 = plt.subplot(122)
ax2.plot(epoch_loss, color='#f19ec2')
plt.title('epoch loss')
plt.savefig(fig_name)
plt.show()
import torch.nn as nn
def train_and_plot(optimizer, fig_name):
"""
训练网络并画出损失函数的变化趋势
输入:
- optimizer:优化器
"""
# 定义均方差损失
mse = nn.MSELoss()
iter_loss, epoch_loss = train(data, num_epochs=30, batch_size=64, model=model, calculate_loss=mse, optimizer=optimizer)
plot_loss(iter_loss, epoch_loss, fig_name)
# 固定随机种子
torch.seed()
# 定义网络结构
model = Linear(2)
# 定义优化器
opt = SimpleBatchGD(init_lr=0.01, model=model)
train_and_plot(opt, 'opti-loss.pdf')
2.学习率调整
(1)AdaGrad算法
①2D可视化实验
from abc import abstractmethod
import copy
import numpy as np
import torch
from matplotlib import pyplot as plt
from nndl.op import Op
# 优化器基类
class Optimizer(object):
def __init__(self, init_lr, model):
self.init_lr = init_lr
#指定优化器需要优化的模型
self.model = model
@abstractmethod
def step(self):
pass
class Adagrad(Optimizer):
def __init__(self, init_lr, model, epsilon):
super(Adagrad, self).__init__(init_lr=init_lr, model=model)
self.G = {}
for key in self.model.params.keys():
self.G[key] = 0
self.epsilon = epsilon
def adagrad(self, x, gradient_x, G, init_lr):
G += gradient_x ** 2
x -= init_lr / torch.sqrt(G + self.epsilon) * gradient_x
return x, G
def step(self):
for key in self.model.params.keys():
self.model.params[key], self.G[key] = self.adagrad(self.model.params[key],
self.model.grads[key],
self.G[key],
self.init_lr)
#这个类是一个被优化函数,forward是函数的前向计算,backward是函数对x的偏导
class OptimizedFunction(Op):
def __init__(self, w):
super(OptimizedFunction, self).__init__()
self.w = torch.as_tensor(w,dtype=torch.float32)#权重矩阵
self.params = {'x': torch.as_tensor(0,dtype=torch.float32)}#存储前向传播中的x
self.grads = {'x': torch.as_tensor(0,dtype=torch.float32)}#存储反向传播中x的梯度
def forward(self, x):
self.params['x'] = x#保存输入x
return torch.matmul(self.w.T, torch.square(self.params['x']))#转置w与x的平方相乘
def backward(self):
self.grads['x'] = 2 * torch.multiply(self.w.T, self.params['x'])#计算关于x的梯度
#这个类是一个训练函数,用于返回每个epoch中x的值和loss的值
# 训练函数
def train_f(model, optimizer, x_init, epoch):
x = x_init
all_x = []
losses = []
for i in range(epoch):
all_x.append(copy.copy(x.numpy())) #保存每轮的x值
loss = model(x)
losses.append(loss)
model.backward()
optimizer.step()
x = model.params['x']
return torch.as_tensor(all_x), losses #返回每个epoch的x值和损失值
#这个类是一个可视化函数,用于绘制x的更新轨迹
class Visualization(object):
def __init__(self):
#只画出参数x1和x2在区间[-5,5]的曲线部分
x1 = np.arange(-5, 5, 0.1)
x2 = np.arange(-5, 5, 0.1)
x1, x2 = np.meshgrid(x1, x2)#创建网格
self.init_x = torch.as_tensor([x1, x2])#转换为pytorch张量
def plot_2d(self, model, x, fig_name):#可视化参数轨迹
#model是模型对象,用于计算损失值;x参数更新的轨迹;fig_name保存的文件名
fig, ax = plt.subplots(figsize=(10, 6))
cp = ax.contourf(self.init_x[0], self.init_x[1], model(self.init_x.transpose(1,0)), colors=['#e4007f', '#f19ec2', '#e86096', '#eb7aaa', '#f6c8dc', '#f5f5f5', '#000000'])
c = ax.contour(self.init_x[0], self.init_x[1], model(self.init_x.transpose(1,0)), colors='black')
cbar = fig.colorbar(cp)
#绘制参数更新轨迹
ax.plot(x[:, 0], x[:, 1], '-o', color='#000000')
#绘制目标点(最优解)
ax.plot(0, 'r*', markersize=18, color='#fefefe')
ax.set_xlabel('$x1$')
ax.set_ylabel('$x2$')
ax.set_xlim((-2, 5))
ax.set_ylim((-2, 5))
plt.savefig(fig_name)
#这个类用于模型训练并可视化
def train_and_plot_f(model, optimizer, epoch, fig_name):
#设置x的初始值
x_init = torch.as_tensor([3, 4], dtype=torch.float32)
print('x1 initiate: {}, x2 initiate: {}'.format(x_init[0].numpy(), x_init[1].numpy()))
#训练函数调用,进行模型训练并返回更新后的参数 x 和每一轮的损失 losses
x, losses = train_f(model, optimizer, x_init, epoch)
losses = np.array(losses)
# 展示x1、x2的更新轨迹
vis = Visualization()
vis.plot_2d(model, x, fig_name)
# 固定随机种子
torch.seed()
w = torch.as_tensor([0.2, 2])
model2 = OptimizedFunction(w)
opt2 = Adagrad(init_lr=0.5, model=model2, epsilon=1e-7)
train_and_plot_f(model2, opt2, epoch=50, fig_name='opti-vis-para2.pdf')
②简单拟合实验
# 固定随机种子
from abc import abstractmethod
import numpy as np
import torch
from nndl.op import Op, SimpleBatchGD
from matplotlib import pyplot as plt
torch.seed()
# 随机生成shape为(1000,2)的训练数据
X = torch.randn([1000, 2])
w = torch.as_tensor([0.5, 0.8])
w = torch.unsqueeze(w, dim=1)
noise = 0.01 * torch.rand([1000])
noise = torch.unsqueeze(noise, dim=1)
# 计算y
y = torch.matmul(X, w) + noise
# 打印X, y样本
print('X: ', X[0].numpy())
print('y: ', y[0].numpy())
# X,y组成训练样本数据
data = torch.concat((X, y), dim=1)
print('input data shape: ', data.shape)
print('data: ', data[0].numpy())
# 优化器基类
class Optimizer(object):
def __init__(self, init_lr, model):
self.init_lr = init_lr
#指定优化器需要优化的模型
self.model = model
@abstractmethod
def step(self):
pass
class Adagrad(Optimizer):
def __init__(self, init_lr, model, epsilon):
super(Adagrad, self).__init__(init_lr=init_lr, model=model)
self.G = {}
for key in self.model.params.keys():
self.G[key] = 0
self.epsilon = epsilon
def adagrad(self, x, gradient_x, G, init_lr):
G += gradient_x ** 2
x -= init_lr / torch.sqrt(G + self.epsilon) * gradient_x
return x, G
def step(self):
for key in self.model.params.keys():
self.model.params[key], self.G[key] = self.adagrad(self.model.params[key],
self.model.grads[key],
self.G[key],
self.init_lr)
class Linear(Op):
def __init__(self, input_size, weight_init=np.random.standard_normal, bias_init=torch.zeros):
self.params = {}
self.params['W'] = weight_init([input_size, 1])
self.params['W'] = torch.as_tensor(self.params['W'],dtype=torch.float32)
self.params['b'] = bias_init([1])
self.inputs = None
self.grads = {}
def forward(self, inputs):
self.inputs = inputs
self.outputs = torch.matmul(self.inputs, self.params['W']) + self.params['b']
return self.outputs
def backward(self, labels):
K = self.inputs.shape[0]
self.grads['W'] = 1./ K*torch.matmul(self.inputs.T, (self.outputs - labels))
self.grads['b'] = 1./K* torch.sum(self.outputs-labels, dim=0)
def train(data, num_epochs, batch_size, model, calculate_loss, optimizer, verbose=False):
# 记录每个回合损失的变化
epoch_loss = []
# 记录每次迭代损失的变化
iter_loss = []
N = len(data)
for epoch_id in range(num_epochs):
# np.random.shuffle(data) #不再随机打乱数据
# 将训练数据进行拆分,每个mini_batch包含batch_size条的数据
mini_batches = [data[i:i+batch_size] for i in range(0, N, batch_size)]
for iter_id, mini_batch in enumerate(mini_batches):
# data中前两个分量为X
inputs = mini_batch[:, :-1]
# data中最后一个分量为y
labels = mini_batch[:, -1:]
# 前向计算
outputs = model(inputs)
# 计算损失
loss = calculate_loss(outputs, labels).numpy()
# 计算梯度
model.backward(labels)
# 梯度更新
optimizer.step()
iter_loss.append(loss)
# verbose = True 则打印当前回合的损失
if verbose:
print('Epoch {:3d}, loss = {:.4f}'.format(epoch_id, np.mean(iter_loss)))
epoch_loss.append(np.mean(iter_loss))
return iter_loss, epoch_loss
def plot_loss(iter_loss, epoch_loss, fig_name):
"""
可视化损失函数的变化趋势
"""
plt.figure(figsize=(10, 4))
ax1 = plt.subplot(121)
ax1.plot(iter_loss, color='#e4007f')
plt.title('iteration loss')
ax2 = plt.subplot(122)
ax2.plot(epoch_loss, color='#f19ec2')
plt.title('epoch loss')
plt.savefig(fig_name)
plt.show()
import torch.nn as nn
def train_and_plot(optimizer, fig_name):
"""
训练网络并画出损失函数的变化趋势
输入:
- optimizer:优化器
"""
# 定义均方差损失
mse = nn.MSELoss()
iter_loss, epoch_loss = train(data, num_epochs=30, batch_size=64, model=model, calculate_loss=mse, optimizer=optimizer)
plot_loss(iter_loss, epoch_loss, fig_name)
# 固定随机种子
torch.seed()
# 定义网络结构
model = Linear(2)
# 定义优化器
opt = Adagrad(init_lr=0.1, model=model, epsilon=1e-7)
train_and_plot(opt, 'opti-loss2.pdf')
(2)RMSprop算法
①2D可视化实验
from abc import abstractmethod
import copy
import numpy as np
import torch
from matplotlib import pyplot as plt
from nndl.op import Op
# 优化器基类
class Optimizer(object):
def __init__(self, init_lr, model):
self.init_lr = init_lr
#指定优化器需要优化的模型
self.model = model
@abstractmethod
def step(self):
pass
class RMSprop(Optimizer):
def __init__(self, init_lr, model, beta, epsilon):
super(RMSprop, self).__init__(init_lr=init_lr, model=model)
self.G = {}
for key in self.model.params.keys():
self.G[key] = 0
self.beta = beta
self.epsilon = epsilon
def rmsprop(self, x, gradient_x, G, init_lr):
"""
rmsprop算法更新参数,G为迭代梯度平方的加权移动平均
"""
G = self.beta * G + (1 - self.beta) * gradient_x ** 2
x -= init_lr / torch.sqrt(G + self.epsilon) * gradient_x
return x, G
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.G[key] = self.rmsprop(self.model.params[key],
self.model.grads[key],
self.G[key],
self.init_lr)
#这个类是一个被优化函数,forward是函数的前向计算,backward是函数对x的偏导
class OptimizedFunction(Op):
def __init__(self, w):
super(OptimizedFunction, self).__init__()
self.w = torch.as_tensor(w,dtype=torch.float32)#权重矩阵
self.params = {'x': torch.as_tensor(0,dtype=torch.float32)}#存储前向传播中的x
self.grads = {'x': torch.as_tensor(0,dtype=torch.float32)}#存储反向传播中x的梯度
def forward(self, x):
self.params['x'] = x#保存输入x
return torch.matmul(self.w.T, torch.square(self.params['x']))#转置w与x的平方相乘
def backward(self):
self.grads['x'] = 2 * torch.multiply(self.w.T, self.params['x'])#计算关于x的梯度
#这个类是一个训练函数,用于返回每个epoch中x的值和loss的值
# 训练函数
def train_f(model, optimizer, x_init, epoch):
x = x_init
all_x = []
losses = []
for i in range(epoch):
all_x.append(copy.copy(x.numpy())) #保存每轮的x值
loss = model(x)
losses.append(loss)
model.backward()
optimizer.step()
x = model.params['x']
return torch.as_tensor(all_x), losses #返回每个epoch的x值和损失值
#这个类是一个可视化函数,用于绘制x的更新轨迹
class Visualization(object):
def __init__(self):
#只画出参数x1和x2在区间[-5,5]的曲线部分
x1 = np.arange(-5, 5, 0.1)
x2 = np.arange(-5, 5, 0.1)
x1, x2 = np.meshgrid(x1, x2)#创建网格
self.init_x = torch.as_tensor([x1, x2])#转换为pytorch张量
def plot_2d(self, model, x, fig_name):#可视化参数轨迹
#model是模型对象,用于计算损失值;x参数更新的轨迹;fig_name保存的文件名
fig, ax = plt.subplots(figsize=(10, 6))
cp = ax.contourf(self.init_x[0], self.init_x[1], model(self.init_x.transpose(1,0)), colors=['#e4007f', '#f19ec2', '#e86096', '#eb7aaa', '#f6c8dc', '#f5f5f5', '#000000'])
c = ax.contour(self.init_x[0], self.init_x[1], model(self.init_x.transpose(1,0)), colors='black')
cbar = fig.colorbar(cp)
#绘制参数更新轨迹
ax.plot(x[:, 0], x[:, 1], '-o', color='#000000')
#绘制目标点(最优解)
ax.plot(0, 'r*', markersize=18, color='#fefefe')
ax.set_xlabel('$x1$')
ax.set_ylabel('$x2$')
ax.set_xlim((-2, 5))
ax.set_ylim((-2, 5))
plt.savefig(fig_name)
#这个类用于模型训练并可视化
def train_and_plot_f(model, optimizer, epoch, fig_name):
#设置x的初始值
x_init = torch.as_tensor([3, 4], dtype=torch.float32)
print('x1 initiate: {}, x2 initiate: {}'.format(x_init[0].numpy(), x_init[1].numpy()))
#训练函数调用,进行模型训练并返回更新后的参数 x 和每一轮的损失 losses
x, losses = train_f(model, optimizer, x_init, epoch)
losses = np.array(losses)
# 展示x1、x2的更新轨迹
vis = Visualization()
vis.plot_2d(model, x, fig_name)
torch.seed()
w = torch.as_tensor([0.2, 2])
model3 = OptimizedFunction(w)
opt3 = RMSprop(init_lr=0.1, model=model3, beta=0.9, epsilon=1e-7)
train_and_plot_f(model3, opt3, epoch=50, fig_name='opti-vis-para3.pdf')
②简单拟合实验
# 固定随机种子
from abc import abstractmethod
import numpy as np
import torch
from nndl.op import Op, SimpleBatchGD
from matplotlib import pyplot as plt
torch.seed()
# 随机生成shape为(1000,2)的训练数据
X = torch.randn([1000, 2])
w = torch.as_tensor([0.5, 0.8])
w = torch.unsqueeze(w, dim=1)
noise = 0.01 * torch.rand([1000])
noise = torch.unsqueeze(noise, dim=1)
# 计算y
y = torch.matmul(X, w) + noise
# 打印X, y样本
print('X: ', X[0].numpy())
print('y: ', y[0].numpy())
# X,y组成训练样本数据
data = torch.concat((X, y), dim=1)
print('input data shape: ', data.shape)
print('data: ', data[0].numpy())
# 优化器基类
class Optimizer(object):
def __init__(self, init_lr, model):
self.init_lr = init_lr
#指定优化器需要优化的模型
self.model = model
@abstractmethod
def step(self):
pass
class RMSprop(Optimizer):
def __init__(self, init_lr, model, beta, epsilon):
super(RMSprop, self).__init__(init_lr=init_lr, model=model)
self.G = {}
for key in self.model.params.keys():
self.G[key] = 0
self.beta = beta
self.epsilon = epsilon
def rmsprop(self, x, gradient_x, G, init_lr):
"""
rmsprop算法更新参数,G为迭代梯度平方的加权移动平均
"""
G = self.beta * G + (1 - self.beta) * gradient_x ** 2
x -= init_lr / torch.sqrt(G + self.epsilon) * gradient_x
return x, G
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.G[key] = self.rmsprop(self.model.params[key],
self.model.grads[key],
self.G[key],
self.init_lr)
class Linear(Op):
def __init__(self, input_size, weight_init=np.random.standard_normal, bias_init=torch.zeros):
self.params = {}
self.params['W'] = weight_init([input_size, 1])
self.params['W'] = torch.as_tensor(self.params['W'],dtype=torch.float32)
self.params['b'] = bias_init([1])
self.inputs = None
self.grads = {}
def forward(self, inputs):
self.inputs = inputs
self.outputs = torch.matmul(self.inputs, self.params['W']) + self.params['b']
return self.outputs
def backward(self, labels):
K = self.inputs.shape[0]
self.grads['W'] = 1./ K*torch.matmul(self.inputs.T, (self.outputs - labels))
self.grads['b'] = 1./K* torch.sum(self.outputs-labels, dim=0)
def train(data, num_epochs, batch_size, model, calculate_loss, optimizer, verbose=False):
# 记录每个回合损失的变化
epoch_loss = []
# 记录每次迭代损失的变化
iter_loss = []
N = len(data)
for epoch_id in range(num_epochs):
# np.random.shuffle(data) #不再随机打乱数据
# 将训练数据进行拆分,每个mini_batch包含batch_size条的数据
mini_batches = [data[i:i+batch_size] for i in range(0, N, batch_size)]
for iter_id, mini_batch in enumerate(mini_batches):
# data中前两个分量为X
inputs = mini_batch[:, :-1]
# data中最后一个分量为y
labels = mini_batch[:, -1:]
# 前向计算
outputs = model(inputs)
# 计算损失
loss = calculate_loss(outputs, labels).numpy()
# 计算梯度
model.backward(labels)
# 梯度更新
optimizer.step()
iter_loss.append(loss)
# verbose = True 则打印当前回合的损失
if verbose:
print('Epoch {:3d}, loss = {:.4f}'.format(epoch_id, np.mean(iter_loss)))
epoch_loss.append(np.mean(iter_loss))
return iter_loss, epoch_loss
def plot_loss(iter_loss, epoch_loss, fig_name):
"""
可视化损失函数的变化趋势
"""
plt.figure(figsize=(10, 4))
ax1 = plt.subplot(121)
ax1.plot(iter_loss, color='#e4007f')
plt.title('iteration loss')
ax2 = plt.subplot(122)
ax2.plot(epoch_loss, color='#f19ec2')
plt.title('epoch loss')
plt.savefig(fig_name)
plt.show()
import torch.nn as nn
def train_and_plot(optimizer, fig_name):
"""
训练网络并画出损失函数的变化趋势
输入:
- optimizer:优化器
"""
# 定义均方差损失
mse = nn.MSELoss()
iter_loss, epoch_loss = train(data, num_epochs=30, batch_size=64, model=model, calculate_loss=mse, optimizer=optimizer)
plot_loss(iter_loss, epoch_loss, fig_name)
# 固定随机种子
torch.seed()
# 定义网络结构
model = Linear(2)
# 定义优化器
opt3 = RMSprop(init_lr=0.1, model=model, beta=0.9, epsilon=1e-7)
train_and_plot(opt3, 'opti-loss3.pdf')
3.梯度估计修正
(1)动量法
①2D可视化实验
from abc import abstractmethod
import copy
import numpy as np
import torch
from matplotlib import pyplot as plt
from nndl.op import Op
# 优化器基类
class Optimizer(object):
def __init__(self, init_lr, model):
self.init_lr = init_lr
#指定优化器需要优化的模型
self.model = model
@abstractmethod
def step(self):
pass
class Momentum(Optimizer):
def __init__(self, init_lr, model, rho):
super(Momentum, self).__init__(init_lr=init_lr, model=model)
self.delta_x = {}
for key in self.model.params.keys():
self.delta_x[key] = 0
self.rho = rho
def momentum(self, x, gradient_x, delta_x, init_lr):
"""
momentum算法更新参数,delta_x为梯度的加权移动平均
"""
delta_x = self.rho * delta_x - init_lr * gradient_x
x += delta_x
return x, delta_x
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.delta_x[key] = self.momentum(self.model.params[key],
self.model.grads[key],
self.delta_x[key],
self.init_lr)
#这个类是一个被优化函数,forward是函数的前向计算,backward是函数对x的偏导
class OptimizedFunction(Op):
def __init__(self, w):
super(OptimizedFunction, self).__init__()
self.w = torch.as_tensor(w,dtype=torch.float32)#权重矩阵
self.params = {'x': torch.as_tensor(0,dtype=torch.float32)}#存储前向传播中的x
self.grads = {'x': torch.as_tensor(0,dtype=torch.float32)}#存储反向传播中x的梯度
def forward(self, x):
self.params['x'] = x#保存输入x
return torch.matmul(self.w.T, torch.square(self.params['x']))#转置w与x的平方相乘
def backward(self):
self.grads['x'] = 2 * torch.multiply(self.w.T, self.params['x'])#计算关于x的梯度
#这个类是一个训练函数,用于返回每个epoch中x的值和loss的值
# 训练函数
def train_f(model, optimizer, x_init, epoch):
x = x_init
all_x = []
losses = []
for i in range(epoch):
all_x.append(copy.copy(x.numpy())) #保存每轮的x值
loss = model(x)
losses.append(loss)
model.backward()
optimizer.step()
x = model.params['x']
return torch.as_tensor(all_x), losses #返回每个epoch的x值和损失值
#这个类是一个可视化函数,用于绘制x的更新轨迹
class Visualization(object):
def __init__(self):
#只画出参数x1和x2在区间[-5,5]的曲线部分
x1 = np.arange(-5, 5, 0.1)
x2 = np.arange(-5, 5, 0.1)
x1, x2 = np.meshgrid(x1, x2)#创建网格
self.init_x = torch.as_tensor([x1, x2])#转换为pytorch张量
def plot_2d(self, model, x, fig_name):#可视化参数轨迹
#model是模型对象,用于计算损失值;x参数更新的轨迹;fig_name保存的文件名
fig, ax = plt.subplots(figsize=(10, 6))
cp = ax.contourf(self.init_x[0], self.init_x[1], model(self.init_x.transpose(1,0)), colors=['#e4007f', '#f19ec2', '#e86096', '#eb7aaa', '#f6c8dc', '#f5f5f5', '#000000'])
c = ax.contour(self.init_x[0], self.init_x[1], model(self.init_x.transpose(1,0)), colors='black')
cbar = fig.colorbar(cp)
#绘制参数更新轨迹
ax.plot(x[:, 0], x[:, 1], '-o', color='#000000')
#绘制目标点(最优解)
ax.plot(0, 'r*', markersize=18, color='#fefefe')
ax.set_xlabel('$x1$')
ax.set_ylabel('$x2$')
ax.set_xlim((-2, 5))
ax.set_ylim((-2, 5))
plt.savefig(fig_name)
#这个类用于模型训练并可视化
def train_and_plot_f(model, optimizer, epoch, fig_name):
#设置x的初始值
x_init = torch.as_tensor([3, 4], dtype=torch.float32)
print('x1 initiate: {}, x2 initiate: {}'.format(x_init[0].numpy(), x_init[1].numpy()))
#训练函数调用,进行模型训练并返回更新后的参数 x 和每一轮的损失 losses
x, losses = train_f(model, optimizer, x_init, epoch)
losses = np.array(losses)
# 展示x1、x2的更新轨迹
vis = Visualization()
vis.plot_2d(model, x, fig_name)
# 固定随机种子
torch.seed()
w = torch.as_tensor([0.2, 2])
model4 = OptimizedFunction(w)
opt4 = Momentum(init_lr=0.01, model=model4, rho=0.9)
train_and_plot_f(model4, opt4, epoch=50, fig_name='opti-vis-para4.pdf')
②简单拟合实验
# 固定随机种子
from abc import abstractmethod
import numpy as np
import torch
from nndl.op import Op, SimpleBatchGD
from matplotlib import pyplot as plt
torch.seed()
# 随机生成shape为(1000,2)的训练数据
X = torch.randn([1000, 2])
w = torch.as_tensor([0.5, 0.8])
w = torch.unsqueeze(w, dim=1)
noise = 0.01 * torch.rand([1000])
noise = torch.unsqueeze(noise, dim=1)
# 计算y
y = torch.matmul(X, w) + noise
# 打印X, y样本
print('X: ', X[0].numpy())
print('y: ', y[0].numpy())
# X,y组成训练样本数据
data = torch.concat((X, y), dim=1)
print('input data shape: ', data.shape)
print('data: ', data[0].numpy())
# 优化器基类
class Optimizer(object):
def __init__(self, init_lr, model):
self.init_lr = init_lr
#指定优化器需要优化的模型
self.model = model
@abstractmethod
def step(self):
pass
class Momentum(Optimizer):
def __init__(self, init_lr, model, rho):
super(Momentum, self).__init__(init_lr=init_lr, model=model)
self.delta_x = {}
for key in self.model.params.keys():
self.delta_x[key] = 0
self.rho = rho
def momentum(self, x, gradient_x, delta_x, init_lr):
"""
momentum算法更新参数,delta_x为梯度的加权移动平均
"""
delta_x = self.rho * delta_x - init_lr * gradient_x
x += delta_x
return x, delta_x
class Linear(Op):
def __init__(self, input_size, weight_init=np.random.standard_normal, bias_init=torch.zeros):
self.params = {}
self.params['W'] = weight_init([input_size, 1])
self.params['W'] = torch.as_tensor(self.params['W'],dtype=torch.float32)
self.params['b'] = bias_init([1])
self.inputs = None
self.grads = {}
def forward(self, inputs):
self.inputs = inputs
self.outputs = torch.matmul(self.inputs, self.params['W']) + self.params['b']
return self.outputs
def backward(self, labels):
K = self.inputs.shape[0]
self.grads['W'] = 1./ K*torch.matmul(self.inputs.T, (self.outputs - labels))
self.grads['b'] = 1./K* torch.sum(self.outputs-labels, dim=0)
def train(data, num_epochs, batch_size, model, calculate_loss, optimizer, verbose=False):
# 记录每个回合损失的变化
epoch_loss = []
# 记录每次迭代损失的变化
iter_loss = []
N = len(data)
for epoch_id in range(num_epochs):
# np.random.shuffle(data) #不再随机打乱数据
# 将训练数据进行拆分,每个mini_batch包含batch_size条的数据
mini_batches = [data[i:i+batch_size] for i in range(0, N, batch_size)]
for iter_id, mini_batch in enumerate(mini_batches):
# data中前两个分量为X
inputs = mini_batch[:, :-1]
# data中最后一个分量为y
labels = mini_batch[:, -1:]
# 前向计算
outputs = model(inputs)
# 计算损失
loss = calculate_loss(outputs, labels).numpy()
# 计算梯度
model.backward(labels)
# 梯度更新
optimizer.step()
iter_loss.append(loss)
# verbose = True 则打印当前回合的损失
if verbose:
print('Epoch {:3d}, loss = {:.4f}'.format(epoch_id, np.mean(iter_loss)))
epoch_loss.append(np.mean(iter_loss))
return iter_loss, epoch_loss
def plot_loss(iter_loss, epoch_loss, fig_name):
"""
可视化损失函数的变化趋势
"""
plt.figure(figsize=(10, 4))
ax1 = plt.subplot(121)
ax1.plot(iter_loss, color='#e4007f')
plt.title('iteration loss')
ax2 = plt.subplot(122)
ax2.plot(epoch_loss, color='#f19ec2')
plt.title('epoch loss')
plt.savefig(fig_name)
plt.show()
import torch.nn as nn
def train_and_plot(optimizer, fig_name):
"""
训练网络并画出损失函数的变化趋势
输入:
- optimizer:优化器
"""
# 定义均方差损失
mse = nn.MSELoss()
iter_loss, epoch_loss = train(data, num_epochs=30, batch_size=64, model=model, calculate_loss=mse, optimizer=optimizer)
plot_loss(iter_loss, epoch_loss, fig_name)
# 固定随机种子
torch.seed()
# 定义网络结构
model = Linear(2)
# 定义优化器
opt = Momentum(init_lr=0.01, model=model, rho=0.9)
train_and_plot(opt, 'opti-loss4.pdf')
(2)Adam算法
①2D可视化实验
from abc import abstractmethod
import copy
import numpy as np
import torch
from matplotlib import pyplot as plt
from nndl.op import Op
# 优化器基类
class Optimizer(object):
def __init__(self, init_lr, model):
self.init_lr = init_lr
#指定优化器需要优化的模型
self.model = model
@abstractmethod
def step(self):
pass
class Adam(Optimizer):
def __init__(self, init_lr, model, beta1, beta2, epsilon):
super(Adam, self).__init__(init_lr=init_lr, model=model)
self.beta1 = beta1
self.beta2 = beta2
self.epsilon = epsilon
self.M, self.G = {}, {}
for key in self.model.params.keys():
self.M[key] = 0
self.G[key] = 0
self.t = 1
def adam(self, x, gradient_x, G, M, t, init_lr):
M = self.beta1 * M + (1 - self.beta1) * gradient_x
G = self.beta2 * G + (1 - self.beta2) * gradient_x ** 2
M_hat = M / (1 - self.beta1 ** t)
G_hat = G / (1 - self.beta2 ** t)
t += 1
x -= init_lr / torch.sqrt(G_hat + self.epsilon) * M_hat
return x, G, M, t
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.G[key], self.M[key], self.t = self.adam(self.model.params[key],
self.model.grads[key],
self.G[key],
self.M[key],
self.t,
self.init_lr)
#这个类是一个被优化函数,forward是函数的前向计算,backward是函数对x的偏导
class OptimizedFunction(Op):
def __init__(self, w):
super(OptimizedFunction, self).__init__()
self.w = torch.as_tensor(w,dtype=torch.float32)#权重矩阵
self.params = {'x': torch.as_tensor(0,dtype=torch.float32)}#存储前向传播中的x
self.grads = {'x': torch.as_tensor(0,dtype=torch.float32)}#存储反向传播中x的梯度
def forward(self, x):
self.params['x'] = x#保存输入x
return torch.matmul(self.w.T, torch.square(self.params['x']))#转置w与x的平方相乘
def backward(self):
self.grads['x'] = 2 * torch.multiply(self.w.T, self.params['x'])#计算关于x的梯度
#这个类是一个训练函数,用于返回每个epoch中x的值和loss的值
# 训练函数
def train_f(model, optimizer, x_init, epoch):
x = x_init
all_x = []
losses = []
for i in range(epoch):
all_x.append(copy.copy(x.numpy())) #保存每轮的x值
loss = model(x)
losses.append(loss)
model.backward()
optimizer.step()
x = model.params['x']
return torch.as_tensor(all_x), losses #返回每个epoch的x值和损失值
#这个类是一个可视化函数,用于绘制x的更新轨迹
class Visualization(object):
def __init__(self):
#只画出参数x1和x2在区间[-5,5]的曲线部分
x1 = np.arange(-5, 5, 0.1)
x2 = np.arange(-5, 5, 0.1)
x1, x2 = np.meshgrid(x1, x2)#创建网格
self.init_x = torch.as_tensor([x1, x2])#转换为pytorch张量
def plot_2d(self, model, x, fig_name):#可视化参数轨迹
#model是模型对象,用于计算损失值;x参数更新的轨迹;fig_name保存的文件名
fig, ax = plt.subplots(figsize=(10, 6))
cp = ax.contourf(self.init_x[0], self.init_x[1], model(self.init_x.transpose(1,0)), colors=['#e4007f', '#f19ec2', '#e86096', '#eb7aaa', '#f6c8dc', '#f5f5f5', '#000000'])
c = ax.contour(self.init_x[0], self.init_x[1], model(self.init_x.transpose(1,0)), colors='black')
cbar = fig.colorbar(cp)
#绘制参数更新轨迹
ax.plot(x[:, 0], x[:, 1], '-o', color='#000000')
#绘制目标点(最优解)
ax.plot(0, 'r*', markersize=18, color='#fefefe')
ax.set_xlabel('$x1$')
ax.set_ylabel('$x2$')
ax.set_xlim((-2, 5))
ax.set_ylim((-2, 5))
plt.savefig(fig_name)
#这个类用于模型训练并可视化
def train_and_plot_f(model, optimizer, epoch, fig_name):
#设置x的初始值
x_init = torch.as_tensor([3, 4], dtype=torch.float32)
print('x1 initiate: {}, x2 initiate: {}'.format(x_init[0].numpy(), x_init[1].numpy()))
#训练函数调用,进行模型训练并返回更新后的参数 x 和每一轮的损失 losses
x, losses = train_f(model, optimizer, x_init, epoch)
losses = np.array(losses)
# 展示x1、x2的更新轨迹
vis = Visualization()
vis.plot_2d(model, x, fig_name)
# 固定随机种子
torch.seed()
w = torch.as_tensor([0.2, 2])
model5 = OptimizedFunction(w)
opt5 = Adam(init_lr=0.2, model=model5, beta1=0.9, beta2=0.99, epsilon=1e-7)
train_and_plot_f(model5, opt5, epoch=20, fig_name='opti-vis-para5.pdf')
②简单拟合实验
# 固定随机种子
from abc import abstractmethod
import numpy as np
import torch
from nndl.op import Op, SimpleBatchGD
from matplotlib import pyplot as plt
torch.seed()
# 随机生成shape为(1000,2)的训练数据
X = torch.randn([1000, 2])
w = torch.as_tensor([0.5, 0.8])
w = torch.unsqueeze(w, dim=1)
noise = 0.01 * torch.rand([1000])
noise = torch.unsqueeze(noise, dim=1)
# 计算y
y = torch.matmul(X, w) + noise
# 打印X, y样本
print('X: ', X[0].numpy())
print('y: ', y[0].numpy())
# X,y组成训练样本数据
data = torch.concat((X, y), dim=1)
print('input data shape: ', data.shape)
print('data: ', data[0].numpy())
# 优化器基类
class Optimizer(object):
def __init__(self, init_lr, model):
self.init_lr = init_lr
#指定优化器需要优化的模型
self.model = model
@abstractmethod
def step(self):
pass
class Adam(Optimizer):
def __init__(self, init_lr, model, beta1, beta2, epsilon):
super(Adam, self).__init__(init_lr=init_lr, model=model)
self.beta1 = beta1
self.beta2 = beta2
self.epsilon = epsilon
self.M, self.G = {}, {}
for key in self.model.params.keys():
self.M[key] = 0
self.G[key] = 0
self.t = 1
def adam(self, x, gradient_x, G, M, t, init_lr):
M = self.beta1 * M + (1 - self.beta1) * gradient_x
G = self.beta2 * G + (1 - self.beta2) * gradient_x ** 2
M_hat = M / (1 - self.beta1 ** t)
G_hat = G / (1 - self.beta2 ** t)
t += 1
x -= init_lr / torch.sqrt(G_hat + self.epsilon) * M_hat
return x, G, M, t
class Linear(Op):
def __init__(self, input_size, weight_init=np.random.standard_normal, bias_init=torch.zeros):
self.params = {}
self.params['W'] = weight_init([input_size, 1])
self.params['W'] = torch.as_tensor(self.params['W'],dtype=torch.float32)
self.params['b'] = bias_init([1])
self.inputs = None
self.grads = {}
def forward(self, inputs):
self.inputs = inputs
self.outputs = torch.matmul(self.inputs, self.params['W']) + self.params['b']
return self.outputs
def backward(self, labels):
K = self.inputs.shape[0]
self.grads['W'] = 1./ K*torch.matmul(self.inputs.T, (self.outputs - labels))
self.grads['b'] = 1./K* torch.sum(self.outputs-labels, dim=0)
def train(data, num_epochs, batch_size, model, calculate_loss, optimizer, verbose=False):
# 记录每个回合损失的变化
epoch_loss = []
# 记录每次迭代损失的变化
iter_loss = []
N = len(data)
for epoch_id in range(num_epochs):
# np.random.shuffle(data) #不再随机打乱数据
# 将训练数据进行拆分,每个mini_batch包含batch_size条的数据
mini_batches = [data[i:i+batch_size] for i in range(0, N, batch_size)]
for iter_id, mini_batch in enumerate(mini_batches):
# data中前两个分量为X
inputs = mini_batch[:, :-1]
# data中最后一个分量为y
labels = mini_batch[:, -1:]
# 前向计算
outputs = model(inputs)
# 计算损失
loss = calculate_loss(outputs, labels).numpy()
# 计算梯度
model.backward(labels)
# 梯度更新
optimizer.step()
iter_loss.append(loss)
# verbose = True 则打印当前回合的损失
if verbose:
print('Epoch {:3d}, loss = {:.4f}'.format(epoch_id, np.mean(iter_loss)))
epoch_loss.append(np.mean(iter_loss))
return iter_loss, epoch_loss
def plot_loss(iter_loss, epoch_loss, fig_name):
"""
可视化损失函数的变化趋势
"""
plt.figure(figsize=(10, 4))
ax1 = plt.subplot(121)
ax1.plot(iter_loss, color='#e4007f')
plt.title('iteration loss')
ax2 = plt.subplot(122)
ax2.plot(epoch_loss, color='#f19ec2')
plt.title('epoch loss')
plt.savefig(fig_name)
plt.show()
import torch.nn as nn
def train_and_plot(optimizer, fig_name):
"""
训练网络并画出损失函数的变化趋势
输入:
- optimizer:优化器
"""
# 定义均方差损失
mse = nn.MSELoss()
iter_loss, epoch_loss = train(data, num_epochs=30, batch_size=64, model=model5, calculate_loss=mse, optimizer=optimizer)
plot_loss(iter_loss, epoch_loss, fig_name)
# 固定随机种子
torch.seed()
# 定义网络结构
model5 = Linear(2)
# 定义优化器
opt5 = Adam(init_lr=0.1, model=model5, beta1=0.9, beta2=0.99, epsilon=1e-7)
train_and_plot(opt5, 'opti-loss5.pdf')
4.不同优化器的3D可视化对比,被优化函数为:
import torch
import numpy as np
import copy
from matplotlib import pyplot as plt
from matplotlib import animation
from itertools import zip_longest
class Op(object):
def __init__(self):
pass
def __call__(self, inputs):
return self.forward(inputs)
# 输入:张量inputs
# 输出:张量outputs
def forward(self, inputs):
# return outputs
raise NotImplementedError
# 输入:最终输出对outputs的梯度outputs_grads
# 输出:最终输出对inputs的梯度inputs_grads
def backward(self, outputs_grads):
# return inputs_grads
raise NotImplementedError
class Optimizer(object): # 优化器基类
def __init__(self, init_lr, model):
"""
优化器类初始化
"""
# 初始化学习率,用于参数更新的计算
self.init_lr = init_lr
# 指定优化器需要优化的模型
self.model = model
def step(self):
"""
定义每次迭代如何更新参数
"""
pass
class SimpleBatchGD(Optimizer):
def __init__(self, init_lr, model):
super(SimpleBatchGD, self).__init__(init_lr=init_lr, model=model)
def step(self):
# 参数更新
if isinstance(self.model.params, dict):
for key in self.model.params.keys():
self.model.params[key] = self.model.params[key] - self.init_lr * self.model.grads[key]
class Adagrad(Optimizer):
def __init__(self, init_lr, model, epsilon):
"""
Adagrad 优化器初始化
输入:
- init_lr: 初始学习率 - model:模型,model.params存储模型参数值 - epsilon:保持数值稳定性而设置的非常小的常数
"""
super(Adagrad, self).__init__(init_lr=init_lr, model=model)
self.G = {}
for key in self.model.params.keys():
self.G[key] = 0
self.epsilon = epsilon
def adagrad(self, x, gradient_x, G, init_lr):
"""
adagrad算法更新参数,G为参数梯度平方的累计值。
"""
G += gradient_x ** 2
x -= init_lr / torch.sqrt(G + self.epsilon) * gradient_x
return x, G
def step(self):
"""
参数更新
"""
for key in self.model.params.keys():
self.model.params[key], self.G[key] = self.adagrad(self.model.params[key],
self.model.grads[key],
self.G[key],
self.init_lr)
class RMSprop(Optimizer):
def __init__(self, init_lr, model, beta, epsilon):
"""
RMSprop优化器初始化
输入:
- init_lr:初始学习率
- model:模型,model.params存储模型参数值
- beta:衰减率
- epsilon:保持数值稳定性而设置的常数
"""
super(RMSprop, self).__init__(init_lr=init_lr, model=model)
self.G = {}
for key in self.model.params.keys():
self.G[key] = 0
self.beta = beta
self.epsilon = epsilon
def rmsprop(self, x, gradient_x, G, init_lr):
"""
rmsprop算法更新参数,G为迭代梯度平方的加权移动平均
"""
G = self.beta * G + (1 - self.beta) * gradient_x ** 2
x -= init_lr / torch.sqrt(G + self.epsilon) * gradient_x
return x, G
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.G[key] = self.rmsprop(self.model.params[key],
self.model.grads[key],
self.G[key],
self.init_lr)
class Momentum(Optimizer):
def __init__(self, init_lr, model, rho):
"""
Momentum优化器初始化
输入:
- init_lr:初始学习率
- model:模型,model.params存储模型参数值
- rho:动量因子
"""
super(Momentum, self).__init__(init_lr=init_lr, model=model)
self.delta_x = {}
for key in self.model.params.keys():
self.delta_x[key] = 0
self.rho = rho
def momentum(self, x, gradient_x, delta_x, init_lr):
"""
momentum算法更新参数,delta_x为梯度的加权移动平均
"""
delta_x = self.rho * delta_x - init_lr * gradient_x
x += delta_x
return x, delta_x
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.delta_x[key] = self.momentum(self.model.params[key],
self.model.grads[key],
self.delta_x[key],
self.init_lr)
class Adam(Optimizer):
def __init__(self, init_lr, model, beta1, beta2, epsilon):
"""
Adam优化器初始化
输入:
- init_lr:初始学习率
- model:模型,model.params存储模型参数值
- beta1, beta2:移动平均的衰减率
- epsilon:保持数值稳定性而设置的常数
"""
super(Adam, self).__init__(init_lr=init_lr, model=model)
self.beta1 = beta1
self.beta2 = beta2
self.epsilon = epsilon
self.M, self.G = {}, {}
for key in self.model.params.keys():
self.M[key] = 0
self.G[key] = 0
self.t = 1
def adam(self, x, gradient_x, G, M, t, init_lr):
"""
adam算法更新参数
输入:
- x:参数
- G:梯度平方的加权移动平均
- M:梯度的加权移动平均
- t:迭代次数
- init_lr:初始学习率
"""
M = self.beta1 * M + (1 - self.beta1) * gradient_x
G = self.beta2 * G + (1 - self.beta2) * gradient_x ** 2
M_hat = M / (1 - self.beta1 ** t)
G_hat = G / (1 - self.beta2 ** t)
t += 1
x -= init_lr / torch.sqrt(G_hat + self.epsilon) * M_hat
return x, G, M, t
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.G[key], self.M[key], self.t = self.adam(self.model.params[key],
self.model.grads[key],
self.G[key],
self.M[key],
self.t,
self.init_lr)
class OptimizedFunction3D(Op):
def __init__(self):
super(OptimizedFunction3D, self).__init__()
self.params = {'x': 0}
self.grads = {'x': 0}
def forward(self, x):
self.params['x'] = x
return x[0] ** 2 + x[1] ** 2 + x[1] ** 3 + x[0] * x[1]
def backward(self):
x = self.params['x']
gradient1 = 2 * x[0] + x[1]
gradient2 = 2 * x[1] + 3 * x[1] ** 2 + x[0]
grad1 = torch.Tensor([gradient1])
grad2 = torch.Tensor([gradient2])
self.grads['x'] = torch.cat([grad1, grad2])
class Visualization3D(animation.FuncAnimation):
""" 绘制动态图像,可视化参数更新轨迹 """
def __init__(self, *xy_values, z_values, labels=[], colors=[], fig, ax, interval=600, blit=True, **kwargs):
"""
初始化3d可视化类
输入:
xy_values:三维中x,y维度的值
z_values:三维中z维度的值
labels:每个参数更新轨迹的标签
colors:每个轨迹的颜色
interval:帧之间的延迟(以毫秒为单位)
blit:是否优化绘图
"""
self.fig = fig
self.ax = ax
self.xy_values = xy_values
self.z_values = z_values
frames = max(xy_value.shape[0] for xy_value in xy_values)
self.lines = [ax.plot([], [], [], label=label, color=color, lw=2)[0]
for _, label, color in zip_longest(xy_values, labels, colors)]
super(Visualization3D, self).__init__(fig, self.animate, init_func=self.init_animation, frames=frames,
interval=interval, blit=blit, **kwargs)
def init_animation(self):
# 数值初始化
for line in self.lines:
line.set_data([], [])
# line.set_3d_properties(np.asarray([])) # 源程序中有这一行,加上会报错。 Edit by David 2022.12.4
return self.lines
def animate(self, i):
# 将x,y,z三个数据传入,绘制三维图像
for line, xy_value, z_value in zip(self.lines, self.xy_values, self.z_values):
line.set_data(xy_value[:i, 0], xy_value[:i, 1])
line.set_3d_properties(z_value[:i])
return self.lines
def train_f(model, optimizer, x_init, epoch):
x = x_init
all_x = []
losses = []
for i in range(epoch):
all_x.append(copy.copy(x.numpy())) # 浅拷贝 改为 深拷贝, 否则List的原值会被改变。 Edit by David 2022.12.4.
loss = model(x)
losses.append(loss)
model.backward()
optimizer.step()
x = model.params['x']
return torch.Tensor(np.array(all_x)), losses
# 构建5个模型,分别配备不同的优化器
model1 = OptimizedFunction3D()
opt_gd = SimpleBatchGD(init_lr=0.01, model=model1)
model2 = OptimizedFunction3D()
opt_adagrad = Adagrad(init_lr=0.5, model=model2, epsilon=1e-7)
model3 = OptimizedFunction3D()
opt_rmsprop = RMSprop(init_lr=0.1, model=model3, beta=0.9, epsilon=1e-7)
model4 = OptimizedFunction3D()
opt_momentum = Momentum(init_lr=0.01, model=model4, rho=0.9)
model5 = OptimizedFunction3D()
opt_adam = Adam(init_lr=0.1, model=model5, beta1=0.9, beta2=0.99, epsilon=1e-7)
models = [model1, model2, model3, model4, model5]
opts = [opt_gd, opt_adagrad, opt_rmsprop, opt_momentum, opt_adam]
x_all_opts = []
z_all_opts = []
# 使用不同优化器训练
for model, opt in zip(models, opts):
x_init = torch.FloatTensor([2, 3])
x_one_opt, z_one_opt = train_f(model, opt, x_init, 150) # epoch
# 保存参数值
x_all_opts.append(x_one_opt.numpy())
z_all_opts.append(np.squeeze(z_one_opt))
# 使用numpy.meshgrid生成x1,x2矩阵,矩阵的每一行为[-3, 3],以0.1为间隔的数值
x1 = np.arange(-3, 3, 0.1)
x2 = np.arange(-3, 3, 0.1)
x1, x2 = np.meshgrid(x1, x2)
init_x = torch.Tensor(np.array([x1, x2]))
model = OptimizedFunction3D()
# 绘制 f_3d函数 的 三维图像
fig = plt.figure()
ax = plt.axes(projection='3d')
X = init_x[0].numpy()
Y = init_x[1].numpy()
Z = model(init_x).numpy() # 改为 model(init_x).numpy() David 2022.12.4
ax.plot_surface(X, Y, Z, cmap='rainbow')
ax.set_xlabel('x1')
ax.set_ylabel('x2')
ax.set_zlabel('f(x1,x2)')
labels = ['SGD', 'AdaGrad', 'RMSprop', 'Momentum', 'Adam']
colors = ['#f6373c', '#f6f237', '#45f637', '#37f0f6', '#000000']
animator = Visualization3D(*x_all_opts, z_values=z_all_opts, labels=labels, colors=colors, fig=fig, ax=ax)
ax.legend(loc='upper left')
plt.show()
(二)不同优化算法的比较分析(被优化函数是)
1.2D可视化
import numpy as np
import matplotlib.pyplot as plt
from collections import OrderedDict
# --------------------------------------------------------优化器---------------------------------------------------------
class SGD:
"""随机梯度下降法(Stochastic Gradient Descent)"""
def __init__(self, lr=0.01):
self.lr = lr
def update(self, params, grads):
for key in params.keys():
params[key] -= self.lr * grads[key]
class Momentum:
"""Momentum SGD"""
def __init__(self, lr=0.01, momentum=0.9):
self.lr = lr
self.momentum = momentum
self.v = None
def update(self, params, grads):
if self.v is None:
self.v = {}
for key, val in params.items():
self.v[key] = np.zeros_like(val)
for key in params.keys():
self.v[key] = self.momentum * self.v[key] - self.lr * grads[key]
params[key] += self.v[key]
class Nesterov:
"""Nesterov's Accelerated Gradient (http://arxiv.org/abs/1212.0901)"""
def __init__(self, lr=0.01, momentum=0.9):
self.lr = lr
self.momentum = momentum
self.v = None
def update(self, params, grads):
if self.v is None:
self.v = {}
for key, val in params.items():
self.v[key] = np.zeros_like(val)
for key in params.keys():
self.v[key] *= self.momentum
self.v[key] -= self.lr * grads[key]
params[key] += self.momentum * self.momentum * self.v[key]
params[key] -= (1 + self.momentum) * self.lr * grads[key]
class AdaGrad:
"""AdaGrad"""
def __init__(self, lr=0.01):
self.lr = lr
self.h = None
def update(self, params, grads):
if self.h is None:
self.h = {}
for key, val in params.items():
self.h[key] = np.zeros_like(val)
for key in params.keys():
self.h[key] += grads[key] * grads[key]
params[key] -= self.lr * grads[key] / (np.sqrt(self.h[key]) + 1e-7)
class RMSprop:
"""RMSprop"""
def __init__(self, lr=0.01, decay_rate=0.99):
self.lr = lr
self.decay_rate = decay_rate
self.h = None
def update(self, params, grads):
if self.h is None:
self.h = {}
for key, val in params.items():
self.h[key] = np.zeros_like(val)
for key in params.keys():
self.h[key] *= self.decay_rate
self.h[key] += (1 - self.decay_rate) * grads[key] * grads[key]
params[key] -= self.lr * grads[key] / (np.sqrt(self.h[key]) + 1e-7)
class Adam:
"""Adam (http://arxiv.org/abs/1412.6980v8)"""
def __init__(self, lr=0.001, beta1=0.9, beta2=0.999):
self.lr = lr
self.beta1 = beta1
self.beta2 = beta2
self.iter = 0
self.m = None
self.v = None
def update(self, params, grads):
if self.m is None:
self.m, self.v = {}, {}
for key, val in params.items():
self.m[key] = np.zeros_like(val)
self.v[key] = np.zeros_like(val)
self.iter += 1
lr_t = self.lr * np.sqrt(1.0 - self.beta2 ** self.iter) / (1.0 - self.beta1 ** self.iter)
for key in params.keys():
self.m[key] += (1 - self.beta1) * (grads[key] - self.m[key])
self.v[key] += (1 - self.beta2) * (grads[key] ** 2 - self.v[key])
params[key] -= lr_t * self.m[key] / (np.sqrt(self.v[key]) + 1e-7)
class AdaDelta:
"""AdaDelta Optimizer (http://arxiv.org/abs/1212.5701)"""
def __init__(self, rho=0.95, epsilon=1e-6):
self.rho = rho
self.epsilon = epsilon
self.s = None
self.d = None
def update(self, params, grads):
if self.s is None:
self.s = {}
self.d = {}
for key, val in params.items():
self.s[key] = np.zeros_like(val)
self.d[key] = np.zeros_like(val)
for key in params.keys():
self.s[key] = self.rho * self.s[key] + (1 - self.rho) * (grads[key] ** 2)
step = np.sqrt(self.d[key] + self.epsilon) / np.sqrt(self.s[key] + self.epsilon)
params[key] -= step * grads[key]
self.d[key] = self.rho * self.d[key] + (1 - self.rho) * (grads[key] ** 2)
# ----------------------------------------------------被优化函数----------------------------------------------------------
def f(x, y):
return x ** 2 / 20.0 + y ** 2
def df(x, y):
return x / 10.0, 2.0 * y
# ------------------------------------------------------实例化-----------------------------------------------------------
init_pos = (-7.0, 2.0)
params = {}
params['x'], params['y'] = init_pos[0], init_pos[1]
grads = {}
grads['x'], grads['y'] = 0, 0
optimizers = OrderedDict()
optimizers["SGD"] = SGD(lr=0.1)
optimizers["Momentum"] = Momentum(lr=0.1)
optimizers["AdaGrad"] = AdaGrad(lr=0.1)
optimizers["Adam"] = Adam(lr=0.1)
optimizers["Nesterov"] = Nesterov(lr=0.1)
optimizers["RMSprop"] = RMSprop(lr=0.1)
optimizers["AdaDelta"] = AdaDelta()
idx = 1
# -----------------------------------------------------迭代30次----------------------------------------------------------
for key in optimizers:
optimizer = optimizers[key]
x_history = []
y_history = []
params['x'], params['y'] = init_pos[0], init_pos[1]
for i in range(30):
x_history.append(params['x'])
y_history.append(params['y'])
grads['x'], grads['y'] = df(params['x'], params['y'])
optimizer.update(params, grads)
x = np.arange(-10, 10, 0.01)
y = np.arange(-5, 5, 0.01)
X, Y = np.meshgrid(x, y)
Z = f(X, Y)
# for simple contour line
mask = Z > 7
Z[mask] = 0
# ----------------------------------------------------可视化---------------------------------------------------------
plt.subplot(3, 3, idx)
idx += 1
plt.plot(x_history, y_history, 'o-', color="red")
plt.contour(X, Y, Z) # 绘制等高线
plt.ylim(-10, 10)
plt.xlim(-10, 10)
plt.plot(0, 0, '+')
plt.title(key)
plt.xlabel("x")
plt.ylabel("y")
plt.subplots_adjust(wspace=0, hspace=0) # 调整子图间距
plt.show()
2.分析不同轨迹的形成原因,分析各个算法的优缺点
(三)优化算法3D可视化(被优化函数为)
1.结合3D动画,用自己的语言,从轨迹、速度等多个角度讲解各个算法优缺点
import torch
import numpy as np
import copy
from matplotlib import pyplot as plt
from matplotlib import animation
from itertools import zip_longest
from matplotlib import cm
class Op(object):
def __init__(self):
pass
def __call__(self, inputs):
return self.forward(inputs)
# 输入:张量inputs
# 输出:张量outputs
def forward(self, inputs):
# return outputs
raise NotImplementedError
# 输入:最终输出对outputs的梯度outputs_grads
# 输出:最终输出对inputs的梯度inputs_grads
def backward(self, outputs_grads):
# return inputs_grads
raise NotImplementedError
class Optimizer(object): # 优化器基类
def __init__(self, init_lr, model):
"""
优化器类初始化
"""
# 初始化学习率,用于参数更新的计算
self.init_lr = init_lr
# 指定优化器需要优化的模型
self.model = model
def step(self):
"""
定义每次迭代如何更新参数
"""
pass
class SimpleBatchGD(Optimizer):
def __init__(self, init_lr, model):
super(SimpleBatchGD, self).__init__(init_lr=init_lr, model=model)
def step(self):
# 参数更新
if isinstance(self.model.params, dict):
for key in self.model.params.keys():
self.model.params[key] = self.model.params[key] - self.init_lr * self.model.grads[key]
class Adagrad(Optimizer):
def __init__(self, init_lr, model, epsilon):
"""
Adagrad 优化器初始化
输入:
- init_lr: 初始学习率 - model:模型,model.params存储模型参数值 - epsilon:保持数值稳定性而设置的非常小的常数
"""
super(Adagrad, self).__init__(init_lr=init_lr, model=model)
self.G = {}
for key in self.model.params.keys():
self.G[key] = 0
self.epsilon = epsilon
def adagrad(self, x, gradient_x, G, init_lr):
"""
adagrad算法更新参数,G为参数梯度平方的累计值。
"""
G += gradient_x ** 2
x -= init_lr / torch.sqrt(G + self.epsilon) * gradient_x
return x, G
def step(self):
"""
参数更新
"""
for key in self.model.params.keys():
self.model.params[key], self.G[key] = self.adagrad(self.model.params[key],
self.model.grads[key],
self.G[key],
self.init_lr)
class RMSprop(Optimizer):
def __init__(self, init_lr, model, beta, epsilon):
"""
RMSprop优化器初始化
输入:
- init_lr:初始学习率
- model:模型,model.params存储模型参数值
- beta:衰减率
- epsilon:保持数值稳定性而设置的常数
"""
super(RMSprop, self).__init__(init_lr=init_lr, model=model)
self.G = {}
for key in self.model.params.keys():
self.G[key] = 0
self.beta = beta
self.epsilon = epsilon
def rmsprop(self, x, gradient_x, G, init_lr):
"""
rmsprop算法更新参数,G为迭代梯度平方的加权移动平均
"""
G = self.beta * G + (1 - self.beta) * gradient_x ** 2
x -= init_lr / torch.sqrt(G + self.epsilon) * gradient_x
return x, G
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.G[key] = self.rmsprop(self.model.params[key],
self.model.grads[key],
self.G[key],
self.init_lr)
class Momentum(Optimizer):
def __init__(self, init_lr, model, rho):
"""
Momentum优化器初始化
输入:
- init_lr:初始学习率
- model:模型,model.params存储模型参数值
- rho:动量因子
"""
super(Momentum, self).__init__(init_lr=init_lr, model=model)
self.delta_x = {}
for key in self.model.params.keys():
self.delta_x[key] = 0
self.rho = rho
def momentum(self, x, gradient_x, delta_x, init_lr):
"""
momentum算法更新参数,delta_x为梯度的加权移动平均
"""
delta_x = self.rho * delta_x - init_lr * gradient_x
x += delta_x
return x, delta_x
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.delta_x[key] = self.momentum(self.model.params[key],
self.model.grads[key],
self.delta_x[key],
self.init_lr)
class Adam(Optimizer):
def __init__(self, init_lr, model, beta1, beta2, epsilon):
"""
Adam优化器初始化
输入:
- init_lr:初始学习率
- model:模型,model.params存储模型参数值
- beta1, beta2:移动平均的衰减率
- epsilon:保持数值稳定性而设置的常数
"""
super(Adam, self).__init__(init_lr=init_lr, model=model)
self.beta1 = beta1
self.beta2 = beta2
self.epsilon = epsilon
self.M, self.G = {}, {}
for key in self.model.params.keys():
self.M[key] = 0
self.G[key] = 0
self.t = 1
def adam(self, x, gradient_x, G, M, t, init_lr):
"""
adam算法更新参数
输入:
- x:参数
- G:梯度平方的加权移动平均
- M:梯度的加权移动平均
- t:迭代次数
- init_lr:初始学习率
"""
M = self.beta1 * M + (1 - self.beta1) * gradient_x
G = self.beta2 * G + (1 - self.beta2) * gradient_x ** 2
M_hat = M / (1 - self.beta1 ** t)
G_hat = G / (1 - self.beta2 ** t)
t += 1
x -= init_lr / torch.sqrt(G_hat + self.epsilon) * M_hat
return x, G, M, t
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.G[key], self.M[key], self.t = self.adam(self.model.params[key],
self.model.grads[key],
self.G[key],
self.M[key],
self.t,
self.init_lr)
class OptimizedFunction3D(Op):
def __init__(self):
super(OptimizedFunction3D, self).__init__()
self.params = {'x': 0}
self.grads = {'x': 0}
def forward(self, x):
self.params['x'] = x
return - x[0] * x[0] / 2 + x[1] * x[1] / 1 # x[0] ** 2 + x[1] ** 2 + x[1] ** 3 + x[0] * x[1]
def backward(self):
x = self.params['x']
gradient1 = - 2 * x[0] / 2
gradient2 = 2 * x[1] / 1
grad1 = torch.Tensor([gradient1])
grad2 = torch.Tensor([gradient2])
self.grads['x'] = torch.cat([grad1, grad2])
class Visualization3D(animation.FuncAnimation):
""" 绘制动态图像,可视化参数更新轨迹 """
def __init__(self, *xy_values, z_values, labels=[], colors=[], fig, ax, interval=100, blit=True, **kwargs):
"""
初始化3d可视化类
输入:
xy_values:三维中x,y维度的值
z_values:三维中z维度的值
labels:每个参数更新轨迹的标签
colors:每个轨迹的颜色
interval:帧之间的延迟(以毫秒为单位)
blit:是否优化绘图
"""
self.fig = fig
self.ax = ax
self.xy_values = xy_values
self.z_values = z_values
frames = max(xy_value.shape[0] for xy_value in xy_values)
# , marker = 'o'
self.lines = [ax.plot([], [], [], label=label, color=color, lw=2)[0]
for _, label, color in zip_longest(xy_values, labels, colors)]
print(self.lines)
super(Visualization3D, self).__init__(fig, self.animate, init_func=self.init_animation, frames=frames,
interval=interval, blit=blit, **kwargs)
def init_animation(self):
# 数值初始化
for line in self.lines:
line.set_data([], [])
# line.set_3d_properties(np.asarray([])) # 源程序中有这一行,加上会报错。 Edit by David 2022.12.4
return self.lines
def animate(self, i):
# 将x,y,z三个数据传入,绘制三维图像
for line, xy_value, z_value in zip(self.lines, self.xy_values, self.z_values):
line.set_data(xy_value[:i, 0], xy_value[:i, 1])
line.set_3d_properties(z_value[:i])
return self.lines
def train_f(model, optimizer, x_init, epoch):
x = x_init
all_x = []
losses = []
for i in range(epoch):
all_x.append(copy.deepcopy(x.numpy())) # 浅拷贝 改为 深拷贝, 否则List的原值会被改变。 Edit by David 2022.12.4.
loss = model(x)
losses.append(loss)
model.backward()
optimizer.step()
x = model.params['x']
return torch.Tensor(np.array(all_x)), losses
# 构建5个模型,分别配备不同的优化器
model1 = OptimizedFunction3D()
opt_gd = SimpleBatchGD(init_lr=0.05, model=model1)
model2 = OptimizedFunction3D()
opt_adagrad = Adagrad(init_lr=0.05, model=model2, epsilon=1e-7)
model3 = OptimizedFunction3D()
opt_rmsprop = RMSprop(init_lr=0.05, model=model3, beta=0.9, epsilon=1e-7)
model4 = OptimizedFunction3D()
opt_momentum = Momentum(init_lr=0.05, model=model4, rho=0.9)
model5 = OptimizedFunction3D()
opt_adam = Adam(init_lr=0.05, model=model5, beta1=0.9, beta2=0.99, epsilon=1e-7)
models = [model5, model2, model3, model4, model1]
opts = [opt_adam, opt_adagrad, opt_rmsprop, opt_momentum, opt_gd]
x_all_opts = []
z_all_opts = []
# 使用不同优化器训练
for model, opt in zip(models, opts):
x_init = torch.FloatTensor([0.00001, 0.5])
x_one_opt, z_one_opt = train_f(model, opt, x_init, 100) # epoch
# 保存参数值
x_all_opts.append(x_one_opt.numpy())
z_all_opts.append(np.squeeze(z_one_opt))
# 使用numpy.meshgrid生成x1,x2矩阵,矩阵的每一行为[-3, 3],以0.1为间隔的数值
x1 = np.arange(-1, 2, 0.01)
x2 = np.arange(-1, 1, 0.05)
x1, x2 = np.meshgrid(x1, x2)
init_x = torch.Tensor(np.array([x1, x2]))
model = OptimizedFunction3D()
# 绘制 f_3d函数 的 三维图像
fig = plt.figure()
ax = plt.axes(projection='3d')
X = init_x[0].numpy()
Y = init_x[1].numpy()
Z = model(init_x).numpy() # 改为 model(init_x).numpy() David 2022.12.4
surf = ax.plot_surface(X, Y, Z, edgecolor='grey', cmap=cm.coolwarm)
# fig.colorbar(surf, shrink=0.5, aspect=1)
ax.set_zlim(-3, 2)
ax.set_xlabel('x1')
ax.set_ylabel('x2')
ax.set_zlabel('f(x1,x2)')
labels = ['Adam', 'AdaGrad', 'RMSprop', 'Momentum', 'SGD']
colors = ['#8B0000', '#0000FF', '#000000', '#008B00', '#FF0000']
animator = Visualization3D(*x_all_opts, z_values=z_all_opts, labels=labels, colors=colors, fig=fig, ax=ax)
ax.legend(loc='upper right')
plt.show()
(四)复现cs231
1.结合3D动画,用自己的语言,从轨迹、速度等多个角度讲解各个算法优缺点
import torch
import numpy as np
import copy
from matplotlib import pyplot as plt
from matplotlib import animation
from itertools import zip_longest
from matplotlib import cm
class Op(object):
def __init__(self):
pass
def __call__(self, inputs):
return self.forward(inputs)
# 输入:张量inputs
# 输出:张量outputs
def forward(self, inputs):
# return outputs
raise NotImplementedError
# 输入:最终输出对outputs的梯度outputs_grads
# 输出:最终输出对inputs的梯度inputs_grads
def backward(self, outputs_grads):
# return inputs_grads
raise NotImplementedError
class Optimizer(object): # 优化器基类
def __init__(self, init_lr, model):
"""
优化器类初始化
"""
# 初始化学习率,用于参数更新的计算
self.init_lr = init_lr
# 指定优化器需要优化的模型
self.model = model
def step(self):
"""
定义每次迭代如何更新参数
"""
pass
class SimpleBatchGD(Optimizer):
def __init__(self, init_lr, model):
super(SimpleBatchGD, self).__init__(init_lr=init_lr, model=model)
def step(self):
# 参数更新
if isinstance(self.model.params, dict):
for key in self.model.params.keys():
self.model.params[key] = self.model.params[key] - self.init_lr * self.model.grads[key]
class Adagrad(Optimizer):
def __init__(self, init_lr, model, epsilon):
"""
Adagrad 优化器初始化
输入:
- init_lr: 初始学习率 - model:模型,model.params存储模型参数值 - epsilon:保持数值稳定性而设置的非常小的常数
"""
super(Adagrad, self).__init__(init_lr=init_lr, model=model)
self.G = {}
for key in self.model.params.keys():
self.G[key] = 0
self.epsilon = epsilon
def adagrad(self, x, gradient_x, G, init_lr):
"""
adagrad算法更新参数,G为参数梯度平方的累计值。
"""
G += gradient_x ** 2
x -= init_lr / torch.sqrt(G + self.epsilon) * gradient_x
return x, G
def step(self):
"""
参数更新
"""
for key in self.model.params.keys():
self.model.params[key], self.G[key] = self.adagrad(self.model.params[key],
self.model.grads[key],
self.G[key],
self.init_lr)
class RMSprop(Optimizer):
def __init__(self, init_lr, model, beta, epsilon):
"""
RMSprop优化器初始化
输入:
- init_lr:初始学习率
- model:模型,model.params存储模型参数值
- beta:衰减率
- epsilon:保持数值稳定性而设置的常数
"""
super(RMSprop, self).__init__(init_lr=init_lr, model=model)
self.G = {}
for key in self.model.params.keys():
self.G[key] = 0
self.beta = beta
self.epsilon = epsilon
def rmsprop(self, x, gradient_x, G, init_lr):
"""
rmsprop算法更新参数,G为迭代梯度平方的加权移动平均
"""
G = self.beta * G + (1 - self.beta) * gradient_x ** 2
x -= init_lr / torch.sqrt(G + self.epsilon) * gradient_x
return x, G
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.G[key] = self.rmsprop(self.model.params[key],
self.model.grads[key],
self.G[key],
self.init_lr)
class Momentum(Optimizer):
def __init__(self, init_lr, model, rho):
"""
Momentum优化器初始化
输入:
- init_lr:初始学习率
- model:模型,model.params存储模型参数值
- rho:动量因子
"""
super(Momentum, self).__init__(init_lr=init_lr, model=model)
self.delta_x = {}
for key in self.model.params.keys():
self.delta_x[key] = 0
self.rho = rho
def momentum(self, x, gradient_x, delta_x, init_lr):
"""
momentum算法更新参数,delta_x为梯度的加权移动平均
"""
delta_x = self.rho * delta_x - init_lr * gradient_x
x += delta_x
return x, delta_x
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.delta_x[key] = self.momentum(self.model.params[key],
self.model.grads[key],
self.delta_x[key],
self.init_lr)
class Adam(Optimizer):
def __init__(self, init_lr, model, beta1, beta2, epsilon):
"""
Adam优化器初始化
输入:
- init_lr:初始学习率
- model:模型,model.params存储模型参数值
- beta1, beta2:移动平均的衰减率
- epsilon:保持数值稳定性而设置的常数
"""
super(Adam, self).__init__(init_lr=init_lr, model=model)
self.beta1 = beta1
self.beta2 = beta2
self.epsilon = epsilon
self.M, self.G = {}, {}
for key in self.model.params.keys():
self.M[key] = 0
self.G[key] = 0
self.t = 1
def adam(self, x, gradient_x, G, M, t, init_lr):
"""
adam算法更新参数
输入:
- x:参数
- G:梯度平方的加权移动平均
- M:梯度的加权移动平均
- t:迭代次数
- init_lr:初始学习率
"""
M = self.beta1 * M + (1 - self.beta1) * gradient_x
G = self.beta2 * G + (1 - self.beta2) * gradient_x ** 2
M_hat = M / (1 - self.beta1 ** t)
G_hat = G / (1 - self.beta2 ** t)
t += 1
x -= init_lr / torch.sqrt(G_hat + self.epsilon) * M_hat
return x, G, M, t
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.G[key], self.M[key], self.t = self.adam(self.model.params[key],
self.model.grads[key],
self.G[key],
self.M[key],
self.t,
self.init_lr)
class AdaDelta(Optimizer):
def __init__(self, init_lr, model, rho=0.9, epsilon=1e-6):
super(AdaDelta, self).__init__(init_lr=init_lr, model=model)
self.rho = rho
self.epsilon = epsilon
self.Eg = {} # 累积梯度平方的加权平均
self.Ed = {} # 累积参数更新平方的加权平均
for key in self.model.params.keys():
self.Eg[key] = 0
self.Ed[key] = 0
def adadelta(self, x, gradient_x, Eg, Ed, rho, epsilon):
Eg = rho * Eg + (1 - rho) * gradient_x ** 2 # 更新梯度的平方的加权平均
step = np.sqrt(Ed + epsilon) / np.sqrt(Eg + epsilon) * gradient_x # 计算步长
x -= step # 更新参数
Ed = rho * Ed + (1 - rho) * step ** 2 # 更新参数更新的平方的加权平均
return x, Eg, Ed
def step(self):
"""参数更新"""
for key in self.model.params.keys():
self.model.params[key], self.Eg[key], self.Ed[key] = self.adadelta(self.model.params[key],
self.model.grads[key],
self.Eg[key],
self.Ed[key],
self.rho,
self.epsilon)
class Nesterov(Optimizer):
def __init__(self, init_lr, model, momentum=0.9):
super(Nesterov, self).__init__(init_lr=init_lr, model=model)
self.momentum = momentum
self.v = {} # 存储动量的累积值
for key in self.model.params.keys():
self.v[key] = 0
def nesterov(self, x, gradient_x, v, momentum, init_lr):
# 预测:基于动量调整参数
v_prev = v
v = momentum * v - init_lr * gradient_x
x += momentum * (v - v_prev) # 基于预测调整参数
return x, v
def step(self):
for key in self.model.params.keys():
self.model.params[key], self.v[key] = self.nesterov(self.model.params[key],
self.model.grads[key],
self.v[key],
self.momentum,
self.init_lr)
class OptimizedFunction3D(Op):
def __init__(self):
super(OptimizedFunction3D, self).__init__()
self.params = {'x': 0}
self.grads = {'x': 0}
def forward(self, x):
self.params['x'] = x
return - x[0] * x[0] / 2 + x[1] * x[1] / 1 # x[0] ** 2 + x[1] ** 2 + x[1] ** 3 + x[0] * x[1]
def backward(self):
x = self.params['x']
gradient1 = - 2 * x[0] / 2
gradient2 = 2 * x[1] / 1
grad1 = torch.Tensor([gradient1])
grad2 = torch.Tensor([gradient2])
self.grads['x'] = torch.cat([grad1, grad2])
class Visualization3D(animation.FuncAnimation):
""" 绘制动态图像,可视化参数更新轨迹 """
def __init__(self, *xy_values, z_values, labels=[], colors=[], fig, ax, interval=100, blit=True, **kwargs):
"""
初始化3d可视化类
输入:
xy_values:三维中x,y维度的值
z_values:三维中z维度的值
labels:每个参数更新轨迹的标签
colors:每个轨迹的颜色
interval:帧之间的延迟(以毫秒为单位)
blit:是否优化绘图
"""
self.fig = fig
self.ax = ax
self.xy_values = xy_values
self.z_values = z_values
frames = max(xy_value.shape[0] for xy_value in xy_values)
self.lines = [ax.plot([], [], [], label=label, color=color, lw=2)[0]
for _, label, color in zip_longest(xy_values, labels, colors)]
self.points = [ax.plot([], [], [], color=color, markeredgewidth=1, markeredgecolor='black', marker='o')[0]
for _, color in zip_longest(xy_values, colors)]
# print(self.lines)
super(Visualization3D, self).__init__(fig, self.animate, init_func=self.init_animation, frames=frames,
interval=interval, blit=blit, **kwargs)
def init_animation(self):
# 数值初始化
for line in self.lines:
line.set_data_3d([], [], [])
for point in self.points:
point.set_data_3d([], [], [])
return self.points + self.lines
def animate(self, i):
# 将x,y,z三个数据传入,绘制三维图像
for line, xy_value, z_value in zip(self.lines, self.xy_values, self.z_values):
line.set_data_3d(xy_value[:i, 0], xy_value[:i, 1], z_value[:i])
for point, xy_value, z_value in zip(self.points, self.xy_values, self.z_values):
point.set_data_3d([xy_value[i, 0]], [xy_value[i, 1]], [z_value[i]])
return self.points + self.lines
def train_f(model, optimizer, x_init, epoch):
x = x_init
all_x = []
losses = []
for i in range(epoch):
all_x.append(copy.deepcopy(x.numpy())) # 浅拷贝 改为 深拷贝, 否则List的原值会被改变。 Edit by David 2022.12.4.
loss = model(x)
losses.append(loss)
model.backward()
optimizer.step()
x = model.params['x']
return torch.Tensor(np.array(all_x)), losses
# 构建5个模型,分别配备不同的优化器
model1 = OptimizedFunction3D()
opt_gd = SimpleBatchGD(init_lr=0.05, model=model1)
model2 = OptimizedFunction3D()
opt_adagrad = Adagrad(init_lr=0.05, model=model2, epsilon=1e-7)
model3 = OptimizedFunction3D()
opt_rmsprop = RMSprop(init_lr=0.05, model=model3, beta=0.9, epsilon=1e-7)
model4 = OptimizedFunction3D()
opt_momentum = Momentum(init_lr=0.05, model=model4, rho=0.9)
model5 = OptimizedFunction3D()
opt_adam = Adam(init_lr=0.05, model=model5, beta1=0.9, beta2=0.99, epsilon=1e-7)
model6 = OptimizedFunction3D()
opt_adadelta = AdaDelta(init_lr=0.1, model=model6)
model7 = OptimizedFunction3D()
opt_nag = Nesterov(init_lr=0.1, model=model7)
models = [model1, model2, model3, model4, model5, model6, model7]
opts = [opt_gd, opt_adagrad, opt_rmsprop, opt_momentum, opt_adam, opt_adadelta, opt_nag]
x_all_opts = []
z_all_opts = []
# 使用不同优化器训练
for model, opt in zip(models, opts):
x_init = torch.FloatTensor([0.00001, 0.5])
x_one_opt, z_one_opt = train_f(model, opt, x_init, 100) # epoch
# 保存参数值
x_all_opts.append(x_one_opt.numpy())
z_all_opts.append(np.squeeze(z_one_opt))
# 使用numpy.meshgrid生成x1,x2矩阵,矩阵的每一行为[-3, 3],以0.1为间隔的数值
x1 = np.arange(-1, 2, 0.01)
x2 = np.arange(-1, 1, 0.05)
x1, x2 = np.meshgrid(x1, x2)
init_x = torch.Tensor(np.array([x1, x2]))
model = OptimizedFunction3D()
# 绘制 f_3d函数 的 三维图像
fig = plt.figure()
ax = plt.axes(projection='3d')
X = init_x[0].numpy()
Y = init_x[1].numpy()
Z = model(init_x).numpy() # 改为 model(init_x).numpy() David 2022.12.4
surf = ax.plot_surface(X, Y, Z, edgecolor='grey', cmap=cm.coolwarm)
# fig.colorbar(surf, shrink=0.5, aspect=1)
ax.set_zlim(-3, 2)
ax.set_xlabel('x1')
ax.set_ylabel('x2')
ax.set_zlabel('f(x1,x2)')
labels = ['SGD', 'AdaGrad', 'RMSprop', 'Momentum', 'Adam', 'AdaDelta', 'Nesterov']
colors = ['#8B0000', '#0000FF', '#000000', '#008B00', '#FF0000', '#4169E1', '#00BFFF']
animator = Visualization3D(*x_all_opts, z_values=z_all_opts, labels=labels, colors=colors, fig=fig, ax=ax)
ax.legend(loc='upper right')
plt.show()
# animator.save('teaser' + '.gif', writer='imagemagick',fps=10) # 效果不好,估计被挡住了…… 有待进一步提高 Edit by David 2022.12.4
# save不好用,不费劲了,安装个软件做gif https://pc.qq.com/detail/13/detail_23913.html
三 总结
SGD
轨迹:“之”字形
形成原因:因为在此函数中梯度的方向并没有指向最小值的方向,SGD只是单纯的朝着梯度方向,放弃了对梯度准确性的追求,会使得其在 函数的形状非均向(比如y方向变化很大时,x方向变化很小 ),能迂回往复地寻找,效率很低。
优点:简单易实现
缺点:收敛速度慢,特别是在凹谷形函数中;缺乏方向调整,导致路径震荡。
AdaGrad
轨迹:由于y轴方向上的梯度较大,因此刚开始变动较大,但是后面会根据这个较大的变动按比例进行调整,减小更新的步伐。因此y轴方向上的更新程度被减弱,“之”字形的变动程度有所衰减。函数的取值高效地向着最小值移动。
形成原因:动量法通过引入指数加权平均,积累历史梯度信息,减少了震荡。
缺点:随着迭代次数的增加,学习率逐渐缩小,经过一定次数的迭代没有找到最优点时,学习率已经非常小,很难再继续找到最优点。
RMSProp
轨迹:表现出平滑的收敛轨迹,路径集中且无明显震荡。
形成原因:RMSprop通过对梯度平方的指数加权平均动态调整学习率,避免了学习率过快下降。
优点:学习率调整更加合理。
缺点:可能陷入局部最优解
AdaDelta
轨迹: 相比SGD更稳定,震荡较小
形成原因:AdaDelta对RMSprop进行了改进,通过限制学习率的范围,进一步平滑了更新过程。
动量法
轨迹:相比SGD,震荡幅度明显减小,更接近目标点,整体轨迹更平滑
形成原因:动量法通过引入指数加权平均,积累历史梯度信息,减少了震荡。
优点:提升收敛速度,减小震荡
Adam算法
轨迹:整体轨迹平滑且快速接近目标点,震荡很少。、
形成原因:Adam结合了动量法和RMSprop的优势,动态调整学习率,并平滑了梯度方向
优点:收敛速度快,适应性强。
本次实验中,首先设置了一个2D可视化实验,就是选择一个二维空间的凸函数,然后用不同的优化算法来寻找最优解(对于这个函数是函数值为0的点),并可视化梯度下降的过程。
SGD在寻找最优解时呈现一种“之”字形路线。AdaGrad算法路线基本上就是朝着最优点走,但是后期点很密,说明步长比较小,走的比较慢,结果显示也没有走到最优点。RMSProp算法就是在AdaGrad上的改进,成功的走到了最优点,因为学习率不是单调减小的。动量法也是在SGD基础上的改进,实验结果显示训练初期,速度比较快,是因为初期梯度方向比较一致,参数更新幅度增大。但是在训练后期,参数更新幅度就减小了,在收敛处振荡。Adam算法可以自适应调整学习率,参数更新更加平稳。
同时对两种函数进行3D可视化,一个是
,一个是,并观察了实验结果,观察他们寻找最优点的过程。
对于第一个函数,从输出结果看,对于构建的函数,Momentum在参数更新时成功逃离鞍点,其他优化器在本次实验中收敛到鞍点处没有成功逃离。同时也可以看到RMSProp的迭代速度比AdaGrad算法的迭代速度快,Adam比RMSProp还要快,SGD更新速度最慢。
对于第二个函数,SGD依然是“之”字形,走的很慢。Adagrad是SGD的改进,“之”字形改进了,一开始走的还比较快,但是到后面走的就比较慢了。RMSProp是在Adagrad上的改进,走的非常快,因为其学习率不是单调下降的。动量法一开始非常慢,并且走的方向也不对,可能是开始一段时间内,梯度方向不一致,所以动量法会起到减速作用,增加稳定性,减少震荡。但是到了后面一段时间,找到了正确方向,梯度方向一致,动量法起到加速作用,可以更块到达最优点。Adam结合了RMSProp和动量法,表现也非常好,路线平稳,速度也比较快。这里面最快的是RMSProp。
参考:NNDL实验 优化算法3D轨迹 复现cs231经典动画_优化算法寻优过程3d-CSDN博客
【NNDL 作业】优化算法比较 增加 RMSprop、Nesterov_rmsprop加入nesterov momentum python代码实现-CSDN博客
NNDL 实验八 网络优化与正则化(3)不同优化算法比较_不同优化算法绘图比较-CSDN博客