四、自然语言处理_08Transformer翻译任务案例
0、前言
在Seq2Seq模型的学习过程中,做过一个文本翻译任务案例,多轮训练后,效果还算能看
Transformer作为NLP领域的扛把子,对于此类任务的处理会更为强大,下面将以基于Transformer模型来重新处理此任务,看看效果如何
1、需求概述
现有一个《data.txt》文件,里面存放了很多组翻译对(即:英文句子 - 中文句子 的组合)
要求针对此《data.txt》文件,使用Seq2Seq模型构建一个翻译系统,并验证翻译效果
2、需求分析
这是一个典型的翻译任务,要求系统在用户输入英文句子之后,输出与之对应的中文句子,可以用Transformer模型来实现
具体来说,至少应该考虑以下几个要点:
-
1、分词器:自定义一个分词器,用于根据输入的语料构建字典
-
输入的句子(src --> source,英文句子)构建两个字典
-
src_token2idx字典的格式为:{英文词:id}
-
src_idx2token字典的格式为:{id:英文词}
-
-
输出的句子(tgt --> target,中文句子)构建两个字典
-
tgt_token2idx字典的格式为:{中文词:id}
-
tgt_idx2token字典的格式为:{id:中文词}
-
-
-
2、数据打包工具:自定义一个数据集和数据的批处理函数,用于将《data.txt》文件中的内容打包成模型处理所需的数据格式
-
3、Transformer模型:包含编码器和解码器方法
-
4、模型训练和推理:自定义模型的训练和推理方法
3、代码实现
3.1 导包
import os
import joblib
import copy
import math
import pandas as pd
import jieba
import opencc
import random
import time
from tqdm import tqdm
import torch
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.nn.functional import log_softmax
from torch.optim.lr_scheduler import LambdaLR
from sklearn.model_selection import train_test_split
3.2 构建分词器
class Tokenizer(object):
"""
自定义一个分词器,用于根据输入的语料构建字典:
1、输入的句子(src --> source,英文句子)构建两个字典
src_token2idx字典的格式为:{英文词:id}
src_idx2token字典的格式为:{id:英文词}
2、输出的句子(tgt --> target,中文句子)构建两个字典
tgt_token2idx字典的格式为:{中文词:id}
tgt_idx2token字典的格式为:{id:中文词}
"""
def __init__(self, data_file, saved_dict):
"""
初始化
"""
# 定义语料文件的路径
self.data_file = data_file
# 定义字典存储的路径
self.saved_dict = saved_dict
# 输入侧 src --> source
self.src_token2idx = None
self.src_idx2token = None
self.src_dict_len = None
self.src_embed_dim = 512
self.src_hidden_size = 512
# 输出侧 tgt --> target
self.tgt_token2idx = None
self.tgt_idx2token = None
self.tgt_dict_len = None
self.tgt_embed_dim = 512
self.tgt_hidden_size = 512
self.tgt_max_len = 100
# 构建字典
self._build_dict()
def _build_dict(self):
"""
构建字典
"""
# 1、如果四个字典都有值,则不需要浪费资源重复构建,跳出这个构建字典的_build_dict方法即可
if all([self.src_token2idx, self.src_idx2token, self.tgt_token2idx, self.tgt_idx2token]):
print("字典已经构建过了")
return
# 2、如果缓存里面有都通过joblib保存的字典文件,则也不需要浪费资源重复构建,直接从字典文件中获取字典,再跳出这个构建字典的_build_dict方法即可
elif os.path.exists(self.saved_dict):
print("从缓存中读取字典")
self.load()
print("读取缓存字典成功")
return
# 3、如果上面两个条件都不满足,则开始从零构建字典
# 3.1 构建标记元素集
# <UNK>:未知标记,用于表示在词汇表中未出现的词,当模型遇到一个它在训练数据中未曾见过的词时,会用 <UNK> 来代替
# <PAD>:填充标记,用于将所有输入序列填充到相同的长度,以便于能够使用批处理和固定大小的神经网络输入
# <SOS>:序列开始标记,用于指示序列生成的开始
# <EOS>:序列结束标记,用于指示序列生成的结束
# 输入侧不需要"<SOS>"和"<EOS>",输出侧需要"<SOS>"和"<EOS>"
src_tokens = {"<UNK>", "<PAD>"}
tgt_tokens = {"<UNK>", "<PAD>", "<SOS>", "<EOS>"}
# 3.2 从语料文件中读取数据
with open(file=self.data_file, mode="r", encoding="utf8") as f:
# 读取每一行内容
for line in tqdm(f.readlines()):
# 如果内容不为空,则执行下面代码
if line:
# 并将英文句子和中文句子通过中间的制表符分开
src_sentence, tgt_sentence = line.strip().split("\t")
# 分别调用split_src和split_tgt方法,对src_sentence和tgt_sentence进行 【句子-->词】 的切分处理
src_sentence_tokens = self.split_src(src_sentence)
tgt_sentence_tokens = self.split_tgt(tgt_sentence)
# 通过union方法取并集,来得到src_tokens和tgt_tokens
src_tokens = src_tokens.union(set(src_sentence_tokens))
tgt_tokens = tgt_tokens.union(set(tgt_sentence_tokens))
# 3.3 构建src的字典,包括src_token2idx和src_idx2token,并获取对应的字典长度
self.src_token2idx = {token: idx for idx, token in enumerate(src_tokens)}
self.src_idx2token = {idx: token for token, idx in self.src_token2idx.items()}
self.src_dict_len = len(self.src_token2idx)
# 3.4 构建tgt的字典,包括tgt_token2idx和tgt_idx2token,并获取对应的字典长度
self.tgt_token2idx = {token: idx for idx, token in enumerate(tgt_tokens)}
self.tgt_idx2token = {idx: token for token, idx in self.tgt_token2idx.items()}
self.tgt_dict_len = len(self.tgt_token2idx)
# 3.5 将上面构建好的四个字典都存放至缓存文件中,方便后面读取
self.save()
print("保存字典成功")
def split_src(self, sentence):
"""
英文句子分词
"""
# 1、去除首尾空格
sentence = sentence.strip()
# 2、将句子进行进行 转小写-->分词-->去除空词和'-->存列表 的操作
tokens = [token for token in jieba.lcut(sentence.lower()) if token not in ("", " ", "'")]
# 3、返回处理后的列表
return tokens
def split_tgt(self, sentence):
"""
中文句子分词
"""
# 1、实例化opencc工具,并设置繁体转简体模式
# t2s,即:Traditional Chinese to Simplified Chinese,表示将繁体句子转换为简体句子
# s2t,即:Simplified Chinese to Traditional Chinese,表示将简体句子转换为繁体句子
converter = opencc.OpenCC(config="t2s")
# 2、进行句子转换
sentence = converter.convert(text=sentence)
# 3、将句子进行进行 分词-->去除空词-->存列表 的操作
tokens = [token for token in jieba.lcut(sentence) if token not in ["", " "]]
# 4、返回处理后的列表
return tokens
def encode_src(self, src_sentence, src_sentence_len):
"""
对src进行编码:把英文句子分词后变成 id
1、按本批次的最大长度来填充,没有见过的词置为"<UNK>",长度不够的用多个"<PAD>"填充
2、src不用加"<SOS>和"<EOS>"
"""
# 1、"<UNK>"转换
src_idx = [self.src_token2idx.get(token, self.src_token2idx.get("<UNK>")) for token in src_sentence]
# 2、"<PAD>"填充
src_idx = (src_idx + [self.src_token2idx.get("<PAD>")] * src_sentence_len)[:src_sentence_len]
# 3、返回处理后的id值
return src_idx
def encode_tgt(self, tgt_sentence, tgt_sentence_len):
"""
对tgt进行编码:把中文句子分词后变成 id
1、按本批次的最大长度来填充,没有见过的词置为"<UNK>",长度不够的用多个"<PAD>"填充
2、tgt需要在首尾分别加"<SOS>和"<EOS>"
"""
# 1、在首尾分别加"<SOS>和"<EOS>"
tgt_sentence = ["<SOS>"] + tgt_sentence + ["<EOS>"]
# 2、因为在首尾分别加了"<SOS>和"<EOS>",共两个词,所以最大长度要加2
tgt_sentence_len += 2
# 3、"<UNK>"转换
tgt_idx = [self.tgt_token2idx.get(token, self.tgt_token2idx.get("<UNK>")) for token in tgt_sentence]
# 4、"<PAD>"填充
tgt_idx = (tgt_idx + [self.tgt_token2idx.get("<PAD>")] * tgt_sentence_len)[:tgt_sentence_len]
# 5、返回处理后的id值
return tgt_idx
def decode_tgt(self, tgt_ids):
"""
对src进行解码:把生成的id序列转换成中文token
输入:[6360, 7925, 8187, 7618, 1653, 4509]
输出:['我', '爱', '北京', '<UNK>']
"""
preds = []
# [batch_size, seq_len]
for temp_tgt in tgt_ids:
# 到EOS则代表结束,需要退出
if temp_tgt == self.tgt_token2idx.get("<EOS>"):
break
# 将不等于"<SOS>"、"<PAD>"的id获取出来,并输出id对应的token(中文词)
# 正如人们赚不到自己认知范围之外的钱一样,生成的序列也不会有找不到的id,所以不需要替换输出"<UNK>"
preds.append([self.tgt_idx2token.get(tgt_id) for tgt_id in temp_tgt if tgt_id not in (tokenizer.tgt_token2idx.get("<SOS>"),
tokenizer.tgt_token2idx.get("<PAD>"))])
return preds
# classmethod是一个装饰器,表示这个方法是类方法,可以通过类名直接调用,而不需要实例化对象(cls是这个方法的第一个参数,代表类本身)
# 主要用途:
# (1)将一个方法定义为类方法,可以清晰地表明它是一个与类相关联的操作,而不是与特定实例相关,这有助于代码的组织和可读性,使得其他开发者更容易理解这个方法的用途
# (2)如果一个方法需要在子类中被重写,那么使用类方法可以确保子类可以覆盖这个方法并提供特定的实现,同时仍然保持类方法的特性
@classmethod
def subsequent_mask(cls, size):
"""
生成屏蔽未来词的subsequent_mask矩阵:size=seq_len
"""
# 1、定义了一个形状为(1, size, size)的张量,size是序列的长度,这个张量用于后续创建对应的上三角矩阵
attn_shape = (1, size, size)
# 2、创建一个上三角矩阵,大小为attn_shape,其中对角线上的值为1,其余为0
# triu-->triangle upper(上三角矩阵),tril-->triangle lower(下三角矩阵)
# diagonal=1参数表示从对角线上方1个位置开始填充为1,对角线本身及以下的元素填充为0
# (如果是下三角矩阵,则diagonal=1表示从对角线下方1个位置开始填充为1,对角线本身及以上的元素填充为0)
subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(torch.uint8)
# 3、上面得到的是【上三角元素为1,对角线本身及以下的元素为0】的矩阵,现在将其与0值作比较,得到的是【上三角元素为False,对角线本身及以下的元素为True】的矩阵
# 比如:(由于False的遮挡效果,使得每一步只能看当前和之前的True)
# [[True, False, False],
# [True, True, False],
# [True, True, True]]
return subsequent_mask == 0
@classmethod
def make_std_mask(cls, tgt, pad):
"""
使用subsequent_mask屏蔽未来词
"""
# 1、去掉"<PAD>",并用unsqueeze改变形状
# unsqueeze(-2):让pad_mask的形状 [batch_size, seq_len] 变为 [batch_size, 1, seq_len]
pad_mask = (tgt != pad).unsqueeze(-2)
# 2、去掉未来词
# tgt.size(-1)是当前序列的长度,即seq_len;
# type_as(pad_mask.data)是将subsequent_mask转换为与pad_mask.data相同的数据类型,以确保两个张量可以进行按位与操作
tgt_mask = pad_mask & Tokenizer.subsequent_mask(tgt.size(-1)).type_as(pad_mask.data)
# 3、返回屏蔽未来词之后的矩阵
return tgt_mask
def save(self):
"""
定义保存字典的方法
"""
# 1、定义字典中的元素内容
state_dict = {
"src_token2idx": self.src_token2idx,
"src_idx2token": self.src_idx2token,
"src_dict_len": self.src_dict_len,
"tgt_token2idx": self.tgt_token2idx,
"tgt_idx2token": self.tgt_idx2token,
"tgt_dict_len": self.tgt_dict_len
}
# 2、保存文件到.cache目录下
if not os.path.exists(".cache"):
os.mkdir(path=".cache")
torch.save(obj=state_dict, f=self.saved_dict)
def load(self):
"""
加载字典
"""
if os.path.exists(path=self.saved_dict):
state_dict = torch.load(f=self.saved_dict, weights_only=True)
self.src_token2idx = state_dict.get("src_token2idx")
self.src_idx2token = state_dict.get("src_idx2token")
self.src_dict_len = state_dict.get("src_dict_len")
self.tgt_token2idx = state_dict.get("tgt_token2idx")
self.tgt_idx2token = state_dict.get("tgt_idx2token")
self.tgt_dict_len = state_dict.get("tgt_dict_len")
实例化分词器
tokenizer = Tokenizer(data_file="./data.txt", saved_dict="./.cache/dicts.bin")
print(tokenizer)
3.3 数据打包
class TransformerDataset(Dataset):
"""
自定义数据集
"""
def __init__(self, data_file, tokenizer, part="train"):
"""
初始化方法
"""
# 定义语料文件的路径
self.data_file = data_file
# 获取分词器实例
self.tokenizer = tokenizer
# 确认数据集的用途
self.part = part
# 将数据置空
self.data = None
# 加载数据
self._load_data()
def _load_data(self):
"""
加载数据
"""
# 1、如果数据有值,则不需要浪费资源重新加载,跳出这个加载数据的_load_data方法即可
if self.data:
print("数据集已经构建过了")
return
# 2、如果缓存里面有通过joblib保存的数据文件,则也不需要浪费资源重新加载,直接从数据文件中获取数据,再跳出这个加载数据的_load_data方法即可
elif os.path.exists(path=fr"./.cache/{self.part}.bin"):
print("从缓存中读取数据")
# 原始数据
self.data = torch.load(f=fr"./.cache/{self.part}.bin", weights_only=True)
print("加载本地数据集成功")
return
# 3、数据读取
data = []
with open(file=self.data_file, mode="r", encoding="utf8") as f:
# 读取每一行内容
for line in tqdm(f.readlines()):
# 如果内容不为空,则执行下面代码
if line:
# 并将英文句子和中文句子通过中间的制表符分开
src_sentence, tgt_sentence = line.strip().split("\t")
# 分别调用split_src和split_tgt方法,对src_sentence和tgt_sentence进行 【句子-->词】 的切分处理
src_sentence_tokens = self.tokenizer.split_src(src_sentence)
tgt_sentence_tokens = self.tokenizer.split_tgt(tgt_sentence)
# src_sentence_tokens--tgt_sentence_tokens列表存放至data中
data.append([src_sentence_tokens, tgt_sentence_tokens])
# 4、数据集切分:80%训练集,20%测试集
train_data, test_data = train_test_split(data, test_size=0.2, random_state=0)
if self.part == "train":
self.data = train_data
else:
self.data = test_data
# 5、保存数据
if not os.path.exists(".cache"):
os.mkdir(path=".cache")
torch.save(obj=self.data, f=fr"./.cache/{self.part}.bin")
def __len__(self):
"""
DataLoader在每次迭代时也都会默认调用__len__方法,从而获取样本数量
"""
return len(self.data)
def __getitem__(self, idx):
"""
DataLoader在每次迭代时都会默认调用Dataset中的__getitem__方法,从而获取单个数据样本并进行批处理
"""
src_sentence, tgt_sentence = self.data[idx]
return src_sentence, len(src_sentence), tgt_sentence, len(tgt_sentence)
回调函数
def collate_fn(batch, tokenizer):
# 1、将所有batch中的内容打包,并以元组的形式赋值给src_sentences, src_sentence_lens, tgt_sentences, tgt_sentence_lens
# zip中可以用(batch[0], batch[1], batch[2]),表示前三个batch,如果用*batch,则表示所有的batch
# 用zip可以把每一batch中的“列元素”对应性的组合到一起,比如src_sentences中存放的是每一batch的src_sentence连起来的元组,src_sentence_lens中存放的是每一batch的src_len连起来的元组,依此类推
src_sentences, src_sentence_lens, tgt_sentences, tgt_sentence_lens = zip(*batch)
# 2、调用tokenizer中的编码方法encode_src,将 src 转 id
# 通过max方法计算src_max_len
src_sentence_len = max(src_sentence_lens)
src_idxes = []
for src_sentence in src_sentences:
src_idxes.append(tokenizer.encode_src(src_sentence, src_sentence_len))
# 3、调用tokenizer中的编码方法encode_tgt,将 tgt 转 id
# 通过max方法计算tgt_max_len
tgt_sentence_len = max(tgt_sentence_lens)
tgt_idxes = []
for tgt_sentence in tgt_sentences:
tgt_idxes.append(tokenizer.encode_tgt(tgt_sentence, tgt_sentence_len))
# 4、所有数据转张量 torch.long
# src [batch_size, seq_len]
src_idxes = torch.LongTensor(src_idxes)
# src_mask [batch_size, 1, seq_len]
src_mask = (src_idxes != tokenizer.src_token2idx.get("<PAD>")).unsqueeze(-2)
# tgt [batch_size, seq_len]
tgt_idxes = torch.LongTensor(tgt_idxes)
# tgt [batch_size, seq_len - 1] 去掉最后的 EOS
tgt_idxes_in = tgt_idxes[:, :-1]
# tgt_y [batch_size, seq_len - 1] 去掉开头的 SOS
tgt_idxes_out = tgt_idxes[:, 1:]
# tgt_mask [batch_size, seq_len-1, seq_len-1]
tgt_mask = tokenizer.make_std_mask(tgt_idxes_in, pad=tokenizer.tgt_token2idx.get("<PAD>"))
# 5、记录生成的有效字符数量
ntokens = (tgt_idxes_out != tokenizer.tgt_token2idx.get("<PAD>")).data.sum()
# 6、返回src, src_mask, tgt, tgt_mask, tgt_y, ntokens
return src_idxes, src_mask, tgt_idxes_in, tgt_mask, tgt_idxes_out, ntokens
获取数据集
# 传入回调函数,获取训练集
train_dataset = TransformerDataset(data_file="data.txt", tokenizer=tokenizer, part="train")
train_dataloader = DataLoader(dataset=train_dataset, shuffle=True, batch_size=32, collate_fn=lambda batch: collate_fn(batch, tokenizer))
# 传入回调函数,获取测试集
test_dataset = TransformerDataset(data_file="data.txt", tokenizer=tokenizer, part="test")
test_dataloader = DataLoader(dataset=test_dataset, shuffle=False, batch_size=32, collate_fn=lambda batch: collate_fn(batch, tokenizer))
3.4 模型构建
设备判断
# 判断用GPU还是CPU
device = "cuda:0" if torch.cuda.is_available() else "cpu"
位置编码
class PositionalEncoding(nn.Module):
"""
根据原始论文,生成固定的位置编码(不可学习的死码)
"""
def __init__(self, d_model, dropout, max_len=5000):
"""
初始化方法:
d_model:模型的维度,即每个位置编码的向量长度
dropout:Dropout的概率,用于防止过拟合
max_len:位置编码的最大长度,默认为5000,这表示可以处理的最大序列长度为5000
"""
super().__init__()
# 1、在位置编码后添加一个Dropout层,用于随机丢弃部分神经元,增强模型的泛化能力
self.dropout = nn.Dropout(p=dropout)
# 2、生成位置编码
# 2.1 初始化一个形状为(max_len, d_model)的零张量,用于存储位置编码
pe = torch.zeros(max_len, d_model)
# 2.2 生成一个从0到max_len-1的序列,并通过unsqueeze(1)将其扩展为二维张量,形状为(max_len, 1),这表示每个位置的索引
position = torch.arange(0, max_len).unsqueeze(1)
# 2.3 计算分母部分,用于调整正弦和余弦函数的频率
div_term = torch.exp(
torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
)
# 3、计算正弦和余弦编码
# 3.1 pe[:, 0::2]:选择pe中所有行的偶数列(即第0列、第2列、第4列等),填充为正弦值
# 3.2 pe[:, 1::2]:选择pe中所有行的奇数列(即第1列、第3列、第5列等),填充为余弦值
# 3.3 正弦和余弦函数的输入是position * div_term,其中position表示位置索引,div_term用于调整频率,这种设计使得位置编码能够捕捉到不同位置的唯一信息
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
# 4、将pe的形状从(max_len, d_model)扩展为(1, max_len, d_model),这样可以方便后续与输入张量x相加
pe = pe.unsqueeze(0)
# 5、将pe注册为模块的缓冲区(buffer)
# 缓冲区不会被优化器更新,但会随着模型的保存和加载而保存和加载,这使得pe在训练过程中保持固定,不会被学习(以此得到不可学习的死码)
self.register_buffer("pe", pe)
def forward(self, x):
"""
前向传播方法
x:输入张量,形状为(batch_size, seq_len, d_model)
"""
# 1、给输入添加位置编码
# self.pe[:, : x.size(1)]:从预计算的位置编码中提取与输入序列长度相同的部分(例如,如果输入序列长度为seq_len,则提取pe[:, :seq_len, :])
# requires_grad_(False):明确指定这部分位置编码不需要梯度计算,节省计算资源
x = x + self.pe[:, : x.size(1)].requires_grad_(False)
# 2、对添加了位置编码的张量应用Dropout,防止过拟合
return self.dropout(x)
向量化层
class Embedding(nn.Module):
"""
向量化层
"""
def __init__(self, d_model, dict_len, padding_idx):
"""
初始化方法:
d_model:嵌入向量的维度(即每个单词或符号的嵌入大小)
dict_len:字典的长度(即词汇表的大小),这表示嵌入矩阵中可以容纳的最大索引数
padding_idx:填充符号的索引(在处理序列数据时,通常会用一个特殊的符号(如 <pad>)来填充较短的序列,使其长度一致,padding_idx指定了这个填充符号的索引)
"""
super().__init__()
self.embed = nn.Embedding(num_embeddings=dict_len, embedding_dim=d_model, padding_idx=padding_idx)
self.d_model = d_model
def forward(self, x):
"""
前向传播方法
1、self.embed(x):将输入的索引张量 x 通过嵌入层 self.embed 处理,得到对应的嵌入向量
输出的嵌入张量形状为 (batch_size, seq_len, d_model)
2、self.d_model ** 0.5:将嵌入向量乘以“根号d_model”,目的是确保在点积操作中,嵌入向量的方差保持稳定,从而避免梯度消失问题
"""
return self.embed(x) * self.d_model ** 0.5
前馈网络层
class FeedForward(nn.Module):
"""
前馈网络层
先抽取注意力特征,再对特征做后处理(先升,再降)
"""
def __init__(self, d_model, d_ff, dropout=0.1):
"""
初始化方法
d_model:模型的维度,即输入和输出特征的维度
d_ff:前馈网络中间层的维度,通常比d_model大得多(例如,d_model=512,d_ff=2048)
dropout:Dropout的概率,用于防止过拟合,这里默认为0.1
"""
super().__init__()
# 1、升维:d_model-->d_ff
self.up_proj = nn.Linear(in_features=d_model, out_features=d_ff)
# 2、降维:d_ff-->d_model
self.down_proj = nn.Linear(in_features=d_ff, out_features=d_model)
# 3、dropout
self.dropout = nn.Dropout(dropout)
def forward(self, x):
"""
前向传播方法
"""
# 1、升维
x = self.up_proj(x)
# 2、激活
x = torch.relu(x)
# 3、降维
x = self.dropout(x)
# 4、dropout
x = self.down_proj(x)
return x
缩放点积注意力计算
def attention(query, key, value, mask=None, dropout=None):
"""
缩放点积注意力的计算
query: 查询向量
key: 键向量
value: 值向量
mask: 掩码
dropout:Dropout的概率
"""
# 1、获取 query 的最后一个维度的大小(即每个头的特征维度)
d_k = query.size(-1)
# 2、计算点积分数
# query.size(-1):获取 query 的最后一个维度的大小,即每个头的特征维度 d_k
# 例如,如果 query 的形状是 (batch_size, h, seq_len, d_k),那么 query.size(-1) 的值是 d_k
# key.transpose(-2, -1):将 key 的最后两个维度交换,将形状从 (batch_size, h, seq_len, d_k) 调整为 (batch_size, h, d_k, seq_len)
# 这样可以方便地进行矩阵乘法操作
# torch.matmul(query, key.transpose(-2, -1)):计算 query 和 key 的点积,得到注意力分数矩阵 scores
# 形状为 (batch_size, h, seq_len, seq_len),表示每个位置与其他位置的相似度
# / math.sqrt(d_k):对点积结果进行缩放,除以 sqrt(d_k)
# 这是为了防止点积结果过大,导致梯度消失或爆炸问题。缩放后的分数范围更加稳定
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
# 3、应用掩码
if mask is not None:
# 3.1 数据搬家
mask = mask.to(device=device)
# 3.2 使用 masked_fill 方法,将掩码中为 False 的位置的分数设置为一个非常小的值(如 -1e9),这样在计算 softmax 时,这些位置的注意力权重会接近于 0,从而实现屏蔽效果
scores = scores.masked_fill(mask == False, -1e9)
# 4、对每个头的分数矩阵 scores 应用 softmax 函数,计算注意力权重
p_attn = scores.softmax(dim=-1)
# 5、应用Dropout
if dropout is not None:
p_attn = dropout(p_attn)
# 6、返回加权值(包括加权特征和注意力权重)
# torch.matmul(p_attn, value):使用注意力权重 p_attn 对 value 进行加权求和,形状为 (batch_size, h, seq_len, d_k),表示每个位置的加权特征
# return torch.matmul(p_attn, value), p_attn:将加权后的特征和注意力权重都进行返回(加权后的特征用于后续的计算,注意力权重可以用于可视化或其他分析)
return torch.matmul(p_attn, value), p_attn
模块克隆
def clones(module, N):
"""
克隆
module: 一个 PyTorch 模块(nn.Module),表示要复制的单个模块。
N: 需要复制的模块数量。
"""
# copy.deepcopy(module):深复制 module(这意味着每个复制的模块都是独立的,它们的参数不会共享)
# 如果不使用深复制,则所有模块将共享相同的参数,这在大多数情况下不是我们想要的行为
return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
多头注意力
class MultiHeadedAttention(nn.Module):
"""
多头注意力
"""
def __init__(self, h, d_model, dropout=0.1):
"""
初始化方法
h:头的数量(number of heads),多头注意力机制将输入分成多个头,每个头处理一部分特征,最后再将它们合并
d_model:模型的维度,即输入和输出特征的维度
dropout:Dropout的概率,用于防止过拟合,这里默认为0.1
"""
super().__init__()
# 1、确保模型的维度可以被头的数量整除(如果不能,则抛出异常,不执行下面代码)
assert d_model % h == 0
# 2、每个头处理的特征维度(假设 d_model = 512,h = 8,那么每个头处理的特征维度为 512 // 8 = 64)
self.d_k = d_model // h
# 3、获取头的数量
self.h = h
# 4、参数层(Q, K, V, O)
# 4.1 qkv包含三个线性变换层,分别用于生成查询(Query)、键(Key)和值(Value),这里使用上面定义的clones函数来创建三个相同的线性层(N=3)
self.qkv = clones(module=nn.Linear(in_features=d_model, out_features=d_model), N=3)
# 4.2 最终的输出线性层,用于将多头注意力的结果合并后再次进行线性变换
self.out = nn.Linear(in_features=d_model, out_features=d_model)
# 5、初始化注意力权重为空
self.attn = None
# 6、dropout
self.dropout = nn.Dropout(p=dropout)
def forward(self, query, key, value, mask=None):
"""
前向传播方法
query: 查询向量,形状为 (batch_size, seq_len, d_model)
key: 键向量,形状为 (batch_size, seq_len, d_model)
value: 值向量,形状为 (batch_size, seq_len, d_model)
mask: 掩码,用于屏蔽某些位置的注意力权重,形状为 (batch_size, seq_len) 或 (batch_size, seq_len, seq_len)
"""
# 如果掩码不为空,则将掩码的维度扩展为 (batch_size, 1, seq_len),以便在多头注意力中应用到每个头
if mask is not None:
mask = mask.unsqueeze(1)
# 1、获取批量大小
nbatches = query.size(0)
# 2、将输入的 query、key 和 value 分别进行线性变换,并将它们分成多个头,以便在多头注意力机制中分别计算每个头的注意力权重
# (1)用for循环,通过zip(self.qkv, (query, key, value)) 生成一个迭代器,每次迭代返回一个 (lin, x) 对
# (2)对每个 lin 和 x 的组合进行操作:
# ①view将线性变换后的张量重塑为 (batch_size, seq_len, h, d_k)
# ②transpose将张量的第二个维度和第三个维度交换【将形状从 (batch_size, seq_len, h, d_k) 调整为 (batch_size, h, seq_len, d_k)】
query, key, value = [
lin(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
for lin, x in zip(self.qkv, (query, key, value))
]
# 3、调用上面定义的attention函数,计算注意力权重和加权后的值
x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)
# 4、合并头
# x.transpose(1, 2): 将张量的维度从 (batch_size, h, seq_len, d_k) 调整为 (batch_size, seq_len, h, d_k)
# contiguous().view(nbatches, -1, self.h * self.d_k): 将张量重塑为 (batch_size, seq_len, d_model),即将所有头的特征合并
# self.out(x): 最终通过输出线性层,得到多头注意力的结果
x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k)
# 5、释放内存
del query
del key
del value
# 6、通过线性转换后输出
return self.out(x)
层归一化模块
class LayerNorm(nn.Module):
"""
层归一化模块
"""
def __init__(self, features, eps=1e-6):
"""
初始化方法
features: 模型的维度,即 d_model,表示每个位置的特征维度
eps: 一个非常小的值,用于防止除以零的情况。默认值为 1e-6
"""
super().__init__()
# 创建一个形状为 features 的可训练参数,初始值为1,作为权重参数
self.weight = nn.Parameter(torch.ones(features))
# 创建一个形状为 features 的可训练参数,初始值为0,作为偏置参数
self.bias = nn.Parameter(torch.zeros(features))
self.eps = eps
def forward(self, x):
"""
前向传播方法
"""
# 计算输入张量 x 在特征维度(最后一个维度)上的均值
mean = x.mean(-1, keepdim=True)
# 计算输入张量 x 在特征维度(最后一个维度)上的标准差
std = x.std(-1, keepdim=True)
# 做标准化
x = (x - mean) / (std + self.eps)
# 对标准化后的张量 x 进行缩放和偏移(先乘权重,再加偏置),然后进行输出
return self.weight * x + self.bias
残差模块
class SublayerConnection(nn.Module):
"""
残差连接模块
"""
def __init__(self, size, dropout):
"""
初始化方法
size: 模型的维度,即 d_model,表示每个位置的特征维度
dropout: Dropout概率,用于正则化
"""
super().__init__()
# 层归一化(LayerNorm)模块,用于对输入进行归一化处理
self.norm = LayerNorm(size)
# dropout
self.dropout = nn.Dropout(dropout)
def forward(self, x, sublayer):
"""
前向传播方法
x: 输入张量,形状为 (batch_size, seq_len, d_model)
sublayer: 一个函数,表示子层的操作。这个函数可以是自注意力(Self-Attention)或前馈网络(Feed-Forward Network)等
"""
# 残差连接及输出
return x + self.dropout(sublayer(self.norm(x)))
编码器
class EncoderLayer(nn.Module):
"""
编码器层
"""
def __init__(self, size, self_attn, feed_forward, dropout):
"""
初始化方法
size: 模型的维度,即 d_model,表示每个位置的特征维度
self_attn: 自注意力机制(Self-Attention),用于解码器内部的注意力计算
feed_forward: 前馈网络(Feed-Forward Network),用于对解码器的输出进行进一步处理
dropout: Dropout概率,用于正则化
"""
super().__init__()
self.self_attn = self_attn
self.feed_forward = feed_forward
# sublayer_cons包含2个子层连接模块(SublayerConnection),用于在每个子层后添加残差连接和层归一化
self.sublayer_cons = clones(SublayerConnection(size, dropout), 2)
self.size = size
def forward(self, x, mask):
"""
前向传播方法
x: 解码器的输入,形状为 (batch_size, tgt_seq_len, d_model)
mask: 掩码,用于屏蔽某些位置的注意力权重,形状为 (batch_size, 1, seq_len) 或 (batch_size, seq_len, seq_len)
"""
# 1、自注意力计算
x = self.sublayer_cons[0](x, lambda x: self.self_attn(x, x, x, mask))
# 2、前馈网络计算及输出
return self.sublayer_cons[1](x, self.feed_forward)
class Encoder(nn.Module):
"""
编码器
"""
def __init__(self, layer, N):
"""
初始化方法
layer: 一个编码器层(EncoderLayer)的实例,表示单个编码器层的结构
N: 编码器中包含的编码器层的数量
"""
super().__init__()
# 包含 N 个编码器层的模块列表
self.layers = clones(layer, N)
# 层归一化模块,用于对编码器的最终输出进行归一化处理
self.norm = LayerNorm(layer.size)
def forward(self, x, mask):
"""
前向传播方法
x: 解码器的输入,形状为 (batch_size, tgt_seq_len, d_model)
mask: 掩码,用于屏蔽某些位置的注意力权重,形状为 (batch_size, 1, seq_len) 或 (batch_size, seq_len, seq_len)
"""
# 1、遍历layers
for layer in self.layers:
# 将当前的编码器输入 x 和掩码 mask 传递给当前的编码器层
# 每个编码器层的输出会作为下一个编码器层的输入
x = layer(x, mask)
# 2、在所有编码器层处理完成后,对最终的输出 x 进行层归一化处理,并将处理后的结果进行输出
return self.norm(x)
解码器
class DecoderLayer(nn.Module):
"""
解码器层
"""
def __init__(self, size, self_attn, cross_attn, feed_forward, dropout):
"""
初始化方法
size: 模型的维度,即 d_model,表示每个位置的特征维度
self_attn: 自注意力机制(Self-Attention),用于解码器内部的注意力计算
cross_attn: 交叉注意力机制(Cross-Attention),用于解码器和编码器之间的注意力计算
feed_forward: 前馈网络(Feed-Forward Network),用于对解码器的输出进行进一步处理
dropout: Dropout概率,用于正则化
"""
super().__init__()
self.size = size
self.self_attn = self_attn
self.cross_attn = cross_attn
self.feed_forward = feed_forward
# sublayer_cons包含3个子层连接模块(SublayerConnection),用于在每个子层后添加残差连接和层归一化
self.sublayer_cons = clones(SublayerConnection(size, dropout), 3)
def forward(self, x, memory, src_mask, tgt_mask):
"""
前向传播方法
x: 解码器的输入,形状为 (batch_size, tgt_seq_len, d_model)
memory: 编码器的输出,形状为 (batch_size, src_seq_len, d_model)
src_mask: 编码器的掩码,用于屏蔽编码器中的某些位置
tgt_mask: 解码器的掩码,用于屏蔽解码器中的某些位置(如未来的序列位置)
"""
m = memory
# 1、自注意力计算
x = self.sublayer_cons[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
# 2、交叉注意力计算
x = self.sublayer_cons[1](x, lambda x: self.cross_attn(x, m, m, src_mask))
# 3、前馈网络计算及输出
return self.sublayer_cons[2](x, self.feed_forward)
class Decoder(nn.Module):
"""
解码器
"""
def __init__(self, layer, N):
"""
初始化方法
layer: 一个解码器层(DecoderLayer)的实例,表示单个解码器层的结构
N: 解码器中包含的解码器层的数量
"""
super().__init__()
# 包含 N 个解码器层的模块列表
self.layers = clones(layer, N)
# 层归一化模块,用于对解码器的最终输出进行归一化处理
self.norm = LayerNorm(layer.size)
def forward(self, x, memory, src_mask, tgt_mask):
"""
前向传播方法
x: 解码器的输入,形状为 (batch_size, tgt_seq_len, d_model)
memory: 编码器的输出,形状为 (batch_size, src_seq_len, d_model)
src_mask: 编码器的掩码,用于屏蔽编码器中的某些位置
tgt_mask: 解码器的掩码,用于屏蔽解码器中的某些位置(如未来的序列位置)
"""
# 1、遍历layers
for layer in self.layers:
# 将当前的解码器输入 x、编码器的输出 memory、编码器掩码 src_mask 和解码器掩码 tgt_mask 传递给当前的解码器层
# 备注:每个解码器层的输出会作为下一个解码器层的输入
x = layer(x, memory, src_mask, tgt_mask)
# 2、在所有解码器层处理完成后,对最终的输出 x 进行层归一化处理,并将处理后的结果进行输出
return self.norm(x)
掩码函数
def subsequent_mask(size):
"""
定义生成掩码的函数,用于生成掩码,以屏蔽(mask out)后续位置的注意力权重
这种掩码通常用于解码器(Decoder)中,以确保在解码过程中,模型只能看到当前位置及之前的位置,而不能看到未来的位置
size: 序列的长度(seq_len),表示掩码的大小
"""
# 定义掩码的形状为 (1, size, size)。这里的 1 表示批量大小(batch size),size 表示序列长度,掩码的形状与注意力权重矩阵的形状一致
attn_shape = (1, size, size)
# 生成由0和1组成的掩码
# torch.ones(attn_shape): 创建一个形状为 (1, size, size) 的全1张量
# torch.triu(..., diagonal=1): 提取上三角矩阵(包括对角线上的元素),并将上三角部分的值设置为1,其余部分设置为0(diagonal=1 表示从对角线的下一个元素开始提取)
# .type(torch.uint8): 将张量的数据类型转换为 torch.uint8(无符号8位整数)(这一步是为了节省内存,因为掩码只需要表示0和1)
subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(torch.uint8)
# 将上三角矩阵中的值转换为布尔值并进行输出【上三角部分(包括对角线上的元素)为 False,其余部分为 True】
return subsequent_mask == 0
概率分布输出
class Generator(nn.Module):
"""
将解码器的输出转换为概率分布
"""
def __init__(self, d_model, dict_len):
"""
初始化方法
d_model: 模型的维度,即解码器输出的特征维度
dict_len: 字典的长度,即输出词汇表的大小,这决定了最终输出的概率分布的大小
"""
super().__init__()
# proj是线性层,用于将解码器的输出投影到词汇表的大小上
self.proj = nn.Linear(in_features=d_model, out_features=dict_len)
def forward(self, x):
"""
前向传播方法
"""
# 对线性层的输出应用 log_softmax 函数,计算每个位置的概率分布,并进行输出
return log_softmax(self.proj(x), dim=-1)
编码器-解码器架构
class EncoderDecoder(nn.Module):
"""
定义一个标准的编码器-解码器(Encoder-Decoder)架构
"""
def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
"""
初始化方法
encoder: 编码器模块,用于处理源序列
decoder: 解码器模块,用于生成目标序列
src_embed: 源序列嵌入模块,用于将源序列的单词或标记转换为向量表示
tgt_embed: 目标序列嵌入模块,用于将目标序列的单词或标记转换为向量表示
generator: 生成器模块,用于将解码器的输出转换为概率分布,通常用于生成最终的输出
"""
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.src_embed = src_embed
self.tgt_embed = tgt_embed
self.generator = generator
def encode(self, src, src_mask):
"""
编码方法
src: 源序列,形状为 (batch_size, src_seq_len)
src_mask: 源序列的掩码
"""
# self.src_embed(src):将源序列 src 通过源序列嵌入模块 self.src_embed,将单词或标记转换为向量表示
# 输出的形状为 (batch_size, src_seq_len, d_model)
# self.encoder(...):将嵌入后的源序列和掩码传递给编码器模块 self.encoder,生成上下文表示(memory)
# 输出的形状为 (batch_size, src_seq_len, d_model)
return self.encoder(self.src_embed(src), src_mask)
def decode(self, memory, src_mask, tgt, tgt_mask):
"""
解码方法
memory: 编码器的输出,形状为 (batch_size, src_seq_len, d_model)
src_mask: 源序列的掩码
tgt: 目标序列,形状为 (batch_size, tgt_seq_len)
tgt_mask: 目标序列的掩码
"""
# self.tgt_embed(tgt):将目标序列 tgt 通过目标序列嵌入模块 self.tgt_embed,将单词或标记转换为向量表示
# 输出的形状为 (batch_size, tgt_seq_len, d_model)
# self.decoder(...):将嵌入后的目标序列、编码器的输出(memory)、源序列掩码和目标序列掩码传递给解码器模块 self.decoder,生成最终的输出
# 输出的形状为 (batch_size, tgt_seq_len, d_model)
return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)
def forward(self, src, tgt, src_mask, tgt_mask):
"""
前向传播方法
src: 源序列,形状为 (batch_size, src_seq_len)
tgt: 目标序列,形状为 (batch_size, tgt_seq_len)
src_mask: 源序列的掩码,用于屏蔽某些位置的注意力权重
tgt_mask: 目标序列的掩码,用于屏蔽某些位置的注意力权重
"""
# self.encode(src, src_mask):调用 encode 方法,对源序列 src 进行编码,生成上下文表示(memory)
# src_mask 用于在编码过程中屏蔽某些位置的注意力权重
# self.decode(memory, src_mask, tgt, tgt_mask):调用 decode 方法,将编码器的输出(memory)和目标序列 tgt 传递给解码器,生成最终的输出
# src_mask 和 tgt_mask 分别用于在解码过程中屏蔽源序列和目标序列中的某些位置的注意力权重
# 输出的形状为 (batch_size, tgt_seq_len, dict_len),表示每个位置对词汇表中每个词的对数概率分布
return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)
transformer模型
def get_model(tokenizer, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1):
"""
构建一个完整的Transformer模型
tokenizer: 一个Tokenizer对象,包含源语言和目标语言的词典大小以及特殊标记(如<PAD>)的索引
N: 编码器和解码器的层数,默认为6
d_model: 模型的维度,默认为512
d_ff: 前馈网络(Feed-Forward Network)的维度,默认为2048
h: 多头注意力(Multi-Head Attention)的头数,默认为8
dropout: Dropout概率,用于正则化,默认为0.1
"""
# 1、定义一个深拷贝函数 c,用于创建模块的副本,这在构建编码器和解码器时非常有用,因为它们需要多个相同的层
c = copy.deepcopy
# 2、调用上面定义的MultiHeadedAttention类,创建一个多头注意力模块 attn,用于编码器和解码器中的自注意力和交叉注意力
attn = MultiHeadedAttention(h, d_model, dropout)
# 3、调用上面定义的FeedForward类,创建一个前馈网络模块 ff,用于编码器和解码器中的非线性变换
ff = FeedForward(d_model, d_ff, dropout)
# 4、调用上面定义的PositionalEncoding类,创建一个位置编码模块 position(不可学习的死码),用于将位置信息添加到嵌入向量中
position = PositionalEncoding(d_model, dropout)
# 5、模型构建
model = EncoderDecoder(
# 5.1 调用上面定义的Encoder类,创建一个编码器模块,包含 N 个编码器层,每个编码器层由自注意力模块和前馈网络模块组成
# 使用 c(attn) 和 c(ff) 创建多个相同的自注意力和前馈网络模块
encoder=Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),
# 5.2 调用上面定义的Decoder类,创建一个解码器模块,包含 N 个解码器层,每个解码器层由自注意力模块、交叉注意力模块和前馈网络模块组成
# 使用 c(attn) 和 c(ff) 创建多个相同的自注意力和前馈网络模块
decoder=Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N),
# 5.3 创建源序列嵌入模块,包含嵌入层和位置编码模块
# 嵌入层的维度为 d_model,词汇表大小为 tokenizer.src_dict_len
# 使用 padding_idx 参数指定 <PAD> 标记的索引,以便在嵌入时忽略这些位置
src_embed=nn.Sequential(Embedding(d_model=d_model,
dict_len=tokenizer.src_dict_len,
padding_idx=tokenizer.src_token2idx.get("<PAD>")),
position),
# 5.4 创建目标序列嵌入模块,包含嵌入层和位置编码模块
# 嵌入层的维度为 d_model,词汇表大小为 tokenizer.tgt_dict_len
# 使用 padding_idx 参数指定 <PAD> 标记的索引,以便在嵌入时忽略这些位置
tgt_embed=nn.Sequential(Embedding(d_model=d_model,
dict_len=tokenizer.tgt_dict_len,
padding_idx=tokenizer.tgt_token2idx.get("<PAD>")),
position),
# 5.5 调用上面定义的Generator类,创建一个生成器模块,用于将解码器的输出转换为概率分布
generator=Generator(d_model=d_model, dict_len=tokenizer.tgt_dict_len)
)
# 6、输出模型
return model
推理测试
def inference_test(tokenizer):
"""
推理测试
"""
# 1、函数定义和模型初始化
# 1.1 使用 get_model 函数创建一个Transformer模型,这里设置编码器和解码器的层数为2
model = get_model(tokenizer=tokenizer, N=2)
# 1.2 数据搬家
model.to(device=device)
# 1.3 模型开启评估模式
model.eval()
# 1.4 打印查看模型结构
print(model)
# 2、准备输入数据
# 2.1 禁用梯度计算,减少内存占用,提高计算速度
with torch.no_grad():
# 2.2 创建一个形状为 [1, 10] 的源序列张量,表示一个长度为10的序列,并进行数据搬家
src = torch.LongTensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]).to(device=device)
# 2.3 创建一个形状为 [1, 1, 10] 的掩码张量,所有位置都为1,表示没有位置被屏蔽
src_mask = torch.ones(1, 1, 10)
# 2.4 调用模型的 encode 方法,将源序列和掩码传递给编码器,生成上下文表示(memory)
memory = model.encode(src, src_mask)
# 3、逐步生成目标序列
# 3.1 创建一个形状为 [1, 1] 的目标序列张量,初始值为0,并进行数据搬家
ys = torch.zeros(1, 1).type_as(src).to(device=device)
# 3.2 循环9次,逐步生成目标序列的每个单词
# 目标序列 ys 的初始长度是1(torch.zeros(1, 1)),而目标序列的最终长度是10,所以需要循环9次,生成9个额外的单词(10-1=9)
for _ in range(9):
# 3.2.1 调用模型的 decode 方法,将编码器的输出(memory)、源序列掩码、当前目标序列(ys)和目标序列掩码传递给解码器,生成解码器的输出
out = model.decode(
memory=memory,
src_mask=src_mask,
tgt=ys,
# 创建一个形状为 [1, ys.size(1), ys.size(1)] 的目标序列掩码,用于屏蔽后续位置的注意力权重
# 使用上面定义的 subsequent_mask 函数生成掩码,并将其数据类型转换为与 src 相同
tgt_mask=subsequent_mask(ys.size(1)).type_as(src.data)
)
# 3.2.2 调用生成器模块,将解码器的输出转换为概率分布(out[:, -1] 表示取解码器输出的最后一个时间步的特征)
prob = model.generator(out[:, -1])
# 3.2.3 使用 torch.max 找到概率分布中概率最大的单词索引(next_word 是下一个单词的索引)
_, next_word = torch.max(prob, dim=1)
# 3.2.4 获取下一个单词的索引值
next_word = next_word.data[0]
# 3.2.5 将下一个单词的索引添加到目标序列 ys 中(使用 torch.cat 将新的单词索引拼接到 ys 的末尾)
ys = torch.cat(
# torch.empty(1, 1):创建一个形状为 [1, 1] 的空张量
# .type_as(src.data):将该张量的数据类型和设备设置为与 src 相同。这确保了新生成的张量与 ys 的数据类型和设备一致
# .fill_(next_word):将新生成的单词索引 next_word 填充到上述创建的张量中
# next_word 是一个标量(0维张量),表示下一个单词的索引
# torch.cat(..., dim=1):使用 torch.cat 沿着第二个维度(dim=1)将列表中的张量拼接起来
# 这样,新生成的单词索引会被添加到 ys 的末尾,形成一个新的目标序列
[ys, torch.empty(1, 1).type_as(src.data).fill_(next_word)], dim=1
)
# 4、打印生成的结果
print("结果:", ys)
# 调用推理测试函数
inference_test(tokenizer=tokenizer)
3.5 utils
学习率调度函数
def rate(step, model_size, factor, warmup):
"""
学习率调度函数
step: 当前训练步数
model_size: 模型的维度(d_model),通常是一个整数,例如512
factor: 一个缩放因子,用于调整学习率的大小
warmup: 预热步数,表示学习率线性增加的阶段长度
"""
# 在训练的初始阶段,step 可能为0,为了避免0的负幂运算导致错误,将 step 的最小值设置为1
if step == 0:
step = 1
# 计算学习率并进行输出
# model_size ** (-0.5):计算模型维度的负平方根。这部分是一个常数,表示模型大小对学习率的影响
# min(step ** (-0.5), step * warmup ** (-1.5)):计算两个值中的较小值
# step ** (-0.5): 当前步数的负平方根
# step * warmup ** (-1.5): 当前步数乘以预热步数的负1.5次幂
# 这种设计使得学习率在预热阶段线性增加,之后逐渐减小
# factor * ...:使用一个缩放因子 factor 调整学习率的大小。这个因子可以根据需要进行调整,以控制学习率的整体水平
return factor * (
model_size ** (-0.5) * min(step ** (-0.5), step * warmup ** (-1.5))
)
标签平滑
class LabelSmoothing(nn.Module):
"""
标签平滑类
标签平滑是一种正则化技术,通过在训练过程中对目标标签进行平滑处理,使模型对目标标签的预测更加平滑,从而提高模型的泛化能力
"""
def __init__(self, size, padding_idx, smoothing=0.0):
"""
初始化方法
size: 词汇表的大小,即目标标签的类别数
padding_idx: 填充标记(如 <PAD>)的索引,用于在计算损失时忽略这些位置
smoothing: 平滑参数,控制标签平滑的程度。默认值为0.0,表示不进行平滑
"""
super().__init__()
# 使用 nn.KLDivLoss 作为损失函数,计算预测分布与目标分布之间的KL散度
self.criterion = nn.KLDivLoss(reduction="sum")
# 填充标记的索引
self.padding_idx = padding_idx
# 目标标签的置信度,计算为 1.0 - smoothing
self.confidence = 1.0 - smoothing
# 平滑参数
self.smoothing = smoothing
# 词汇表的大小
self.size = size
# 用于存储目标分布的变量
self.true_dist = None
def forward(self, x, target):
"""
前向传播方法
x: 模型的输出,形状为 (batch_size, seq_len, size),表示每个位置对词汇表中每个词的预测概率分布
target: 目标标签,形状为 (batch_size, seq_len),表示每个位置的真实标签索引
"""
# 1、确保模型的输出维度与词汇表大小一致
assert x.size(1) == self.size
# 2、创建一个与模型输出 x 形状相同的张量,用于存储目标分布
true_dist = x.data.clone()
# 3、将目标分布的所有元素初始化为 self.smoothing / (self.size - 2),这里减去2是因为要去掉填充标记和可能的其他特殊标记(如 <SOS> 和 <EOS>)
true_dist.fill_(self.smoothing / (self.size - 2))
# 4、将目标标签的置信度设置到目标分布中的特定位置
# scatter_(dim, index, src) 方法的工作原理是:根据 index 中的索引,将 src 中的值更新到目标张量的指定位置
# 即:
# 对于目标张量 target,scatter_ 会遍历 index 中的每个索引值
# 根据 index 中的索引值,将 src 中对应的值更新到 target 的指定位置
true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
# 5、将目标分布中填充标记的位置设置为0,忽略这些位置的损失
true_dist[:, self.padding_idx] = 0
# 6、找到目标标签中填充标记的位置
mask = torch.nonzero(target.data == self.padding_idx)
# 7、如果存在填充标记,将目标分布中这些位置的值设置为0
if mask.dim() > 0:
true_dist.index_fill_(0, mask.squeeze(), 0.0)
# 8、将目标分布存储在 self.true_dist 中,便于后续调试或分析
self.true_dist = true_dist
# 9、使用 nn.KLDivLoss 计算模型输出 x 与目标分布 true_dist 之间的KL散度损失【true_dist.clone().detach() 确保目标分布不会参与梯度计算】
return self.criterion(x, true_dist.clone().detach())
执行训练步骤及损失计算
class SimpleLossCompute(object):
"""
损失计算及训练步骤执行
"""
def __init__(self, generator, criterion):
"""
初始化方法
generator: 一个生成器模块,通常是一个线性层后接softmax函数,用于将模型的输出转换为概率分布
criterion: 一个损失函数,用于计算模型输出与目标标签之间的损失
"""
self.generator = generator
self.criterion = criterion
def __call__(self, x, y, norm):
"""
__call__ 方法的主要作用是使类的实例可以像函数一样被调用(调用时就执行__call__ 方法)
x: 模型的输出,形状为 (batch_size, seq_len, d_model)
y: 目标标签,形状为 (batch_size, seq_len)
norm: 一个标量,用于标准化损失值,通常用于处理不同批次大小或序列长度的情况
"""
# 1、将模型的输出 x 通过生成器模块 self.generator,将特征维度从 d_model 转换为词汇表大小,生成概率分布
# 输出的形状为 (batch_size, seq_len, vocab_size)
x = self.generator(x)
# 2、标准化损失值
# x.contiguous().view(-1, x.size(-1)):
# 使用 contiguous() 确保张量在内存中是连续的,这对于某些操作(如 view)是必要的
# 使用 view(-1, x.size(-1)) 将张量重塑为 (batch_size * seq_len, vocab_size),以便计算损失
# y.contiguous().view(-1):将目标标签 y 重塑为 (batch_size * seq_len),以便与模型输出的形状匹配
# self.criterion(...):使用损失函数 self.criterion 计算模型输出与目标标签之间的损失。
# 损失函数的输入是模型输出的概率分布和目标标签的索引。
# / norm:将计算得到的损失值除以 norm,进行标准化处理。
sloss = (
self.criterion(x.contiguous().view(-1, x.size(-1)), y.contiguous().view(-1))
/ norm
)
# 3、返回最终的损失值和标准化后的损失张量
# loss.data * norm:将标准化后的损失值乘以 norm,得到最终的损失值。sloss.data 是一个标量,表示损失值。
return sloss.data * norm, sloss
贪心解码
def greedy_decode(model, src, src_mask, max_len, start_symbol):
"""
贪心解码(每次选择当前步骤中概率最大的单词作为下一个单词)
model: Transformer模型实例,包含编码器和解码器
src: 源序列,形状为 (batch_size, src_seq_len)
src_mask: 源序列的掩码,形状为 (batch_size, 1, src_seq_len)
max_len: 生成的目标序列的最大长度
start_symbol: 起始符号的索引,通常是一个特殊的标记(如 <SOS>)
"""
# 1、调用模型的 encode 方法,将源序列 src 和源序列掩码 src_mask 传递给编码器,生成中间表示(memory)。
# memory 的形状为 (batch_size, src_seq_len, d_model)
memory = model.encode(src, src_mask)
# 2、创建一个形状为 (1, 1) 的张量,填充为 start_symbol,表示目标序列的起始符号
# 使用 type_as(src.data) 确保 ys 的数据类型和设备与 src 一致
ys = torch.zeros(1, 1).fill_(start_symbol).type_as(src.data)
# 3、自回归式生成(目标序列的初始长度已经为1(包含起始符号 <SOS>),因此,只需要再生成 max_len - 1 个单词,以达到目标序列的最大长度 max_len即可)
for _ in range(max_len - 1):
# 3.1 调用模型的 decode 方法,将编码器的输出(memory)、源序列掩码(src_mask)、当前目标序列(ys)和目标序列掩码传递给解码器。
# 目标序列掩码使用 subsequent_mask(ys.size(1)).type_as(src.data) 生成,以屏蔽后续位置的注意力权重
out = model.decode(
memory, src_mask, ys, subsequent_mask(ys.size(1)).type_as(src.data)
)
# 3.2 使用生成器模块将解码器的输出转换为概率分布,out[:, -1] 表示取解码器输出的最后一个时间步的特征结果
prob = model.generator(out[:, -1])
# 3.3 使用 torch.max 找到概率分布中概率最大的单词索引
_, next_word = torch.max(prob, dim=1)
# 3.4 获取下一个单词的索引值
next_word = next_word.data[0]
# 3.5 将新生成的单词索引 next_word 添加到目标序列 ys 的末尾
ys = torch.cat(
[ys, torch.zeros(1, 1).type_as(src.data).fill_(next_word)], dim=1
)
# 4、返回生成的目标序列
return ys
预测结果转真实结果
def get_real_output(y, tokenizer):
"""
将模型的预测结果(索引形式)转换为真实的结果(单词或标记形式),并去除特殊标记(如 <EOS> 和 <PAD>)
y: 模型的预测结果,形状为 (batch_size, seq_len),表示每个位置的单词索引
tokenizer: 一个Tokenizer对象,包含目标语言的词典和特殊标记的索引
"""
# 1、将预测结果 y 转换为 Python 列表
y = y.tolist()
# 2、初始化一个空列表,用于存储原始的预测结果(包括 <EOS> 和 <PAD>)
raw_results = []
# 3、初始化一个空列表,用于存储最终结果(去除 <EOS> 和 <PAD>)
final_results = []
# 4、遍历每个预测序列,将预测结果转换为单词
for s in y:
# 使用列表推导式,将每个索引 idx 转换为对应的单词或标记,并将转换后的单词列表添加到 raw_results 中
raw_results.append(
[tokenizer.tgt_idx2token.get(idx) for idx in s]
)
# 5、遍历每个预测序列,去除 <EOS> <PAD>
for s in y:
# 5.1 初始化一个空列表,用于存储当前序列的最终结果
result = []
# 5.2 从序列中遍历每个索引 idx
for idx in s:
# 5.2.1 如果当前索引是 <EOS>,则直接结束当前序列的处理
if idx == tokenizer.tgt_token2idx.get("<EOS>"):
break
# 5.2.2 如果当前索引是 <PAD>,跳过当前索引
elif idx == tokenizer.tgt_token2idx.get("<PAD>"):
continue
# 5.2.3 将当前索引对应的单词或标记添加到 result 中
result.append(tokenizer.tgt_idx2token.get(idx))
# 5.3 将当前序列的最终结果添加到 final_results 中
final_results.append(result)
# 6、返回原始的预测结果 raw_results 和去除 <EOS> 和 <PAD> 后的最终结果 final_results
return raw_results, final_results
跟踪监控
class TrainState(object):
"""
提供一个简单的结构,用于跟踪训练过程中的各种状态信息
这些信息对于监控训练进度、调整训练策略(如学习率调度)和记录训练日志非常有用
"""
# step:当前训练步骤(在当前 epoch 中的步骤数),每次调用模型的前向传播和反向传播时,step 会增加
step: int = 0
# accum_step:梯度累积步骤数,在某些情况下,为了减少内存占用或提高训练稳定性,会使用梯度累积技术,accum_step 用于跟踪这些步骤
accum_step: int = 0
# samples:总共使用的样本数,每次处理一个批次的样本时,samples 会增加
samples: int = 0
# tokens:总共处理的标记(tokens)数,在自然语言处理任务中,每个单词或子词(subword)通常被视为一个标记,tokens 用于跟踪这些标记的总数
tokens: int = 0
评估/训练一轮
def run_epoch(
data_iter,
model,
loss_compute,
optimizer,
scheduler,
mode="train",
accum_iter=1,
train_state=TrainState(),
device="cpu"
):
"""
训练/评估一轮
data_iter: 数据迭代器,用于生成训练或评估数据
model: 模型实例,用于前向传播和生成输出
loss_compute: 损失计算函数,用于计算模型输出与目标之间的损失
optimizer: 优化器,用于更新模型参数
scheduler: 学习率调度器,用于调整学习率
mode: 模式,可以是 "train"、"train+log" 或 "eval",这里默认为 "train"
accum_iter: 梯度累积步数,默认为1(用于在多个前向传播步骤中累积梯度,然后进行一次反向传播)
train_state: 训练状态对象,用于跟踪训练过程中的各种状态信息
device: 设备,可以是 "cpu" 或 "cuda",这里默认为 "cpu"
处理逻辑:
初始化变量:记录训练开始的时间,初始化总标记数、总损失和当前批次标记数
数据迭代:遍历数据迭代器,获取每个批次的数据
数据移动到指定设备:将数据移动到指定的设备
前向传播:调用模型的 forward 方法,生成输出
损失计算:计算模型输出与目标标签之间的损失
反向传播和优化:在训练模式下,进行反向传播、优化器更新和学习率调整
记录和打印日志:定期打印训练日志,记录训练进度
清理变量:删除损失值和损失张量,释放内存
返回结果:返回平均损失和训练状态对象
"""
# 记录训练开始的时间
start = time.time()
# 初始化总共处理的标记数
total_tokens = 0
# 初始化总损失
total_loss = 0
# 初始化当前批次处理的标记数
tokens = 0
# 初始化梯度累积的步数
n_accum = 0
for i, (src, src_mask, tgt, tgt_mask, tgt_y, ntokens) in enumerate(data_iter):
"""
遍历数据迭代器 data_iter,获取每个批次的数据
每个批次的数据包括
src: 源序列
src_mask: 源序列的掩码
tgt: 目标序列
tgt_mask: 目标序列的掩码
tgt_y: 目标序列的真实标签
ntokens: 当前批次的标记数
"""
# 将源序列、目标序列和目标标签移动到指定的设备(CPU/GPU)
src = src.to(device=device)
tgt = tgt.to(device=device)
tgt_y = tgt_y.to(device=device)
# 调用模型的 forward 方法,将源序列、目标序列、源序列掩码和目标序列掩码传递给模型,得到模型的输出
out = model.forward(src, tgt, src_mask, tgt_mask)
# 使用损失计算函数 loss_compute 计算模型输出与目标标签之间的损失(loss 是损失值,loss_node 是损失张量,用于反向传播)
loss, loss_node = loss_compute(out, tgt_y, ntokens)
# 反向传播和优化
if mode == "train" or mode == "train+log":
# 对损失张量进行反向传播,计算梯度
loss_node.backward()
# 更新训练步骤数
train_state.step += 1
# 更新处理的样本数
train_state.samples += src.shape[0]
# 更新处理的标记数
train_state.tokens += ntokens
# 如果当前步数是梯度累积步数的倍数,执行优化器的更新步骤
if i % accum_iter == 0:
# 更新模型参数
optimizer.step()
# 清零梯度
optimizer.zero_grad(set_to_none=True)
# 更新梯度累积步数
n_accum += 1
# 更新梯度累积步骤数
train_state.accum_step += 1
# 调整学习率
scheduler.step()
# 总共处理的标记数
total_tokens += ntokens
# 总损失
total_loss += loss
# 当前批次处理的标记数
tokens += ntokens
# 每40个批次打印一次训练日志,包括:
# 当前步数
# 梯度累积步数
# 当前批次的损失
# 每秒处理的标记数
# 当前学习率
if i % 40 == 1 and (mode == "train" or mode == "train+log"):
# 当前学习率
lr = optimizer.param_groups[0]["lr"]
# 每秒处理的标记数
elapsed = time.time() - start
print(
(
"Epoch Step: %6d | Accumulation Step: %3d | Loss: %6.2f "
+ "| Tokens / Sec: %7.1f | Learning Rate: %6.1e"
)
% (i, n_accum, loss / ntokens, tokens / elapsed, lr)
)
# 记录训练开始的时间
start = time.time()
# 初始化当前批次处理的标记数
tokens = 0
# 删除损失值和损失张量,释放内存
del loss
del loss_node
# 返回平均损失和训练状态对象
return total_loss / total_tokens, train_state
3.6 模型调用
class Translation(object):
"""
定义一个 Translation 类,用于处理机器翻译任务(包括模型的初始化、训练和推理功能)
"""
def __init__(self, data_file, device, tokenizer, train_dataloader, test_dataloader):
"""
初始化方法
data_file:数据文件路径
device:设备(CPU/GPU)
tokenizer:分词器实例
"""
# 数据文件路径
self.file = data_file
# 设备(CPU/GPU)
self.device = device
# 分词器实例
self.tokenizer = tokenizer
# 训练集
self.train_dataloader = train_dataloader
# 测试集
self.test_dataloader = test_dataloader
# Transformer模型实例
self.model = self._get_model(tokenizer=self.tokenizer, N=2)
# 训练的总轮次
self.epochs = 20
# 优化器,用于更新模型参数
self.optimizer = torch.optim.Adam(
params=self.model.parameters(),
lr=0.5,
betas=(0.9, 0.98),
eps=1e-9
)
# 学习率调度器,用于调整学习率
self.lr_scheduler = LambdaLR(
optimizer=self.optimizer,
lr_lambda=lambda step: rate(
step, model_size=self.model.src_embed[0].d_model, factor=1.0, warmup=400
),
)
# 损失函数,用于计算模型输出与目标之间的损失
self.criterion = LabelSmoothing(size=self.tokenizer.tgt_dict_len,
padding_idx=self.tokenizer.tgt_token2idx.get("<PAD>"),
smoothing=0.1)
# 损失计算对象,封装了生成器和损失函数
self.loss_compute = SimpleLossCompute(self.model.generator, self.criterion)
def _get_model(self, tokenizer, N):
"""
模型获取
"""
# 调用上面定义的Transformer模型方法(get_model)获取模型
model = get_model(tokenizer=tokenizer, N=N)
# 如果存在模型参数文件,则进行加载
if os.path.exists(path="./.cache/model.pt"):
print("加载预训练模型...")
model.load_state_dict(state_dict=torch.load(f="./.cache/model.pt", weights_only=True))
print("加载本地模型成功")
# 数据搬家
model.to(device=self.device)
# 返回获取好的模型
return model
def infer(self, sentence="Am I wrong?"):
"""
预测过程
"""
# 打印原文
print("原文:", sentence)
# 打印分词
sentence = self.tokenizer.split_src(sentence=sentence)
print("分词:", sentence)
# 打印编码
sentence = self.tokenizer.encode_src(src_sentence=sentence, src_sentence_len=len(sentence))
print("编码:", sentence)
# 打印张量
src = torch.LongTensor([sentence]).to(device=self.device)
print("张量:", src)
# 获取源序列的长度(即张量的第二个维度)
max_len = src.size(1)
# 创建一个形状为 (1, 1, max_len) 的掩码张量,所有位置的值为1,表示没有位置被屏蔽
src_mask = torch.ones(1, 1, max_len)
# 将模型设置为评估模式
self.model.eval()
# 禁用梯度计算,减少内存占用,提高计算速度
with torch.no_grad():
# 调用上面定义的贪心解码函数,获取预测结果
y_pred = greedy_decode(self.model,
src,
src_mask,
max_len=self.tokenizer.tgt_max_len,
start_symbol=self.tokenizer.tgt_token2idx.get("<SOS>"))
# 去除启动信号 <SOS>
y_pred = y_pred[:, 1:]
# 生成预测结果
raw_results, final_results = get_real_output(y_pred.cpu(), self.tokenizer)
print("原始预测:", raw_results[0])
print("最终预测:", final_results[0])
def train(self):
"""
训练过程
"""
# 获取训练集
train_dataloader = self.train_dataloader
# 每一轮的训练
for epoch in range(self.epochs):
# 打印当前轮次
print(f"######################## Epoch: {epoch + 1} ########################")
# 调用模型的训练方法
self.model.train()
# 调用每一轮的处理函数
loss, state = run_epoch(data_iter=train_dataloader,
model=self.model,
loss_compute=self.loss_compute,
optimizer=self.optimizer,
scheduler=self.lr_scheduler,
mode="train",
device=self.device)
# 获取当前轮次的损失
loss = loss.item()
# 保存模型
if not os.path.exists(path="./.cache"):
os.mkdir(path="./.cache")
torch.save(obj=self.model.state_dict(), f="./.cache/model.pt")
# 测试效果
self.infer(sentence="Am I wrong?")
self.infer(sentence="I went to school by bike!")
self.infer(sentence="Are you OK?")
# 设置早停规则
if loss < 0.5:
print(f"训练提前结束, 当前损失为:{loss}")
break
训练
translation = Translation(data_file="./data.txt", device=device, tokenizer=tokenizer, train_dataloader=train_dataloader, test_dataloader=test_dataloader)
translation.train()
测试
translation.infer(sentence="This is my car!")