当前位置: 首页 > article >正文

机器学习-使用大规模的平行语料

下面将详细介绍如何使用 Python 和深度学习框架 PyTorch,结合大规模平行语料库(以 WMT 英德数据集为例)实现一个基于注意力机制的神经机器翻译模型。

1. 安装必要的库

首先,确保你已经安装了以下库:

pip install torch torchtext spacy
python -m spacy download en_core_web_sm
python -m spacy download de_core_news_sm

2. 代码实现

import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.datasets import Multi30k
from torchtext.data import Field, BucketIterator
import spacy
import random
import math
import time

# 设置随机种子以保证结果可复现
SEED = 1234
random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

# 加载英语和德语的分词器
spacy_de = spacy.load('de_core_news_sm')
spacy_en = spacy.load('en_core_web_sm')

# 定义分词函数
def tokenize_de(text):
    return [tok.text for tok in spacy_de.tokenizer(text)]

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

# 定义字段
SRC = Field(tokenize = tokenize_de,
            init_token = '<sos>',
            eos_token = '<eos>',
            lower = True)

TRG = Field(tokenize = tokenize_en,
            init_token = '<sos>',
            eos_token = '<eos>',
            lower = True)

# 加载数据集
train_data, valid_data, test_data = Multi30k.splits(exts = ('.de', '.en'),
                                                    fields = (SRC, TRG))

# 构建词汇表
SRC.build_vocab(train_data, min_freq = 2)
TRG.build_vocab(train_data, min_freq = 2)

# 定义设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 创建迭代器
BATCH_SIZE = 128
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size = BATCH_SIZE,
    device = device)

# 定义编码器
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional = True)
        self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, hidden = self.rnn(embedded)
        hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)))
        return outputs, hidden

# 定义注意力机制
class Attention(nn.Module):
    def __init__(self, enc_hid_dim, dec_hid_dim):
        super().__init__()
        self.attn = nn.Linear((enc_hid_dim * 2) + dec_hid_dim, dec_hid_dim)
        self.v = nn.Parameter(torch.rand(dec_hid_dim))
        stdv = 1. / math.sqrt(self.v.size(0))
        self.v.data.uniform_(-stdv, stdv)

    def forward(self, hidden, encoder_outputs):
        batch_size = encoder_outputs.shape[1]
        src_len = encoder_outputs.shape[0]
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
        encoder_outputs = encoder_outputs.permute(1, 0, 2)
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim = 2)))
        energy = energy.permute(0, 2, 1)
        v = self.v.repeat(batch_size, 1).unsqueeze(1)
        attention = torch.bmm(v, energy).squeeze(1)
        return torch.softmax(attention, dim = 1)

# 定义解码器
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout, attention):
        super().__init__()
        self.output_dim = output_dim
        self.attention = attention
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim)
        self.fc_out = nn.Linear((enc_hid_dim * 2) + dec_hid_dim + emb_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, encoder_outputs):
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        a = self.attention(hidden, encoder_outputs)
        a = a.unsqueeze(1)
        encoder_outputs = encoder_outputs.permute(1, 0, 2)
        weighted = torch.bmm(a, encoder_outputs)
        weighted = weighted.permute(1, 0, 2)
        rnn_input = torch.cat((embedded, weighted), dim = 2)
        output, hidden = self.rnn(rnn_input, hidden.unsqueeze(0))
        assert (output == hidden).all()
        embedded = embedded.squeeze(0)
        output = output.squeeze(0)
        weighted = weighted.squeeze(0)
        prediction = self.fc_out(torch.cat((output, weighted, embedded), dim = 1))
        return prediction, hidden.squeeze(0)

# 定义序列到序列模型
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio = 0.5):
        batch_size = src.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        encoder_outputs, hidden = self.encoder(src)
        input = trg[0,:]
        for t in range(1, trg_len):
            output, hidden = self.decoder(input, hidden, encoder_outputs)
            outputs[t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[t] if teacher_force else top1
        return outputs

# 初始化模型
INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
ENC_HID_DIM = 512
DEC_HID_DIM = 512
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5

attn = Attention(ENC_HID_DIM, DEC_HID_DIM)
enc = Encoder(INPUT_DIM, ENC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, ENC_DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, DEC_DROPOUT, attn)

model = Seq2Seq(enc, dec, device).to(device)

# 定义优化器和损失函数
optimizer = optim.Adam(model.parameters())
TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]
criterion = nn.CrossEntropyLoss(ignore_index = TRG_PAD_IDX)

# 训练模型
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    for i, batch in enumerate(iterator):
        src = batch.src
        trg = batch.trg
        optimizer.zero_grad()
        output = model(src, trg)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

# 评估模型
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for i, batch in enumerate(iterator):
            src = batch.src
            trg = batch.trg
            output = model(src, trg, 0)
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    return epoch_loss / len(iterator)

# 训练模型
N_EPOCHS = 10
CLIP = 1
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    start_time = time.time()
    train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_iterator, criterion)
    end_time = time.time()
    epoch_mins, epoch_secs = divmod(end_time - start_time, 60)
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut3-model.pt')
    print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs:.2f}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}')

# 在测试集上评估模型
model.load_state_dict(torch.load('tut3-model.pt'))
test_loss = evaluate(model, test_iterator, criterion)
print(f'| Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f} |')

3. 代码解释

  • 数据预处理
    • 使用torchtext加载 WMT 英德平行语料库。
    • 定义分词函数和字段,构建词汇表。
    • 使用BucketIterator创建数据迭代器。
  • 模型构建
    • 编码器:使用双向 GRU 处理输入序列,并将最后一个隐藏状态映射到解码器的隐藏状态维度。
    • 注意力机制:计算解码器隐藏状态与编码器输出之间的注意力权重。
    • 解码器:结合注意力权重和嵌入输入,使用 GRU 生成输出序列。
    • 序列到序列模型:将编码器和解码器组合在一起,实现整个翻译过程。
  • 训练和评估
    • 使用 Adam 优化器和交叉熵损失函数进行训练。
    • 在训练过程中使用教师强制策略,以提高模型的学习效率。
    • 在验证集上评估模型,并保存最佳模型。
    • 最后在测试集上评估模型的性能。

4. 注意事项

  • 此代码示例可以在 GPU 上运行,前提是你的机器上安装了 CUDA 并且 PyTorch 支持 CUDA。
  • 训练时间可能较长,你可以根据需要调整训练轮数和模型参数。
  • 可以进一步优化模型,如使用更复杂的注意力机制、更大的模型容量等,以提高翻译质量。

http://www.kler.cn/a/542842.html

相关文章:

  • 最新的构建Jenkins插件的方式
  • IntelliJ IDEA Console控制台输出成json的配置方式
  • Linux(Ubuntu)安装pyenv和pyenv-virtualenv
  • Linux源码包安装MySQL数据库
  • IPD项目管理是什么?
  • 【Flink源码分析】5. Flink1.19源码分析-异步编程(CompletableFuture)
  • mysql学习笔记-锁
  • 畅聊deepseek-r1,SiliconFlow 硅基流动注册+使用
  • 基于YoloV11和驱动级鼠标模拟实现Ai自瞄
  • 鸿蒙oh-package.json版本号
  • 【数据结构】_堆的向上调整和向下调整建堆法
  • Lucene 中的并发错误:如何修复乐观并发失败
  • 工业4.0时代,3D开发工具HOOPS如何赋能塑计量行业自动化与数据可视化?
  • Visual Studio Code中文出现黄色框子的解决办法
  • C语言中常见关键字(static,extern)
  • 【含文档+PPT+源码】基于python爬虫的豆瓣电影、音乐、图书数据分析系统
  • 妙用Pytest内置request Fixture 监控测试执行过程
  • Spring boot中实现字典管理
  • Vue解决父子组件传值,子组件改变值后父组件的值也改变的问题
  • deepseek:三个月备考高级系统架构师
  • 【并发控制、更新、版本控制】.NET开源ORM框架 SqlSugar 系列
  • ASP.NET Core DDD
  • C++多态性之重载多态(二)—学习记录
  • 图像处理篇---基本Python图像处理
  • Linux查看硬件常用命令
  • 美​团​一​二​面​​东​方​财​富​一​面