当前位置: 首页 > article >正文

【NLP 42、实践 ⑪ 用Bert模型结构实现自回归语言模型的训练】

目录

数据文件

一、模型定义

1.模型初始化

代码运行流程

2.前向传播,计算损失 ⭐

代码运行流程

二、加载语料

代码运行流程

三、 随机生成样本

代码运行流程

四、建立模型

五、采样策略选择

代码运行流程

六、模型效果测试

代码运行流程

七、模型训练

代码运行流程

八、main函数

九、整体代码


如果结局早已注定,那么过程就将大于结局

                                                        —— 25.3.18

自回归语言模型:由前文预测后文的语言模型

特点:单向

训练方式:利用前n个字预测第n+1个字,实现一个mask矩阵,送入Bert模型,让其前文看不到后文,作一个生成式训练 

数据文件

通过网盘分享的文件:文本生成
链接: https://pan.baidu.com/s/1Az9WLH1LfEyk_5ih8db7jw?pwd=6uv6 提取码: 6uv6 
--来自百度网盘超级会员v3的分享

一、模型定义

1.模型初始化

代码运行流程

# LanguageModel初始化流程树状图
├── 1. 父类初始化
│   └── `super().__init__()` → 继承父类(如`nn.Module`)的属性和方法
├── 2. 加载预训练BERT模型
│   ├── `self.bert = BertModel.from_pretrained(...)`
│   │   ├── `pretrain_model_path`: 预训练模型路径(如`bert-base-uncased`)
│   │   ├── `return_dict=False`: 强制返回元组而非字典(兼容旧版代码)
│   │   └── `attn_implementation='eager'`: 使用标准注意力实现(非Flash Attention等优化)
│   └── 输出特征维度:`hidden_size`(与BERT模型配置一致)
├── 3. 定义分类层
│   └── `self.classify = nn.Linear(hidden_size, vocab_size)`
│       ├── `hidden_size`: BERT输出的隐藏层维度(如768)
│       └── `vocab_size`: 目标词汇表大小(如30522)
└── 4. 定义损失函数
    └── `self.loss = nn.functional.cross_entropy`
        └── 计算预测logits与真实标签的交叉熵损失

hidden_size:与BERT模型的隐藏层维度一致(例如BERT-base为768),用于连接分类层输入维度。

vocab_size:输出层词汇表大小,通常与预训练模型的词汇表匹配

pretrain_model_path: 预训练模型的本地路径或HuggingFace模型标识符

self.bert:加载预训练的BERT模型,用于文本编码

self.classify:将BERT输出的隐藏状态映射到词汇表空间,生成预测logits

self.loss:计算预测logits与真实标签的交叉熵损失

BertModel.from_pretrained():从预训练模型(如BERT、RoBERTa等)加载模型权重和配置,支持自定义加载路径或HuggingFace模型标识符

        attn_implementation:用于选择注意力机制的实现方式

                eager:使用PyTorch原生的注意力实现,基于标准的矩阵乘法(如torch.matmul)和softmax操作。

                sdpa:该实现针对GPU进行了优化,利用Flash Attention等高效计算技术,显著提升了计算速度和内存效率。

                flash_attention_2:相比"sdpa",Flash Attention 2进一步优化了计算和内存使用,特别适合处理非常长的序列。

参数类型必选默认值说明
pretrained_model_name_or_pathstr预训练模型名称(如bert-base-uncased)或本地路径
configPretrainedConfigNone自定义模型配置(覆盖默认配置)
cache_dirstrNone模型缓存目录(避免重复下载)
force_downloadboolFalse强制重新下载模型文件
resume_downloadboolFalse断点续传下载
output_loading_infoboolFalse是否返回加载过程的详细信息
attn_implementationstr"eager"注意力实现方式(如"flash_attention_2"加速)

nn.Linear():定义全连接层,对输入数据执行线性变换(y = xA^T + b),适用于特征映射和分类层

参数类型必选默认值说明
in_featuresint输入特征维度(如BERT隐藏层维度768)
out_featuresint输出特征维度(如词汇表大小30522)
biasboolTrue是否启用偏置项(b

nn.functional.cross_entropy:计算交叉熵损失,结合log_softmaxnll_loss,适用于分类任务(如情感分析、文本分类)

参数类型必选默认值说明
inputTensor未归一化的预测值(形状[batch, num_classes]
targetTensor真实标签(类别索引或概率分布)
weightTensorNone类别权重(平衡样本不均衡问题)
ignore_indexint-100忽略指定索引的标签计算
reductionstr"mean"损失聚合方式("none""mean""sum"
label_smoothingfloat0.0标签平滑系数(缓解过拟合)
    def __init__(self, hidden_size, vocab_size, pretrain_model_path):
        super(LanguageModel, self).__init__()
        # self.embedding = nn.Embedding(len(vocab), input_dim)
        # self.layer = nn.LSTM(input_dim, input_dim, num_layers=1, batch_first=True)

        self.bert = BertModel.from_pretrained(pretrain_model_path, return_dict=False, attn_implementation='eager')

        self.classify = nn.Linear(hidden_size, vocab_size)
        self.loss = nn.functional.cross_entropy

2.前向传播,计算损失 ⭐

代码运行流程

# forward方法流程
├── ​**输入判断**  
│   └── 根据`y`是否存在选择训练或推理模式  
│       ├── ​**训练模式** (`y is not None`)  
│       │   ├── 1. 构建注意力掩码  
│       │   │   └── `mask = torch.tril(torch.ones((x.shape[0], x.shape[1], x.shape[1])))` [1](@ref)  
│       │   │       └── 生成下三角矩阵(允许当前token关注过去token,禁止关注未来token)  
│       │   ├── 2. BERT编码  
│       │   │   └── `x, _ = self.bert(x, attention_mask=mask)`  
│       │   │       ├── 输入`x`: 输入序列的token IDs [1](@ref)  
│       │   │       └── `attention_mask`: 限制自注意力范围(防止模型“偷看”未来信息) [1](@ref)  
│       │   ├── 3. 分类层预测  
│       │   │   └── `y_pred = self.classify(x)`  
│       │   │       └── 将BERT输出映射到词汇表空间(`vocab_size`维度)[4,6](@ref)  
│       │   └── 4. 计算损失  
│       │       └── `return self.loss(y_pred.view(-1, ...), y.view(-1))`  
│       │           └── 交叉熵损失计算(展平维度适配`[batch*seq_len, vocab_size]`)[7,9](@ref)  
│       └── ​**推理模式** (`y is None`)  
│           ├── 1. BERT编码(无掩码)  
│           │   └── `x, _ = self.bert(x)`  
│           │       └── 全注意力模式(允许所有token互相关注)  
│           ├── 2. 分类层预测  
│           │   └── `y_pred = self.classify(x)`  
│           └── 3. 返回概率分布  
│               └── `return torch.softmax(y_pred, dim=-1)` [8](@ref)  
│                   └── 输出每个token的类别概率(维度:`[batch_size, seq_len, vocab_size]`)  

x:输入序列的token ID张量

y:真实标签张量(训练模式时存在)

y_pred:分类层输出的预测logits

mask:生成下三角注意力掩码矩阵

self.loss:计算预测与标签的交叉熵损失

torch.trill():生成一个下三角矩阵,主对角线以下的元素保留,其余置零。常用于因果掩码(如自注意力机制中防止模型“偷看”未来信息)

参数类型必选默认值说明
inputTensor输入张量
diagonalint0对角线偏移量:
0:主对角线
>0:主对角线上方第k条对角线
<0:主对角线下方第k条对角线

torch.ones():生成指定形状的全1张量,常用于初始化或占位符

参数类型必选默认值说明
sizeint或tuple输出张量的形状,如(2,3)
dtypetorch.dtypeNone数据类型(如torch.float32
devicetorch.deviceCPU设备(如"cuda:0"
requires_gradboolFalse是否需计算梯度

.shape():返回张量的维度信息(类型为torch.Size),非函数而是属性

torch.cuda.is_available():检测当前系统是否支持CUDA(即是否有可用GPU),返回布尔值

cuda():将张量或模型从CPU移动到GPU,加速计算

参数类型必选默认值说明
deviceint或torch.device当前默认GPU目标GPU设备索引(如0"cuda:0"

.view():改变张量的形状(类似reshape),但需张量内存连续

参数类型必选默认值说明
*shapeint或-1目标形状,-1表示自动计算维度大小

torch.softmax():对张量在指定维度上计算Softmax,输出概率分布(总和为1)

参数类型必选默认值说明
inputTensor输入张量
dimint计算Softmax的维度(如dim=1表示按行计算)
    # 当输入真实标签,返回loss值;无真实标签,返回预测值
    def forward(self, x, y=None):
        if y is not None:
            # 训练时,构建一个下三角的mask矩阵,让上下文之间没有交互
            mask = torch.tril(torch.ones((x.shape[0], x.shape[1], x.shape[1])))
            if torch.cuda.is_available():
                mask = mask.cuda()
            x, _ = self.bert(x, attention_mask=mask)
            y_pred = self.classify(x)  # output shape:(batch_size, vocab_size)
            return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1))
        else:
            # 预测时,可以不使用mask
            x, _ = self.bert(x)
            y_pred = self.classify(x)  # output shape:(batch_size, vocab_size)
            return torch.softmax(y_pred, dim=-1)

二、加载语料

代码运行流程

load_corpus(path)
├── 1. 初始化corpus字符串
│   └── corpus = ""
├── 2. 打开文件
│   ├── with open(path, encoding="gbk") as f:
│   │   └── 以"gbk"编码只读模式打开文件(自动处理文件关闭)
│   └── 异常处理:若文件不存在或编码错误,抛出IOError/UnicodeDecodeError(隐式)
├── 3. 逐行读取文件内容
│   └── for line in f:
│       ├── line.strip(): 移除行首尾的空白字符(包括换行符"\n")
│       └── corpus += line.strip(): 拼接处理后的行到corpus字符串
└── 4. 返回结果
    └── return corpus → 合并后的完整文本字符串

path:需要加载的文本文件的路径(如"./data.txt"),支持绝对或相对路径。

corpus:初始为空字符串,用于存储逐行读取并处理后的完整文本。

f:文件对象,通过open()打开的文件句柄,支持迭代逐行读取。

line:每次循环读取的一行原始内容(包含换行符)

open():打开文件并返回文件对象,用于文件的读写操作。支持文本或二进制模式,是Python中文件I/O的核心函数

参数类型必选默认值说明
​**name**str文件路径(如"data.txt"),支持绝对或相对路径

2

​**mode**str'r'文件打开模式:
'r':只读
'w':写入(覆盖)
'a':追加写入
'b':二进制模式(如'rb'
'+':读写模式(如'r+')。
​**buffering**int-1缓冲策略:
0:无缓冲(仅限二进制模式)
1:行缓冲(文本模式)
>1:指定缓冲区大小(字节)

strip():移除字符串开头和结尾的指定字符序列,默认删除空白符(如空格、换行符\n、制表符\t等)

参数类型必选默认值说明
​**chars**strNone指定要删除的字符集合:
- 若提供,则删除开头和结尾中所有属于chars的字符,直到遇到不在其中的字符为止;
- 若未提供,默认删除空白符。
# 加载语料
def load_corpus(path):
    corpus = ""
    with open(path, encoding="gbk") as f:
        for line in f:
            corpus += line.strip()
    return corpus

三、 随机生成样本

代码运行流程

build_sample(tokenizer, window_size, corpus)
├── 1. 随机生成起始位置
│   └── start = random.randint(0, len(corpus)-1-window_size)
│       ├── 生成范围:0 ≤ start ≤ len(corpus)-window_size-1
│       └── 确保窗口末尾不越界
├── 2. 截取输入窗口与目标序列
│   ├── end = start + window_size → 窗口结束位置
│   ├── window = corpus[start:end] → 输入文本(前n字)
│   └── target = corpus[start+1:end+1] → 目标文本(后n字,输入右移1位)
├── 3. 文本编码为模型输入
│   ├── x = tokenizer.encode(window, ...)
│   │   ├── add_special_tokens=False → 不添加特殊标记(如[CLS]/[SEP])
│   │   ├── padding='max_length' → 填充至固定长度(max_length=10)
│   │   └── truncation=True → 超长时截断
│   └── y = tokenizer.encode(target, ...) → 同x逻辑
└── 4. 返回样本
    └── return x, y → 输入序列x与目标序列y

tokenizer:分词器对象,将文本转换为模型可处理的数字序列

window_size:输入窗口的长度(字符数)

corpis:原始文本语料

start:窗口起始位置

end:窗口结束位置

window:输入文本片段

target:目标文本片段

x:输入序列的编码

y:目标序列的编码

random.randint():生成一个在闭区间 [a, b] 内的随机整数(包含 a 和 b),常用于模拟离散随机事件(如掷骰子、随机抽样等)

参数类型必选默认值说明示例
​**a**int随机数的最小值(包含)random.randint(1, 10) → 7
​**b**int随机数的最大值(包含)random.randint(1, 10) → 3

tokenizer.encode():将文本转换为模型可处理的数字序列(Token ID 列表),常用于自然语言处理任务(如BERT、GPT等模型的输入预处理)

参数类型必选默认值说明
​**text**str 或 List[str]输入文本(支持单句或批量文本)
​**add_special_tokens**boolTrue是否添加特殊标记(如BERT的[CLS][SEP]
​**padding**str 或 boolFalse填充策略:True/"max_length"填充至max_length长度,"longest"填充到批次最长序列长度
​**truncation**str 或 boolFalse截断策略:True/"longest_first"优先截断较长部分,"only_first"仅截断第一个句子
​**max_length**int模型默认(如512)序列最大长度(超出部分截断或填充)
​**return_tensors**strNone返回张量类型:"pt"(PyTorch)或"tf"(TensorFlow)
# 随机生成一个样本
# 从文本中截取随机窗口,前n个字作为输入,最后一个字作为输出
def build_sample(tokenizer, window_size, corpus):
    start = random.randint(0, len(corpus) - 1 - window_size)
    end = start + window_size
    window = corpus[start:end]
    target = corpus[start + 1:end + 1]  # 输入输出错开一位

    x = tokenizer.encode(window, add_special_tokens=False, padding='max_length', truncation=True,
                         max_length=10)  # 将字转换成序号
    y = tokenizer.encode(target, add_special_tokens=False, padding='max_length', truncation=True, max_length=10)

    return x, y

四、建立模型

vocab:词汇表的大小或词汇表对象。通常表示模型中词汇的数量,用于定义模型的输入维度。

char_dim:字符嵌入的维度,表示每个字符的向量表示的长度。

pretrain_model_path:预训练模型的路径,用于加载预训练权重或配置。

model:返回一个 LanguageModel 实例,该实例已经初始化并加载了预训练权重

# 建立模型
def build_model(vocab, char_dim, pretrain_model_path):
    model = LanguageModel(768, 21128, pretrain_model_path)
    return model

五、采样策略选择

代码运行流程

sampling_strategy(prob_distribution)
├── 1. 随机选择采样策略
│   ├── if random.random() > 0.1:
│   │   └── strategy = "greedy" → 90%概率使用贪婪搜索
│   └── else:
│       └── strategy = "sampling" → 10%概率使用随机采样
├── 2. 根据策略选择token
│   ├── if strategy == "greedy":
│   │   └── return int(torch.argmax(prob_distribution)) → 选择概率最大的token
│   └── elif strategy == "sampling":
│       ├── prob_distribution = prob_distribution.cpu().numpy() → 将概率分布转换为NumPy数组
│       └── return np.random.choice(list(range(len(prob_distribution))), p=prob_distribution) → 根据概率分布随机采样
└── 3. 返回选择的token ID
    └── return token_id → 返回生成的token ID

prob_distribution:模型输出的概率分布,表示每个token的未归一化概率(logits)

strategy:采样策略

random.random():生成一个 [0.0, 1.0) 之间的随机浮点数,常用于需要随机值的场景

torch.argmax():返回张量中最大值的索引,常用于分类任务中获取最大概率的类别索引

参数类型必选默认值说明
inputtorch.Tensor输入张量。
dimintNone指定沿哪个维度计算最大值索引。
keepdimboolFalse是否保留原维度。

cpu():将张量从 GPU 转移到 CPU,适用于需要在 CPU 上处理数据的场景

numpy():将 PyTorch 张量转换为 NumPy 数组,便于与 NumPy 库进行交互

np.random.choice():从给定数组中随机选择元素,支持指定采样概率和是否允许重复

参数类型必选默认值说明
aint 或 array-like输入数组或整数。
sizeint 或 tupleNone输出数组的形状。
replaceboolTrue是否允许重复采样。
parray-likeNone每个元素的采样概率。

list():将可迭代对象(如元组、字符串、集合等)转换为列表,便于修改和操作

参数类型必选默认值说明
iterable可迭代对象将可迭代对象转换为列表。
def sampling_strategy(prob_distribution):
    if random.random() > 0.1:
        strategy = "greedy"
    else:
        strategy = "sampling"
    if strategy == "greedy":
        return int(torch.argmax(prob_distribution))
    elif strategy == "sampling":
        prob_distribution = prob_distribution.cpu().numpy()
        return np.random.choice(list(range(len(prob_distribution))), p=prob_distribution)

六、模型效果测试

代码运行流程

generate_sentence(openings, model, tokenizer, window_size)
├── 1. 设置模型为评估模式
│   └── model.eval() → 禁用dropout和batchnorm等训练专用行为
├── 2. 禁用梯度计算
│   └── with torch.no_grad() → 减少内存占用,加速推理
├── 3. 初始化生成字符
│   └── pred_char = "" → 用于存储当前生成的字符
├── 4. 文本生成循环
│   ├── 终止条件:
│   │   ├── pred_char == "\n" → 生成换行符
│   │   └── len(openings) > 30 → 生成文本超过30字
│   ├── 更新生成文本
│   │   └── openings += pred_char → 将新生成的字符追加到文本中
│   ├── 编码输入文本
│   │   ├── x = tokenizer.encode(openings, add_special_tokens=False) → 将文本转换为数字序列
│   │   └── x = torch.LongTensor([x]) → 转换为PyTorch张量
│   ├── 设备转移
│   │   └── if torch.cuda.is_available(): x = x.cuda() → 将输入数据移至GPU(若可用)
│   ├── 模型推理
│   │   └── y = model(x)[0][-1] → 获取模型输出的最后一个token的logits
│   ├── 采样策略
│   │   └── index = sampling_strategy(y) → 根据logits选择下一个token(如贪婪搜索或随机采样)
│   └── 解码生成字符
│       └── pred_char = ''.join(tokenizer.decode(index)) → 将token ID转换为字符
└── 5. 返回生成文本
    └── return openings → 返回生成的完整文本

openings:生成文本的起始片段(如"今天天气")

model:用于生成文本的预训练或微调模型

tokenizer:分词器对象,将文本转换为模型输入的数字序列,并将模型输出解码为文本

window_size:输入窗口大小,限制模型输入的长度

pred_char:当前生成的字符

x:编码后的输入文本(数字序列)

y:模型输出,模型预测的logits(未归一化的概率分布)

index:通过采样策略从logits中选择的token ID

eval():在测试或验证时使用,确保模型行为一致,不会随机丢弃神经元或更新 BatchNorm 的统计量

torch.no_grad():在推理阶段使用,避免不必要的梯度计算和存储

tokenizer.encode():将文本转换为模型输入的数字序列

参数类型必选默认值说明
textstr 或 List[str]输入文本,支持单句或批量文本。
add_special_tokensboolTrue是否添加特殊标记(如 [CLS] 和 [SEP])。
max_lengthint模型默认(如 512)最大序列长度,超出部分截断或填充。
truncationstr 或 boolFalse截断策略(如 "longest_first")。
return_tensorsstrNone返回张量类型(如 "pt" 或 "tf")。

torch.LongTensor():创建 64 位整型张量,常用于索引或标签数据

参数类型必选默认值说明
datalist 或 numpy.ndarray输入数据,转换为 64 位整型张量。

torch.cuda.is_available():返回布尔值,指示是否可以使用 GPU

cuda():将张量或模型从CPU移动到GPU,加速计算

参数类型必选默认值说明
deviceint或torch.device当前默认GPU目标GPU设备索引(如0"cuda:0"

join():将可迭代对象中的元素连接为字符串,使用指定分隔符

参数类型必选默认值说明
iterable可迭代对象包含要连接的字符串或字符。
sepstr""分隔符,默认为空字符串。

tokenizer.decode():将 token ID 序列解码为文本

参数类型必选默认值说明
token_idsList[int] 或 torch.Tensor要解码的 token ID 序列。
skip_special_tokensboolFalse是否跳过特殊标记(如 [CLS] 和 [SEP])。
clean_up_tokenization_spacesboolTrue是否清理多余的空格。
# 文本生成测试代码
def generate_sentence(openings, model, tokenizer, window_size):
    # reverse_vocab = dict((y, x) for x, y in vocab.items())
    model.eval()
    with torch.no_grad():
        pred_char = ""
        # 生成了换行符,或生成文本超过30字则终止迭代
        while pred_char != "\n" and len(openings) <= 30:
            openings += pred_char
            x = tokenizer.encode(openings, add_special_tokens=False)
            x = torch.LongTensor([x])
            if torch.cuda.is_available():
                x = x.cuda()
            y = model(x)[0][-1]
            index = sampling_strategy(y)
            pred_char = ''.join(tokenizer.decode(index))
    return openings

七、模型训练

代码运行流程

train(corpus_path, save_weight=True)
├── 1. 初始化训练参数
│   ├── epoch_num = 20 → 训练轮数
│   ├── batch_size = 128 → 每次训练样本个数
│   ├── train_sample = 10000 → 每轮训练样本总数
│   ├── char_dim = 768 → 每个字的维度
│   ├── window_size = 10 → 样本文本长度
│   ├── vocab_size = 21128 → 字表大小
│   └── learning_rate = 0.001 → 学习率
├── 2. 加载预训练模型和分词器
│   ├── pretrain_model_path = r"F:\人工智能NLP\NLP资料\week6 语言模型\bert-base-chinese" → 预训练模型路径
│   └── tokenizer = BertTokenizer.from_pretrained(pretrain_model_path) → 加载分词器
├── 3. 加载语料和构建模型
│   ├── corpus = load_corpus(corpus_path) → 加载语料
│   ├── model = build_model(vocab_size, char_dim, pretrain_model_path) → 构建模型
│   └── if torch.cuda.is_available(): model = model.cuda() → 将模型移至GPU(若可用)
├── 4. 初始化优化器
│   └── optim = torch.optim.Adam(model.parameters(), lr=learning_rate) → 使用Adam优化器
├── 5. 训练循环
│   ├── for epoch in range(epoch_num): → 遍历每轮训练
│   │   ├── model.train() → 设置模型为训练模式
│   │   ├── watch_loss = [] → 初始化损失列表
│   │   ├── for batch in range(int(train_sample / batch_size)): → 遍历每个批次
│   │   │   ├── x, y = build_dataset(batch_size, tokenizer, window_size, corpus) → 构建训练样本
│   │   │   ├── if torch.cuda.is_available(): x, y = x.cuda(), y.cuda() → 将数据移至GPU(若可用)
│   │   │   ├── optim.zero_grad() → 梯度归零
│   │   │   ├── loss = model(x, y) → 计算损失
│   │   │   ├── loss.backward() → 反向传播计算梯度
│   │   │   └── optim.step() → 更新模型参数
│   │   │   └── watch_loss.append(loss.item()) → 记录损失值
│   │   └── print("第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss))) → 打印每轮平均损失
│   │   └── print(generate_sentence("让他在半年之前,就不能做出", model, tokenizer, window_size)) → 生成示例句子
│   │   └── print(generate_sentence("李慕站在山路上,深深的呼吸", model, tokenizer, window_size)) → 生成示例句子
├── 6. 保存模型权重
│   ├── if not save_weight: return → 不保存权重
│   └── else:
│       ├── base_name = os.path.basename(corpus_path).replace("txt", "pth") → 生成模型文件名
│       ├── model_path = os.path.join("model", base_name) → 生成模型保存路径
│       └── torch.save(model.state_dict(), model_path) → 保存模型权重
└── 7. 返回
    └── return → 函数结束

corpus_path:语料文件的路径,用于加载训练数据。

save_weight:控制是否保存训练后的模型权重,默认为 True

epoch_num:训练轮数

batch_size:每次训练的样本数量

train_sample:每轮训练样本总数

char_dim:每个字符的向量维度

window_size:窗口长度大小

vocab_size:词汇表中字符的数量

learning_rate:优化器的学习率

pretrain_model_path:预训练模型的路径,用于加载分词器和模型权重

tokenizer:将文本转换为模型输入的数字序列

corpus:加载的语料数据,用于训练模型

model:用于训练的语言模型

optim:用于更新模型参数的优化器

watch_loss:记录每轮训练中每个批次的损失值

x,y:输入数据和目标数据,用于训练模型。

base_name:根据语料路径生成的模型文件名。

model_path:模型权重的保存路径。

BertTokenizer.from_pretrained():加载预训练的分词器

参数类型必选默认值说明
pretrained_model_name_or_pathstr 或 os.PathLike预训练模型的名称或路径。
*init_inputs任意额外的初始化参数。
**kwargs任意其他可选参数,如 do_lower_casecls_token_id 等。

torch.cuda.is_available():返回布尔值,指示是否可以使用 GPU。

model.cuda():将模型移至 GPU。

参数类型必选默认值说明
deviceint 或 str"cuda:0"指定 GPU 设备,如 "cuda:0" 或 "cuda:1"

torch.optim.Adam():创建 Adam 优化器

参数类型必选默认值说明
paramsIterable需要优化的参数,通常是 model.parameters()
lrfloat0.001学习率。
betasTuple[float, float](0.9, 0.999)Adam 算法中的 beta1 和 beta2 参数。
epsfloat1e-8数值稳定性参数。
weight_decayfloat0权重衰减(L2 正则化)。
amsgradboolFalse是否使用 AMSGrad 变体。

model.parameters():获取模型的可训练参数

model.train():启用 Dropout 和 BatchNorm 等训练专用行为

optim.zero_grad():将模型参数的梯度归零,避免梯度累积。

loss.backward():反向传播计算梯度

参数类型必选默认值说明
retain_graphboolFalse是否保留计算图以供后续反向传播。
create_graphboolFalse是否创建计算图以支持高阶导数计算。

optim.step():根据梯度更新模型参数

append():将元素添加到列表末尾

参数类型必选默认值说明
item任意要添加到列表末尾的元素。

item():返回张量的 Python 标量值

np.mean():计算数组的均值

参数类型必选默认值说明
aarray_like输入数组。
axisint 或 tupleNone沿指定轴计算均值。
dtypedtypeNone输出数组的数据类型。
outndarrayNone输出数组。

os.path.basename():返回路径中的文件名部分

参数类型必选默认值说明
pathstr文件路径。

replace():替换字符串中的子字符串

参数类型必选默认值说明
oldstr要替换的子字符串。
newstr替换后的新字符串。
countint-1替换次数,默认替换所有。

os.path.join():将多个路径部分连接成一个完整路径

参数类型必选默认值说明
*pathsstr要连接的路径部分。

state_dict():获取模型的状态字典,包含所有可训练参数。

def train(corpus_path, save_weight=True):
    epoch_num = 20  # 训练轮数
    batch_size = 128  # 每次训练样本个数
    train_sample = 10000  # 每轮训练总共训练的样本总数
    char_dim = 768  # 每个字的维度
    window_size = 10  # 样本文本长度
    vocab_size = 21128  # 字表大小
    learning_rate = 0.001  # 学习率

    pretrain_model_path = r"F:\人工智能NLP\NLP资料\week6 语言模型\bert-base-chinese"
    tokenizer = BertTokenizer.from_pretrained(pretrain_model_path)

    corpus = load_corpus(corpus_path)  # 加载语料
    model = build_model(vocab_size, char_dim, pretrain_model_path)  # 建立模型
    if torch.cuda.is_available():
        model = model.cuda()
    optim = torch.optim.Adam(model.parameters(), lr=learning_rate)  # 建立优化器
    print("文本词表模型加载完毕,开始训练")
    for epoch in range(epoch_num):
        model.train()
        watch_loss = []
        for batch in range(int(train_sample / batch_size)):
            x, y = build_dataset(batch_size, tokenizer, window_size, corpus)  # 构建一组训练样本
            if torch.cuda.is_available():
                x, y = x.cuda(), y.cuda()
            optim.zero_grad()  # 梯度归零
            loss = model(x, y)  # 计算loss
            loss.backward()  # 计算梯度
            optim.step()  # 更新权重
            watch_loss.append(loss.item())
        print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))
        print(generate_sentence("让他在半年之前,就不能做出", model, tokenizer, window_size))
        print(generate_sentence("李慕站在山路上,深深的呼吸", model, tokenizer, window_size))
    if not save_weight:
        return
    else:
        base_name = os.path.basename(corpus_path).replace("txt", "pth")
        model_path = os.path.join("model", base_name)
        torch.save(model.state_dict(), model_path)
        return

八、main函数

if __name__ == "__main__":
    # build_vocab_from_corpus("corpus/all.txt")
    train(r"F:\人工智能NLP\NLP\HomeWork\demo9.1_Bert语言模型生成文本\corpus.txt", False)


九、整体代码

# coding:utf8

import torch
import torch.nn as nn
import numpy as np
import math
import random
import os
import re
from transformers import BertTokenizer, BertModel

"""
基于pytorch的Bert语言模型
"""


class LanguageModel(nn.Module):
    def __init__(self, hidden_size, vocab_size, pretrain_model_path):
        super(LanguageModel, self).__init__()
        # self.embedding = nn.Embedding(len(vocab), input_dim)
        # self.layer = nn.LSTM(input_dim, input_dim, num_layers=1, batch_first=True)

        self.bert = BertModel.from_pretrained(pretrain_model_path, return_dict=False, attn_implementation='eager')

        self.classify = nn.Linear(hidden_size, vocab_size)
        self.loss = nn.functional.cross_entropy

    # 当输入真实标签,返回loss值;无真实标签,返回预测值
    def forward(self, x, y=None):
        if y is not None:
            # 训练时,构建一个下三角的mask矩阵,让上下文之间没有交互
            mask = torch.tril(torch.ones((x.shape[0], x.shape[1], x.shape[1])))
            if torch.cuda.is_available():
                mask = mask.cuda()
            x, _ = self.bert(x, attention_mask=mask)
            y_pred = self.classify(x)  # output shape:(batch_size, vocab_size)
            return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1))
        else:
            # 预测时,可以不使用mask
            x, _ = self.bert(x)
            y_pred = self.classify(x)  # output shape:(batch_size, vocab_size)
            return torch.softmax(y_pred, dim=-1)


# 加载字表
# def build_vocab(vocab_path):
#     vocab = {"<pad>":0}
#     with open(vocab_path, encoding="utf8") as f:
#         for index, line in enumerate(f):
#             char = line[:-1]       #去掉结尾换行符
#             vocab[char] = index + 1 #留出0位给pad token
#     return vocab

# 加载语料
def load_corpus(path):
    corpus = ""
    with open(path, encoding="gbk") as f:
        for line in f:
            corpus += line.strip()
    return corpus


# 随机生成一个样本
# 从文本中截取随机窗口,前n个字作为输入,最后一个字作为输出
def build_sample(tokenizer, window_size, corpus):
    start = random.randint(0, len(corpus) - 1 - window_size)
    end = start + window_size
    window = corpus[start:end]
    target = corpus[start + 1:end + 1]  # 输入输出错开一位

    x = tokenizer.encode(window, add_special_tokens=False, padding='max_length', truncation=True,
                         max_length=10)  # 将字转换成序号
    y = tokenizer.encode(target, add_special_tokens=False, padding='max_length', truncation=True, max_length=10)

    return x, y


# 建立数据集
# sample_length 输入需要的样本数量。需要多少生成多少
# vocab 词表
# window_size 样本长度
# corpus 语料字符串
def build_dataset(sample_length, tokenizer, window_size, corpus):
    dataset_x = []
    dataset_y = []
    for i in range(sample_length):
        x, y = build_sample(tokenizer, window_size, corpus)
        dataset_x.append(x)
        dataset_y.append(y)
    return torch.LongTensor(dataset_x), torch.LongTensor(dataset_y)


# 建立模型
def build_model(vocab, char_dim, pretrain_model_path):
    model = LanguageModel(768, 21128, pretrain_model_path)
    return model

def sampling_strategy(prob_distribution):
    if random.random() > 0.1:
        strategy = "greedy"
    else:
        strategy = "sampling"
    if strategy == "greedy":
        return int(torch.argmax(prob_distribution))
    elif strategy == "sampling":
        prob_distribution = prob_distribution.cpu().numpy()
        return np.random.choice(list(range(len(prob_distribution))), p=prob_distribution)


# 文本生成测试代码
def generate_sentence(openings, model, tokenizer, window_size):
    # reverse_vocab = dict((y, x) for x, y in vocab.items())
    model.eval()
    with torch.no_grad():
        pred_char = ""
        # 生成了换行符,或生成文本超过30字则终止迭代
        while pred_char != "\n" and len(openings) <= 30:
            openings += pred_char
            x = tokenizer.encode(openings, add_special_tokens=False)
            x = torch.LongTensor([x])
            if torch.cuda.is_available():
                x = x.cuda()
            y = model(x)[0][-1]
            index = sampling_strategy(y)
            pred_char = ''.join(tokenizer.decode(index))
    return openings


def train(corpus_path, save_weight=True):
    epoch_num = 20  # 训练轮数
    batch_size = 128  # 每次训练样本个数
    train_sample = 10000  # 每轮训练总共训练的样本总数
    char_dim = 768  # 每个字的维度
    window_size = 10  # 样本文本长度
    vocab_size = 21128  # 字表大小
    learning_rate = 0.001  # 学习率

    pretrain_model_path = r"F:\人工智能NLP\NLP资料\week6 语言模型\bert-base-chinese"
    tokenizer = BertTokenizer.from_pretrained(pretrain_model_path)

    corpus = load_corpus(corpus_path)  # 加载语料
    model = build_model(vocab_size, char_dim, pretrain_model_path)  # 建立模型
    if torch.cuda.is_available():
        model = model.cuda()
    optim = torch.optim.Adam(model.parameters(), lr=learning_rate)  # 建立优化器
    print("文本词表模型加载完毕,开始训练")
    for epoch in range(epoch_num):
        model.train()
        watch_loss = []
        for batch in range(int(train_sample / batch_size)):
            x, y = build_dataset(batch_size, tokenizer, window_size, corpus)  # 构建一组训练样本
            if torch.cuda.is_available():
                x, y = x.cuda(), y.cuda()
            optim.zero_grad()  # 梯度归零
            loss = model(x, y)  # 计算loss
            loss.backward()  # 计算梯度
            optim.step()  # 更新权重
            watch_loss.append(loss.item())
        print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))
        print(generate_sentence("让他在半年之前,就不能做出", model, tokenizer, window_size))
        print(generate_sentence("李慕站在山路上,深深的呼吸", model, tokenizer, window_size))
    if not save_weight:
        return
    else:
        base_name = os.path.basename(corpus_path).replace("txt", "pth")
        model_path = os.path.join("model", base_name)
        torch.save(model.state_dict(), model_path)
        return


if __name__ == "__main__":
    # build_vocab_from_corpus("corpus/all.txt")
    train(r"F:\人工智能NLP\NLP\HomeWork\demo9.1_Bert语言模型生成文本\corpus.txt", False)


http://www.kler.cn/a/598864.html

相关文章:

  • 人脸表情识别系统分享(基于深度学习+OpenCV+PyQt5)
  • Spring Boot框架中常用注解
  • 【mysql】同一个字段,字符串相加
  • 从Oracle到OceanBase数据库迁移:全方位技术解析
  • 如何让Go 的regexp包支持 (?!...) 这样的 Perl 语法?
  • PHP转GO Day3 函数定义与包管理实践(创建数学工具包)
  • 2.1.项目管理前言
  • 【Linux】:守护进程化
  • 【JavaEE】Mybatis基础使用注解 增删改查操作
  • DeepSeek政务应用场景与解决方案【清华大学最新版】
  • 996引擎-接口测试:背包
  • 开源AI大模型赋能私域流量:S2B2C场景下品牌文化建构的智能路径研究
  • webscoket爬虫之某旺(1)分析篇
  • 深度学习闭环检测算法实现与优化指南
  • 学术型ppt制作经验分享 - 如何美化科研ppt?
  • Linux下JDK1.8安装配置
  • 深度学习 Deep Learning 第5章 机器学习基础
  • Cloudfare内网穿透配置
  • 自动化测试中使用的设计模式
  • C++ vcpkg下载rttr等类库