当前位置: 首页 > article >正文

【PyTorch][chapter 27][李宏毅深度学习][transformer-2]

摘要:

        前面重点介绍了transformer 的encoder 结构,这里重点介绍一下

transformer 的 decoder 结构

transformer 核心是采样了自注意机制,论文里面通过Complexity per Layer, Sequential Operations, Maximum path Length 跟其它模型对比,阐述了其优势

Complexity per Layer通过计算复杂度,越低越好
Sequential Operations顺序计算越少越好(并行计算)
Maximum path Length一个信息到另个一个信息最大长度(越短越好)

transformer主要优点是:两个长的比较远的点,只需要一步就可以到达,并行度好。信息的糅合性比较好。但是需要更多的数据和更大的模型才能做出来跟RNN,CNN同样的效果.

 attention 不会对数据顺序建模,使用了更广泛的偏纳规则,所以需要更大的数据

         归置偏纳‌是指在卷积神经网络中假设特征具有局部性的特性,即当我们把相邻的一些特征放在一起时,更容易得到“解”。这一概念强调了特征之间的关联性和局部性,对于理解和优化神经网络模型具有一定的指导意义。通过归置偏纳,我们可以更好地理解和利用数据的局部特征,从而提高模型的性能和准确性‌1。

           在卷积神经网络中,归置偏纳的应用体现在假设特征可以在局部范围内通过简单的模式进行组合。这种局部性的假设使得网络能够更容易地学习到输入数据的局部特征,进而通过这些局部特征的组合来识别更复杂的模式。这种方法的优点在于它能够减少模型的复杂度,同时保持较高的准确性,尤其是在处理图像、语音等具有局部相关性的数据时,归置偏纳的策略显得尤为重要。

         总的来说,归置偏纳是卷积神经网络中一个重要的概念,它通过利用特征的局部性和关联性,帮助网络更有效地学习和识别数据中的模式,从而提高模型的性能和准确性‌


目录:

  1.     encoder 架构
  2.     decoder 架构
  3.     mask
  4.     cross attention
  5.     decoder 前向传播以及损失函数
  6.     Decoder  Encoder代码实现

一    encoder 架构

torch.nn.TransformerEncoder(encoder_layer, num_layers)
torch.nn.TransformerEncoderLayer(d_model, nhead)

     1.1 架构

       

  1.    TransformerEncoder 由 6 个相同的 TransformerEncoderLayer 组成
  2.    TransformerEncoderLayer 主要由 Multi-head- Attention (多头注意力网络)和              FeedForward-Network 组成

   1.2       Multihead- Attention

               将Q,K,V 投影到子空间中, ScaledDot-Product 得到加权求和的向量

                

   1.3     Scaled Dot-Product Attention

                     

                     

      1.4 FeedForward-Network(前馈网络,缩写为 FFN) 主要架构如下:

               

            


二   decoder 架构

torch.nn.TransformerDecoderLayer(d_model, nhead)
torch.nn.TransformerDecoder(decoder_layer, num_layers)

      2.1  架构

               

     其中TransformerDecoder由6个相同的TransformerDecoderLayer 组成

     2.2   TransformerDecoderLayer

        由三个部分组成:

       1  Masked Multi-Head Attention: 输出 Query

       2:  Cross Multi-Head Attention:  其输入由三个部分组成

                           key(编码器)  value(编码器)   query(解码器)

       3   FFN (前馈神经网络)


三 :mask 原理

     mask 有两种方案:padding mask   和  sequence mask。

     解码器使用了    sequence mask & padding mask

      编码器使用了    padding  mask

      3.1 padding Mask

           在NLP中,文本一般是不定长的,所以在进行 batch训练之前,要先进行长度的统一,过长的句子可以通过truncating 截断到固定的长度,过短的句子可以通过 padding 增加到固定的长度,但是 padding 对应的字符只是为了统一长度,并没有实际的价值,因此希望在之后的计算中屏蔽它们,这时候就需要 Mask。

方案:

     根据seq 的valid_lens 情况,设置attention score     

      

 3.2 sequence Mask 

        sequence mask 主要应用在decoder 里面。它主要用于避免模型在训练时看到未来的词,从而确保预测时的自回归特性。

     如下图: 计算attention score 时候, 只计算到 \alpha_{21},\alpha_{22},其后面位置补0

 3.3 参考代码

      后面的例子跟这个有点差异,总体思想一致

# -*- coding: utf-8 -*-
"""
Created on Thu Aug 29 13:01:24 2024

@author: chengxf2
"""
import torch

def padding_mask(seq, pad):
    #编码器用到
    mask = (seq != pad)
    #print(mask)
    mask = mask.unsqueeze(-2)
    #print(mask.shape)
    #batch,1,seq_lenth
    return mask
    

def sequence_mask(seq):
    #解码器用到
    batch_size, seq_length = seq.shape
    mask = torch.ones((seq_length,seq_length))
    sequence_mask = torch.tril(mask).unsqueeze(0)
    #print("\n sequence mask\n",sequence_mask)
    mask_bool = sequence_mask.bool()
    
    return sequence_mask
   
    
    
    
    return sequence_mask,mask_bool

padding = 0
seq = torch.LongTensor([[1,2,padding],
                        [1,padding,padding]])

batch_size, seq_length =seq.shape

mask_p = padding_mask(seq, padding)
scores = torch.rand(batch_size, seq_length,seq_length)
print("\n scors:\n ",scores)

attention_score_p = scores.masked_fill (mask_p==0, -1e9)
print("\n padding mask 结果\n",attention_score_p)

mask_s = sequence_mask(seq)
attention_score_s = scores.masked_fill(mask_s==0, -1e9)
print("\n sequence_mask 结果\n",attention_score_s)


四  Cross Attention

     这个模型是2016年发表的论文,transorformer 借用了该方案。

编码器输出key, value(历史信息),Decoder 把当前的输入作为查询(query)

通过Cross attention 得到当前信息和历史信息的加权求和的一个效果.


五  decoder 前向传播以及损失函数

    5.1 前向传播

      采用自回归方式,当前时刻的输出作为下一时刻的输入

     1: decoder 输入: x, state

             x:    解码器开始输入的是一个特殊的符号 begin(one-hot编码)

            state:  保存 decoder 输出的结果,valid_lens, 以及编码器历史输入x

   

2  decoder 输出一个向量:  向量长度为vocab_size,选择概率最高的作为输出   

            如下图,"机” 概率最高

把【 BEGIN ,机”】 作为 Decoder 的 输入

4   decoder 输出一个向量:  向量长度为vocab_size,选择概率最高的作为输出   

          器 的概率最高,作为输出

5 依次类推,把自己的输出作为输入,直到遇到特殊的 stop token

    这种方案也称为 AT(auto regressive)自回归模型如下图

还有一种方案成为 NAT,可以并行输入,但是很多实验证明性能不如AT,优点是并行化。

            输入 n个begin, 输出 n 个 begin


5.2  损失函数

      使用cross entropy 作为损失函数

    

         5.3 技巧

      5.3.1  copy  工作:

      在对话系统中,使用复制操作(参考: pointer network)

       5.3.2  Guided attention(TTS 语音合成)

              领导attention 的过程,attention 有固定的方式,由左向右

            ​​​​​​​

  5.3.3 bing search

      现在的decoder 是贪心算法,每次都是只输出概率最高的。

参考HMM 模型,或者Veter bi 算法,可以允许每次搜寻概率较小的

追求总求概率最高的。

         ​​​​​​​


六  代码实现

     这里主要参考李沫的教程,Q,K,V以及DotProduct 求解过程,跟李宏毅的编码器结构有点差异,当总体思路是一致的.

   8.1 文本  embedding

          (1)   假设有两个句子

                   sentence1 =  i  want a plane

                    sentence2 = i  love  you

           (2)   句子转为ID 映射

           

ID
beg|stop0
i1
want2
a3
plane4
love5
you6

         (3)     sequence embedding

句子embedding
   sentence1[1, 2, 3, 4, 0]
   sentence2[1, 5, 6, 0, 0]

    8.2  Encoder 代码

      文件名: encoder.py

import math
import torch
from  torch import nn


class AddNorm(nn.Module):
    
    def __init__(self,  dropout=0.1):
        super(AddNorm,self).__init__()
        self.dropout = nn.Dropout(dropout)
    def forward(self,inputs,Y):
        norm = nn.LayerNorm(inputs.shape)
        output = norm(self.dropout(Y)+inputs)
        return output
    
class PositionWiseFFN(nn.Module):
    def __init__(self, input_size=512, hidden_size=2048,output_size=512):
        super(PositionWiseFFN,self).__init__()
        self.densel = nn.Linear(input_size, hidden_size,bias=True)
        self.relu = nn.ReLU()
        self.dense2 = nn.Linear(hidden_size, output_size,bias=True)
    
    def forward(self, inputs):
        h = self.relu(self.densel(inputs))
        output = self.dense2(h)
        return output

def getMask(X, valid_len):
    """在序列中屏蔽不相关的项"""
    batch_size,seq_len,d_k = X.shape
    '''
    这种算法和下面求解结果是一样的
    paddingMask= torch.zeros((batch_size,seq_len))
    for m in range(batch_size):
            n = valid_len[m]
            for j in range(n):
                paddingMask[m,j]=1 
    paddingMask = paddingMask.unsqueeze(-1).bool()
    '''
    mask = torch.arange((seq_len), dtype=torch.float32, device=X.device)
    paddingMask = mask[None, :] < valid_len[:, None]
    paddingMask = paddingMask.unsqueeze(-1)
    return paddingMask

class DotProductAttention(nn.Module):
    def __init__(self, dropout):
        super(DotProductAttention, self).__init__()
        self.dropout = nn.Dropout(dropout)

    
    def forward(self, query, key, value, valid_lens= None):
        #query的形状  (batch_len, seq_len, d_k)
        batch_len, seq_len, d_k =query.shape
        scores = torch.bmm(query,  key.transpose(1,2))/math.sqrt(d_k)
        if valid_lens is not None:
            padding_mask = getMask(query, valid_lens)
            scores = scores.masked_fill(padding_mask==0, -1e9)
        attention_score = nn.functional.softmax(scores, dim=-1)
        output = torch.bmm(self.dropout(attention_score) ,value)
        return output
        


def transpose_qkv(X, nheads):
    #子空间投影 d_model=d_k*nheads
    batch_size, seq_len, d_model = X.shape
    #3维投影成4维的 (batch_size,seq_len, nheads,d_k) 
    X = X.reshape(batch_size, seq_len, nheads,-1)
    #矩阵转置 (batch_size, nheads,seq_len, d_k)
    X = X.permute(0,2,1,3)
    #(batch_size*nheads, seq_len,d_k)
    y = X.reshape(-1,X.shape[2],X.shape[3])
    return y


def transpose_output(X, num_heads):
    # 逆转 transpose_qkv 函数的操作
    #[batch_size, num_heads, seq_len,input_size]
    X = X.reshape(-1,num_heads,X.shape[1],X.shape[2])
    #[batch_size,seq_len, num_heads,input_size]
    X=  X.permute(0,2,1,3)
    return X.reshape(X.shape[0],X.shape[1],-1)

class MultiHeadAttention(nn.Module):
    #多头注意力机制
    def __init__(self,d_model, nheads, dropout):
        super(MultiHeadAttention, self).__init__()
        self.nheads = nheads
        self.attention = DotProductAttention(dropout)
        #子空间投影
        self.W_q = nn.Linear(d_model,d_model, bias=True)
        self.W_k = nn.Linear(d_model,d_model, bias=True)
        self.W_v = nn.Linear(d_model,d_model, bias=True)
        #输出
        self.W_o = nn.Linear(d_model,d_model, bias=True)
    
    
    def forward(self, query, key, value, valid_lens):
        #query.shape: (batch_size, seq_len, d_model)
        #d_model = nheads*d_k
        #q_projection.shape:(batch_size*nheads, seq_len, d_k)
        q_projection = transpose_qkv(self.W_q(query), self.nheads)
        k_projection = transpose_qkv(self.W_k(key),   self.nheads)
        v_projection = transpose_qkv(self.W_v(value), self.nheads)
        if  valid_lens is not None:
            valid_lens = torch.repeat_interleave(valid_lens,repeats=self.nheads, dim=0)
            
        output_1 = self.attention(q_projection, k_projection, v_projection ,valid_lens)
        output_concat = transpose_output(output_1, self.nheads)
        output = self.W_o(output_concat)
        return output
    
class EncoderBlock(nn.Module):
     #编码器Block
     def __init__(self, d_model=512, nheads=8, dropout=0.1):
         super(EncoderBlock, self).__init__()
         self.attention = MultiHeadAttention(d_model, nheads, dropout)
         self.addnorm1 = AddNorm(dropout)
         self.ffn = PositionWiseFFN(d_model, d_model*4, d_model)
         self.addnorm2 = AddNorm(dropout)
    
     def forward(self, inputs, valid_lens):
         
         output_1 = self.attention(inputs,inputs,inputs,valid_lens)
         output_2 = self.addnorm1(inputs,output_1)
         output = self.addnorm2(output_2, self.ffn(output_2))
         return output
     
class  PositionalEncoding(nn.Module):
    #位置编码
    def __init__(self,max_seq_length=1000, d_model=512):
        super(PositionalEncoding,self).__init__()
        pe = torch.zeros(max_seq_length,d_model)
        div_term =torch.pow(10000,  torch.arange(0, d_model, 2)/d_model)
        position = torch.arange(0, max_seq_length).unsqueeze(1)
        pe[:,0::2]= torch.sin(position/div_term)
        pe[:,1::2]= torch.cos(position/div_term)
        pe=pe.unsqueeze(0)
        self.register_buffer('pe', pe)
        
    def forward(self, inputs):
      
        batch_size, seq_len,d_model = inputs.shape
        outputs =inputs +self.pe[:,:seq_len,:].clone().detach()
        return outputs
    
    
class TransformerEncoder(nn.Module):
    #编码器
    def __init__(self, vocab_size=10000, embedding_dim=512, nheads=8, dropout=0.5,num_layers=6):
        #vocab_size: 字典中词的个数   embedding_dim: 词的维度
        super(TransformerEncoder, self).__init__()
        self.d_model = embedding_dim
        #先转为one-hot  输出a=Wx, 通过AE 编码器训练得到的
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.pos_enconding = PositionalEncoding(vocab_size ,embedding_dim)
        self.blocks = nn.Sequential()
        
        for i in range(num_layers):
            name = "block"+str(i)
            net = EncoderBlock(embedding_dim, nheads, dropout) 
            self.blocks.add_module(name, net)

    
    def forward(self, sentence_token, valid_len):
        #[batch,seq_length,d_model]
        x = self.embedding(sentence_token)
        x = self.pos_enconding(x*math.sqrt(self.d_model))
        #self.attention_weights = [None]*len(self.blks)
        
        for i ,blocks in enumerate(self.blocks):
            x =blocks(x,valid_len)
            #self.attention_weights[i]=blk.attention.attention.attention_weights
        
        return x


'''
if __name__ == "__main__":
    
    vocab_size = 200
    d_model = 512
    nheads = 8
    bias = False
    dropout = 0.1
    num_layers = 6
    max_seq_len = 30
    vocab_tokens = torch.LongTensor([[1, 2, 3, 0, 0, 0],
                                     [1, 2, 5, 0, 0, 0],
                                     [5, 5, 5, 5, 0, 0]])
    
    
    batch_size,seq_len = vocab_tokens.shape
    valid_lens = torch.tensor([3,3,4])
    batch = len(valid_lens)
    if  not(batch==batch_size):
        print("\n error ")
        
    net = TransformerEncoder(vocab_size, d_model, nheads, 
                              dropout,  num_layers)
    
    net.eval()
    
    out = net(vocab_tokens,valid_lens)
    
    print("\n encoder输出 ",out.shape)
'''   

8.3  decoder 代码

# -*- coding: utf-8 -*-
"""
Created on Tue Sep  3 16:22:28 2024

@author: chengxf2
"""

# -*- coding: utf-8 -*-
"""
Created on Mon Sep  2 14:26:34 2024

@author: chengxf2
"""
import torch
import torch.nn as nn
from encoder import MultiHeadAttention
from encoder import AddNorm
from encoder import PositionWiseFFN
from encoder import TransformerEncoder
from encoder import PositionalEncoding
import math

class DecoderBlock(nn.Module):
    #解码器
    def __init__(self, d_model=512, nheads=8, dropout=0.1, i=0):
        super(DecoderBlock, self).__init__()
        self.i = i 
        self.attention1 = MultiHeadAttention(d_model,nheads,dropout)
        self.addnorm1=    AddNorm(dropout)
        self.attention2=  MultiHeadAttention(d_model,nheads,dropout)
        self.addnorm2=    AddNorm(dropout)
        self.ffn=         PositionWiseFFN(d_model, d_model*4,d_model)
        self.addnorm3 =   AddNorm(dropout)
        
    def forward(self, inputs, state):
        #训练: 输出序列的所有词都在同一时间处理
        #state[0] 保存编码器输出  
        #state[1] 保存编码器valid_lens
        #state[2] 保存解码器6个解码器每层的输入
        #使用自回归方案
        encoder_outputs, encoder_valid_lens = state[0],state[1]
        #print("\n self.i",self.i,len(state[2]))
        if state[2][self.i] is None:
            input_history = inputs
        else:
            input_last = state[2][self.i]
            input_history = torch.cat((input_last,inputs),axis=1)
        state[2][self.i] = input_history
        if self.training:
            batch_size, seq_len, d_k = inputs.shape
            seq_padding = torch.arange(1, batch_size+1, device=inputs.device)
        else:
            seq_padding = None
        #自注意力
        y1 = self.attention1(inputs, input_history,input_history, seq_padding)
        x2 = self.addnorm1(inputs,y1)
        #编码器-解码器 cross attention
        y2 = self.attention2(x2, encoder_outputs, encoder_outputs, encoder_valid_lens)
        x3 = self.addnorm2(x2,y2)
        #最后输出
        y3 = self.ffn(x3)
        output= self.addnorm3(x3,y3)
        return output, state
    
    
class TransformerDecoder(nn.Module):
    #解码器
    def __init__(self, vocab_size=1000, d_model=512, nheads=8, dropout=0.5,num_layers=6):
        super(TransformerDecoder, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_enconding = PositionalEncoding(vocab_size ,d_model)
        self.blocks = nn.Sequential()
        
        for i in range(num_layers):
            name = "Block"+str(i)
            net =  DecoderBlock(d_model, nheads, dropout, i)
            self.blocks.add_module(name, net)
        self.layer = nn.Linear(d_model, vocab_size)
        self.num_layers = num_layers
        
    def init_state(self, encoder_outputs, encoder_valid_lens):
        #cross-attention 用到  编码器的输出,以及句子长度,解码器的query
        state = [encoder_outputs, encoder_valid_lens,[None]*self.num_layers]
        return state
    
    def forward(self, sentence_token, state):
        #[batch,seq_len,512]
        x = self.embedding(sentence_token)
        x = self.pos_enconding(x*math.sqrt(self.d_model))

        for i ,block in enumerate(self.blocks):
            x,state =block(x,state)
            #self.attention_weights[i]=blk.attention.attention.attention_weights
        #print("\n  x")
        output = self.layer(x)
        print("\n output",output.shape)
        return output, state
    


if __name__ == "__main__":
    
    vocab_size = 200
    d_model = 512
    nheads = 8
    dropout = 0.1
    num_layers = 6
 
    sequence_token_input = torch.LongTensor([[1, 2, 3, 0, 0, 0],
                                             [1, 2, 5, 0, 0, 0],
                                             [5, 5, 5, 5, 0, 0]])
    
    sequence_token_begin = torch.LongTensor([[0, 0, 0, 0, 0, 0],
                                             [0, 0, 0, 0, 0, 0],
                                             [0, 0, 0, 0, 0, 0]])
    
    
    batch_size,seq_len = sequence_token_input.shape
    encoder_valid_lens = torch.tensor([3,3,4])
    batch = len(encoder_valid_lens)
    if  not(batch==batch_size):
          print("\n error \n")
        
        
    encoder = TransformerEncoder(vocab_size, d_model, nheads, dropout, num_layers)  
    decoder = TransformerDecoder(vocab_size, d_model, nheads, dropout, num_layers)
    encoder_outputs = encoder(sequence_token_input,encoder_valid_lens)
    decoder_state= decoder.init_state(encoder_outputs, encoder_valid_lens)
    output,decoder_state =  decoder(sequence_token_begin,decoder_state)
    #判断一句话是否结束,是看其句子中单词是否有eof 标志,即最大值对应 

        
        

ResNet论文逐段精读【论文精读】_哔哩哔哩_bilibili

“AI”科普丨Transformer架构图解最强教程!

https://zhuanlan.zhihu.com/p/137615798

4.13.【李宏毅机器学习】Transformer (下)(P4)_哔哩哔哩_bilibili

pytorch 插件 pytorch内置transformer_detailtoo的技术博客_51CTO博客

震惊!transformer框架流程完全等于年会发奖品流程【强烈推荐】【系列 10-4-1】_哔哩哔哩_bilibili


http://www.kler.cn/a/290798.html

相关文章:

  • 解决win11的vmvare和docker冲突
  • 1、docker概念和基本使用命令
  • 《机器学习》——TF-IDF(关键词提取)
  • 【计算机网络】窥探计网全貌:说说计算机网络体系结构?
  • C# 继承(接口)
  • 学习及笔记
  • ARM微处理器编程模型与linux驱动开发
  • PYTHON发送邮件详细流程
  • JS设计模式之“名片设计师” - 工厂方法模式
  • 简述CCS平面线性光源
  • Java的时间复杂度和空间复杂度和常见排序
  • 【vite-plugin-vuetify】自动导入 vuetify 组件和指令
  • 单调栈
  • 缩点专题总结
  • 【HuggingFace Transformers】OpenAIGPTModel源码解析
  • 信息安全威胁
  • 递归算法专题——真正理解递归和正确使用递归力扣实战应用
  • 09-03 周二 ansible部署与使用指南
  • 论文解读 | ACL2024 Outstanding Paper:因果指导的主动学习方法:助力大语言模型自动识别并去除偏见...
  • 仕考网:公务员备考技巧有哪些?
  • 全国大学生数学建模竞赛赛区评阅工作规范(2023年修订稿)
  • MySQL——基础操作(1)
  • 选择服务器机柜租用要注意哪些方面?
  • python元组
  • 雅特力初步环境准备
  • 【202408最新】Anaconda+VSCode+CUDA+Pytorch安装配置保姆级教程