【PyTorch][chapter 27][李宏毅深度学习][transformer-2]
摘要:
前面重点介绍了transformer 的encoder 结构,这里重点介绍一下
transformer 的 decoder 结构
transformer 核心是采样了自注意机制,论文里面通过Complexity per Layer, Sequential Operations, Maximum path Length 跟其它模型对比,阐述了其优势
Complexity per Layer | 通过计算复杂度,越低越好 |
Sequential Operations | 顺序计算越少越好(并行计算) |
Maximum path Length | 一个信息到另个一个信息最大长度(越短越好) |
transformer主要优点是:两个长的比较远的点,只需要一步就可以到达,并行度好。信息的糅合性比较好。但是需要更多的数据和更大的模型才能做出来跟RNN,CNN同样的效果.
attention 不会对数据顺序建模,使用了更广泛的偏纳规则,所以需要更大的数据
归置偏纳是指在卷积神经网络中假设特征具有局部性的特性,即当我们把相邻的一些特征放在一起时,更容易得到“解”。这一概念强调了特征之间的关联性和局部性,对于理解和优化神经网络模型具有一定的指导意义。通过归置偏纳,我们可以更好地理解和利用数据的局部特征,从而提高模型的性能和准确性1。
在卷积神经网络中,归置偏纳的应用体现在假设特征可以在局部范围内通过简单的模式进行组合。这种局部性的假设使得网络能够更容易地学习到输入数据的局部特征,进而通过这些局部特征的组合来识别更复杂的模式。这种方法的优点在于它能够减少模型的复杂度,同时保持较高的准确性,尤其是在处理图像、语音等具有局部相关性的数据时,归置偏纳的策略显得尤为重要。
总的来说,归置偏纳是卷积神经网络中一个重要的概念,它通过利用特征的局部性和关联性,帮助网络更有效地学习和识别数据中的模式,从而提高模型的性能和准确性
目录:
- encoder 架构
- decoder 架构
- mask
- cross attention
- decoder 前向传播以及损失函数
- Decoder Encoder代码实现
一 encoder 架构
torch.nn.TransformerEncoder(encoder_layer, num_layers)
torch.nn.TransformerEncoderLayer(d_model, nhead)
1.1 架构
- TransformerEncoder 由 6 个相同的 TransformerEncoderLayer 组成
- TransformerEncoderLayer 主要由 Multi-head- Attention (多头注意力网络)和 FeedForward-Network 组成
1.2 Multihead- Attention
将Q,K,V 投影到子空间中, ScaledDot-Product 得到加权求和的向量
1.3 Scaled Dot-Product Attention
1.4 FeedForward-Network(前馈网络,缩写为 FFN)
主要架构如下:
二 decoder 架构
torch.nn.TransformerDecoderLayer(d_model, nhead)
torch.nn.TransformerDecoder(decoder_layer, num_layers)
2.1 架构
其中TransformerDecoder由6个相同的TransformerDecoderLayer 组成
2.2 TransformerDecoderLayer
由三个部分组成:
1 Masked Multi-Head Attention: 输出 Query
2: Cross Multi-Head Attention: 其输入由三个部分组成
key(编码器) value(编码器) query(解码器)
3 FFN (前馈神经网络)
三 :mask 原理
mask 有两种方案:padding mask 和 sequence mask。
解码器使用了 sequence mask & padding mask
编码器使用了 padding mask
3.1 padding Mask
在NLP中,文本一般是不定长的,所以在进行 batch训练之前,要先进行长度的统一,过长的句子可以通过truncating 截断到固定的长度,过短的句子可以通过 padding 增加到固定的长度,但是 padding 对应的字符只是为了统一长度,并没有实际的价值,因此希望在之后的计算中屏蔽它们,这时候就需要 Mask。
方案:
根据seq 的valid_lens 情况,设置attention score
3.2 sequence Mask
sequence mask 主要应用在decoder 里面。它主要用于避免模型在训练时看到未来的词,从而确保预测时的自回归特性。
如下图: 计算attention score 时候, 只计算到 ,其后面位置补0
3.3 参考代码
后面的例子跟这个有点差异,总体思想一致
# -*- coding: utf-8 -*- """ Created on Thu Aug 29 13:01:24 2024 @author: chengxf2 """ import torch def padding_mask(seq, pad): #编码器用到 mask = (seq != pad) #print(mask) mask = mask.unsqueeze(-2) #print(mask.shape) #batch,1,seq_lenth return mask def sequence_mask(seq): #解码器用到 batch_size, seq_length = seq.shape mask = torch.ones((seq_length,seq_length)) sequence_mask = torch.tril(mask).unsqueeze(0) #print("\n sequence mask\n",sequence_mask) mask_bool = sequence_mask.bool() return sequence_mask return sequence_mask,mask_bool padding = 0 seq = torch.LongTensor([[1,2,padding], [1,padding,padding]]) batch_size, seq_length =seq.shape mask_p = padding_mask(seq, padding) scores = torch.rand(batch_size, seq_length,seq_length) print("\n scors:\n ",scores) attention_score_p = scores.masked_fill (mask_p==0, -1e9) print("\n padding mask 结果\n",attention_score_p) mask_s = sequence_mask(seq) attention_score_s = scores.masked_fill(mask_s==0, -1e9) print("\n sequence_mask 结果\n",attention_score_s)
四 Cross Attention
这个模型是2016年发表的论文,transorformer 借用了该方案。
编码器输出key, value(历史信息),Decoder 把当前的输入作为查询(query)
通过Cross attention 得到当前信息和历史信息的加权求和的一个效果.
五 decoder 前向传播以及损失函数
5.1 前向传播
采用自回归方式,当前时刻的输出作为下一时刻的输入
1: decoder 输入: x, state
x: 解码器开始输入的是一个特殊的符号 begin(one-hot编码)
state: 保存 decoder 输出的结果,valid_lens, 以及编码器历史输入x
2 decoder 输出一个向量: 向量长度为vocab_size,选择概率最高的作为输出
如下图,"机” 概率最高
3 把【 BEGIN ,机”】 作为 Decoder 的 输入
4 decoder 输出一个向量: 向量长度为vocab_size,选择概率最高的作为输出
器 的概率最高,作为输出
5 依次类推,把自己的输出作为输入,直到遇到特殊的 stop token
这种方案也称为 AT(auto regressive)自回归模型如下图
还有一种方案成为 NAT,可以并行输入,但是很多实验证明性能不如AT,优点是并行化。
输入 n个begin, 输出 n 个 begin
5.2 损失函数
使用cross entropy 作为损失函数
5.3 技巧
5.3.1 copy 工作:
在对话系统中,使用复制操作(参考: pointer network)
5.3.2 Guided attention(TTS 语音合成)
领导attention 的过程,attention 有固定的方式,由左向右
5.3.3 bing search
现在的decoder 是贪心算法,每次都是只输出概率最高的。
参考HMM 模型,或者Veter bi 算法,可以允许每次搜寻概率较小的
追求总求概率最高的。
六 代码实现
这里主要参考李沫的教程,Q,K,V以及DotProduct 求解过程,跟李宏毅的编码器结构有点差异,当总体思路是一致的.
8.1 文本 embedding
(1) 假设有两个句子
sentence1 = i want a plane
sentence2 = i love you
(2) 句子转为ID 映射
词 | ID |
beg|stop | 0 |
i | 1 |
want | 2 |
a | 3 |
plane | 4 |
love | 5 |
you | 6 |
(3) sequence embedding
句子 | embedding |
sentence1 | [1, 2, 3, 4, 0] |
sentence2 | [1, 5, 6, 0, 0] |
8.2 Encoder 代码
文件名: encoder.py
import math
import torch
from torch import nn
class AddNorm(nn.Module):
def __init__(self, dropout=0.1):
super(AddNorm,self).__init__()
self.dropout = nn.Dropout(dropout)
def forward(self,inputs,Y):
norm = nn.LayerNorm(inputs.shape)
output = norm(self.dropout(Y)+inputs)
return output
class PositionWiseFFN(nn.Module):
def __init__(self, input_size=512, hidden_size=2048,output_size=512):
super(PositionWiseFFN,self).__init__()
self.densel = nn.Linear(input_size, hidden_size,bias=True)
self.relu = nn.ReLU()
self.dense2 = nn.Linear(hidden_size, output_size,bias=True)
def forward(self, inputs):
h = self.relu(self.densel(inputs))
output = self.dense2(h)
return output
def getMask(X, valid_len):
"""在序列中屏蔽不相关的项"""
batch_size,seq_len,d_k = X.shape
'''
这种算法和下面求解结果是一样的
paddingMask= torch.zeros((batch_size,seq_len))
for m in range(batch_size):
n = valid_len[m]
for j in range(n):
paddingMask[m,j]=1
paddingMask = paddingMask.unsqueeze(-1).bool()
'''
mask = torch.arange((seq_len), dtype=torch.float32, device=X.device)
paddingMask = mask[None, :] < valid_len[:, None]
paddingMask = paddingMask.unsqueeze(-1)
return paddingMask
class DotProductAttention(nn.Module):
def __init__(self, dropout):
super(DotProductAttention, self).__init__()
self.dropout = nn.Dropout(dropout)
def forward(self, query, key, value, valid_lens= None):
#query的形状 (batch_len, seq_len, d_k)
batch_len, seq_len, d_k =query.shape
scores = torch.bmm(query, key.transpose(1,2))/math.sqrt(d_k)
if valid_lens is not None:
padding_mask = getMask(query, valid_lens)
scores = scores.masked_fill(padding_mask==0, -1e9)
attention_score = nn.functional.softmax(scores, dim=-1)
output = torch.bmm(self.dropout(attention_score) ,value)
return output
def transpose_qkv(X, nheads):
#子空间投影 d_model=d_k*nheads
batch_size, seq_len, d_model = X.shape
#3维投影成4维的 (batch_size,seq_len, nheads,d_k)
X = X.reshape(batch_size, seq_len, nheads,-1)
#矩阵转置 (batch_size, nheads,seq_len, d_k)
X = X.permute(0,2,1,3)
#(batch_size*nheads, seq_len,d_k)
y = X.reshape(-1,X.shape[2],X.shape[3])
return y
def transpose_output(X, num_heads):
# 逆转 transpose_qkv 函数的操作
#[batch_size, num_heads, seq_len,input_size]
X = X.reshape(-1,num_heads,X.shape[1],X.shape[2])
#[batch_size,seq_len, num_heads,input_size]
X= X.permute(0,2,1,3)
return X.reshape(X.shape[0],X.shape[1],-1)
class MultiHeadAttention(nn.Module):
#多头注意力机制
def __init__(self,d_model, nheads, dropout):
super(MultiHeadAttention, self).__init__()
self.nheads = nheads
self.attention = DotProductAttention(dropout)
#子空间投影
self.W_q = nn.Linear(d_model,d_model, bias=True)
self.W_k = nn.Linear(d_model,d_model, bias=True)
self.W_v = nn.Linear(d_model,d_model, bias=True)
#输出
self.W_o = nn.Linear(d_model,d_model, bias=True)
def forward(self, query, key, value, valid_lens):
#query.shape: (batch_size, seq_len, d_model)
#d_model = nheads*d_k
#q_projection.shape:(batch_size*nheads, seq_len, d_k)
q_projection = transpose_qkv(self.W_q(query), self.nheads)
k_projection = transpose_qkv(self.W_k(key), self.nheads)
v_projection = transpose_qkv(self.W_v(value), self.nheads)
if valid_lens is not None:
valid_lens = torch.repeat_interleave(valid_lens,repeats=self.nheads, dim=0)
output_1 = self.attention(q_projection, k_projection, v_projection ,valid_lens)
output_concat = transpose_output(output_1, self.nheads)
output = self.W_o(output_concat)
return output
class EncoderBlock(nn.Module):
#编码器Block
def __init__(self, d_model=512, nheads=8, dropout=0.1):
super(EncoderBlock, self).__init__()
self.attention = MultiHeadAttention(d_model, nheads, dropout)
self.addnorm1 = AddNorm(dropout)
self.ffn = PositionWiseFFN(d_model, d_model*4, d_model)
self.addnorm2 = AddNorm(dropout)
def forward(self, inputs, valid_lens):
output_1 = self.attention(inputs,inputs,inputs,valid_lens)
output_2 = self.addnorm1(inputs,output_1)
output = self.addnorm2(output_2, self.ffn(output_2))
return output
class PositionalEncoding(nn.Module):
#位置编码
def __init__(self,max_seq_length=1000, d_model=512):
super(PositionalEncoding,self).__init__()
pe = torch.zeros(max_seq_length,d_model)
div_term =torch.pow(10000, torch.arange(0, d_model, 2)/d_model)
position = torch.arange(0, max_seq_length).unsqueeze(1)
pe[:,0::2]= torch.sin(position/div_term)
pe[:,1::2]= torch.cos(position/div_term)
pe=pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, inputs):
batch_size, seq_len,d_model = inputs.shape
outputs =inputs +self.pe[:,:seq_len,:].clone().detach()
return outputs
class TransformerEncoder(nn.Module):
#编码器
def __init__(self, vocab_size=10000, embedding_dim=512, nheads=8, dropout=0.5,num_layers=6):
#vocab_size: 字典中词的个数 embedding_dim: 词的维度
super(TransformerEncoder, self).__init__()
self.d_model = embedding_dim
#先转为one-hot 输出a=Wx, 通过AE 编码器训练得到的
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.pos_enconding = PositionalEncoding(vocab_size ,embedding_dim)
self.blocks = nn.Sequential()
for i in range(num_layers):
name = "block"+str(i)
net = EncoderBlock(embedding_dim, nheads, dropout)
self.blocks.add_module(name, net)
def forward(self, sentence_token, valid_len):
#[batch,seq_length,d_model]
x = self.embedding(sentence_token)
x = self.pos_enconding(x*math.sqrt(self.d_model))
#self.attention_weights = [None]*len(self.blks)
for i ,blocks in enumerate(self.blocks):
x =blocks(x,valid_len)
#self.attention_weights[i]=blk.attention.attention.attention_weights
return x
'''
if __name__ == "__main__":
vocab_size = 200
d_model = 512
nheads = 8
bias = False
dropout = 0.1
num_layers = 6
max_seq_len = 30
vocab_tokens = torch.LongTensor([[1, 2, 3, 0, 0, 0],
[1, 2, 5, 0, 0, 0],
[5, 5, 5, 5, 0, 0]])
batch_size,seq_len = vocab_tokens.shape
valid_lens = torch.tensor([3,3,4])
batch = len(valid_lens)
if not(batch==batch_size):
print("\n error ")
net = TransformerEncoder(vocab_size, d_model, nheads,
dropout, num_layers)
net.eval()
out = net(vocab_tokens,valid_lens)
print("\n encoder输出 ",out.shape)
'''
8.3 decoder 代码
# -*- coding: utf-8 -*-
"""
Created on Tue Sep 3 16:22:28 2024
@author: chengxf2
"""
# -*- coding: utf-8 -*-
"""
Created on Mon Sep 2 14:26:34 2024
@author: chengxf2
"""
import torch
import torch.nn as nn
from encoder import MultiHeadAttention
from encoder import AddNorm
from encoder import PositionWiseFFN
from encoder import TransformerEncoder
from encoder import PositionalEncoding
import math
class DecoderBlock(nn.Module):
#解码器
def __init__(self, d_model=512, nheads=8, dropout=0.1, i=0):
super(DecoderBlock, self).__init__()
self.i = i
self.attention1 = MultiHeadAttention(d_model,nheads,dropout)
self.addnorm1= AddNorm(dropout)
self.attention2= MultiHeadAttention(d_model,nheads,dropout)
self.addnorm2= AddNorm(dropout)
self.ffn= PositionWiseFFN(d_model, d_model*4,d_model)
self.addnorm3 = AddNorm(dropout)
def forward(self, inputs, state):
#训练: 输出序列的所有词都在同一时间处理
#state[0] 保存编码器输出
#state[1] 保存编码器valid_lens
#state[2] 保存解码器6个解码器每层的输入
#使用自回归方案
encoder_outputs, encoder_valid_lens = state[0],state[1]
#print("\n self.i",self.i,len(state[2]))
if state[2][self.i] is None:
input_history = inputs
else:
input_last = state[2][self.i]
input_history = torch.cat((input_last,inputs),axis=1)
state[2][self.i] = input_history
if self.training:
batch_size, seq_len, d_k = inputs.shape
seq_padding = torch.arange(1, batch_size+1, device=inputs.device)
else:
seq_padding = None
#自注意力
y1 = self.attention1(inputs, input_history,input_history, seq_padding)
x2 = self.addnorm1(inputs,y1)
#编码器-解码器 cross attention
y2 = self.attention2(x2, encoder_outputs, encoder_outputs, encoder_valid_lens)
x3 = self.addnorm2(x2,y2)
#最后输出
y3 = self.ffn(x3)
output= self.addnorm3(x3,y3)
return output, state
class TransformerDecoder(nn.Module):
#解码器
def __init__(self, vocab_size=1000, d_model=512, nheads=8, dropout=0.5,num_layers=6):
super(TransformerDecoder, self).__init__()
self.d_model = d_model
self.embedding = nn.Embedding(vocab_size, d_model)
self.pos_enconding = PositionalEncoding(vocab_size ,d_model)
self.blocks = nn.Sequential()
for i in range(num_layers):
name = "Block"+str(i)
net = DecoderBlock(d_model, nheads, dropout, i)
self.blocks.add_module(name, net)
self.layer = nn.Linear(d_model, vocab_size)
self.num_layers = num_layers
def init_state(self, encoder_outputs, encoder_valid_lens):
#cross-attention 用到 编码器的输出,以及句子长度,解码器的query
state = [encoder_outputs, encoder_valid_lens,[None]*self.num_layers]
return state
def forward(self, sentence_token, state):
#[batch,seq_len,512]
x = self.embedding(sentence_token)
x = self.pos_enconding(x*math.sqrt(self.d_model))
for i ,block in enumerate(self.blocks):
x,state =block(x,state)
#self.attention_weights[i]=blk.attention.attention.attention_weights
#print("\n x")
output = self.layer(x)
print("\n output",output.shape)
return output, state
if __name__ == "__main__":
vocab_size = 200
d_model = 512
nheads = 8
dropout = 0.1
num_layers = 6
sequence_token_input = torch.LongTensor([[1, 2, 3, 0, 0, 0],
[1, 2, 5, 0, 0, 0],
[5, 5, 5, 5, 0, 0]])
sequence_token_begin = torch.LongTensor([[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0]])
batch_size,seq_len = sequence_token_input.shape
encoder_valid_lens = torch.tensor([3,3,4])
batch = len(encoder_valid_lens)
if not(batch==batch_size):
print("\n error \n")
encoder = TransformerEncoder(vocab_size, d_model, nheads, dropout, num_layers)
decoder = TransformerDecoder(vocab_size, d_model, nheads, dropout, num_layers)
encoder_outputs = encoder(sequence_token_input,encoder_valid_lens)
decoder_state= decoder.init_state(encoder_outputs, encoder_valid_lens)
output,decoder_state = decoder(sequence_token_begin,decoder_state)
#判断一句话是否结束,是看其句子中单词是否有eof 标志,即最大值对应
ResNet论文逐段精读【论文精读】_哔哩哔哩_bilibili
“AI”科普丨Transformer架构图解最强教程!
https://zhuanlan.zhihu.com/p/137615798
4.13.【李宏毅机器学习】Transformer (下)(P4)_哔哩哔哩_bilibili
pytorch 插件 pytorch内置transformer_detailtoo的技术博客_51CTO博客
震惊!transformer框架流程完全等于年会发奖品流程【强烈推荐】【系列 10-4-1】_哔哩哔哩_bilibili