当前位置: 首页 > article >正文

机器学习系列----深入理解Transformer模型

在过去的几年里,Transformer模型彻底改变了自然语言处理(NLP)领域,并扩展到计算机视觉、语音处理等多个领域。Transformer架构自2017年由Vaswani等人提出以来,凭借其强大的性能和高效的并行计算能力,成为了现代深度学习中最重要的模型之一。在这篇文章中,我们将深入分析Transformer的核心原理,并通过代码示例帮助大家理解其实际实现。

一.Transformer模型概述

    Transformer模型的最大特点是基于“自注意力机制”(Self-Attention),它不依赖于传统的循环神经网络(RNN)或卷积神经网络(CNN),而是通过并行计算高效地处理序列数据。Transformer模型包含两个主要部分:编码器(Encoder)和解码器(Decoder)。每个部分由多个相同的层组成,每层包含不同的子组件。接下来,我们将逐一分析Transformer模型的关键组成部分。

1. 自注意力机制(Self-Attention)

自注意力机制允许模型在处理输入序列时,考虑序列中所有单词之间的关系。具体来说,给定一个输入词,它会根据该词与序列中其他所有词之间的相似度来更新自己的表示。

自注意力计算步骤
假设输入是一个词序列,其中每个词会被转换成一个向量。自注意力机制通过以下步骤计算每个词的最终表示:

步骤1:计算每个输入向量的 Query(Q)、Key(K) 和 Value(V) 向量。
步骤2:计算 Query 和 Key 之间的相似度,通常使用点积(Dot Product),然后进行 softmax 操作,得到每个词的注意力权重。
步骤3:使用这些权重对 Value 向量进行加权求和,得到新的表示。
代码示例:自注意力机制实现

import torch
import torch.nn.functional as F

def self_attention(Q, K, V):
    # 计算Q和K的点积
    scores = torch.matmul(Q, K.transpose(-2, -1)) / Q.size(-1) ** 0.5
    # 使用softmax归一化得分
    attention_weights = F.softmax(scores, dim=-1)
    # 通过注意力权重加权求和V
    output = torch.matmul(attention_weights, V)
    return output, attention_weights

# 假设Q, K, V的形状为 (batch_size, num_heads, seq_len, d_k)
Q = torch.randn(1, 8, 10, 64)  # Batch size: 1, Heads: 8, Seq_len: 10, d_k: 64
K = torch.randn(1, 8, 10, 64)
V = torch.randn(1, 8, 10, 64)

output, attention_weights = self_attention(Q, K, V)
print(output.shape)  # (1, 8, 10, 64)

2. 多头注意力机制(Multi-Head Attention)
为了让模型能够捕捉到输入数据中不同子空间的依赖关系,Transformer使用了 多头注意力机制。多头注意力机制并行计算多个自注意力头,每个头学习不同的注意力模式,最后将多个头的输出拼接起来,再进行一次线性变换。

代码示例:多头注意力实现

class MultiHeadAttention(torch.nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_k = d_model // num_heads
        
        self.Q_linear = torch.nn.Linear(d_model, d_model)
        self.K_linear = torch.nn.Linear(d_model, d_model)
        self.V_linear = torch.nn.Linear(d_model, d_model)
        self.out_linear = torch.nn.Linear(d_model, d_model)
    
    def forward(self, Q, K, V):
        batch_size = Q.size(0)
        
        # 线性变换并分成多头
        Q = self.Q_linear(Q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.K_linear(K).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.V_linear(V).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        
        # 自注意力计算
        output, attention_weights = self._scaled_dot_product_attention(Q, K, V)
        
        # 拼接各个头的输出
        output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        output = self.out_linear(output)
        return output, attention_weights

    def _scaled_dot_product_attention(self, Q, K, V):
        scores = torch.matmul(Q, K.transpose(-2, -1)) / self.d_k ** 0.5
        attention_weights = F.softmax(scores, dim=-1)
        output = torch.matmul(attention_weights, V)
        return output, attention_weights

# 测试多头注意力机制
multi_head_attention = MultiHeadAttention(d_model=256, num_heads=8)
Q = torch.randn(32, 10, 256)  # Batch size: 32, Sequence length: 10, d_model: 256
K = torch.randn(32, 10, 256)
V = torch.randn(32, 10, 256)

output, attention_weights = multi_head_attention(Q, K, V)
print(output.shape)  # (32, 10, 256)

3. 位置编码(Positional Encoding)
由于Transformer没有像RNN那样内建的顺序信息,因此需要通过 位置编码 来向模型提供输入词的位置信息。位置编码通常使用正弦和余弦函数生成,具有周期性,以便模型能够学习到不同词位置之间的相对关系。

代码示例:位置编码实现

import numpy as np
import torch

class PositionalEncoding(torch.nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).float().unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(np.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

# 测试位置编码
pos_encoder = PositionalEncoding(d_model=256)
x = torch.randn(32, 10, 256)  # Batch size: 32, Seq length: 10, d_model: 256
encoded_x = pos_encoder(x)
print(encoded_x.shape)  # (32, 10, 256)

 4. 编码器和解码器
Transformer模型由多个相同的编码器和解码器组成。每个编码器层包括一个 多头自注意力机制 和一个 前馈神经网络,解码器则在此基础上增加了一个 多头注意力机制 用于与编码器的输出交互。我们通过堆叠多个这些层,构建一个强大的模型。

5. 编码器和解码器的堆叠

class TransformerEncoderLayer(torch.nn.Module):
    def __init__(self, d_model, num_heads, ff_hidden_dim):
        super(TransformerEncoderLayer, self).__init__()
        self.attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = torch.nn.Sequential(
            torch.nn.Linear(d_model, ff_hidden_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(ff_hidden_dim, d_model)
        )
        self.norm1 = torch.nn.LayerNorm(d_model)
        self.norm2 = torch.nn.LayerNorm(d_model)
    
    def forward(self, x):
        attn_output, _ = self.attn(x, x, x)
        x = self.norm1(x + attn_output)
        ffn_output = self.ffn(x)
        x = self.norm2(x + ffn_output)
        return x

class Transformer(torch.nn.Module):
    def __init__(self, d_model, num_heads, num_layers, ff_hidden_dim, vocab_size, max_len=5000):
        super(Transformer, self).__init__()
        self.embedding = torch.nn.Embedding(vocab_size, d_model)
                self.pos_encoder = PositionalEncoding(d_model)
        self.encoder_layers = torch.nn.ModuleList([
            TransformerEncoderLayer(d_model, num_heads, ff_hidden_dim) for _ in range(num_layers)
        ])
        self.decoder = torch.nn.Linear(d_model, vocab_size)
    
    def forward(self, x):
        # 通过嵌入层
        x = self.embedding(x)
        
        # 加入位置编码
        x = self.pos_encoder(x)
        
        # 通过多个编码器层
        for layer in self.encoder_layers:
            x = layer(x)
        
        # 最后通过解码器层输出词汇表大小的预测
        logits = self.decoder(x)
        return logits

# 测试Transformer模型
vocab_size = 10000  # 假设词汇表大小为10,000
seq_len = 20  # 输入序列长度为20
batch_size = 32
d_model = 256  # 嵌入维度
num_heads = 8  # 注意力头数
num_layers = 6  # 编码器层数
ff_hidden_dim = 1024  # 前馈神经网络的隐藏层维度

model = Transformer(d_model, num_heads, num_layers, ff_hidden_dim, vocab_size)

# 假设输入是一个批次的词序列(词汇表中的索引)
input_sequence = torch.randint(0, vocab_size, (batch_size, seq_len))

# 模型前向传播
output = model(input_sequence)
print(output.shape)  # 输出的形状应该是 (batch_size, seq_len, vocab_size)

二.实际运用----基于PyTorch实现的情感分析模型

下面是一个基于PyTorch实现的Transformer模型的应用实例。该示例展示了如何使用Transformer模型进行文本分类任务,具体的任务是对IMDB电影评论数据集进行情感分析。

我们会构建一个文本分类模型,它的输入是一个电影评论文本,输出是该评论的情感分类(正面或负面)。在这个示例中,我们会利用PyTorch的nn.Transformer类和一些常见的深度学习技术来实现。

1. 环境配置与数据加载

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
from tqdm import tqdm
import spacy

# 使用spaCy加载英文模型,用于分词
nlp = spacy.load("en_core_web_sm")

# 加载IMDB数据集,通常可以通过torchtext或自己下载
def load_imdb_data():
    # 这里假设你已经将IMDB数据集下载到本地
    # 你可以在网上找到数据集,并将其转换为文本和标签
    # 这里只是一个模拟,实际使用时你需要具体实现数据加载
    texts = ["This movie was great!", "I hated this movie.", "Amazing acting and story!", "Worst movie ever.", ...]
    labels = [1, 0, 1, 0, ...]  # 1表示正面,0表示负面
    
    return texts, labels

# 数据预处理函数:tokenize和创建词汇表
def preprocess(texts, vocab_size=10000):
    # 使用spaCy进行文本预处理
    tokenized_texts = []
    for text in texts:
        doc = nlp(text)
        tokenized_texts.append([token.text for token in doc if not token.is_stop and not token.is_punct])

    # 创建词汇表
    word_freq = {}
    for tokens in tokenized_texts:
        for token in tokens:
            word_freq[token] = word_freq.get(token, 0) + 1

    # 排序并选择最常见的词汇
    vocab = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:vocab_size]
    vocab = {word: idx + 2 for idx, (word, _) in enumerate(vocab)}  # 给每个词分配一个索引,0和1预留给padding和未知词

    # 添加特殊的padding和未知符号
    vocab['<PAD>'] = 0
    vocab['<UNK>'] = 1

    # 将文本转换为索引
    texts_idx = []
    for tokens in tokenized_texts:
        texts_idx.append([vocab.get(token, vocab['<UNK>']) for token in tokens])

    return texts_idx, vocab

# 数据集类
class IMDBDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        return torch.tensor(self.texts[idx]), torch.tensor(self.labels[idx])

# 加载数据
texts, labels = load_imdb_data()
texts_idx, vocab = preprocess(texts)
train_texts, val_texts, train_labels, val_labels = train_test_split(texts_idx, labels, test_size=0.2)

train_dataset = IMDBDataset(train_texts, train_labels)
val_dataset = IMDBDataset(val_texts, val_labels)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

2. 定义Transformer模型

class TransformerTextClassifier(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, num_layers, ff_hidden_dim, num_classes, max_len=500):
        super(TransformerTextClassifier, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.pos_encoder = nn.TransformerEncoderLayer(embed_size, num_heads, ff_hidden_dim)
        self.encoder = nn.TransformerEncoder(self.pos_encoder, num_layers)
        self.fc = nn.Linear(embed_size, num_classes)
        self.max_len = max_len

    def forward(self, x):
        # x: (batch_size, seq_len)
        batch_size, seq_len = x.size()

        # 获取位置编码
        positions = torch.arange(0, seq_len, device=x.device).unsqueeze(0).repeat(batch_size, 1)

        # 词嵌入 + 位置编码
        x = self.embedding(x) + positions.float()

        # 转换为 (seq_len, batch_size, embed_size)
        x = x.permute(1, 0, 2)

        # 通过Transformer编码器
        x = self.encoder(x)

        # 取最后一个时间步的输出
        x = x[-1, :, :]

        # 分类层
        x = self.fc(x)

        return x

3. 训练与验证模型

# 超参数设置
vocab_size = len(vocab)
embed_size = 256
num_heads = 8
num_layers = 6
ff_hidden_dim = 512
num_classes = 2  # 正面和负面
learning_rate = 0.001
num_epochs = 10

# 实例化模型
model = TransformerTextClassifier(vocab_size, embed_size, num_heads, num_layers, ff_hidden_dim, num_classes)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 训练函数
def train(model, train_loader, optimizer, criterion):
    model.train()
    total_loss = 0
    correct_preds = 0
    total_preds = 0

    for texts, labels in tqdm(train_loader, desc="Training"):
        optimizer.zero_grad()
        
        # 转换为长整型
        texts, labels = texts.long(), labels.long()

        # 前向传播
        outputs = model(texts)
        
        # 计算损失
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()

        # 计算准确度
        _, predicted = torch.max(outputs, 1)
        correct_preds += (predicted == labels).sum().item()
        total_preds += labels.size(0)
    
    avg_loss = total_loss / len(train_loader)
    accuracy = correct_preds / total_preds * 100
    return avg_loss, accuracy

# 验证函数
def evaluate(model, val_loader, criterion):
    model.eval()
    total_loss = 0
    correct_preds = 0
    total_preds = 0

    with torch.no_grad():
        for texts, labels in tqdm(val_loader, desc="Evaluating"):
            texts, labels = texts.long(), labels.long()

            outputs = model(texts)
            loss = criterion(outputs, labels)

            total_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            correct_preds += (predicted == labels).sum().item()
            total_preds += labels.size(0)
    
    avg_loss = total_loss / len(val_loader)
    accuracy = correct_preds / total_preds * 100
    return avg_loss, accuracy

# 训练与验证
for epoch in range(num_epochs):
    train_loss, train_accuracy = train(model, train_loader, optimizer, criterion)
    val_loss, val_accuracy = evaluate(model, val_loader, criterion)
    
    print(f"Epoch {epoch + 1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%")
    print(f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

4. 测试与推理

# 使用训练好的模型进行预测
def predict(model, text, vocab, max_len=500):
    model.eval()
    
    # 预处理输入文本
    tokens = [token.text for token in nlp(text) if not token.is_stop and not token.is_punct]
    text_idx = [vocab.get(token, vocab['<UNK>']) for token in tokens]
    text_idx = text_idx[:max_len]  # 限制最大长度
    text_idx += [vocab['<PAD>']] * (max_len - len(text_idx))  # 填充到最大长度

    input_tensor = torch.tensor(text_idx).unsqueeze(0)  # 增加批次维度

    # 进行预测
    with torch.no_grad():
        output = model(input_tensor)
        prediction = torch.argmax(output, dim=1).item()
    
    return "Positive" if prediction == 1 else "Negative"

# 示例预测
sample_text = "The movie was absolutely fantastic! I loved it."
print(predict(model, sample_text, vocab))

三.总结

Transformer模型是一种基于自注意力(Self-Attention)机制的深度学习架构,最早由Vaswani等人在2017年提出,主要用于自然语言处理(NLP)任务。与传统的循环神经网络(RNN)和长短期记忆网络(LSTM)不同,Transformer不依赖于序列的顺序处理,而是通过自注意力机制来捕捉输入序列中任意位置之间的依赖关系。该模型的核心思想是,通过计算输入序列中每个位置与其他所有位置的相似度,来动态地加权每个位置的信息,从而有效地捕捉长距离依赖。

Transformer的结构分为两部分:编码器(Encoder)和解码器(Decoder)。编码器负责对输入序列进行处理,提取出输入的高阶特征;解码器则将这些特征转化为输出序列。在许多任务中(如文本分类和机器翻译的编码部分),我们通常只使用Transformer的编码器部分。Transformer的最大特点是其完全并行化的计算方式,这使得它比传统的RNN/LSTM模型在训练时更高效,尤其在大规模数据集上表现更为突出。

Transformer的关键组件包括自注意力层(Self-Attention Layer)和位置编码(Positional Encoding)。自注意力层通过计算每个输入单元与其他单元的相似度,生成加权的表示。而位置编码则解决了序列顺序问题,因为Transformer本身并没有递归结构,无法显式地捕捉位置信息。通过位置编码,可以为每个输入单元添加位置信息,使得模型能够理解序列中的相对位置关系。

Transformer的优势不仅体现在高效的计算和长距离依赖建模上,还使得它成为多种NLP任务(如机器翻译、文本生成、情感分析等)和跨领域应用(如计算机视觉和生物信息学)的基石。许多后续的模型,如BERT、GPT等,都是在Transformer基础上发展而来,推动了自然语言处理技术的飞跃。


http://www.kler.cn/a/387843.html

相关文章:

  • Flink三种集群部署模型
  • 二、智能体强化学习——深度强化学习核心算法
  • android源码编译后,为什么emulator一直黑屏或者停止android界面
  • 【Java项目】基于SpringBoot的【校园交友系统】
  • MT6706BL 同步整流 规格书
  • 缓存-Redis-常见问题-缓存击穿-永不过期+逻辑过期(全面 易理解)
  • C++顶层const与底层const
  • 【需求变更】使用 Redis 和 Lua 脚本实现变更后方案编号的生成
  • Linux下通过sqlplus连Oracle提示字符是乱码▒▒▒[
  • 什么是 eCPRI,它对 5G 和 Open RAN 有何贡献?
  • 设计模式-七个基本原则之一-迪米特法则 + 案例
  • 【WRF模拟】全过程总结:WPS预处理及WRF运行
  • Mybatis拦截器中获取@RequestBody表单的值修改查询SQL
  • redis的部署方式详解
  • 「QT」几何数据类 之 QLine 整型直线类
  • IP可用端口扫描器工具(bun + typescript)
  • Debezium系列之:Debezium 中的增量快照
  • 数据结构之排序--选择排序详解
  • MATLAB和Python多语高维转录分析
  • 在 Vue 中实现与优化轮询技术
  • JMeter基础篇
  • react useRef
  • 水库汛限水位是什么?如何进行安全监测
  • Android HandlerThread 基础
  • 修复 Ubuntu中 “Command ‘python’ not found” 的错误
  • 软件工程概论项目(一),git环境的配置和平台代码的拉取