pytorch +torchtext transform
https://github.com/k2393937499/Transformer
1.数据加载
import torchtext;
# torchtext.disable_torchtext_deprecation_warning()
import spacy
import torch
import torchtext.datasets as datasets
from torch.nn.functional import pad
from torch.utils.data import DataLoader
from torchtext.vocab import build_vocab_from_iterator
from torchtext.data.functional import to_map_style_dataset
class Multi30k():
def __init__(self, dataset, tokenizer_de, tokenizer_en, vocab_de, vocab_en, language_pair, max_padding=128,
pad_id=2):
assert dataset in ['train', 'val', 'test'], 'dataset must be in [train, val, test]'
assert language_pair in [('de', 'en'),
('en', 'de')], 'language pair must be (\'en\', \'de\') or (\'de\', \'en\') '
self.tokenizer_de = tokenizer_de
self.tokenizer_en = tokenizer_en
self.vocab_de = vocab_de
self.vocab_en = vocab_en
self.language_pair = language_pair
self.max_padding = max_padding
self.pad_id = pad_id
if dataset == 'train':
self.orign_dataset, _, _ = datasets.Multi30k(language_pair=language_pair)
elif dataset == 'val':
_, self.orign_dataset, _ = datasets.Multi30k(language_pair=language_pair)
else:
_, _, self.orign_dataset = datasets.Multi30k(language_pair=language_pair)
self.iter_map = to_map_style_dataset(self.orign_dataset)
def __getitem__(self, idx):
bs_id = torch.tensor([0], ) # <s> token id
eos_id = torch.tensor([1], ) # </s> token id
if self.language_pair == ('de', 'en'):
origin_src = self.iter_map[idx][0]
origin_tgt = self.iter_map[idx][1]
src = torch.cat([bs_id, torch.tensor(
self.vocab_de([token.text for token in self.tokenizer_de.tokenizer(origin_src)]), dtype=torch.int64,
), eos_id], dim=0)
tgt = torch.cat([bs_id, torch.tensor(
self.vocab_en([token.text for token in self.tokenizer_en.tokenizer(origin_tgt)]), dtype=torch.int64,
), eos_id], dim=0)
src = pad(src, (0, self.max_padding - len(src)), value=self.pad_id)
tgt = pad(tgt, (0, self.max_padding - len(tgt)), value=self.pad_id)
else:
origin_src = self.iter_map[idx][1]
origin_tgt = self.iter_map[idx][0]
src = torch.cat([bs_id, torch.tensor(
self.vocab_de([token.text for token in self.tokenizer_en.tokenizer(origin_src)]), dtype=torch.int64,
), eos_id], dim=0)
tgt = torch.cat([bs_id, torch.tensor(
self.vocab_en([token.text for token in self.tokenizer_de.tokenizer(origin_tgt)]), dtype=torch.int64,
), eos_id], dim=0)
src = pad(src, (0, self.max_padding - len(src)), value=self.pad_id)
tgt = pad(tgt, (0, self.max_padding - len(tgt)), value=self.pad_id)
return src, tgt
def __len__(self):
return len(self.iter_map)
class Batch:
"""Object for holding a batch of data with mask during training."""
def __init__(self, src, tgt=None, pad=2): # 2 = <blank>
self.src = src
self.src_mask = (src != pad).unsqueeze(-2)
if tgt is not None:
self.tgt = tgt[:, :-1]
self.tgt_y = tgt[:, 1:]
self.tgt_mask = self.make_std_mask(self.tgt, pad)
self.ntokens = (self.tgt_y != pad).data.sum()
@staticmethod
def make_std_mask(tgt, pad):
"Create a mask to hide padding and future words."
tgt_mask = (tgt != pad).unsqueeze(-2)
attn_shape = (1, tgt.size(-1), tgt.size(-1))
subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(torch.uint8)
subsequent_mask = (subsequent_mask == 0).type_as(tgt_mask.data)
tgt_mask = tgt_mask & subsequent_mask
return tgt_mask
if __name__ == "__main__":
def yield_tokens(iter, tokenizer, index):
for text in iter:
yield [token.text for token in tokenizer.tokenizer(text[index])]
spacy_de = spacy.load("de_core_news_sm")
spacy_en = spacy.load("en_core_web_sm")
train, val, test = datasets.Multi30k(language_pair=("de", "en"))
src_vocab = build_vocab_from_iterator(yield_tokens(train + val + test, spacy_de, 0),
specials=["<s>", "</s>", "<blank>", "<unk>"])
tgt_vocab = build_vocab_from_iterator(yield_tokens(train + val + test, spacy_en, 1),
specials=["<s>", "</s>", "<blank>", "<unk>"])
src_vocab.set_default_index(src_vocab["<unk>"])
tgt_vocab.set_default_index(tgt_vocab["<unk>"])
origin_data = Multi30k(dataset="train", tokenizer_de=spacy_de, tokenizer_en=spacy_en, vocab_de=src_vocab,
vocab_en=tgt_vocab, language_pair=('de', 'en'))
data_iter = DataLoader(origin_data, batch_size=32)
data = (Batch(b[0], b[1], pad=2) for b in data_iter)
for i in data:
print(i)
print(i.src)
print(i.src_mask)
print(i.tgt)
print(i.tgt_mask)
使用 德文转英文为例
src ‘德文句子’
target ‘英文句子’
1.spacy模型分词,把一个句子分成很多个单词。
2.单词变成词典的索引,字符转数字
3.padding,保证每个句子的长度一致。例如128,句子开始是0,句子结束时1,其他填充2 ,所以一个句子就是0,。。。。1,2。。。 这个样子。
4.src的mask不用遮挡下一个预测,所以mask就是句子中不为2的就是true
5.target,需要遮挡,所以是一个下三角,每次遮挡下一个。
6.因为src是全部放入encode中,计算这个句子的相关性,然后把这个相关系和target的第一个输入送入decode中,预测target第二个单词。所以target的输入长度是 target[:-1] ,target去除最后一个。 target的预测结果就是target[1:],target去除第一个。
2.模型
import math
import copy
import torch
import torch.nn as nn
from torch.nn.functional import log_softmax
def clones(module, N):
"Produce N identical layers."
return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
# 词向量映射到固定维度空间""
class Embeddings(nn.Module):
def __init__(self, d_model, vocab):
super(Embeddings, self).__init__()
self.lut = nn.Embedding(vocab, d_model)
self.d_model = d_model
def forward(self, x):
x = self.lut(x)
x *= math.sqrt(self.d_model)
return x
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout_rate, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout_rate)
# Compute the positional encodings once in log space.
position = torch.arange(0, max_len).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
pe = torch.zeros(max_len, d_model)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer("pe", pe)
def forward(self, x):
x = x + self.pe[:, : x.size(1)].requires_grad_(False)
x = self.dropout(x)
return x
class MultiHeadedAttention(nn.Module):
def __init__(self, h, d_model, dropout_rate=0.1):
"Take in model size and number of heads."
super(MultiHeadedAttention, self).__init__()
assert d_model % h == 0
# We assume d_v always equals d_k
self.d_k = d_model // h
self.h = h
self.linears = clones(nn.Linear(d_model, d_model), 4)
self.attn = None
self.dropout = nn.Dropout(p=dropout_rate)
def _attention(self, query, key, value, mask=None, dropout=None):
"Compute 'Scaled Dot Product Attention'"
d_k = query.size(-1)
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
p_attn = scores.softmax(dim=-1)
if dropout is not None:
p_attn = dropout(p_attn)
return torch.matmul(p_attn, value), p_attn
def forward(self, query, key, value, mask=None):
if mask is not None:
mask = mask.unsqueeze(1)
nbatches = query.size(0)
# 1) 调整为度 d_model -> [batch, head, seq_length, d_k]
query, key, value = [
linear(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
for linear, x in zip(self.linears, (query, key, value))
]
# 2) Apply attention on all the projected vectors in batch.
x, self.attn = self._attention(
query, key, value, mask, dropout=self.dropout
)
# 3) "Concat" using a view and apply a final linear.
x = (
x.transpose(1, 2)
.contiguous()
.view(nbatches, -1, self.h * self.d_k)
)
x = self.linears[-1](x)
del query
del key
del value
return x
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff, dropout_rate=0.1):
super(PositionwiseFeedForward, self).__init__()
self.w_1 = nn.Linear(d_model, d_ff)
self.w_2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout_rate)
def forward(self, x):
x = self.w_1(x)
x = x.relu()
x = self.w_2(x)
return x
class LayerNorm(nn.Module):
"Construct a layernorm module (See citation for details)."
def __init__(self, features, eps=1e-6):
super(LayerNorm, self).__init__()
self.a_2 = nn.Parameter(torch.ones(features))
self.b_2 = nn.Parameter(torch.zeros(features))
self.eps = eps
def forward(self, x):
mean = x.mean(-1, keepdim=True)
std = x.std(-1, keepdim=True)
return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
class ResidualConnection(nn.Module):
def __init__(self, size, dropout_rate):
super(ResidualConnection, self).__init__()
self.norm = LayerNorm(size, eps=1e-6)
self.dropout = nn.Dropout(dropout_rate)
def forward(self, x, sublayer):
x = x + self.dropout(sublayer(self.norm(x)))
return x
class EncoderSublayer(nn.Module):
def __init__(self, size, self_attn, feed_forward, dropout_rate):
super(EncoderSublayer, self).__init__()
self.self_attn = self_attn
self.feed_forward = feed_forward
self.sublayer = clones(ResidualConnection(size, dropout_rate), 2)
self.size = size
def forward(self, x, mask):
"Follow Figure 1 (left) for connections."
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
return self.sublayer[1](x, self.feed_forward)
class Encoder(nn.Module):
"Core encoder is a stack of N layers"
def __init__(self, size, self_attn, feed_forward, dropout_rate, N):
super(Encoder, self).__init__()
self.layers = clones(EncoderSublayer(size, self_attn, feed_forward, dropout_rate), N)
self.norm = LayerNorm(size, eps=1e-6)
def forward(self, x, mask):
"Pass the input (and mask) through each layer in turn."
for layer in self.layers:
x = layer(x, mask)
x = self.norm(x)
return x
class DecoderSublayer(nn.Module):
def __init__(self, size, self_attn, src_attn, feed_forward, dropout_rate):
super(DecoderSublayer, self).__init__()
self.size = size
self.self_attn = self_attn
self.src_attn = src_attn
self.feed_forward = feed_forward
self.sublayer = clones(ResidualConnection(size, dropout_rate), 3)
def forward(self, x, memory, src_mask, tgt_mask):
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
x = self.sublayer[1](x, lambda x: self.src_attn(x, memory, memory, src_mask))
x = self.sublayer[2](x, self.feed_forward)
return x
class Decoder(nn.Module):
"Generic N layer decoder with masking."
def __init__(self, size, self_attn, src_attn, feed_forward, dropout_rate, N):
super(Decoder, self).__init__()
self.layers = clones(DecoderSublayer(size, self_attn, src_attn, feed_forward, dropout_rate), N)
self.norm = LayerNorm(size, eps=1e-6)
def forward(self, x, memory, src_mask, tgt_mask):
for layer in self.layers:
x = layer(x, memory, src_mask, tgt_mask)
x = self.norm(x)
return x
class Generator(nn.Module):
"Define standard linear + softmax generation step."
def __init__(self, d_model, len_vocab):
super(Generator, self).__init__()
self.proj = nn.Linear(d_model, len_vocab)
def forward(self, x):
return log_softmax(self.proj(x), dim=-1)
class Transformer(nn.Module):
def __init__(self, len_src_vocab, len_tgt_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout_rate=0.1):
super(Transformer, self).__init__()
c = copy.deepcopy
pe = PositionalEncoding(d_model, dropout_rate)
attn = MultiHeadedAttention(h, d_model, dropout_rate)
ffn = PositionwiseFeedForward(d_model, d_ff, dropout_rate)
self.encoder = Encoder(d_model, c(attn), c(ffn), dropout_rate, N)
self.decoder = Decoder(d_model, c(attn), c(attn), c(ffn), dropout_rate, N)
self.src_embed = nn.Sequential(Embeddings(d_model, len_src_vocab), c(pe))
self.tgt_embed = nn.Sequential(Embeddings(d_model, len_tgt_vocab), c(pe))
self.generator = Generator(d_model, len_tgt_vocab)
def encode(self, src, src_mask):
src = self.src_embed(src)
out = self.encoder(src, src_mask)
return out
def decode(self, memory, src_mask, tgt, tgt_mask):
tgt = self.tgt_embed(tgt)
out = self.decoder(tgt, memory, src_mask, tgt_mask)
return out
def forward(self, src, tgt, src_mask, tgt_mask):
"Take in and process masked src and target sequences."
out = self.encode(src, src_mask)
out = self.decode(out, src_mask, tgt, tgt_mask)
return out
if __name__ == "__main__":
def subsequent_mask(size):
"Mask out subsequent positions."
attn_shape = (1, size, size)
subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(
torch.uint8
)
return subsequent_mask == 0
test_model = Transformer(11, 11, 2)
test_model.eval()
src = torch.LongTensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]])
src_mask = torch.ones(1, 1, 10)
memory = test_model.encode(src, src_mask)
ys = torch.zeros(1, 1).type_as(src)
for i in range(9):
out = test_model.decode(
memory, src_mask, ys, subsequent_mask(ys.size(1)).type_as(src.data)
)
prob = test_model.generator(out[:, -1])
_, next_word = torch.max(prob, dim=1)
next_word = next_word.data[0]
ys = torch.cat(
[ys, torch.empty(1, 1).type_as(src.data).fill_(next_word)], dim=1
)
print("Example Untrained Model Prediction:", ys)
def get_parameter_number(model):
total_num = sum(p.numel() for p in model.parameters())
trainable_num = sum(p.numel() for p in model.parameters() if p.requires_grad)
return {'Total': total_num, 'Trainable': trainable_num}
print(get_parameter_number(test_model))
2.1 encode
1.词向量编码
2.位置编码
3.词向量编码+位置编码->x
4.x->selfattention->x
5.x->残差网络->x
6.4和5 执行N次
7.norm->output
2.2decode
1.词向量编码
2.位置编码
3.词向量编码+位置编码->target
4.target->selfattention->target
5.target->残差网络->target
6.target、encode的输出作为selfattention的输入、encode的输出作为selfattention的输入->selfattention
7.4和5、6 执行N次
8.norm->output
3.loss
import torch
import torch.nn as nn
class LabelSmoothing(nn.Module):
"Implement label smoothing."
def __init__(self, size, padding_idx, smoothing=0.0):
super(LabelSmoothing, self).__init__()
self.criterion = nn.KLDivLoss(reduction="sum")
self.padding_idx = padding_idx
self.confidence = 1.0 - smoothing
self.smoothing = smoothing
self.size = size
self.true_dist = None
def forward(self, x, target):
assert x.size(1) == self.size
true_dist = x.data.clone()
true_dist.fill_(self.smoothing / (self.size - 2))
true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
true_dist[:, self.padding_idx] = 0
mask = torch.nonzero(target.data == self.padding_idx)
if mask.dim() > 0:
true_dist.index_fill_(0, mask.squeeze(), 0.0)
self.true_dist = true_dist
return self.criterion(x, true_dist.clone().detach())
class SimpleLossCompute:
"A simple loss compute and train function."
def __init__(self, generator, criterion):
self.generator = generator
self.criterion = criterion
def __call__(self, x, y, norm):
x = self.generator(x)
sloss = (
self.criterion(
x.contiguous().view(-1, x.size(-1)), y.contiguous().view(-1)
)
/ norm
)
return sloss.data * norm, sloss
4.训练
# import torchtext; torchtext.disable_torchtext_deprecation_warning()
import os
import spacy
import torch
import torchtext.datasets as datasets
from dataset import Multi30k
from model import Transformer
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import LambdaLR
from loss import LabelSmoothing, SimpleLossCompute
from torchtext.vocab import build_vocab_from_iterator
from train_eval_utils import train_one_epoch, val_one_epoch
def yield_tokens(iter, tokenizer, index):
for text in iter:
yield [token.text for token in tokenizer.tokenizer(text[index])]
try:
spacy_de = spacy.load("de_core_news_sm")
except IOError:
os.system("python -m spacy download de_core_news_sm")
spacy_de = spacy.load("de_core_news_sm")
try:
spacy_en = spacy.load("en_core_web_sm")
except IOError:
os.system("python -m spacy download en_core_web_sm")
spacy_en = spacy.load("en_core_web_sm")
train, val, test = datasets.Multi30k(language_pair=("de", "en"))
src_vocab = build_vocab_from_iterator(yield_tokens(train+val+test, spacy_de, 0), specials=["<s>", "</s>", "<blank>", "<unk>"], min_freq=2)
tgt_vocab = build_vocab_from_iterator(yield_tokens(train+val+test, spacy_en, 1), specials=["<s>", "</s>", "<blank>", "<unk>"], min_freq=2)
src_vocab.set_default_index(src_vocab["<unk>"])
tgt_vocab.set_default_index(tgt_vocab["<unk>"])
train_data = Multi30k(dataset="train", tokenizer_de=spacy_de, tokenizer_en=spacy_en, vocab_de=src_vocab, vocab_en=tgt_vocab, language_pair=('de', 'en'))
valid_data = Multi30k(dataset="val", tokenizer_de=spacy_de, tokenizer_en=spacy_en, vocab_de=src_vocab, vocab_en=tgt_vocab, language_pair=('de', 'en'))
train_dataloader = DataLoader(train_data, batch_size=16)
valid_dataloader = DataLoader(valid_data, batch_size=16)
config = {
"num_epochs": 10,
"accum_iter": 10,
"base_lr": 1.0,
"max_padding": 72,
"warmup": 3000,
"file_prefix": "multi30k_model_",
}
model = Transformer(len(src_vocab), len(tgt_vocab), N=6)
# model.to("cuda")
pad_idx = tgt_vocab["<blank>"]
criterion = LabelSmoothing(size=len(tgt_vocab), padding_idx=pad_idx, smoothing=0.1)
# criterion.to("cuda")
def rate(step, model_size, factor, warmup):
"""
we have to default the step to 1 for LambdaLR function to avoid zero raising to negative power.
"""
if step == 0:
step = 1
return factor * (model_size ** (-0.5) * min(step ** (-0.5), step * warmup ** (-1.5)))
optimizer = torch.optim.Adam(model.parameters(), lr=config["base_lr"], betas=(0.9, 0.98), eps=1e-9)
lr_scheduler = LambdaLR(
optimizer=optimizer,
lr_lambda=lambda step: rate(step, model_size=512, factor=1, warmup=config["warmup"]),
)
for epoch in range(config["num_epochs"]):
model.train()
print(f"Epoch {epoch} Training ====", flush=True)
sloss = train_one_epoch(
train_dataloader,
model,
SimpleLossCompute(model.generator, criterion),
optimizer,
lr_scheduler,
accum_iter=config["accum_iter"],
)
loss, bleu_score = val_one_epoch(valid_dataloader, model, src_vocab, tgt_vocab, SimpleLossCompute(model.generator, criterion),)
with open('log.txt', 'a', encoding='utf-8') as file:
text_to_append = "epochs:{}, loss:{}, bleu_score:{}\n".format(epoch, loss.data, bleu_score)
file.write(text_to_append)
if epoch % 10 == 0:
file_path = "saves/%s%.2d.pt" % (config["file_prefix"], epoch)
torch.save(model.state_dict(), file_path)
torch.cuda.empty_cache()
file_path = "saves/%sfinal.pt" % config["file_prefix"]
torch.save(model.state_dict(), file_path)