当前位置: 首页 > article >正文

pytorch +torchtext transform

https://github.com/k2393937499/Transformer

1.数据加载

import torchtext;

# torchtext.disable_torchtext_deprecation_warning()

import spacy
import torch
import torchtext.datasets as datasets
from torch.nn.functional import pad
from torch.utils.data import DataLoader
from torchtext.vocab import build_vocab_from_iterator
from torchtext.data.functional import to_map_style_dataset


class Multi30k():
    def __init__(self, dataset, tokenizer_de, tokenizer_en, vocab_de, vocab_en, language_pair, max_padding=128,
                 pad_id=2):
        assert dataset in ['train', 'val', 'test'], 'dataset must be in [train, val, test]'
        assert language_pair in [('de', 'en'),
                                 ('en', 'de')], 'language pair must be (\'en\', \'de\') or (\'de\', \'en\') '
        self.tokenizer_de = tokenizer_de
        self.tokenizer_en = tokenizer_en
        self.vocab_de = vocab_de
        self.vocab_en = vocab_en
        self.language_pair = language_pair
        self.max_padding = max_padding
        self.pad_id = pad_id

        if dataset == 'train':
            self.orign_dataset, _, _ = datasets.Multi30k(language_pair=language_pair)
        elif dataset == 'val':
            _, self.orign_dataset, _ = datasets.Multi30k(language_pair=language_pair)
        else:
            _, _, self.orign_dataset = datasets.Multi30k(language_pair=language_pair)
        self.iter_map = to_map_style_dataset(self.orign_dataset)

    def __getitem__(self, idx):
        bs_id = torch.tensor([0], )  # <s> token id
        eos_id = torch.tensor([1], )  # </s> token id
        if self.language_pair == ('de', 'en'):
            origin_src = self.iter_map[idx][0]
            origin_tgt = self.iter_map[idx][1]
            src = torch.cat([bs_id, torch.tensor(
                self.vocab_de([token.text for token in self.tokenizer_de.tokenizer(origin_src)]), dtype=torch.int64,
                ), eos_id], dim=0)
            tgt = torch.cat([bs_id, torch.tensor(
                self.vocab_en([token.text for token in self.tokenizer_en.tokenizer(origin_tgt)]), dtype=torch.int64,
                ), eos_id], dim=0)
            src = pad(src, (0, self.max_padding - len(src)), value=self.pad_id)
            tgt = pad(tgt, (0, self.max_padding - len(tgt)), value=self.pad_id)
        else:
            origin_src = self.iter_map[idx][1]
            origin_tgt = self.iter_map[idx][0]
            src = torch.cat([bs_id, torch.tensor(
                self.vocab_de([token.text for token in self.tokenizer_en.tokenizer(origin_src)]), dtype=torch.int64,
                ), eos_id], dim=0)
            tgt = torch.cat([bs_id, torch.tensor(
                self.vocab_en([token.text for token in self.tokenizer_de.tokenizer(origin_tgt)]), dtype=torch.int64,
                ), eos_id], dim=0)
            src = pad(src, (0, self.max_padding - len(src)), value=self.pad_id)
            tgt = pad(tgt, (0, self.max_padding - len(tgt)), value=self.pad_id)
        return src, tgt

    def __len__(self):
        return len(self.iter_map)


class Batch:
    """Object for holding a batch of data with mask during training."""

    def __init__(self, src, tgt=None, pad=2):  # 2 = <blank>
        self.src = src
        self.src_mask = (src != pad).unsqueeze(-2)
        if tgt is not None:
            self.tgt = tgt[:, :-1]
            self.tgt_y = tgt[:, 1:]
            self.tgt_mask = self.make_std_mask(self.tgt, pad)
            self.ntokens = (self.tgt_y != pad).data.sum()

    @staticmethod
    def make_std_mask(tgt, pad):
        "Create a mask to hide padding and future words."
        tgt_mask = (tgt != pad).unsqueeze(-2)
        attn_shape = (1, tgt.size(-1), tgt.size(-1))
        subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(torch.uint8)
        subsequent_mask = (subsequent_mask == 0).type_as(tgt_mask.data)
        tgt_mask = tgt_mask & subsequent_mask
        return tgt_mask


if __name__ == "__main__":
    def yield_tokens(iter, tokenizer, index):
        for text in iter:
            yield [token.text for token in tokenizer.tokenizer(text[index])]


    spacy_de = spacy.load("de_core_news_sm")
    spacy_en = spacy.load("en_core_web_sm")
    train, val, test = datasets.Multi30k(language_pair=("de", "en"))
    src_vocab = build_vocab_from_iterator(yield_tokens(train + val + test, spacy_de, 0),
                                          specials=["<s>", "</s>", "<blank>", "<unk>"])
    tgt_vocab = build_vocab_from_iterator(yield_tokens(train + val + test, spacy_en, 1),
                                          specials=["<s>", "</s>", "<blank>", "<unk>"])
    src_vocab.set_default_index(src_vocab["<unk>"])
    tgt_vocab.set_default_index(tgt_vocab["<unk>"])

    origin_data = Multi30k(dataset="train", tokenizer_de=spacy_de, tokenizer_en=spacy_en, vocab_de=src_vocab,
                           vocab_en=tgt_vocab, language_pair=('de', 'en'))
    data_iter = DataLoader(origin_data, batch_size=32)
    data = (Batch(b[0], b[1], pad=2) for b in data_iter)

    for i in data:
        print(i)
        print(i.src)
        print(i.src_mask)
        print(i.tgt)
        print(i.tgt_mask)

使用 德文转英文为例
src ‘德文句子’
target ‘英文句子’

1.spacy模型分词,把一个句子分成很多个单词。
2.单词变成词典的索引,字符转数字
3.padding,保证每个句子的长度一致。例如128,句子开始是0,句子结束时1,其他填充2 ,所以一个句子就是0,。。。。1,2。。。 这个样子。
4.src的mask不用遮挡下一个预测,所以mask就是句子中不为2的就是true
5.target,需要遮挡,所以是一个下三角,每次遮挡下一个。
6.因为src是全部放入encode中,计算这个句子的相关性,然后把这个相关系和target的第一个输入送入decode中,预测target第二个单词。所以target的输入长度是 target[:-1] ,target去除最后一个。 target的预测结果就是target[1:],target去除第一个。

2.模型

import math
import copy
import torch
import torch.nn as nn
from torch.nn.functional import log_softmax


def clones(module, N):
    "Produce N identical layers."
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])


#  词向量映射到固定维度空间""
class Embeddings(nn.Module):
    def __init__(self, d_model, vocab):
        super(Embeddings, self).__init__()
        self.lut = nn.Embedding(vocab, d_model)
        self.d_model = d_model

    def forward(self, x):
        x = self.lut(x)
        x *= math.sqrt(self.d_model)
        return x


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout_rate, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout_rate)

        # Compute the positional encodings once in log space.
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        x = self.dropout(x)
        return x


class MultiHeadedAttention(nn.Module):
    def __init__(self, h, d_model, dropout_rate=0.1):
        "Take in model size and number of heads."
        super(MultiHeadedAttention, self).__init__()
        assert d_model % h == 0
        # We assume d_v always equals d_k
        self.d_k = d_model // h
        self.h = h
        self.linears = clones(nn.Linear(d_model, d_model), 4)
        self.attn = None
        self.dropout = nn.Dropout(p=dropout_rate)

    def _attention(self, query, key, value, mask=None, dropout=None):
        "Compute 'Scaled Dot Product Attention'"
        d_k = query.size(-1)
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        p_attn = scores.softmax(dim=-1)
        if dropout is not None:
            p_attn = dropout(p_attn)
        return torch.matmul(p_attn, value), p_attn

    def forward(self, query, key, value, mask=None):
        if mask is not None:
            mask = mask.unsqueeze(1)
        nbatches = query.size(0)

        # 1) 调整为度 d_model -> [batch, head, seq_length, d_k]
        query, key, value = [
            linear(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
            for linear, x in zip(self.linears, (query, key, value))
        ]

        # 2) Apply attention on all the projected vectors in batch.
        x, self.attn = self._attention(
            query, key, value, mask, dropout=self.dropout
        )

        # 3) "Concat" using a view and apply a final linear.
        x = (
            x.transpose(1, 2)
            .contiguous()
            .view(nbatches, -1, self.h * self.d_k)
        )
        x = self.linears[-1](x)
        del query
        del key
        del value
        return x


class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout_rate=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        x = self.w_1(x)
        x = x.relu()
        x = self.w_2(x)
        return x


class LayerNorm(nn.Module):
    "Construct a layernorm module (See citation for details)."

    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.a_2 = nn.Parameter(torch.ones(features))
        self.b_2 = nn.Parameter(torch.zeros(features))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2


class ResidualConnection(nn.Module):
    def __init__(self, size, dropout_rate):
        super(ResidualConnection, self).__init__()
        self.norm = LayerNorm(size, eps=1e-6)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, sublayer):
        x = x + self.dropout(sublayer(self.norm(x)))
        return x


class EncoderSublayer(nn.Module):
    def __init__(self, size, self_attn, feed_forward, dropout_rate):
        super(EncoderSublayer, self).__init__()
        self.self_attn = self_attn
        self.feed_forward = feed_forward
        self.sublayer = clones(ResidualConnection(size, dropout_rate), 2)
        self.size = size

    def forward(self, x, mask):
        "Follow Figure 1 (left) for connections."
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
        return self.sublayer[1](x, self.feed_forward)


class Encoder(nn.Module):
    "Core encoder is a stack of N layers"

    def __init__(self, size, self_attn, feed_forward, dropout_rate, N):
        super(Encoder, self).__init__()
        self.layers = clones(EncoderSublayer(size, self_attn, feed_forward, dropout_rate), N)
        self.norm = LayerNorm(size, eps=1e-6)

    def forward(self, x, mask):
        "Pass the input (and mask) through each layer in turn."
        for layer in self.layers:
            x = layer(x, mask)
        x = self.norm(x)
        return x


class DecoderSublayer(nn.Module):
    def __init__(self, size, self_attn, src_attn, feed_forward, dropout_rate):
        super(DecoderSublayer, self).__init__()
        self.size = size
        self.self_attn = self_attn
        self.src_attn = src_attn
        self.feed_forward = feed_forward
        self.sublayer = clones(ResidualConnection(size, dropout_rate), 3)

    def forward(self, x, memory, src_mask, tgt_mask):
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
        x = self.sublayer[1](x, lambda x: self.src_attn(x, memory, memory, src_mask))
        x = self.sublayer[2](x, self.feed_forward)
        return x


class Decoder(nn.Module):
    "Generic N layer decoder with masking."

    def __init__(self, size, self_attn, src_attn, feed_forward, dropout_rate, N):
        super(Decoder, self).__init__()
        self.layers = clones(DecoderSublayer(size, self_attn, src_attn, feed_forward, dropout_rate), N)
        self.norm = LayerNorm(size, eps=1e-6)

    def forward(self, x, memory, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, memory, src_mask, tgt_mask)
        x = self.norm(x)
        return x


class Generator(nn.Module):
    "Define standard linear + softmax generation step."

    def __init__(self, d_model, len_vocab):
        super(Generator, self).__init__()
        self.proj = nn.Linear(d_model, len_vocab)

    def forward(self, x):
        return log_softmax(self.proj(x), dim=-1)


class Transformer(nn.Module):
    def __init__(self, len_src_vocab, len_tgt_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout_rate=0.1):
        super(Transformer, self).__init__()
        c = copy.deepcopy
        pe = PositionalEncoding(d_model, dropout_rate)
        attn = MultiHeadedAttention(h, d_model, dropout_rate)
        ffn = PositionwiseFeedForward(d_model, d_ff, dropout_rate)

        self.encoder = Encoder(d_model, c(attn), c(ffn), dropout_rate, N)
        self.decoder = Decoder(d_model, c(attn), c(attn), c(ffn), dropout_rate, N)
        self.src_embed = nn.Sequential(Embeddings(d_model, len_src_vocab), c(pe))
        self.tgt_embed = nn.Sequential(Embeddings(d_model, len_tgt_vocab), c(pe))

        self.generator = Generator(d_model, len_tgt_vocab)

    def encode(self, src, src_mask):
        src = self.src_embed(src)
        out = self.encoder(src, src_mask)
        return out

    def decode(self, memory, src_mask, tgt, tgt_mask):
        tgt = self.tgt_embed(tgt)
        out = self.decoder(tgt, memory, src_mask, tgt_mask)
        return out

    def forward(self, src, tgt, src_mask, tgt_mask):
        "Take in and process masked src and target sequences."
        out = self.encode(src, src_mask)
        out = self.decode(out, src_mask, tgt, tgt_mask)
        return out


if __name__ == "__main__":
    def subsequent_mask(size):
        "Mask out subsequent positions."
        attn_shape = (1, size, size)
        subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(
            torch.uint8
        )
        return subsequent_mask == 0


    test_model = Transformer(11, 11, 2)
    test_model.eval()
    src = torch.LongTensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]])
    src_mask = torch.ones(1, 1, 10)

    memory = test_model.encode(src, src_mask)
    ys = torch.zeros(1, 1).type_as(src)

    for i in range(9):
        out = test_model.decode(
            memory, src_mask, ys, subsequent_mask(ys.size(1)).type_as(src.data)
        )
        prob = test_model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.data[0]
        ys = torch.cat(
            [ys, torch.empty(1, 1).type_as(src.data).fill_(next_word)], dim=1
        )

    print("Example Untrained Model Prediction:", ys)


    def get_parameter_number(model):
        total_num = sum(p.numel() for p in model.parameters())
        trainable_num = sum(p.numel() for p in model.parameters() if p.requires_grad)
        return {'Total': total_num, 'Trainable': trainable_num}


    print(get_parameter_number(test_model))

2.1 encode

1.词向量编码
2.位置编码
3.词向量编码+位置编码->x
4.x->selfattention->x
5.x->残差网络->x
6.4和5 执行N次
7.norm->output

2.2decode

1.词向量编码
2.位置编码
3.词向量编码+位置编码->target
4.target->selfattention->target
5.target->残差网络->target
6.target、encode的输出作为selfattention的输入、encode的输出作为selfattention的输入->selfattention

7.4和5、6 执行N次
8.norm->output

3.loss

import torch
import torch.nn as nn


class LabelSmoothing(nn.Module):
    "Implement label smoothing."

    def __init__(self, size, padding_idx, smoothing=0.0):
        super(LabelSmoothing, self).__init__()
        self.criterion = nn.KLDivLoss(reduction="sum")
        self.padding_idx = padding_idx
        self.confidence = 1.0 - smoothing
        self.smoothing = smoothing
        self.size = size
        self.true_dist = None

    def forward(self, x, target):
        assert x.size(1) == self.size
        true_dist = x.data.clone()
        true_dist.fill_(self.smoothing / (self.size - 2))
        true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
        true_dist[:, self.padding_idx] = 0
        mask = torch.nonzero(target.data == self.padding_idx)
        if mask.dim() > 0:
            true_dist.index_fill_(0, mask.squeeze(), 0.0)
        self.true_dist = true_dist
        return self.criterion(x, true_dist.clone().detach())


class SimpleLossCompute:
    "A simple loss compute and train function."

    def __init__(self, generator, criterion):
        self.generator = generator
        self.criterion = criterion

    def __call__(self, x, y, norm):
        x = self.generator(x)
        sloss = (
                self.criterion(
                    x.contiguous().view(-1, x.size(-1)), y.contiguous().view(-1)
                )
                / norm
        )
        return sloss.data * norm, sloss

4.训练

# import torchtext; torchtext.disable_torchtext_deprecation_warning()

import os
import spacy
import torch
import torchtext.datasets as datasets

from dataset import Multi30k
from model import Transformer
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import LambdaLR
from loss import LabelSmoothing, SimpleLossCompute
from torchtext.vocab import build_vocab_from_iterator
from train_eval_utils import train_one_epoch, val_one_epoch

def yield_tokens(iter, tokenizer, index):
    for text in iter:
        yield [token.text for token in tokenizer.tokenizer(text[index])]
try:
    spacy_de = spacy.load("de_core_news_sm")
except IOError:
    os.system("python -m spacy download de_core_news_sm")
    spacy_de = spacy.load("de_core_news_sm")

try:
    spacy_en = spacy.load("en_core_web_sm")
except IOError:
    os.system("python -m spacy download en_core_web_sm")
    spacy_en = spacy.load("en_core_web_sm")
train, val, test = datasets.Multi30k(language_pair=("de", "en"))
src_vocab = build_vocab_from_iterator(yield_tokens(train+val+test, spacy_de, 0), specials=["<s>", "</s>", "<blank>", "<unk>"], min_freq=2)
tgt_vocab = build_vocab_from_iterator(yield_tokens(train+val+test, spacy_en, 1), specials=["<s>", "</s>", "<blank>", "<unk>"], min_freq=2)
src_vocab.set_default_index(src_vocab["<unk>"])
tgt_vocab.set_default_index(tgt_vocab["<unk>"])

train_data = Multi30k(dataset="train", tokenizer_de=spacy_de, tokenizer_en=spacy_en, vocab_de=src_vocab, vocab_en=tgt_vocab, language_pair=('de', 'en'))
valid_data = Multi30k(dataset="val", tokenizer_de=spacy_de, tokenizer_en=spacy_en, vocab_de=src_vocab, vocab_en=tgt_vocab, language_pair=('de', 'en'))
train_dataloader = DataLoader(train_data, batch_size=16)
valid_dataloader = DataLoader(valid_data, batch_size=16)

config = {
    "num_epochs": 10,
    "accum_iter": 10,
    "base_lr": 1.0,
    "max_padding": 72,
    "warmup": 3000,
    "file_prefix": "multi30k_model_",
}

model = Transformer(len(src_vocab), len(tgt_vocab), N=6)
# model.to("cuda")

pad_idx = tgt_vocab["<blank>"]
criterion = LabelSmoothing(size=len(tgt_vocab), padding_idx=pad_idx, smoothing=0.1)
# criterion.to("cuda")

def rate(step, model_size, factor, warmup):
    """
    we have to default the step to 1 for LambdaLR function to avoid zero raising to negative power.
    """
    if step == 0:
        step = 1
    return factor * (model_size ** (-0.5) * min(step ** (-0.5), step * warmup ** (-1.5)))
optimizer = torch.optim.Adam(model.parameters(), lr=config["base_lr"], betas=(0.9, 0.98), eps=1e-9)
lr_scheduler = LambdaLR(
    optimizer=optimizer,
    lr_lambda=lambda step: rate(step, model_size=512, factor=1, warmup=config["warmup"]),
)

for epoch in range(config["num_epochs"]):
    model.train()
    print(f"Epoch {epoch} Training ====", flush=True)
    sloss = train_one_epoch(
        train_dataloader,
        model,
        SimpleLossCompute(model.generator, criterion),
        optimizer,
        lr_scheduler,
        accum_iter=config["accum_iter"],
    )

    loss, bleu_score = val_one_epoch(valid_dataloader, model, src_vocab, tgt_vocab, SimpleLossCompute(model.generator, criterion),)

    with open('log.txt', 'a', encoding='utf-8') as file:
        text_to_append = "epochs:{}, loss:{}, bleu_score:{}\n".format(epoch, loss.data, bleu_score)
        file.write(text_to_append)

    if epoch % 10 == 0:
        file_path = "saves/%s%.2d.pt" % (config["file_prefix"], epoch)
        torch.save(model.state_dict(), file_path)
    torch.cuda.empty_cache()

file_path = "saves/%sfinal.pt" % config["file_prefix"]
torch.save(model.state_dict(), file_path)

http://www.kler.cn/a/453515.html

相关文章:

  • 少儿编程在线培训系统:客户服务与学习支持
  • uniapp登录
  • CI/CD是什么?
  • Flutter组件————PageView
  • 六大基础深度神经网络之CNN
  • 能省一点是一点 - 享元模式(Flyweight Pattern)
  • 大语言模型(LLM)中大数据的压缩存储及其重要性
  • Redis单线程快的原因
  • 21天掌握javaweb-->第19天:Spring Boot后端优化与部署
  • MyBatis的插件运行原理,与如何编写一个插件
  • 人、机、环境中各有其神经网络系统
  • 敏捷开发在前端团队的应用
  • Vue单页应用的配置
  • datahub 汉化
  • Go的select的运行原理
  • elasticsearch upsert 使用
  • Java全栈项目 - 汽车维修服务管理平台
  • windows相关
  • 音视频入门知识(一):基本概念篇
  • Golang 为什么没有注解?