大语言模型从理论到实践(第二版)-学习笔记(一)transformer理论与实践
Transformer 结构
机器翻译的目标是从源语言(Source Language)转换到目标语言(Target Language)。Transformer结构完全通过注意力机制完成对源语言序列和目标语言序列全局依赖的建模。
注意力层:使用多头注意力(Multi-Head Attention)机制整合上下文语义。多头注意力并行运行多个独立注意力机制,进而从多维度捕捉输入序列信息。它使得序列中任意两个单词之间的依赖关系可以直接被建模而不基于传统的循环结构,从而更好地解决文本的长程依赖问题。
• 位置感知前馈网络层(Position-wise Feed-Forward Network):通过全连接层对输入文本序列
中的每个单词表示进行更复杂的变换。
• 残差连接:对应图中的 Add 部分。它是一条分别作用在上述两个子层中的直连通路,被用于
连接两个子层的输入与输出,使信息流动更高效,有利于模型的优化。
• 层归一化:对应图中的 Norm 部分。它作用于上述两个子层的输出表示序列,对表示序列进
行层归一化操作,同样起到稳定优化的作用。
接下来介绍各层的实现方法:
嵌入表示层
对于输入文本序列,先通过输入嵌入层(Input Embedding)将每个单词转换为其相对应的向
量表示。在送入编码器端建模其上下文语义之前,一个非常重要的操作是在词嵌入中加入位置编码(Positional Encoding)这一特征。具体来说,序列中每一个单词所在的位置都对应一个向量。这一向量会与单词表示对应相加并送入后续模块中做进一步处理。在训练过程中,模型会自动地学习到如何利用这部分位置信息。为了得到不同位置所对应的编码,Transformer 结构使用不同频率的正余弦函数,如下所示。
其中,pos 表示单词所在的位置,2i 和 2i +1 表示位置编码向量中的对应维度,d 则对应位置编码
的总维度。通过上面这种方式计算位置编码有以下两个好处:第一,正余弦函数的范围是 [−1, +1],导出的位置编码与原词嵌入相加不会使得结果偏离过远而破坏原有单词的语义信息;
第二,依据三角函数的基本性质,可以得知第 pos + k 个位置编码是第 pos 个位置编码的线性组合,这就意味着位置编码中蕴含着单词之间的距离信息。解释:
#embed.py
import torch
import torch.nn as nn
import math
class Embedder(nn.Module):
def __init__(self, vocab_size, d_model):
super().__init__()
self.d_model = d_model
self.embed = nn.Embedding(vocab_size, d_model)
def forward(self, x):
return self.embed(x)
'''1.1.1 嵌入表示层'''
class PositionalEncoder(nn.Module):
def __init__(self, d_model, max_seq_len=80, dropout=0.1):
super().__init__()
self.d_model = d_model
self.dropout = nn.Dropout(dropout)
# 根据pos和i创建一个常量PE矩阵
pe = torch.zeros(max_seq_len, d_model)
for pos in range(max_seq_len):
for i in range(0, d_model, 2):
pe[pos, i] = math.sin(pos / (10000 ** (i / d_model)))
if i + 1 < d_model:
pe[pos, i + 1] = math.cos(pos / (10000 ** (i / d_model)))
pe = pe.unsqueeze(0)
'''
在 PyTorch 中,缓冲区(register_buffer)和参数(register_parameter)的区别在于:
参数(比如权重 weight)是需要训练的,会参与梯度计算,默认 requires_grad=True。
缓冲区(比如 pe)是固定的辅助张量,默认 requires_grad=False,不会参与梯度更新。
'''
self.register_buffer('pe', pe)
def forward(self, x):
# 让词嵌入的数值比位置编码(范围在 [-1, 1])更大一些。这样加法后,词嵌入的语义信息不会被位置编码“淹没”。
# 这是 Transformer 论文里的一个小技巧。
x = x * math.sqrt(self.d_model)
# 增加位置常量到单词嵌入表示中
seq_len = x.size(1)
x = x + self.pe[:, :seq_len, :]
'''
设置 requires_grad=False 表示这个张量(位置编码 pe)是固定的,不参与梯度更新。
为什么这么做?
因为位置编码是预先算好的常量,不是模型要学习的参数。Transformer 只学习词嵌入和后续的权重,位置编码只是“附加信息”,不需要训练。
'''
return self.dropout(x)
注意力层
解释:
关于qkv的维度:
前馈层
#Sublayer.py
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
'''1.1.2 注意力层'''
class MultiHeadAttention(nn.Module):
def __init__(self, heads, d_model, dropout=0.1):
super().__init__()
self.d_model = d_model
self.d_k = d_model // heads
self.h = heads
self.q_linear = nn.Linear(d_model, d_model)
self.k_linear = nn.Linear(d_model, d_model)
self.v_linear = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
self.out = nn.Linear(d_model, d_model)
def attention(self, q, k, v, d_k, mask=None, dropout=None):
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)
#q:(batch_size, num_heads, seq_len, d_k)
#k:(batch_size, num_heads, d_k, seq_len)
# 掩盖那些为了补全长度而增加的单元,使其通过Softmax计算后为0
if mask is not None:
mask = mask.unsqueeze(1)
scores = scores.masked_fill(mask == 0, -1e9)
scores = F.softmax(scores, dim=-1)
if dropout is not None:
scores = dropout(scores)
'''
为什么对 scores 进行 Dropout?
以下是几个主要原因:
1. 防止过拟合scores 是注意力机制的核心,它决定了模型对输入序列中不同位置的关注程度。如果某些位置的注意力权重过高
(例如,模型过于偏好某些特定词),可能会导致模型在训练数据上过拟合。通过对 scores 施加 dropout,
随机地将部分注意力权重置为 0,可以迫使模型学习更鲁棒的表示,避免过度依赖某些输入特征或位置。
Transformer 模型在多个地方使用了 dropout,例如:
词嵌入层之后。
Feed-Forward 网络中。
残差连接和层归一化之后。
处理噪声和不确定性
在自然语言处理任务中,输入序列可能包含噪声(例如不相关的词或冗余信息)。通过在 scores 上施加 dropout,模型可以模拟这种不确定性,
学习对输入的轻微扰动更具鲁棒性。
'''
output = torch.matmul(scores, v)
return output
def forward(self, q, k, v, mask=None):
bs = q.size(0)
# 利用线性计算划分成h个头
k = self.k_linear(k).view(bs, -1, self.h, self.d_k)
q = self.q_linear(q).view(bs, -1, self.h, self.d_k)
v = self.v_linear(v).view(bs, -1, self.h, self.d_k)
# 矩阵转置 (batch_size, num_heads, seq_len, d_k)
k = k.transpose(1, 2)
q = q.transpose(1, 2)
v = v.transpose(1, 2)
# 计算attention
scores = self.attention(q, k, v, self.d_k, mask, self.dropout)
# 连接多个头并输入最后的线性层
'''
维度变化:
原始形状:(batch_size, num_heads, seq_len, d_k)
交换后形状:(batch_size, seq_len, num_heads, d_k)
.contiguous()
作用:确保张量在内存中是连续的。
后续的 view 操作要求张量是连续的(contiguous),否则会报错。
'''
concat = scores.transpose(1, 2).contiguous().view(bs, -1, self.d_model)
output = self.out(concat)
return output#(batch_size, seq_len, d_model)
'''1.1.3 前馈层'''
class FeedForward(nn.Module):
def __init__(self, d_model, d_ff=2048, dropout=0.1):
super().__init__()
# d_ff 默认设为 2048
self.linear_1 = nn.Linear(d_model, d_ff)
self.dropout = nn.Dropout(dropout)
self.linear_2 = nn.Linear(d_ff, d_model)
def forward(self, x):
x = self.dropout(F.relu(self.linear_1(x)))
x = self.linear_2(x)
return x
'''1.1.4 残差连接与层归一化'''
class Norm(nn.Module):
def __init__(self, d_model, eps=1e-6):
super().__init__()
self.size = d_model
# 层归一化包含两个可学习参数
self.alpha = nn.Parameter(torch.ones(self.size))
self.bias = nn.Parameter(torch.zeros(self.size))
self.eps = eps # 避免除零
def forward(self, x):
mean = x.mean(dim=-1, keepdim=True)#(batch_size, seq_len)
std = x.std(dim=-1, keepdim=True)#(batch_size, seq_len)
norm = self.alpha * (x - mean) / (std + self.eps) + self.bias
return norm
残差连接与层归一化
其中 µ 和 σ 分别表示均值和方差,用于将数据平移缩放到均值为 0、方差为 1 的标准分布,α 和
b 是可学习的参数。层归一化技术可以有效地缓解优化过程中潜在的不稳定、收敛速度慢等问题。
编码器和解码器结构
#layers
import torch
import torch.nn as nn
from Sublayer import FeedForward, MultiHeadAttention, Norm
'''1.1.5 编码器和解码器结构'''
class EncoderLayer(nn.Module):
def __init__(self, d_model, heads, dropout=0.1):
super().__init__()
self.norm_1 = Norm(d_model)
self.norm_2 = Norm(d_model)
self.attn = MultiHeadAttention(heads, d_model, dropout=dropout)
self.ff = FeedForward(d_model, dropout=dropout)
self.dropout_1 = nn.Dropout(dropout)
self.dropout_2 = nn.Dropout(dropout)
def forward(self, x, mask):
attn_output = self.attn(x, x, x, mask)
attn_output = self.dropout_1(attn_output)
'''在子模块输出上应用 dropout(如 self.dropout_1(attn_output)),
可以随机丢弃子模块输出的一部分,迫使模型更好地利用输入𝑥和子模块输出的组合,
从而增强鲁棒性。'''
x = x + attn_output
x = self.norm_1(x)
ff_output = self.ff(x)
ff_output = self.dropout_2(ff_output)
x = x + ff_output
x = self.norm_2(x)
return x
class DecoderLayer(nn.Module):
def __init__(self, d_model, heads, dropout=0.1):
super().__init__()
self.norm_1 = Norm(d_model)
self.norm_2 = Norm(d_model)
self.norm_3 = Norm(d_model)
self.dropout_1 = nn.Dropout(dropout)
self.dropout_2 = nn.Dropout(dropout)
self.dropout_3 = nn.Dropout(dropout)
self.attn_1 = MultiHeadAttention(heads, d_model, dropout=dropout)
self.attn_2 = MultiHeadAttention(heads, d_model, dropout=dropout)
self.ff = FeedForward(d_model, dropout=dropout)
def forward(self, x, e_outputs, src_mask, trg_mask):
attn_output_1 = self.attn_1(x, x, x, trg_mask)
attn_output_1 = self.dropout_1(attn_output_1)
x = x + attn_output_1
x = self.norm_1(x)
attn_output_2 = self.attn_2(x, e_outputs, e_outputs, src_mask)
attn_output_2 = self.dropout_2(attn_output_2)
x = x + attn_output_2
x = self.norm_2(x)
ff_output = self.ff(x)
ff_output = self.dropout_3(ff_output)
x = x + ff_output
x = self.norm_3(x)
return x
配套的代码 写的真的很规整 厉害!
最后的transformer:
'''
models.py
源序列嵌入+位置编码+encoder*N+对最后的输出归一化
目标嵌入+位置编码+decoder*N+对最后的输出归一化
'''
import torch
import torch.nn as nn
from layers import EncoderLayer, DecoderLayer
from embed import Embedder, PositionalEncoder
from Sublayer import Norm
import copy
def get_clones(module, N):
return nn.ModuleList([copy.deepcopy(module) for i in range(N)])
'''1.1.5 编码器和解码器结构'''
class Encoder(nn.Module):
def __init__(self, vocab_size, d_model, N, heads, dropout):
super().__init__()
self.N = N
self.embed = Embedder(vocab_size, d_model)
self.pe = PositionalEncoder(d_model, dropout=dropout)
self.layers = get_clones(EncoderLayer(d_model, heads, dropout), N)
self.norm = Norm(d_model)
def forward(self, src, mask):
x = self.embed(src)
x = self.pe(x)
for i in range(self.N):
x = self.layers[i](x, mask)
return self.norm(x)
class Decoder(nn.Module):
def __init__(self, vocab_size, d_model, N, heads, dropout):
super().__init__()
self.N = N
self.embed = Embedder(vocab_size, d_model)
self.pe = PositionalEncoder(d_model, dropout=dropout)
self.layers = get_clones(DecoderLayer(d_model, heads, dropout), N)
self.norm = Norm(d_model)
def forward(self, trg, e_outputs, src_mask, trg_mask):
x = self.embed(trg)
x = self.pe(x)
for i in range(self.N):
x = self.layers[i](x, e_outputs, src_mask, trg_mask)
return self.norm(x)
class Transformer(nn.Module):
def __init__(self, src_vocab, trg_vocab, d_model, N, heads, dropout):
super().__init__()
self.encoder = Encoder(src_vocab, d_model, N, heads, dropout)
self.decoder = Decoder(trg_vocab, d_model, N, heads, dropout)
self.out = nn.Linear(d_model, trg_vocab)
def forward(self, src, trg, src_mask, trg_mask):
e_outputs = self.encoder(src, src_mask)
d_output = self.decoder(trg, e_outputs, src_mask, trg_mask)
output = self.out(d_output)
return output
对最后的输出归一化的解释:
训练和测试
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from models import Transformer
from batch import create_masks
from process import *
import numpy as np
import time
# 数据
src_file = 'data/english.txt'
trg_file = 'data/french.txt'
src_lang = 'en_core_web_sm'
trg_lang = 'fr_core_news_sm'
max_strlen = 80
batchsize = 1500
src_data, trg_data = read_data(src_file, trg_file)
EN_TEXT, FR_TEXT = create_fields(src_lang, trg_lang)
train_iter, src_pad, trg_pad = create_dataset(src_data, trg_data, EN_TEXT, FR_TEXT, max_strlen, batchsize)
'''1.1.5 编码器和解码器结构'''
# 模型参数定义
d_model = 512
heads = 8
N = 6
dropout = 0.1
src_vocab = len(EN_TEXT.vocab)
trg_vocab = len(FR_TEXT.vocab)
model = Transformer(src_vocab, trg_vocab, d_model, N, heads, dropout)
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
optim = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
# 模型训练
def train_model(epochs, print_every=100):
model.train()
start = time.time()
temp = start
total_loss = 0
for epoch in range(epochs):
for i, batch in enumerate(train_iter):
src = batch.src.transpose(0, 1)
trg = batch.trg.transpose(0, 1)
# 将我们输入的英语句子中的所有单词翻译成法语
# 除了最后一个单词,因为它为结束符,不需要进行下一个单词的预测
trg_input = trg[:, :-1]
# 试图预测单词
targets = trg[:, 1:].contiguous().view(-1)
# 使用掩码代码创建函数来制作掩码
src_mask, trg_mask = create_masks(src, trg_input, src_pad, trg_pad)
preds = model(src, trg_input, src_mask, trg_mask)
optim.zero_grad()
loss = F.cross_entropy(preds.view(-1, preds.size(-1)),
targets, ignore_index=trg_pad)
loss.backward()
optim.step()
total_loss += loss.item()
if (i + 1) % print_every == 0:
loss_avg = total_loss / print_every
print("time = %dm, epoch %d, iter = %d, loss = %.3f, %ds per %d iters" %
((time.time() - start) // 60, epoch + 1, i + 1, loss_avg,
time.time() - temp, print_every))
total_loss = 0
temp = time.time()
# 模型测试
def translate(src, max_len=80, custom_string=False):
model.eval()
if custom_string == True:
src = tokenize_en(src, EN_TEXT)
src = torch.LongTensor(src)
print(src)
src_mask = (src != src_pad).unsqueeze(-2)
e_outputs = model.encoder(src.unsqueeze(0), src_mask)
outputs = torch.zeros(max_len).type_as(src.data)
outputs[0] = torch.LongTensor([FR_TEXT.vocab.stoi['<sos>']])
for i in range(1, max_len):
trg_mask = np.triu(np.ones((1, i, i)).astype('uint8'))
trg_mask = Variable(torch.from_numpy(trg_mask) == 0)
out = model.out(model.decoder(outputs[:i].unsqueeze(0),
e_outputs, src_mask, trg_mask))
out = F.softmax(out, dim=-1)
val, ix = out[:, -1].data.topk(1)
outputs[i] = ix[0][0]
if ix[0][0] == FR_TEXT.vocab.stoi['<eos>']:
break
return ' '.join(
[FR_TEXT.vocab.itos[ix] for ix in outputs[:i]]
)
if __name__ == "__main__":
train_model(2)
words = 'Let me see.'
print(translate(words, custom_string=True))
源码地址(可运行):intro-llm-code/chs/ch2-foundations/Transformer/main.py at main · intro-llm/intro-llm-code