Pytorch实现Transformer
实现add&Normalize
首先实现这个部分是因为不论解码器还是编码器都需要这一部分。
首先我们假设归一化(LayerNorm)已经做好了。
class SublayerConnection(nn.Module):
def __init__(self,size,dropout=0.1):
super(SublayerConnection,self).__init__()
self.layer_norm = LayerNorm(size)
# 假设LayerNorm已经做好,做一个dropout防止过拟合
self.dropout = nn.Dropout(p=dropout)
def forward(self,x,sublayer):
# x+sublayer(x)就是add,sublayer就是上一步的操作
return self.dropout(self.layer_norm(x+sublayer(x)))
因为我们不确定是Encoder还是Decoder,所以用sublayer传入来作为上一层。
接下来实现LayerNorm
class LayerNorm(nn.Module):
def __init__(self,x_size,eps=1e-6):
super(LayerNorm,self).__init__()
self.a_2 = nn.Parameter(torch.ones(x_size))
self.b_2 = nn.Parameter(torch.zeros(x_size))
self.eps = eps
def forward(self,x):
mean = x.mean(-1,keepdim = True)
std = x.std(-1,keepdim = True)
return self.a_2*(x-mean)/(math.sqrt(std)+self.eps)+self.b_2
这一串代码很简单,就是公式的实现而已,注意要把a_2和b_2作为可以学习的变量。
实现多头注意力机制和自注意力机制
自注意力机制
def self_attention(query,key,value,dropout = None,mask = None):
d_k = query.size(-1)
sores = np.matmul(query,key.transpose(-2,-1))/math.sqrt(d_k)
attn_softmax = F.softmax(sores,dim=-1)
if dropout is not None:
attn_softmax = dropout(attn_softmax)
return np.matmul(attn_softmax,value),attn_softmax
实现QKV的相乘,除以 d k \sqrt{d_k} dk为了便于softmax得出的概率差距不会太极端。
多头自注意力机制
class MultiHeadattention(nn.Module):
def __init__(self,head,d_model,dropout = 0.1):
super(MultiHeadattention,self).__init__()
assert(d_model % head == 0)
self.d_k = d_model//head
self.head = head
self.d_model = d_model
self.linear_quary = nn.Linear(d_model,d_model)
self.linear_key = nn.Linear(d_model,d_model)
self.linear_value = nn.Linear(d_model,d_model)
self.linear_out = nn.Linear(d_model,d_model)
self.dropout = nn.Dropout(p=dropout)
self.attn = None
def forward(self,quary,key,value,mask = None):
n_batch = quary.size(0)
quary = self.linear_quary(quary).view(n_batch,-1,self.head,self.d_k).transpose(1,2)
key = self.linear_key(key).view(n_batch,-1,self.head,self.d_k).transpose(1,2)
value = self.linear_value(value).view(n_batch,-1,self.head,self.d_k).transpose(1,2)
x,self.attn = self_attention(quary,key,value,dropout=self.dropout,mask=mask)
x = x.transpose(1,2).contiguous().view(n_batch,-1,self.head*self.d_k)
return self.linear_out(x)
还有掩码自注意力需要添加,此时没有添加,等讲解到掩码的时候添加。
QKV实现分割并行计算之后,对x进行拼接重塑维度,做一个线性变换输出x。
位置编码
class PositionalEncoding(nn.Module):
def __init__(self,dim,dropout,max_len=5000):
super(PositionalEncoding,self).__init__()
if dim%2 != 0:
raise ValueError("Can't use sin/cos positional encoding with"
"odd dim (got dim={:d})".format(dim))
'''
维度必须是偶数,这个if就是如果维度是奇数的情况下跳出异常,
只有维度是偶数的情况下才能都有对称性,确保每一个位置都有一个sin一个cos
能够提供更完整的位置属性
'''
pe = torch.zeros(max_len,dim)
position = torch.arange(0,max_len).unsqueeze(1)
div_term = torch.exp((torch.arange(0,dim,2,dtype=torch.float)*-(math.log(10000.0)/dim)))
pe[:,1::2] = torch.sin(position.float() * div_term)
pe[:,0::2] = torch.cos(position.float() * div_term)
pe = pe.unsqueeze(1)
self.register_buffer('pe',pe)
# 创建pe的时候用的zeros使得pe是一个可学习的参数,但是位置编码是一直不变的,因此用这个函数使得pe不会改变,但会存储
self.drop_out = nn.Dropout(p = dropout)
self.dim = dim
def forward(self,emb,step=None):
if step == None:
emb = emb + self.pe[:emb.size(0)]
else:
emb = emb + self.pe[:step]
emb = self.drop_out(emb)
return emb
FeedForward实现
class PositionWiseFeedForward(nn.Module):
def __init__(self,d_model,d_ff,dropout=0.1) -> None:
super(PositionWiseFeedForward,self).__init__()
self.w_1 = nn.Linear(d_model,d_ff)
self.w_2 = nn.Linear(d_ff,d_model)
self.relu = nn.ReLU()
self.layer_norm = nn.LayerNorm(d_model,eps=1e-6)
self.dropout1 = nn.Dropout(p=dropout)
self.dropout2 = nn.Dropout(p=dropout)
def forward(self,x):
inter = self.dropout1(self.relu(self.w_1(self.layer_norm(x))))
output = self.dropout2(self.relu(self.w_2(inter)))
return output
FFN非常简单,就是两次wx+b,加一个激活函数即可,代码里面初始 w 1 , w 2 , r e l u w_1,w_2,relu w1,w2,relu等,为什么没有b呢,pytorch linear在初始化的时候已经帮助我们初始化好了不需要我们自己初始化。
EncoderDecoder层实现
def clones(module,n):
return nn.ModuleList([deepcopy(module) for _ in range(n)])
class EncoderLayer(nn.Module):
def __init__(self,size,attn,feed_forward,dropout=0.1):
super(EncoderLayer,self).__init__()
self.attn = attn
self.feed_forward = feed_forward
self.sublayer_connection_list = clones(SublayerConnection(size,dropout),2)
def forward(self,x,mask):
first_x = self.sublayer_connection_list[0](x,lambda x_attn:self.attn(x,x,x,mask))
return self.sublayer_connection_list[1](x,self.feed_forward)
class Encoder(nn.Module):
def __init__(self,n,encoder_layer):
super(Encoder,self).__init__()
self.encoder_layer_list = clones(encoder_layer,n)
def forward(self,x,src_mask):
for encoder_layer in self.encoder_layer_list:
x = encoder_layer(x,src_mask)
return x
class DecoderLayer(nn.model):
def __init__(self,d_model,attn,feed_forward,sublayer_num,dropout=0.1):
super(DecoderLayer,self).__init__()
self.attn = attn
self.feed_forward = feed_forward
self.sublayer_connection_list = clones(SublayerConnection(d_model,dropout),sublayer_num)
def forward(self,x,l2r_memory,src_mask,trg_mask,r2l_memory=None,r2l_trg_mask = None):
first_x = self.sublayer_connection_list[0](x,lambda x_attn: self.attn(x,l2r_memory,l2r_memory,trg_mask))
second_x = self.sublayer_connection_list[1](first_x,lambda sencond_x_attn:self.attn(first_x,l2r_memory,l2r_memory,src_mask))
if r2l_memory is not None:
third_x = self.sublayer_connection_list[-2](second_x,lambda third_x_attn:self.attn(second_x,r2l_memory,r2l_memory,r2l_trg_mask))
return self.sublayer_connection_list[-1](third_x,self.feed_forward)
else:
return self.sublayer_connection_list[-1](second_x,self.feed_forward)
class Decoder(nn.Module):
def __init__(self,n_layers,decoder_layer):
super(Decoder,self).__init__()
self.decoder_layer_list = clones(decoder_layer,n_layers)
def forward(self,x,memory,src_mask,trg_mask):
for decoder_layer in self.decoder_layer_list:
x = decoder_layer(x,memory,src_mask,trg_mask)
return x
首先实现一个用来复制的函数,因为不管是decoder还是encoder都有两个add和Normalize。
encoder和decoder都先实现一层,再用一个类来多次创建实现多层叠加。
encoderlayer就是一层encoder,就是实现多头+add和Normalize+ffn+add和Normalize,其中传入的参数attn是创建好的多头自注意力模型,ffn同理
decoderlayer就是一层decoder,同理是实现多头+add和Normalize+ffn+add和Normalize,只不过接受的参数不同,x是输入,memory是encoder层的输出,后面两个是掩码,最后两个参数不用管,是反向解码。
拼凑Transformer
class WordProbGenerator(nn.Module):
def __init__(self,d_model,vocab_size):
super(WordProbGenerator,self).__init__()
self.linear = nn.Linear(d_model,vocab_size)
def forward(self,x):
return F.log_softmax(self.linear(x),dim=-1)
class WordEmbedding(nn.Module):
def __init__(self,vocab_size,d_model) -> None:
super(WordEmbedding,self).__init__()
self.embedding = nn.Embedding(vocab_size,d_model)
self.embed = self.embedding
self.d_model = d_model
def forward(self,x):
return self.embed(x)*math.sqrt(self.d_model)
class ABDTransformer(nn.Module):
def __init__(self,vocab,d_model,d_ff,n_head,n_layer,dropout,device='cuda'):
super(ABDTransformer,self).__init__()
self.vocab = vocab
self.device = device
attn = MultiHeadattention(n_head,d_model,dropout)
feed_forward = PositionWiseFeedForward(d_model,d_ff,dropout)
self.src_embed = WordEmbedding(vocab,d_model)
self.pos_embed = PositionalEncoding(d_model,dropout)
self.encoder = Encoder(n_layer,EncoderLayer(d_model,deepcopy(attn),deepcopy(feed_forward),dropout=dropout))
self.decoder = Decoder(n_layer,DecoderLayer(d_model,deepcopy(attn),deepcopy(feed_forward),sublayer_num=3,dropout=dropout))
self.word_prob_generator = WordProbGenerator(d_model,vocab)
def encode(self,src,src_mask):
x = self.src_embed(src[0])
x = self.pos_embed(x)
x = self.encoder(x,src_mask)
return x
def decode(self,trg,memory,src_mask,trg_mask):
x = self.src_embed(trg)
x = self.pos_embed(x)
return self.decoder(x,memory,src_mask,trg_mask)
def forward(self,src,trg,mask):
src_mask,trg_mask = mask
encoder_output = self.encode(src,src_mask)
decoder_output = self.decode(trg,encoder_output,src_mask,trg_mask)
pred = self.word_prob_generator(decoder_output)
return pred
第一个函数是得到各个词的概率
第二个函数是将输入词向量的维度,就是字典的维度改成d_model的维度。
组建transformer
第一步的init中将模型都初始化,包括encoder,decoder,ffn,多头自注意力机制,位置编码等。
第二步写decode和encode的输入输出,简单来说我们已经实现了内部的操作,现在写两个函数,将他需要的输入进去即可。
第三步调用函数,最后得到概率输出。