Encoder-Decoder 编码器-解码器架构 (Seq2Seq Model)
Encoder - Decoder
- Encoder编码器使用长度可变的序列作为输入,将其转换为固定的隐状态
- Decoder解码器基于输入通过编码器得到的编码信息和当前输出能看到的(已经生成)的词元来预测下一个词元
- 在编码器中使用< bos >作为序列开始的词元,在训练过程中使用Teacher Force策略进行训练即基于正确的输入进行训练
- Encoder编码器最终的隐状态用于初始化解码器的隐状态(Seq2Seq做法),在其他一些设计中,编码器最终的隐状态在每个时间步都作为解码器的输入序列的一部分。
nn.rnn()
self.rnn = nn.GRU(embed_size, num_hiddens, num_layers,dropout=dropout)
output, state = self.rnn(X)
- 其中返回的output和state分别是什么呢
- output 返回RNN在每个时间步的隐藏状态序列,其大小是**(num_steps,batch_size,num_hiddens)**
- state返回RNN最后一个时间步的隐藏状态,其大小是**(num_layers,batch_size,num_hiddens)**
在Encoder链接到Deocder中
- Encode的Output用于初始化
Encoder
class Encoder(nn.Module):
def __init__(self, **kwargs):
super(Encoder, self).__init__(**kwargs)
def forward(self, X, *args):
raise NotImplementedError
class Seq2SeqEncoder(Encoder):
"""用于序列到序列学习的循环神经网络编码器"""
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
dropout=0, **kwargs):
super(Seq2SeqEncoder, self).__init__(**kwargs)
# 嵌入层
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size, num_hiddens, num_layers,
dropout=dropout)
def forward(self, X, *args):
# 输出'X'的形状:(batch_size,num_steps,embed_size)
X = self.embedding(X)
# 在循环神经网络模型中,第一个轴对应于时间步
X = X.permute(1, 0, 2)
# 如果未提及状态,则默认为0
output, state = self.rnn(X)
# output的形状:(num_steps,batch_size,num_hiddens)
# state的形状:(num_layers,batch_size,num_hiddens)
return output, state
Input X: (batch_size,num_steps,vocab_size)
X = self.embedding(X) : (batch_size,num_steps,embed_size)
X = X.permute(1, 0, 2): (num_steps,batch_size,embed_size)
output, state = self.rnn(X):
output的形状:(num_steps,batch_size,num_hiddens)
state的形状:(num_layers,batch_size,num_hiddens)
# 实例化 需要定义vocab_size embed_size num_hiddens num_layers
encoder = Seq2SeqEncoder(vocab_size=10, embed_size=8, num_hiddens=16,num_layers=2)
Decoder
class Decoder(nn.Module):
"""编码器-解码器架构的基本解码器接口"""
def __init__(self, **kwargs):
super(Decoder, self).__init__(**kwargs)
def init_state(self, enc_outputs, *args):
raise NotImplementedError
def forward(self, X, state):
raise NotImplementedError
class Seq2SeqDecoder(d2l.Decoder):
"""用于序列到序列学习的循环神经网络解码器"""
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
dropout=0, **kwargs):
super(Seq2SeqDecoder, self).__init__(**kwargs)
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers,
dropout=dropout)
self.dense = nn.Linear(num_hiddens, vocab_size)
def init_state(self, enc_outputs, *args):
# 这里的enc_outputs包括outputs和state这里使用state用于初始化Decoder
# 也就是不仅仅使用state作为Decoder的初始化,还是用State的最后一层作为Context
return enc_outputs[1]
def forward(self, X, state):
# 输出'X'的形状:(batch_size,num_steps,embed_size)
X = self.embedding(X).permute(1, 0, 2)
# 广播context,使其具有与X相同的num_steps
# context取自encoder输出的state的最后一层
context = state[-1].repeat(X.shape[0], 1, 1)
X_and_context = torch.cat((X, context), 2)
output, state = self.rnn(X_and_context, state)
output = self.dense(output).permute(1, 0, 2)
# output的形状:(batch_size,num_steps,vocab_size)
# state的形状:(num_layers,batch_size,num_hiddens)
return output, state
- 使用encoder输出的State的最后一层作为Context上下文信息和输入并在一起
input X : (batch_size,num_steps,vocab_size)
context = state[-1].repeat(X.shape[0], 1, 1): context是取到的上下文
X_and_context = torch.cat((X, context), 2)
output, state = self.rnn(X_and_context, state)
output的形状:(batch_size,num_steps,num_hiddens)
state的形状:(num_layers,batch_size,num_hiddens)
decoder = Seq2SeqDecoder(vocab_size=10, embed_size=8, num_hiddens=16,num_layers=2)
Encoder-Decoder
class EncoderDecoder(nn.Module):
"""编码器-解码器架构的基类"""
def __init__(self, encoder, decoder, **kwargs):
super(EncoderDecoder, self).__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
def forward(self, enc_X, dec_X, *args):
enc_outputs = self.encoder(enc_X, *args)
dec_state = self.decoder.init_state(enc_outputs, *args)
return self.decoder(dec_X, dec_state)
- 通过Encoder然后输出
- 使用Encoder的State初始化Decoder
- 通过Decoder然后输出
训练代码
embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 300, d2l.try_gpu()
train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)
encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers,
dropout)
decoder = Seq2SeqDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers,
dropout)
net = EncoderDecoder(encoder, decoder)
train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
- train_seq2seq是这样实现的:
#@save
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
"""训练序列到序列模型"""
def xavier_init_weights(m):
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)
if type(m) == nn.GRU:
for param in m._flat_weights_names:
if "weight" in param:
nn.init.xavier_uniform_(m._parameters[param])
net.apply(xavier_init_weights)
net.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
loss = MaskedSoftmaxCELoss() # 在这里使用MaskedSoftmaxCELoss比较特殊
net.train()
for epoch in range(num_epochs):
timer = Timer()
metric = Accumulator(2) # 训练损失总和,词元数量
for batch in data_iter:
optimizer.zero_grad()
X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]
# 强制教学:使用<bos>接上正确答案,作为dec_input的输入
bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],
device=device).reshape(-1, 1)
dec_input = torch.cat([bos, Y[:, :-1]], 1) # 强制教学
# 将预测出来的Y_hat和Y进行对比
Y_hat, _ = net(X, dec_input, X_valid_len)
l = loss(Y_hat, Y, Y_valid_len)
l.sum().backward() # 损失函数的标量进行“反向传播”
grad_clipping(net, 1)
num_tokens = Y_valid_len.sum()
optimizer.step()
with torch.no_grad():
metric.add(l.sum(), num_tokens)
print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '
f'tokens/sec on {str(device)}')