NLP从零开始------16.文本中阶处理之序列到序列模型(1)
1. 序列到序列模型简介
序列到序列( sequence to sequence, seq2seq) 是指输入和输出各为一个序列(如一句话) 的任务。本节将输入序列称作源序列,输出序列称作目标序列。序列到序列有非常多的重要应用, 其中最有名的是机器翻译( machine translation), 机器翻译模型的输入是待翻译语言(源语言) 的文本,输出则是翻译后的语言(目标语言) 的文本。
此外, 序列到序列的应用还有: 改写( paraphrase), 即将输入文本保留原意, 用意思相近的词进行重写; 风格迁移( style transfer), 即转换输入文本的风格(如口语转书面语、负面评价改为正面评价、现代文改为文言文等); 文本摘要( summarization), 即将较长的文本总结为简短精练的短文本; 问答( question answering), 即为用户输入的问题提供回答;对话( dialog),即对用户的输入进行回应。这些都是自然语言处理中非常重要的任务。另外,还有许多任务,尽管它们并非典型的序列到序列任务,但也可以使用序列到序列的方法解决,例如后面章节将要提到的命名实体识别任务,即识别输入句子中的人名、地名等实体,既可以使用后面章节将要介绍的序列标注方法来解决,也可以使用序列到序列方法解决,举例如下:
源序列: 小红在上海旅游。
目标序列:[小红|人名]在[上海|地名]旅游。
类似地,后续章节将要介绍的成分句法分析( constituency parsing)、语义角色标注( semantic role labeling, SRL)、共指消解( coreference resolution) 等任务也都可以使用序列到序列的方法解决。值得一提的是,虽然前面提到的这些任务可以利用序列到序列的方式解决,但是许多情况下效果不如其最常用的方式(如利用序列标注方法解决命名实体识别)。
对于像机器翻译这一类经典的序列到序列任务,采用基于神经网络的方法具有非常大的优势。在早期的相关研究被提出后不久,基于神经网络序列到序列的机器翻译模型效果就迅速提升并超过了更为传统的统计机器翻译模型,成为主流的机器翻译方案。因此,本节主要介绍基于神经网络的序列到序列方法,包括模型、学习和解码。随后介绍序列到序列模型中常用的指针网络与拷贝机制。最后介绍序列到序列任务的一些延伸和扩展。
2. 基于神经网络的序列到序列模型
序列到序列模型与上个章节介绍的语言模型十分类似,都需要在已有文字序列的基础上预测下一个词的概率分布。其区别是,语言模型只需建模一个序列, 而序列到序列模型需要建模两个序列,因此需要包含两个模块:一个编码器用于处理源序列,一个解码器用于生成目标序列。本小节将依次介绍基于循环神经网络、注意力机制以及 Transformer的序列到序列模型。
2.1 循环神经网络
如下图所示, 基于循环神经网络(包括长短期记忆等变体)的序列到序列模型与前几个章节介绍的循环神经网络非常相似,但是按输入不同分成了编码器、解码器两部分, 其中编码器依次接收源序列的词,但不计算任何输出。编码器最后一步的隐状态成为解码器的初始隐状态,这个隐状态向量有时称作上下文向量( context vector),它编码了整个源序列的信息。解码器在第一步接收特殊符号“< sos>”作为目标序列的起始符, 并预测第一个词的概率分布,从中解码出第一个词(解码方法将在下面讨论);随后将第一个词作为下一步的输入, 继续解码第二个词,以此类推, 直到最后解码出终止符“< oos>”,意味着目标序列已解码完毕。这种方式即前面所介绍的自回归过程。
下面介绍基于循环神经网络的编码器和解码器的代码实现。首先是作为编码器的循环神经网络。
import torch
import torch.nn as nn
class RNNEncoder(nn.Module):
def __init__(self, vocab_size, hidden_size):
super(RNNEncoder, self).__init__()
# 隐层大小
self.hidden_size = hidden_size
# 词表大小
self.vocab_size = vocab_size
# 词嵌入层
self.embedding = nn.Embedding(self.vocab_size,\
self.hidden_size)
self.gru = nn.GRU(self.hidden_size, self.hidden_size,\
batch_first=True)
def forward(self, inputs):
# inputs: batch * seq_len
# 注意门控循环单元使用batch_first=True,因此输入需要至少batch为1
features = self.embedding(inputs)
output, hidden = self.gru(features)
return output, hidden
接下来是作为解码器的另一个循环神经网络的代码实现。
class RNNDecoder(nn.Module):
def __init__(self, vocab_size, hidden_size):
super(RNNDecoder, self).__init__()
self.hidden_size = hidden_size
self.vocab_size = vocab_size
# 序列到序列任务并不限制编码器和解码器输入同一种语言,
# 因此解码器也需要定义一个嵌入层
self.embedding = nn.Embedding(self.vocab_size, self.hidden_size)
self.gru = nn.GRU(self.hidden_size, self.hidden_size,\
batch_first=True)
# 用于将输出的隐状态映射为词表上的分布
self.linear = nn.Linear(self.hidden_size, self.vocab_size)
# 解码整个序列
def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
batch_size = encoder_outputs.size(0)
# 从<sos>开始解码
decoder_input = torch.empty(batch_size, 1,\
dtype=torch.long).fill_(SOS_token)
decoder_hidden = encoder_hidden
decoder_outputs = []
# 如果目标序列确定,最大解码步数确定;
# 如果目标序列不确定,解码到最大长度
if target_tensor is not None:
seq_length = target_tensor.size(1)
else:
seq_length = MAX_LENGTH
# 进行seq_length次解码
for i in range(seq_length):
# 每次输入一个词和一个隐状态
decoder_output, decoder_hidden = self.forward_step(\
decoder_input, decoder_hidden)
decoder_outputs.append(decoder_output)
if target_tensor is not None:
# teacher forcing: 使用真实目标序列作为下一步的输入
decoder_input = target_tensor[:, i].unsqueeze(1)
else:
# 从当前步的输出概率分布中选取概率最大的预测结果
# 作为下一步的输入
_, topi = decoder_output.topk(1)
# 使用detach从当前计算图中分离,避免回传梯度
decoder_input = topi.squeeze(-1).detach()
decoder_outputs = torch.cat(decoder_outputs, dim=1)
decoder_outputs = F.log_softmax(decoder_outputs, dim=-1)
# 为了与AttnRNNDecoder接口保持统一,最后输出None
return decoder_outputs, decoder_hidden, None
# 解码一步
def forward_step(self, input, hidden):
output = self.embedding(input)
output = F.relu(output)
output, hidden = self.gru(output, hidden)
output = self.out(output)
return output, hidden
2.2 注意力机制
在序列到序列循环神经网络上加入注意力机制的方式同样与上一章节介绍的方式非常相似,区别在于,注意力机制在这里仅用于解码时建模从目标序列到源序列的依赖关系。具体而言,在解码的每一步,将解码器输出的隐状态特征作为查询,将编码器计算的源序列中每个元素的隐状态特征作为键和值,从而计算注意力输出向量; 这个输出向量会与解码器当前步骤的隐状态特征一起用于预测目标序列的下一个元素。
序列到序列中的注意力机制使得解码器能够直接“看到”源序列,而不再仅依赖循环神经网络的隐状态传递源序列的信息。此外,注意力机制提供了一种类似于人类处理此类任务时的序列到序列机制。人类在进行像翻译这样的序列到序列任务时,常常会边看源句边进行翻译,而不是一次性读完源句之后记住它再翻译, 而注意力机制模仿了这个过程。最后,注意力机制为序列到序列模型提供了一些可解释性: 通过观察注意力分布,可以知道解码器生成每个词时在注意源句中的哪些词,这可以看作源句和目标句之间的一种“软性”对齐。
下面介绍基于注意力机制的循环神经网络解码器的代码实现。我们使用一个注意力层来计算注意力权重,其输入为解码器的输入和隐状态。这里使用 Bahdanau注意力( Bahdanau attention) , 这是序列到序列模型中应用最广泛的注意力机制,特别是对于机器翻译任务。该注意力机制使用一个对齐模型( alignment model) 来计算编码器和解码器隐状态之间的注意力分数,具体来讲就是一个前馈神经网络。相比于点乘注意力, Bahdanau注意力利用了非线性变换。
import torch.nn.functional as F
class BahdanauAttention(nn.Module):
def __init__(self, hidden_size):
super(BahdanauAttention, self).__init__()
self.Wa = nn.Linear(hidden_size, hidden_size)
self.Ua = nn.Linear(hidden_size, hidden_size)
self.Va = nn.Linear(hidden_size, 1)
def forward(self, query, keys):
# query: batch * 1 * hidden_size
# keys: batch * seq_length * hidden_size
# 这一步用到了广播(broadcast)机制
scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys)))
scores = scores.squeeze(2).unsqueeze(1)
weights = F.softmax(scores, dim=-1)
context = torch.bmm(weights, keys)
return context, weights
class AttnRNNDecoder(nn.Module):
def __init__(self, vocab_size, hidden_size):
super(AttnRNNDecoder, self).__init__()
self.hidden_size = hidden_size
self.vocab_size = vocab_size
self.embedding = nn.Embedding(self.vocab_size, self.hidden_size)
self.attention = BahdanauAttention(hidden_size)
# 输入来自解码器输入和上下文向量,因此输入大小为2 * hidden_size
self.gru = nn.GRU(2 * self.hidden_size, self.hidden_size,\
batch_first=True)
# 用于将注意力的结果映射为词表上的分布
self.out = nn.Linear(self.hidden_size, self.vocab_size)
# 解码整个序列
def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
batch_size = encoder_outputs.size(0)
# 从<sos>开始解码
decoder_input = torch.empty(batch_size, 1, dtype=\
torch.long).fill_(SOS_token)
decoder_hidden = encoder_hidden
decoder_outputs = []
attentions = []
# 如果目标序列确定,最大解码步数确定;
# 如果目标序列不确定,解码到最大长度
if target_tensor is not None:
seq_length = target_tensor.size(1)
else:
seq_length = MAX_LENGTH
# 进行seq_length次解码
for i in range(seq_length):
# 每次输入一个词和一个隐状态
decoder_output, decoder_hidden, attn_weights = \
self.forward_step(
decoder_input, decoder_hidden, encoder_outputs
)
decoder_outputs.append(decoder_output)
attentions.append(attn_weights)
if target_tensor is not None:
# teacher forcing: 使用真实目标序列作为下一步的输入
decoder_input = target_tensor[:, i].unsqueeze(1)
else:
# 从当前步的输出概率分布中选取概率最大的预测结果
# 作为下一步的输入
_, topi = decoder_output.topk(1)
# 使用detach从当前计算图中分离,避免回传梯度
decoder_input = topi.squeeze(-1).detach()
decoder_outputs = torch.cat(decoder_outputs, dim=1)
decoder_outputs = F.log_softmax(decoder_outputs, dim=-1)
attentions = torch.cat(attentions, dim=1)
# 与RNNDecoder接口保持统一,最后输出注意力权重
return decoder_outputs, decoder_hidden, attentions
# 解码一步
def forward_step(self, input, hidden, encoder_outputs):
embeded = self.embedding(input)
# 输出的隐状态为1 * batch * hidden_size,
# 注意力的输入需要batch * 1 * hidden_size
query = hidden.permute(1, 0, 2)
context, attn_weights = self.attention(query, encoder_outputs)
input_gru = torch.cat((embeded, context), dim=2)
# 输入的隐状态需要1 * batch * hidden_size
output, hidden = self.gru(input_gru, hidden)
output = self.out(output)
return output, hidden, attn_weights
2.3 Transformer
Transformer模型同样也可以用于序列到序列任务。编码器与上一章介绍的 Transformer结构几乎相同,仅有两方面区别。一方面,由于不需要像语言模型那样每一步只能看到前置序列,而是需要看到完整的句子, 因此掩码多头自注意力模块中去除了注意力掩码。另一方面,由于编码器不需要输出,因此去掉了顶层的线性分类器。
解码器同样与上一章介绍的 Transformer结构几乎相同,但在掩码多头自注意力模块之后增加了一个交叉多头注意力模块,以便在解码时引入编码器所计算的源序列的信息。交叉注意力模块的设计与上面介绍的循环神经网络上的注意力机制是类似的。具体而言,交叉注意力模块使用解码器中自注意力模块的输出计算查询,使用编码器顶端的输出计算键和值,不使用任何注意力掩码, 其他部分与自注意力模块一样。
基于 Transformer的序列到序列模型通常也使用自回归的方式进行解码, 但 Transformer不同位置之间的并行性,使得非自回归方式的解码成为可能。非自回归解码器的结构与自回归解码器类似,但解码时需要先预测目标句的长度,将该长度对应个数的特殊符号作为输入,此外自注意力模块不需要掩码,所有位置的计算并行执行。有关具体细节这里不再展开。
接下来我们复用上一章的代码,实现基于 Transformer的编码器和解码器。
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import sys
sys.path.append('./code')
from transformer import *
class TransformerEncoder(nn.Module):
def __init__(self, vocab_size, max_len, hidden_size, num_heads,\
dropout, intermediate_size):
super().__init__()
self.embedding_layer = EmbeddingLayer(vocab_size, max_len,\
hidden_size)
# 直接使用TransformerLayer作为编码层,简单起见只使用一层
self.layer = TransformerLayer(hidden_size, num_heads,\
dropout, intermediate_size)
# 与TransformerLM不同,编码器不需要线性层用于输出
def forward(self, input_ids):
# 这里实现的forward()函数一次只能处理一句话,
# 如果想要支持批次运算,需要根据输入序列的长度返回隐状态
assert input_ids.ndim == 2 and input_ids.size(0) == 1
seq_len = input_ids.size(1)
assert seq_len <= self.embedding_layer.max_len
# 1 * seq_len
pos_ids = torch.unsqueeze(torch.arange(seq_len), dim=0)
attention_mask = torch.ones((1, seq_len), dtype=torch.int32)
input_states = self.embedding_layer(input_ids, pos_ids)
hidden_states = self.layer(input_states, attention_mask)
return hidden_states, attention_mask
class MultiHeadCrossAttention(MultiHeadSelfAttention):
def forward(self, tgt, tgt_mask, src, src_mask):
"""
tgt: query, batch_size * tgt_seq_len * hidden_size
tgt_mask: batch_size * tgt_seq_len
src: keys/values, batch_size * src_seq_len * hidden_size
src_mask: batch_size * src_seq_len
"""
# (batch_size * num_heads) * seq_len * (hidden_size / num_heads)
queries = self.transpose_qkv(self.W_q(tgt))
keys = self.transpose_qkv(self.W_k(src))
values = self.transpose_qkv(self.W_v(src))
# 这一步与自注意力不同,计算交叉掩码
# batch_size * tgt_seq_len * src_seq_len
attention_mask = tgt_mask.unsqueeze(2) * src_mask.unsqueeze(1)
# 重复张量的元素,用以支持多个注意力头的运算
# (batch_size * num_heads) * tgt_seq_len * src_seq_len
attention_mask = torch.repeat_interleave(attention_mask,\
repeats=self.num_heads, dim=0)
# (batch_size * num_heads) * tgt_seq_len * \
# (hidden_size / num_heads)
output = self.attention(queries, keys, values, attention_mask)
# batch * tgt_seq_len * hidden_size
output_concat = self.transpose_output(output)
return self.W_o(output_concat)
# TransformerDecoderLayer比TransformerLayer多了交叉多头注意力
class TransformerDecoderLayer(nn.Module):
def __init__(self, hidden_size, num_heads, dropout,\
intermediate_size):
super().__init__()
self.self_attention = MultiHeadSelfAttention(hidden_size,\
num_heads, dropout)
self.add_norm1 = AddNorm(hidden_size, dropout)
self.enc_attention = MultiHeadCrossAttention(hidden_size,\
num_heads, dropout)
self.add_norm2 = AddNorm(hidden_size, dropout)
self.fnn = PositionWiseFNN(hidden_size, intermediate_size)
self.add_norm3 = AddNorm(hidden_size, dropout)
def forward(self, src_states, src_mask, tgt_states, tgt_mask):
# 掩码多头自注意力
tgt = self.add_norm1(tgt_states, self.self_attention(\
tgt_states, tgt_states, tgt_states, tgt_mask))
# 交叉多头自注意力
tgt = self.add_norm2(tgt, self.enc_attention(tgt,\
tgt_mask, src_states, src_mask))
# 前馈神经网络
return self.add_norm3(tgt, self.fnn(tgt))
class TransformerDecoder(nn.Module):
def __init__(self, vocab_size, max_len, hidden_size, num_heads,\
dropout, intermediate_size):
super().__init__()
self.embedding_layer = EmbeddingLayer(vocab_size, max_len,\
hidden_size)
# 简单起见只使用一层
self.layer = TransformerDecoderLayer(hidden_size, num_heads,\
dropout, intermediate_size)
# 解码器与TransformerLM一样,需要输出层
self.output_layer = nn.Linear(hidden_size, vocab_size)
def forward(self, src_states, src_mask, tgt_tensor=None):
# 确保一次只输入一句话,形状为1 * seq_len * hidden_size
assert src_states.ndim == 3 and src_states.size(0) == 1
if tgt_tensor is not None:
# 确保一次只输入一句话,形状为1 * seq_len
assert tgt_tensor.ndim == 2 and tgt_tensor.size(0) == 1
seq_len = tgt_tensor.size(1)
assert seq_len <= self.embedding_layer.max_len
else:
seq_len = self.embedding_layer.max_len
decoder_input = torch.empty(1, 1, dtype=torch.long).\
fill_(SOS_token)
decoder_outputs = []
for i in range(seq_len):
decoder_output = self.forward_step(decoder_input,\
src_mask, src_states)
decoder_outputs.append(decoder_output)
if tgt_tensor is not None:
# teacher forcing: 使用真实目标序列作为下一步的输入
decoder_input = torch.cat((decoder_input,\
tgt_tensor[:, i:i+1]), 1)
else:
# 从当前步的输出概率分布中选取概率最大的预测结果
# 作为下一步的输入
_, topi = decoder_output.topk(1)
# 使用detach从当前计算图中分离,避免回传梯度
decoder_input = torch.cat((decoder_input,\
topi.squeeze(-1).detach()), 1)
decoder_outputs = torch.cat(decoder_outputs, dim=1)
decoder_outputs = F.log_softmax(decoder_outputs, dim=-1)
# 与RNNDecoder接口保持统一
return decoder_outputs, None, None
# 解码一步,与RNNDecoder接口略有不同,RNNDecoder一次输入
# 一个隐状态和一个词,输出一个分布、一个隐状态
# TransformerDecoder不需要输入隐状态,
# 输入整个目标端历史输入序列,输出一个分布,不输出隐状态
def forward_step(self, tgt_inputs, src_mask, src_states):
seq_len = tgt_inputs.size(1)
# 1 * seq_len
pos_ids = torch.unsqueeze(torch.arange(seq_len), dim=0)
tgt_mask = torch.ones((1, seq_len), dtype=torch.int32)
tgt_states = self.embedding_layer(tgt_inputs, pos_ids)
hidden_states = self.layer(src_states, src_mask, tgt_states,\
tgt_mask)
output = self.output_layer(hidden_states[:, -1:, :])
return output