当前位置: 首页 > article >正文

从位置编码开始手搓transformer框架,transfromer讲解

目录

一、词嵌入和位置编码

1.词嵌入

原理:

2.位置编码

3.transformer中的编码

原因:

4.位置编码代码:

 二、多头自注意力机制

1.生成查询向量、键向量、值向量

2.用Q和K计算得分

3.将得到的分数除以根号下dk然后softmax

4.将归一化后的分数和V点积

5.如何实现多头

代码:

三、前馈网络

代码:

四、编码器层:

代码:

五、解码器层

那什么是掩码呢?

1.Padding Mask

2.Sequence Mask

代码:

六、完整Transformer模型

七、使用示例

八、完整代码

都看到这里了,点个赞呗!!!


先赞后看养成习惯

在开始之前大家先看看这个图:

一、词嵌入和位置编码

1.词嵌入

定义:词嵌入是自然语言处理(NLP)中语言模型与表征学习技术的统称,旨在把一个维数为所有词数量的高维空间嵌入到一个维数低得多的连续向量空间中,将每个单词或词组映射为实数域上的向量。将离散的词语转换为实数向量,使语义/语法相似的词在向量空间中距离更近。

原理:

1.分布假设:具有相似意义的单词往往出现在相似的上下文中,通过分析单词共现模式来捕捉语义关系。例如,“国王”“王后”“王子”“公主” 等词往往会在类似的语境中出现,那么在词嵌入空间中,它们的向量距离也会比较近。

2.向量空间操作:将文本表示为数值向量,可使用向量空间操作捕捉和操纵单词和短语之间的语义关系。如 “国王”-“男人”+“女人”≈“王后”,这种向量运算能反映出词语之间的语义联系。

3.降维:词嵌入是低维的密集向量,减少了计算复杂度和内存需求,适用于大规模 NLP 应用。相比独热编码等高维稀疏表示,词嵌入能更高效地处理大量文本数据。

4.上下文信息:通过考虑在给定上下文中共现的单词来捕捉上下文信息,有助于模型根据周围单词理解单词的含义,从而更好地表示短语和句子。例如 “苹果” 一词,在 “我吃了一个苹果” 和 “苹果公司发布了新产品” 这两个句子中,根据上下文,词嵌入能准确捕捉到 “苹果” 的不同语义。

2.位置编码

位置编码是自然语言处理尤其是 Transformer 架构及其衍生模型里的关键技术。因为 Transformer 架构的核心组件自注意力机制本身不具备捕捉序列中元素位置信息的能力,而位置编码可以将位置信息融入到词向量中,使模型能够感知到序列中元素的相对或绝对位置。

1.为模型引入位置信息:在 Transformer 中,输入的词向量是没有位置信息的,自注意力机制平等地处理输入序列中的每个元素,无法区分不同位置的元素。位置编码通过为每个位置生成一个独特的编码向量,并将其与词向量相加,从而为模型提供位置信息

2.相对位置与绝对位置:位置编码可以表示绝对位置,即每个位置对应一个固定的编码向量;也可以表示相对位置,即通过编码向量的差异来体现元素之间的相对位置关系。在实际应用中,相对位置编码对于捕捉序列中的长距离依赖关系更为重要。

3.可学习与位置编码:位置编码可以是可学习的,即模型在训练过程中自动学习每个位置的编码向量;也可以是固定的,即使用预先定义的函数生成编码向量。例如,Transformer 中使用的是固定的正弦和余弦函数生成位置编码。不知道正弦和余弦函数生成位置编码的可以去看看Transform架构中的位置编码-CSDN博客

3.transformer中的编码

在transformer中位置编码和词嵌入常被结合使用,共同完成对文本的编码。

原因:

1.词嵌入编码词语的语义信息(如猫和狗的相似性),位置编码用来编码词语的位置信息(如我爱你中爱是第二个词),在trnsformer中将词嵌入和位置编码相加,形成既包含语义有包含位置信息的输出向量。

2.在输入的时候,词嵌入和位置编码的向量维度相同,直接相加:输入向量=词嵌入向量+位置编码向量。

4.位置编码代码:

结合公式来看:

# 位置编码模块
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()  # 位置索引 [max_len,1]
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                           (-math.log(10000.0) / d_model)) # 这里是计算公式中括号里面的分母部分
        pe[:, 0::2] = torch.sin(position * div_term)  # 偶数维度用正弦波
        pe[:, 1::2] = torch.cos(position * div_term)  # 奇数维度用余弦波
        self.register_buffer('pe', pe.unsqueeze(0))  # [1, max_len, d_model],将位置编码注册到buffer中

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]  # 动态截取与输入序列长度匹配的位置编码

 二、多头自注意力机制

1.生成查询向量、键向量、值向量

就是从每个编码器的输入向量(即每个单词的词向量)生成三个向量:查询向量query、键向量key、值向量value。至于这三个向量的生成方法是把输入的向量分别乘以三个不同的权重矩阵Wq、Wk、Wv,得到Q、K、V,而这些权重矩阵是在模型训练阶段中训练出来的

2.用Q和K计算得分

这个得分是通过第一步中的Q和K点积出来的,比如说有五个词,那么就有五组的Q,K,V。使用第一组的Q分别点积其他组的K,就得到了第一组对其它组的得分

3.将得到的分数除以根号下dk然后softmax

按照第二步的说法,现在每组有5个得分,一共5*5的得分,按照transformer论文中的意思要先除以根号下df,而这个dk是每头(多头自注意)分到的数据维度。然后通过softmax传递结果,softmax的作用是归一化,将得到分数归一到0-1之间,且对于每组而言他们的和为1。

4.将归一化后的分数和V点积

其实第三步还有加权的意思,因为归一化后和为1,那么他们的值就是权,越大的说明和当前单词越相关,反之亦然。

如此,所有单词的自注意力计算就完成了,得到的向量就可以传给前馈神经网络。

5.如何实现多头

按照上面的意思好像是一个单词就分为一组Q,K,V。好像没有多头的概念啊。那么在编码哪里我们不是说了每个词都会被编码成向量吗,比如一个词被编码成的向量的shape为:[2,64,64](batch_size,w,h),那么我们只要将这个向量扩维,将他的shape变为:[2,8,64,64](batch_size, num_heads, w, h)不就可以了吗,这样不就等于是每个样本里都有八个相同的词吗

可能有些小伙伴就问了:那么这样在上面QKV运算的时候不会变吗?当然不会了,有这个问题的同学应该是线性代数不过关,在多维矩阵相乘的计算里,只与最后两个维度有关,前面的维度只要相等就可以了。

代码:


class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0  # 确保可分割
        self.d_k = d_model // num_heads  # 每个头的维度
        self.num_heads = num_heads
        # 定义Q/K/V的线性投影
        self.Wq = nn.Linear(d_model, d_model)  # 查询向量投影
        self.Wk = nn.Linear(d_model, d_model)  # 键向量投影
        self.Wv = nn.Linear(d_model, d_model)  # 值向量投影
        self.Wo = nn.Linear(d_model, d_model)  # 输出投影

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        # 分割多头(网页6、7)
        q = self.Wq(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        k = self.Wk(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        v = self.Wv(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        
        # 计算缩放点积注意力(网页4)
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:  # 应用掩码(如因果掩码)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)  # 注意力权重
        
        # 合并多头(网页6)
        context = torch.matmul(attn, v).transpose(1,2).contiguous()
        context = context.view(batch_size, -1, self.num_heads * self.d_k)
        return self.Wo(context)  # 输出线性变换

三、前馈网络

这个就没什么好解释的了,将自注意力层传过来的值经过两层的神经网络,目的是增强模型的表达能力,使用激活函数ReLU提供非线性,防止过拟合

代码:

class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)  # 扩展维度
        self.linear2 = nn.Linear(d_ff, d_model)   # 降维回原空间
        self.dropout = nn.Dropout(0.1)  # 正则化

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

 

四、编码器层:

上图是transformer的组成

我们可以看到编码器的组成是:经过位置编码后的数据被分为了三支分别是Q,K,V。经过多头自注意里机制后在经过归一化层,在QKV的旁边那条线是代表残差连接(后面会说)。然后在经过前馈神经网络,然后在经过归一化层和残差连接,这样一个编码器就组装完毕了。

那什么是残差连接呢?残差连接是用来解决梯度消失和网格退化的问题,梯度反向传播时经过多层非线性激活函数(如 sigmoid、tanh)后会逐渐衰减,导致浅层网络难以更新(梯度消失)。而在transformer中,多头自注意力层和前馈神经网络层后均使用了残差连接,并搭配Layer Normalization(层归一化),计算为:在还没有经过多头自注意力层或者前馈神经网络层的数据加上已经经过多头自注意力层或者前馈神经网络层后数据直接相加。这样不管他怎么更新,数据都不会太小了。

代码:

我们只要将上面的多头自注意力,归一化,前馈神经网络组合一下就可以了

class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff=2048):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)  # Post-LN结构(网页6)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x, mask=None):
        # 残差连接+层归一化(网页4、6)
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_output))
        return x

五、解码器层

其实解码器的组成部分和编码器的组成部分差不多,比编码器多一层多头自注意力,而且自注意里面还多了一个掩码Masked(后面会说),第二个注意力层好像不太一样。其实那三条线代表的还是Q,K,V。只不过K和V来之编码器中,只有Q来自自己。

那什么是掩码呢?

在 Transformer 模型中,掩码(Mask)用于选择性地忽略序列中的未来信息,确保模型在计算注意力时只关注历史信息。根据应用场景不同,掩码可分为两类:Padding Mask(填充掩码)和Sequence Mask(序列掩码,又称 Look-Ahead Mask)

1.Padding Mask

在处理变长序列时,忽略填充值的影响,避免无意义的位置参与注意力计算。比如在编码器中可能输入的序列长度不一致,短序列需填充到相同长度,填充部分需掩码覆盖,解码器中当自己的Q可能与编码器中的KV长度不一致,这时就需要Padding Mask来填充。一般来说填充值为0因为这样有值就是True没有值就是False,或者设置为负无穷大,这样在softmax后就趋近于0,那么在加权与V计算的时候就相当于忽略这些值了

2.Sequence Mask

这个掩码就是防止模型看到未来的信息,确保每个位置(V)都只能看到历史的信息。实现方式:生成上三角掩码矩阵,对于长度为L的序列,创建一个L×L的矩阵,对角线及左下三角为0(允许访问),右上三角为-inf(禁止访问)。这样经过掩码处理和softmax后与V计算时间,比如第一行除了第一列有值以外都为0,第二行除了第一列和第二列有值以外都为0,这样就保证了每次计算只能看到历史信息了

而在这里我们一般是将两种掩码结合来使用,来序列不一样长的时候就使用Padding Mask后在使用Sequence Mask,通过这两个函数,可以方便地为 Transformer 模型生成所需的掩码矩阵。

代码:

那么将上面的的多头自注意力层加上掩码,然后经过残差连接和归一化层,在经过只有Q是自己的多头自注意力层,在经过残差连接和归一化后解码器就完成啦!!!

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff=2048):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)
        
    def forward(self, x, enc_output, src_mask, tgt_mask):
        # 自注意力(带因果掩码)
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        
        # 交叉注意力(网页3、6)
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        
        # 前馈网络
        ffn_output = self.ffn(x)
        x = self.norm3(x + self.dropout(ffn_output))
        return x

六、完整Transformer模型

这个模型就是将上面的编码器、残差连接归一化、前馈神经网络、解码器以代码的方式结合起来训练就是Transformer模型了

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, 
                 num_heads=8, num_layers=6, max_seq_len=500):
        super().__init__()
        # 输入嵌入层
        self.encoder_embed = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embed = nn.Embedding(tgt_vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_seq_len)
        
        # 堆叠编码器/解码器层(网页6)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads) for _ in range(num_layers)])
        self.final_linear = nn.Linear(d_model, tgt_vocab_size)  # 输出投影

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        # 编码过程(网页3)
        src_emb = self.pos_encoding(self.encoder_embed(src))
        enc_output = src_emb
        for layer in self.encoder_layers:
            enc_output = layer(enc_output, src_mask)
        
        # 解码过程(网页6)
        tgt_emb = self.pos_encoding(self.decoder_embed(tgt))
        dec_output = tgt_emb
        for layer in self.decoder_layers:
            dec_output = layer(dec_output, enc_output, src_mask, tgt_mask)
            
        return self.final_linear(dec_output)  # 生成目标词汇概率

七、使用示例

if __name__ == '__main__':
    d_model=512
    num_heads=8
    num_layers=6
    vocab_size=10000
    max_seq_len=100

    model = Transformer(
    src_vocab_size=10000,  # 源语言词汇量
    tgt_vocab_size=10000,  # 目标语言词汇量
    d_model=512,           # 模型维度(网页6)
    num_heads=8,           # 注意力头数
    num_layers=6           # 堆叠层数
    )

    # 生成测试数据
    src = torch.randint(0, 10000, (32, 100))  # [batch_size=32, seq_len=100]
    tgt = torch.randint(0, 10000, (32, 100))

    # 前向传播(网页7)
    output = model(src, tgt)
    print(output.shape)  # [32, 100, 10000] → 每个位置预测10000词概率

八、完整代码

import torch
import torch.nn as nn
import math


# 位置编码模块
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()  # 位置索引 [max_len,1]
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                           (-math.log(10000.0) / d_model))  # [d_model/2]
        pe[:, 0::2] = torch.sin(position * div_term)  # 偶数维度用正弦波
        pe[:, 1::2] = torch.cos(position * div_term)  # 奇数维度用余弦波
        self.register_buffer('pe', pe.unsqueeze(0))  # [1, max_len, d_model]

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]  # 动态截取与输入序列长度匹配的位置编码

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0  # 确保可分割
        self.d_k = d_model // num_heads  # 每个头的维度
        self.num_heads = num_heads
        # 定义Q/K/V的线性投影
        self.Wq = nn.Linear(d_model, d_model)  # 查询向量投影
        self.Wk = nn.Linear(d_model, d_model)  # 键向量投影
        self.Wv = nn.Linear(d_model, d_model)  # 值向量投影
        self.Wo = nn.Linear(d_model, d_model)  # 输出投影

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        # 分割多头(网页6、7)
        q = self.Wq(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        k = self.Wk(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        v = self.Wv(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        
        # 计算缩放点积注意力(网页4)
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:  # 应用掩码(如因果掩码)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)  # 注意力权重
        
        # 合并多头(网页6)
        context = torch.matmul(attn, v).transpose(1,2).contiguous()
        context = context.view(batch_size, -1, self.num_heads * self.d_k)
        return self.Wo(context)  # 输出线性变换

class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)  # 扩展维度
        self.linear2 = nn.Linear(d_ff, d_model)   # 降维回原空间
        self.dropout = nn.Dropout(0.1)  # 正则化

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff=2048):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)  # Post-LN结构(网页6)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x, mask=None):
        # 残差连接+层归一化(网页4、6)
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_output))
        return x

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff=2048):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)
        
    def forward(self, x, enc_output, src_mask, tgt_mask):
        # 自注意力(带因果掩码)
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        
        # 交叉注意力(网页3、6)
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        
        # 前馈网络
        ffn_output = self.ffn(x)
        x = self.norm3(x + self.dropout(ffn_output))
        return x

# 完整transformer模型
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, 
                 num_heads=8, num_layers=6, max_seq_len=500):
        super().__init__()
        # 输入嵌入层
        self.encoder_embed = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embed = nn.Embedding(tgt_vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_seq_len)
        
        # 堆叠编码器/解码器层(网页6)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads) for _ in range(num_layers)])
        self.final_linear = nn.Linear(d_model, tgt_vocab_size)  # 输出投影

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        # 编码过程(网页3)
        src_emb = self.pos_encoding(self.encoder_embed(src))
        enc_output = src_emb
        for layer in self.encoder_layers:
            enc_output = layer(enc_output, src_mask)
        
        # 解码过程(网页6)
        tgt_emb = self.pos_encoding(self.decoder_embed(tgt))
        dec_output = tgt_emb
        for layer in self.decoder_layers:
            dec_output = layer(dec_output, enc_output, src_mask, tgt_mask)
            
        return self.final_linear(dec_output)  # 生成目标词汇概率


if __name__ == '__main__':
    d_model=512
    num_heads=8
    num_layers=6
    vocab_size=10000
    max_seq_len=100

    # 初始化模型参数
    model = Transformer(
    src_vocab_size=10000,  # 源语言词汇量
    tgt_vocab_size=10000,  # 目标语言词汇量
    d_model=512,           # 模型维度(网页6)
    num_heads=8,           # 注意力头数
    num_layers=6           # 堆叠层数
    )

    # 生成测试数据
    src = torch.randint(0, 10000, (32, 100))  # [batch_size=32, seq_len=100]
    tgt = torch.randint(0, 10000, (32, 100))

    # 前向传播(网页7)
    output = model(src, tgt)
    print(output.shape)  # [32, 100, 10000] → 每个位置预测10000词概率






都看到这里了,点个赞呗!!!


http://www.kler.cn/a/592246.html

相关文章:

  • DeepSeek算力服务器的选型--青岛佰优联创新科技有限公司
  • HTML语言的贪心算法
  • Golang | 每日一练 (6)
  • 植物知识分享论坛毕设
  • 【QA】CRTP在模板中有哪些用处?
  • Ollama + Open WebUI 本地部署DeepSeek
  • test_cases测试用例层/test_1_login
  • 2023 CSP-J 题解
  • 蓝桥杯练习day2:执行操作后的变化量
  • redis分布式锁实现Redisson+redlock中watch dog是如何判断当前线程是否持有锁进行续租的呢?
  • 事务隔离级别是?
  • kotlin 中的构造函数的作用
  • 黑盒问题的操作优化
  • TPAMI-2025 | 中山大学具身导航参数高效训练!NavCoT:通过解耦推理提升基于大模型的视觉语言导航
  • Python列表1
  • hexo+git pages搭建网站避坑QAQ
  • 基于BCLinux制作Apache HTTPD 2.4.63 的RPM安装包
  • JAVA-多线程join()等待一个线程
  • 精细护理:为进行性核上性麻痹患者筑牢生活防线
  • stm32第七天震动传感器