大模型开发(四):PET项目——新零售决策评价系统(上)
PET项目——新零售决策评价系统(上)
- 0 前言
- 1 项目介绍
- 1.1 PET简介
- 1.2 项目背景
- 1.3 项目结构
- 1.4 硬件配置
- 2 数据处理
- 2.1 数据介绍
- 2.2 提示词模板与标签映射
- 2.3 BERT模型的输入格式
- 2.4 硬模板类
- 2.5 函数式编程
- 2.6 datasets模块
- 主要功能:
- 在本项目中的使用:
- 2.7 数据集加载与批处理
- 2.8 数据集导入器
- 3 模型配置与推理
- 3.1 配置文件
- 3.2 模型推理
- 3.3 convert_logits_to_ids函数
- 3.4 标签映射类(第一部分)
- 4 模型训练(微调)
- 4.1 标签映射类(第二部分)
- 4.2 损失函数
- 4.3 模型的评估指标类
- 4.4 模型微调
0 前言
上篇文章我们介绍了使用全量微调构建医疗问诊机器人,比较常用的微调还有提示词微调(Prompt微调)、PEFT,而Prompt微调用的比较多有PET和P-Tuning,而我们今天介绍的项目,就是使用PET的方式微调大模型。
1 项目介绍
1.1 PET简介
PET的全称是Pattern-Exploiting Training,主要用来做文本分类(Prompt微调基本都是文本分两类),它是通过构建提示词模板(硬模版,需要我们自己指定模板)的方式,将下游任务转化为一个完形填空任务,这样就可以用BERT的MLM模型来进行预测了。
1.2 项目背景
智能推荐系统是AI技术在新零售中 最为常见且有效的应用之一,通过分析用户的购买历史、浏览行为以及喜好偏好,推荐系统可以根据个人特征给用户进行个性化商品推荐。这种个性化推荐不仅可以提高用户购买意愿,减少信息过载,还可以带来更高的用户满意度和销量。
在智能推荐系统中,文本分类的应用属于重要的应用环节。比如:某电商网站都允许用户为商品填写评论,这些文本评论能够体现出用户的偏好以及商品特征信息。因此,本次项目我们将以“电商平台用户评论”为背景,基于深度学习方法实现评论文本的准确分类,这样做的目的是通过用户对不同商品或服务的评价,平台能够快速回应用户需求,改进产品和服务。同时,自动分类也为个性化推荐奠定基础,帮助用户更轻松地找到符合其偏好的商品。
本项目要做的就是对用户的评论进行分类,当输入用户的评论后,输出这是对什么商品的评论,比如输入为“包装不错,苹果挺甜的,个头也大。”,输出“苹果”。
本项目我们使用BERT模型作为基础模型。
1.3 项目结构
我们把代码和预训练模型放到两个不同的目录中
其中代码在PET目录下面,结构如下:
1.4 硬件配置
硬件我们使用FunHPC云算力市场上的RTX 3080显卡,显存为12G,关于FunHPC云算力的使用,可以参考这篇文章。
2 数据处理
个人认为,本项目最难、最繁琐的部分就是数据处理了,特别是之前没有开发过和BERT相关的项目。这里需要铺垫的知识有点多,我尽量在用到之前,对其进行讲解。
2.1 数据介绍
训练集和验证集分别在data目录的train.txt与dev.txt中,其中 train.txt 内容如下:
train.txt一共包含63条样本数据,dev.txt一共包含590条样本数据,每一行用\t
分开,前半部分为标签(label),后半部分为原始输入 (用户评论),如果想使用自定义数据训练,只需要仿照上述示例数据构建数据集即可。
训练集数据少是由prompt微调的思想决定的,微调的时候更多地是使用预训练模型地能力,验证集数据多是为了保证评估结果的可信度,因此需要比较多的数据对模型进行充分评估。
2.2 提示词模板与标签映射
data目录下的prompt.txt中保存的是提示词模板,内容如下:
这是一条{MASK}评论:{textA}。
data目录下的verbalizer.txt中保存的是主标签与子标签的对应关系,内容如下:
电脑 电脑
水果 水果,苹果,香蕉,西瓜,西瓜
平板 平板
衣服 衣服,裙子,西装
酒店 酒店
洗浴 洗浴
书籍 书籍
蒙牛 蒙牛
手机 手机
电器 电器
这里第一列是主标签,第二列是子标签,我们的项目中,主标签和子标签是一致的。但很多情况下,两者是不一致的。
例如,以新闻分类为例:
某条新闻的标题为:中国女排再夺冠!
我们构建的PET模板为:下面是[MASK][MASK]
新闻:textA
,Label:体育/财经/时政/军事。
如果使用标签去填充,那么构建的提示词为:下面是[MASK][MASK]
新闻:中国女排再夺冠!
如果让模型去预测[MASK][MASK]
遮盖的内容,那么很可能得到排球、女排、体育等词汇,它有可能不在我们要预测的范围内(体育/财经/时政/军事),所以需要对标签进行映射,将排球、女排、体育都映射成体育。
训练时,以子标签作为目标值,损失函数的计算也是通过对比预测值与子标签的差距来进行;计算各种评估指标时(例如准确率、召回率等)则是对比主标签(即把预测值转换为主标签,然后计算)。
2.3 BERT模型的输入格式
如果不了解BERT模型的输入格式,那么很难理解本项目的数据处理过程。
BERT(Bidirectional Encoder Representations from Transformers)模型的输入通常包含以下几个部分,这些部分共同构成了一个完整的输入格式:
-
input_ids
:- 含义:这是输入文本经过分词器(如
AutoTokenizer
)编码后的 token ID 列表。 - 作用:用于表示输入文本中的每个 token(单词或子词)。BERT 模型通过这些 IDs 来查找对应的词向量。
- 含义:这是输入文本经过分词器(如
-
attention_mask
:- 含义:这是一个与
input_ids
长度相同的二进制列表,用于标识哪些位置是有效的 token(值为 1),哪些位置是填充的 token(值为 0)。 - 作用:帮助模型忽略填充的部分,只关注实际的输入内容。这对于处理不同长度的输入序列非常重要。
- 含义:这是一个与
-
token_type_ids
:- 含义:这是一个与
input_ids
长度相同的整数列表,用于区分不同的句子或文本片段。BERT 支持处理多句输入(例如:[CLS] 句子A [SEP] 句子B [SEP]
),此时需要通过token_type_ids
来标识每个 token 属于哪个句子。 - 作用:在多句输入的情况下,帮助模型理解哪些 token 属于哪个句子。例如,在问答任务中,
token_type_ids
可以区分问题和上下文。 - 如果输入是单句
"这是一条[MASK]评论:这个手机也太卡了"
,那么token_type_ids
将全部为 0。 - 如果输入是两个句子
[CLS]句子A[SEP]句子B[SEP]
,那么token_type_ids
的前半部分为 0,后半部分为 1。
- 含义:这是一个与
-
position_ids
(可选):- 含义:这是每个 token 在输入序列中的位置索引。
- 例如"这是一条
[MASK]
评论:这个手机也太卡了",假设经过分词器处理后,输入文本被编码为以下 token 列表(包括特殊token [CLS]
和[SEP]
),对应的position_ids
就是这些 token 在输入序列中的位置索引:
input_tokens = ['[CLS]', '这', '是', '一', '条', '[MASK]', '评', '论', ':', '这', '个', '手', '机', '也', '太', '卡', '了', '[SEP]']
position_ids = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
mask_position
(特定任务使用):- 含义:这是一个标记
[MASK]
位置的列表,用于指示哪些位置是被遮盖的 token。 - 作用:在 MLM(Masked Language Model)任务中,模型需要知道哪些位置是被遮盖的,以便进行预测。
[MASK]
出现在第 6 个位置(从 0 开始计数),因此 mask_position 为:
- 含义:这是一个标记
mask_position = [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
对于大多数 BERT 应用场景,输入至少需要包含 input_ids
和 attention_mask
。如果涉及多句输入,则还需要 token_type_ids
,而 mask_position
则是特定任务(如 MLM)中使用的辅助信息。我们当前的分类任务,就是被 [MASK]
遮盖的商品类别,所以是MLM任务,需要mask_position
。
2.4 硬模板类
所谓硬模板,指的是人工设计的模板,或者说开发人员/产品经理设计的模板,除了硬模板,还有软模板,软模版是大模型生成的模板,关于软模版,我们这里不展开,后面的文章会介绍。
data_handle目录下的template.py中写了一个名为HardTemplate的类,它有两个功能:一是将样本数据填充到模板里面,构成提示词;二是将提示词按照BERT模型的输入格式进行转化。
template.py的内容如下:
# -*- coding:utf-8 -*-
from transformers import AutoTokenizer
import numpy as np
class HardTemplate(object):
"""
硬模板,人工定义句子和[MASK]之间的位置关系。
"""
def __init__(self, prompt: str):
"""
Args:
prompt (str): prompt格式定义字符串,它是提示词模板
e.g. -> "这是一条{MASK}评论:{textA}。",两个花括号中的内容是自定义字段,目前用 MASK 和 textA 进行占位
"""
self.prompt = prompt
self.inputs_list = [] # 用于填充prompt的输入列表,获取模板分词后的结果
self.custom_tokens = set(['MASK']) # 从prompt中解析出的自定义token,比如'MASK'和'textA',即花括号中的内容
self.prompt_analysis() # 解析prompt模板
def prompt_analysis(self):
"""
将prompt文字模板拆解为可映射的数据结构。
Examples:
prompt -> "这是一条{MASK}评论:{textA}。"
该函数执行后
inputs_list -> ['这', '是', '一', '条', 'MASK', '评', '论', ':', 'textA', '。']
custom_tokens -> {'textA', 'MASK'},这是一个集合
"""
idx = 0
while idx < len(self.prompt):
str_part = ''
if self.prompt[idx] not in ['{', '}']:
self.inputs_list.append(self.prompt[idx])
if self.prompt[idx] == '{': # 进入自定义字段
idx += 1
while self.prompt[idx] != '}': # 右边的花括号是在这个分支中遍历完
str_part += self.prompt[idx] # 拼接该自定义字段的值
idx += 1
elif self.prompt[idx] == '}': # 右边花括号如果出现在这个分支中,则说明花括号不匹配,报错
raise ValueError("Unmatched bracket '}', check your prompt.")
if str_part:
self.inputs_list.append(str_part)
self.custom_tokens.add(str_part) # 将所有自定义字段存储,后续会检测输入信息是否完整
idx += 1
def __call__(self,
inputs_dict: dict,
tokenizer,
mask_length,
max_seq_len=512):
"""
输入一个样本,将其填充到硬模板当中,并将其转化为Bert模型的输入格式
Bert模型的输入包含 input_ids、attention_mask、token_type_ids、position_ids(可选)、mask_position(特定任务使用)
Args:
inputs_dict (dict): prompt中的参数字典, e.g. -> {"textA": "这个手机也太卡了", "MASK": "[MASK]"}
tokenizer: 用于encoding文本
mask_length (int): MASK token 的长度,即掩码的字符长度
Returns:
dict -> {
'text': '[CLS]这是一条[MASK]评论:这个手机也太卡了。[SEP]',
'input_ids': [1, 47, 10, 7, 304, 3, 480, 279, 74, 47, 27, 247, 98, 105, 512, 777, 15, 12043, 2],
'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], # 填充后的提示词,哪些位置是需要注意的,对于不需要注意的,即用[PAD]填充的,则置0
'mask_position': [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] # 在[MASK]填充的位置置1
}
"""
# 定义输出格式(Bert模型的接收格式)
outputs = {
'text': '',
'input_ids': [],
'token_type_ids': [],
'attention_mask': [],
'mask_position': []
}
# 构建输入到模型的字符串,即将样本信息填充到模板
str_formated = ''
for value in self.inputs_list:
if value in self.custom_tokens:
if value == 'MASK':
# 用inputs_dict中,'MASK'键对应的值(本项目对应的值为'[MASK]'),替换模板中的'MASK'字段
# 并且根据mask_length,将MASK字段重复mask_length次,拼接起来
str_formated += inputs_dict[value] * mask_length
else:
# 这个分支中,value == 'textA',即将样本信息填充到模板中
str_formated += inputs_dict[value]
else:
str_formated += value
# print(f'str_formated-->{str_formated}')
# 将填充后的字符串,用 tokenizer 进行编码,得到模型的输入格式
encoded = tokenizer(text=str_formated,
truncation=True,
max_length=max_seq_len,
padding='max_length')
# print(f'encoded--->{encoded}')
outputs['input_ids'] = encoded['input_ids'] # encoded['input_ids']有起始符、分隔符和填充符,分词器给加上的
outputs['token_type_ids'] = encoded['token_type_ids']
outputs['attention_mask'] = encoded['attention_mask']
# 将input_ids从词表中的索引转化为对应的token,方便阅读
outputs['text'] = ''.join(tokenizer.convert_ids_to_tokens(encoded['input_ids']))
# 将'[MASK]'转化为词表中的索引,方便后续使用
mask_token_id = tokenizer.convert_tokens_to_ids(['[MASK]'])[0]
# 获取mask_position,即[MASK]的位置
mask_position = np.where(np.array(outputs['input_ids']) == mask_token_id)[0].tolist()
outputs['mask_position'] = mask_position
return outputs
if __name__ == '__main__':
# 创建分词器
tokenizer = AutoTokenizer.from_pretrained('../../预训练模型/bert-base-chinese')
# print(tokenizer.convert_ids_to_tokens([3819, 3352, 3819, 3352]))
# print(tokenizer.convert_tokens_to_ids(['网', '球']))
# 创建硬模板对象
hard_template = HardTemplate(prompt='这是一条{MASK}评论:{textA}')
# print(hard_template.inputs_list)
# print(hard_template.custom_tokens)
# 往硬模板内填充内容
tep = hard_template(
inputs_dict={'textA': '包装不错,苹果挺甜的,个头也大。', 'MASK': '[MASK]'},
tokenizer=tokenizer,
max_seq_len=30,
mask_length=2)
# 打印
for key, value in tep.items():
print(f'{key}: {value}')
对BERT模型的输入格式有所了解后,这个脚本的代码理解起来就比较容易。输出:
text: [CLS]这是一条[MASK][MASK]评论:包装不错,苹果挺甜的,个头也大。[SEP][PAD][PAD][PAD]
input_ids: [101, 6821, 3221, 671, 3340, 103, 103, 6397, 6389, 8038, 1259, 6163, 679, 7231, 8024, 5741, 3362, 2923, 4494, 4638, 8024, 702, 1928, 738, 1920, 511, 102, 0, 0, 0]
token_type_ids: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
attention_mask: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0]
mask_position: [5, 6]
2.5 函数式编程
按理说,函数式编程和数据处理关系并不大,但data_handle目录下的data_preprocess.py用到了,所以这里讲一下。
functools.partial
是 Python 标准库 functools
模块中的一个工具,用于部分应用函数(partial function application),它允许你固定某些参数的值,从而创建一个新的函数。
以下是 functools.partial
的用法和特点:
- 基本语法
from functools import partial
new_function = partial(function, *args, **kwargs)
function
: 要部分应用的原始函数。*args
: 固定的位置参数。**kwargs
: 固定的关键字参数。new_function
: 返回的新函数,其中部分参数已经被固定。
通过 partial
,你可以将一个函数的部分参数预先绑定,生成一个新的函数。新函数在调用时只需传递剩余的参数即可。
- 示例1:固定位置参数
from functools import partial
def add(x, y):
return x + y
# 创建一个新函数,固定第一个参数为5
add_five = partial(add, 5)
print(add_five(10)) # 输出: 15
- 示例2:固定关键字参数
from functools import partial
def greet(name, greeting="Hello"):
return f"{greeting}, {name}!"
# 创建一个新函数,固定greeting为"Hi"
hi_greet = partial(greet, greeting="Hi")
print(hi_greet("Alice")) # 输出: Hi, Alice!
- 示例 3:结合位置参数和关键字参数
from functools import partial
def multiply(x, y, z):
return x * y * z
# 创建一个新函数,固定x=2, y=3
multiply_by_2_and_3 = partial(multiply, 2, 3)
print(multiply_by_2_and_3(4)) # 输出: 24
2.6 datasets模块
同样的,data_handle目录下的data_preprocess.py也用到了datasets模块,它是 Hugging Face 提供的一个用于处理机器学习和深度学习任务中数据集的 Python 库。它简化了从各种来源加载、预处理和迭代数据的过程,支持多种格式的数据集,并且与 Hugging Face 的 transformers
库无缝集成。
安装命令:pip install datasets
主要功能:
-
加载数据集:
- 可以从本地文件或远程资源加载数据集。
- 支持多种格式,如 CSV、JSON、文本文件等。
- 内置了许多常用数据集,可以直接通过名称加载。
-
数据预处理:
- 提供了丰富的 API 来对数据进行预处理,如分词、编码、填充等。
- 支持批量处理(batched),可以显著提高处理效率。
new_dataset = dataset.map(function, batched=False, batch_size=None, num_proc=1, ...)
- 参数说明如下:
- function: 要应用到每个样本上的函数。该函数应该接受一个字典作为输入,并返回一个字典。
- batched (bool): 是否以批次的方式处理数据。如果为 True,则 function 将接收一批样本(即包含多个样本的字典),而不是单个样本。
- batch_size (int): 如果 batched=True,则指定每批次的样本数量。默认情况下,batch_size=1000。
- num_proc (int): 并行处理的进程数。设置为大于 1 的值可以加速处理过程。
- 其他参数: 还有其他一些参数,如 remove_columns、load_from_cache_file 等,可以根据需要使用。
-
数据迭代(了解即可):
- 提供了灵活的迭代器,方便在训练过程中逐批次获取数据。
- 支持分布式训练中的数据分片。
-
数据增强(了解即可):
- 可以通过自定义函数对数据进行增强或转换。
在本项目中的使用:
在本项目的代码中,datasets
模块主要用于以下几个方面:
- 加载数据集:
train_dataset = load_dataset('text', data_files=r'../data/train.txt')
这里使用了 load_dataset
函数从本地文本文件加载数据集。load_dataset
返回一个 DatasetDict
对象,其中包含不同分割(如训练集、验证集、测试集)的数据。
- 映射数据处理函数:
new_dataset = train_dataset.map(convert_func, batched=True)
使用 map
方法将 convert_func
函数应用到数据集的每个样本上,并返回一个新的数据集对象,batched=True
参数表示将整个数据集作为一个批次进行处理,这个方法非常高效,因为它可以在底层进行优化,例如并行处理和批处理。
至于convert_func是什么,我们稍后介绍。
2.7 数据集加载与批处理
data_handle目录下的data_preprocess.py,有了前面的知识铺垫,里面的代码理解起来会容易很多。内容如下:
import torch
import numpy as np
from functools import partial
from datasets import load_dataset
from transformers import AutoTokenizer
from template import HardTemplate
def convert_example(
examples: dict,
tokenizer,
max_seq_len: int,
max_label_len: int,
hard_template: HardTemplate,
train_mode=True,
return_tensor=False) -> dict:
"""
将样本数据转换为模型接收的输入数据。
Args:
examples (dict): 训练数据样本, e.g. -> {
"text": [
'手机 这个手机也太卡了。',
'体育 世界杯为何迟迟不见宣传',
...
]
}
max_seq_len (int): 句子的最大长度,若没有达到最大长度,则padding为最大长度
max_label_len (int): 最大label长度,若没有达到最大长度,则padding为最大长度
hard_template (HardTemplate): 模板类。
train_mode (bool): 训练阶段 or 推理阶段。
return_tensor (bool): 是否返回tensor类型,如不是,则返回numpy类型。
Returns:
dict (str: np.array) -> tokenized_output = {
'input_ids': [[1, 47, 10, 7, 304, 3, 3, 3, 3, 47, 27, 247, 98, 105, 512, 777, 15, 12043, 2], ...],
'token_type_ids': [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], ...],
'attention_mask': [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], ...],
'mask_positions': [[5, 6, 7, 8], ...],
'mask_labels': [[2372, 3442, 0, 0], [2643, 4434, 2334, 0], ...]
}
"""
# 定义输出格式(Bert模型的接收格式)
tokenized_output = {
'input_ids': [],
'token_type_ids': [],
'attention_mask': [],
'mask_positions': [],
'mask_labels': []
}
# 遍历样本数据,将样本填充到模板中,并转化为Bert模型的输入格式
for i, example in enumerate(examples['text']):
if train_mode:
# 如果是训练模式,则既有样本的label,也有样本的文本内容
label, content = example.strip().split('\t')
else:
# 如果是评估(推理)模式,则只有样本的文本内容
content = example.strip()
# 需要将每个样本整合成一个字典,这样硬模板对象才方便处理
inputs_dict = {
'textA': content,
'MASK': '[MASK]'
}
# 将样本填充到硬模板当中,并将其转化为Bert模型的输入格式
encoded_inputs = hard_template(
inputs_dict=inputs_dict,
tokenizer=tokenizer,
max_seq_len=max_seq_len,
mask_length=max_label_len)
# 收集硬模板的处理结果
tokenized_output['input_ids'].append(encoded_inputs["input_ids"])
tokenized_output['token_type_ids'].append(encoded_inputs["token_type_ids"])
tokenized_output['attention_mask'].append(encoded_inputs["attention_mask"])
tokenized_output['mask_positions'].append(encoded_inputs["mask_position"])
# 对于训练模式,则需要将label转化为Bert模型的输入格式
if train_mode:
# 将标签转化为id
label_encoded = tokenizer(text=[label])
# 因为分词器处理的结果是以[CLS]开头和[SEP]结尾的,所以需要将[CLS]和[SEP]去除
label_encoded = label_encoded['input_ids'][0][1:-1] # 这里中间有个索引[0],是因为分词器输入标签时,也用了一层列表包装
# 去掉[CLS]和[SEP]后,如果标签的长度大于max_label_len,则截断
label_encoded = label_encoded[:max_label_len]
# 如果标签的长度小于max_label_len,则填充pad_token_id
label_encoded = label_encoded + [tokenizer.pad_token_id] * (max_label_len - len(label_encoded))
# 收集处理后的标签
tokenized_output['mask_labels'].append(label_encoded)
# 将数据转化为torch.tensor或者numpy.array格式,方便后续处理
for k, v in tokenized_output.items():
if return_tensor:
tokenized_output[k] = torch.LongTensor(v)
else:
tokenized_output[k] = np.array(v)
return tokenized_output
if __name__ == '__main__':
# 导入数据
train_dataset = load_dataset('text', data_files=r'../data/train.txt')
# load_dataset会返回一个datasets.dataset_dict.DatasetDict对象,该对象有一个名为train的键,train_dataset['train']是一个datasets.arrow_dataset.Dataset对象
# train_dataset['train']['text']是一个列表,里面存储着每条样本的文本内容,原来txt文件中,每个非空行都会被当成一个样本
# 创建分词器
tokenizer = AutoTokenizer.from_pretrained('../../预训练模型/bert-base-chinese')
# 创建硬模板对象
hard_template = HardTemplate(prompt='这是一条{MASK}评论:{textA}')
# 函数式编程
convert_func = partial(convert_example,
tokenizer=tokenizer,
hard_template=hard_template,
max_seq_len=30,
max_label_len=2,
)
# 这里将 `convert_example` 函数的部分参数(如 `tokenizer`, `hard_template`, `max_seq_len`, `max_label_len`)预先绑定,
# 生成了一个新的函数 `convert_func`。这样,在后续调用 `convert_func` 时,只需要传递剩余的参数(如 `examples` 和其他可选参数)。
# batched=True相当于将train_dataset看成一个批次的样本直接对数据进行处理,节省时间
new_dataset = train_dataset.map(convert_func, batched=True) # dataset是一个datasets.dataset_dict.DatasetDict对象
for value in new_dataset['train']:
# value将是一个字典,其中包含输入的id、token_type_id、attention_mask、mask_position和mask_label
print(type(value))
for k, v in value.items():
print(k, v)
print(len(value['input_ids']))
break
输出:
<class 'dict'>
text 电脑 (1)这款笔记本外观感觉挺漂亮的,分量吗,对我来说不算沉。 (2)安装了WindowsXP系统后,运行的速度挺快。发热量没有想象中那么大。可能尚未运行很耗资源的程序,没有感到内存的弊病。不过,1G的内存确实有点小。 (3)附赠的包很不错,挺有手感的。但是附赠的鼠标实在是太小了,幸好同时订了一个双飞燕的鼠标哟。
input_ids [101, 6821, 3221, 671, 3340, 103, 103, 6397, 6389, 8038, 113, 122, 114, 6821, 3621, 5011, 6381, 3315, 1912, 6225, 2697, 6230, 2923, 4023, 778, 4638, 8024, 1146, 7030, 102]
token_type_ids [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
attention_mask [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
mask_positions [5, 6]
mask_labels [4510, 5554]
30
2.8 数据集导入器
data_handle目录下的data_loader.py只有一个函数,即get_data,它的目的是获得数据导入器,只要前面的template.py和data_preprocess.py掌握之后,这里的代码理解起来很容易。
# coding:utf-8
from torch.utils.data import DataLoader
from transformers import default_data_collator
from data_preprocess import *
def get_data(prompt_file, train_path, dev_path, max_seq_len, max_label_len, batch_size, tokenizer):
"""
准备数据集函数。
该函数从指定的文件中读取提示词模板,加载训练集和验证集,并将它们转换为模型可以接受的格式。
参数:
- prompt_file: 提示词模板文件的路径。
- train_path: 训练集文件的路径。
- dev_path: 验证集文件的路径。
- max_seq_len: 输入序列的最大长度。
- max_label_len: 标签序列的最大长度。
- batch_size: 每个批次的大小。
- tokenizer: 分词器。
返回:
- train_dataloader: 训练集的数据加载器。
- dev_dataloader: 验证集的数据加载器。
"""
# 从文件prompt.txt从读取提示词模板
prompt = open(prompt_file, 'r', encoding='utf8').readlines()[0].strip()
# 创建硬模板对象
hard_template = HardTemplate(prompt=prompt)
# 导入数据
dataset = load_dataset('text', data_files={'train': train_path, 'dev': dev_path})
# load_dataset会返回一个datasets.dataset_dict.DatasetDict对象,字典里包含 train 键和 dev 键
# 将将样本数据转换为模型接收的输入数据。
# 函数式编程
new_func = partial(convert_example,
tokenizer=tokenizer,
hard_template=hard_template,
max_seq_len=max_seq_len,
max_label_len=max_label_len)
# 将数据集转换为模型接收的输入数据
dataset = dataset.map(new_func, batched=True)
# 从数据集对象中获取训练集与验证集
train_dataset = dataset["train"]
dev_dataset = dataset["dev"]
# print('train_dataset', train_dataset[:2])
# print('*'*80)
# default_data_collator作用,转换为tensor数据类型
# 创建DataLoader对象,用于加载数据集
train_dataloader = DataLoader(train_dataset,
shuffle=True,
collate_fn=default_data_collator,
batch_size=batch_size)
dev_dataloader = DataLoader(dev_dataset,
collate_fn=default_data_collator,
batch_size=batch_size)
return train_dataloader, dev_dataloader
if __name__ == '__main__':
# 创建分词器
tokenizer = AutoTokenizer.from_pretrained('../../预训练模型/bert-base-chinese')
# 调用get_data函数,获取训练集和验证集的数据加载器
train_dataloader, dev_dataloader = get_data(prompt_file='../data/prompt.txt',
train_path='../data/train.txt',
dev_path='../data/dev.txt',
max_seq_len=512,
max_label_len=2,
batch_size=8,
tokenizer=tokenizer)
print('训练集batch数量:', len(train_dataloader))
print('验证集batch数量:', len(dev_dataloader))
print('-'*80)
for i, value in enumerate(train_dataloader):
print('type(value): ', type(value))
print('value.keys(): ', value.keys())
print('len(value): ', len(value))
print(r"value['input_ids'].shape: ", value['input_ids'].shape)
print(r"value['input_ids'].dtype: ", value['input_ids'].dtype)
输出
训练集batch数量: 8
验证集batch数量: 74
--------------------------------------------------------------------------------
type(value): <class 'dict'>
value.keys(): dict_keys(['input_ids', 'token_type_ids', 'attention_mask', 'mask_positions', 'mask_labels'])
len(value): 5
value['input_ids'].shape: torch.Size([8, 512])
value['input_ids'].dtype: torch.int64
从输出中可以看到,value['input_ids'].shape
为torch.Size([8, 512])
,8表示batch_size,512表示句子的最大长度。
3 模型配置与推理
3.1 配置文件
在当前目录(PET)下有个名为pet_config.py的文件,它是模型的配置文件,即把相关的配置参数都写到了一个类中,代码如下:
# coding:utf-8
import torch
class ProjectConfig(object):
def __init__(self):
self.device = 'cuda:0' if torch.cuda.is_available() else 'cpu' # windows电脑/linux服务器
self.pre_model = r'../预训练模型/bert-base-chinese'
self.train_path = r'data/train.txt'
self.dev_path = r'data/dev.txt'
self.prompt_file = r'data/prompt.txt'
self.verbalizer = r'data/verbalizer.txt'
self.max_seq_len = 512 # 输入文本的最大长度
self.batch_size = 8
self.learning_rate = 5e-5 # 学习率,在NLP中,一般1e-6 ~ 1e-5之间
self.weight_decay = 0 # L2正则化系数
self.warmup_ratio = 0.06 # 学习率预热参数
self.max_label_len = 2 # 标签的最大长度(字符个数)
self.epochs = 2
self.logging_steps = 10 # 日志相关设置
self.valid_steps = 20 # 每隔多少个step做一次验证
self.save_dir = r'checkpoints'
if __name__ == '__main__':
pc = ProjectConfig()
print(pc.prompt_file)
print(pc.pre_model)
输出
data/prompt.txt
../预训练模型/bert-base-chinese
有了上一篇文章(全量微调项目——基于GPT2 搭建医疗问诊机器人)和刚刚介绍的内容为基础,这个脚本看懂并不难,这里不做赘述。
我们这边只是为了简单演示,epoch配置的是2,实际工作中,不可能用那么小的。
3.2 模型推理
在当前目录下,有个名为inference.py的脚本,内容如下:
import time
from typing import List
import torch
from rich import print
from transformers import AutoTokenizer, AutoModelForMaskedLM
import sys
sys.path.append('data_handle')
from utils.verbalizer import Verbalizer
from data_handle.template import HardTemplate
from data_handle.data_preprocess import convert_example
from utils.common_utils import convert_logits_to_ids
def inference(contents: List[str]):
"""
推理函数,输入原始句子,输出mask label的预测值。
Args:
contents (List[str]): 描原始句子列表。
"""
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
model_path = 'checkpoints/model_best_old'
# 创建分词器
tokenizer = AutoTokenizer.from_pretrained(model_path)
# 创建模型
model = AutoModelForMaskedLM.from_pretrained(model_path)
model.to(device).eval()
# 标签最大长度
max_label_len = 2
# 创建标签映射器
verbalizer = Verbalizer(
verbalizer_file='data/verbalizer.txt',
tokenizer=tokenizer,
max_label_len=max_label_len
)
# prompt定义
prompt = open('data/prompt.txt', 'r', encoding='utf8').readlines()[0].strip()
# 创建硬模板
hard_template = HardTemplate(prompt=prompt)
with torch.no_grad():
start_time = time.time()
# 将样本数据转换为模型接收的输入数据。
examples = {'text': contents}
tokenized_output = convert_example(
examples,
tokenizer,
hard_template=hard_template,
max_seq_len=128,
max_label_len=max_label_len,
train_mode=False,
return_tensor=True
)
# 模型推理
logits = model(input_ids=tokenized_output['input_ids'].to(device),
token_type_ids=tokenized_output['token_type_ids'].to(device),
attention_mask=tokenized_output['attention_mask'].to(device)).logits
# logits的形状为(batch_size, seq_len, vocab_size)
# 推理结果后处理
predictions = convert_logits_to_ids(logits, tokenized_output['mask_positions']).cpu().numpy().tolist() # (batch, label_num)
predictions = verbalizer.batch_find_main_label(predictions) # 找到子label属于的主label
predictions = [ele['label'] for ele in predictions]
used = time.time() - start_time
print(f'Used {used}s.')
return predictions
if __name__ == '__main__':
# 测试数据
contents = [
'天台很好看,躺在躺椅上很悠闲,因为活动所以我觉得性价比还不错,适合一家出行,特别是去迪士尼也蛮近的,下次有机会肯定还会再来的,值得推荐',
'环境,设施,很棒,周边配套设施齐全,前台小姐姐超级漂亮!酒店很赞,早餐不错,服务态度很好,前台美眉很漂亮。性价比超高的一家酒店。强烈推荐',
"物流超快,隔天就到了,还没用,屯着出游的时候用的,听方便的,占地小",
"福行市来到无早集市,因为是喜欢的面包店,所以跑来集市看看。第一眼就看到了,之前在微店买了小刘,这次买了老刘,还有一直喜欢的巧克力磅蛋糕。好奇老板为啥不做柠檬磅蛋糕了,微店一直都是买不到的状态。因为不爱碱水硬欧之类的,所以期待老板多来点其他小点,饼干一直也是大爱,那天好像也没看到",
"服务很用心,房型也很舒服,小朋友很喜欢,下次去嘉定还会再选择。床铺柔软舒适,晚上休息很安逸,隔音效果不错赞,下次还会来"
]
print("针对下面的文本评论,请分别给出对应所属类别:")
# 推理
res = inference(contents)
#print('inference label(s):', res)
# 将测试数据与推理结果拼接成字典
new_dict = {}
for i in range(len(contents)):
new_dict[contents[i]] = res[i]
print(new_dict)
输出
针对下面的文本评论,请分别给出对应所属类别:
Used 0.44545769691467285s.
{
'天台很好看,躺在躺椅上很悠闲,因为活动所以我觉得性价比还不错,适合一家出行
,特别是去迪士尼也蛮近的,下次有机会肯定还会再来的,值得推荐': '酒店',
'环境,设施,很棒,周边配套设施齐全,前台小姐姐超级漂亮!酒店很赞,早餐不错
,服务态度很好,前台美眉很漂亮。性价比超高的一家酒店。强烈推荐': '酒店',
'物流超快,隔天就到了,还没用,屯着出游的时候用的,听方便的,占地小':
'衣服',
'福行市来到无早集市,因为是喜欢的面包店,所以跑来集市看看。第一眼就看到了,
之前在微店买了小刘,这次买了老刘,还有一直喜欢的巧克力磅蛋糕。好奇老板为啥不做
柠檬磅蛋糕了,微店一直都是买不到的状态。因为不爱碱水硬欧之类的,所以期待老板多
来点其他小点,饼干一直也是大爱,那天好像也没看到': '衣服',
'服务很用心,房型也很舒服,小朋友很喜欢,下次去嘉定还会再选择。床铺柔软舒适
,晚上休息很安逸,隔音效果不错赞,下次还会来': '酒店'
}
代码中的模型路径,是在训练之后得到的最优模型,当然,这份代码是训练完之后讲解的,因此有这个。
在推理结果后处理的部分,使用了convert_logits_to_ids
函数,以及标签映射器的batch_find_main_label
,我们来逐个介绍。
3.3 convert_logits_to_ids函数
这个函数的代码在 PET/utils/common_utils.py
中,它将mask位置的预测结果转化为词表中的token_id
,代码如下:
import torch
def convert_logits_to_ids(logits: torch.tensor, mask_positions: torch.tensor):
"""
输入Language Model的词表概率分布(LMModel的logits,即模型的输出),将mask_position位置的
token logits转换为token的id。
Args:
logits (torch.tensor): model output -> (batch, seq_len, vocab_size)
mask_positions (torch.tensor): mask token的位置 -> (batch, max_label_len)
Returns:
torch.LongTensor: 对应mask position上最大概率的推理token -> (batch, max_label_len)
"""
label_length = mask_positions.size()[1] # 标签长度
batch_size, seq_len, vocab_size = logits.size()
# 输出的逻辑值打平,方便后续从中获取最大概率的token id
logits = logits.reshape(batch_size * seq_len, -1) # (batch_size * seq_len, vocab_size)
# 将mask_positions转换为逻辑值打平后的索引
mask_positions_after_reshaped = []
for batch, mask_pos in enumerate(mask_positions.detach().cpu().numpy().tolist()):
for pos in mask_pos:
mask_positions_after_reshaped.append(batch * seq_len + pos)
# 找到mask位置的预测概率分布
mask_logits = logits[mask_positions_after_reshaped] # (batch_size * label_num, vocab_size)
# 找到每个mask位置的最大概率所在索引(字典中的索引),即预测的token_id
predict_tokens = mask_logits.argmax(dim=-1) # (batch_size * max_label_len)
# 将获得的token_id转化成batch的形式
predict_tokens = predict_tokens.reshape(-1, label_length) # (batch_size, max_label_len)
return predict_tokens
if __name__ == '__main__':
logits = torch.randn(2, 20, 21193)
mask_positions = torch.LongTensor([
[5, 6],
[5, 6],
])
predict_tokens = convert_logits_to_ids(logits, mask_positions)
print(predict_tokens)
输出
tensor([[20063, 20244],
[ 6725, 7702]])
当然,PET/utils/common_utils.py
中还有另一个计算损失的函数,这个我们后面会讲。
3.4 标签映射类(第一部分)
推理的时候,创建了一个标签映射器,因为推理推出来的很可能是子标签,这里需要将其转化为主标签。
该类的代码在PET/utils/verbalizer.py
中,这个类的方法太多,代码太长,我这里只讲推理中用到的,剩下的需要到训练的时候再介绍
# -*- coding:utf-8 -*-
from typing import Union, List # Union 是 typing 模块中定义的一个类,用于表示多个类型中的任意一种类型
class Verbalizer(object):
def __init__(self, verbalizer_file: str, tokenizer, max_label_len: int):
"""
Args:
verbalizer_file (str): verbalizer文件存放地址。
tokenizer: 用于文本和id之间的转换。
max_label_len (int): 标签长度,若大于则截断,若小于则补齐
"""
self.tokenizer = tokenizer
self.label_dict = self.load_label_dict(verbalizer_file)
self.max_label_len = max_label_len
def load_label_dict(self, verbalizer_file: str):
"""
读取本地文件,构建verbalizer字典(标签映射字典)。
Args:
verbalizer_file (str): verbalizer文件存放地址。
Returns:
dict -> {
'体育': ['篮球', '足球','网球', '排球', ...],
'酒店': ['宾馆', '旅馆', '旅店', '酒店', ...],
...
}
"""
label_dict = {}
with open(verbalizer_file, 'r', encoding='utf8') as f:
# 逐行读取
for line in f.readlines():
label, sub_labels = line.strip().split('\t')
label_dict[label] = list(set(sub_labels.split(','))) # 这里使用集合来去重
return label_dict
def find_main_label(self, sub_label: List[Union[list, str]], hard_mapping=True):
"""
通过子标签找到父标签。
Args:
sub_label (List[Union[list, str]]): 子标签, 文本型 或 id_list, e.g. -> '苹果' or [5741, 3362]
hard_mapping (bool): 当生成的词语不在子标签中,是否一定要匹配到一个最相似的label。
Returns:
dict -> {
'label': '水果',
'token_ids': [3717, 3362]
}
"""
# 如果传入为id_list, 则通过tokenizer转回来
if type(sub_label) == list:
# 移除[PAD]token(如果有)
pad_token_id = self.tokenizer.pad_token_id
while pad_token_id in sub_label:
sub_label.remove(pad_token_id)
# 将id_list转换为字符串
sub_label = ''.join(self.tokenizer.convert_ids_to_tokens(sub_label))
# 遍历标签映射字典,看标签是否在映射字典的子标签中
main_label = '无'
for label, s_labels in self.label_dict.items():
if sub_label in s_labels:
main_label = label
break
# 如果标签不在映射字典的子标签中,那么main_label将是'无‘
# 此时若hard_mapping为True,则通过最大公共子串匹配最相似的label
if main_label == '无' and hard_mapping:
main_label = self.hard_mapping(sub_label)
# 构建返回结果样式
ret = {
'label': main_label,
'token_ids': self.tokenizer(main_label)['input_ids'][1:-1] # 通过分词器转为id_list,并去掉[CLS]和[SEP]
}
return ret
def batch_find_main_label(self, sub_label: List[Union[list, str]], hard_mapping=True):
"""
批量通过子标签找父标签。
"""
return [self.find_main_label(l, hard_mapping) for l in sub_label]
inference.py中调用的是batch_find_main_label
函数,而batch_find_main_label
则是通过列表生成式调用find_main_label
函数,通过代码中的注释,能比较好地理解这个函数在坐什么。
对于衣物类的评论,有可能模型生成的词汇是“裤子”,而它不在“衣服”的子标签中,这个时候需要使用强匹配策略。
find_main_label
函数中,如果大模型生成的词汇(预测的子标签),不在映射字典的子标签中,则调用hard_mapping
函数,该函数是通过寻找与生成的词汇有最大公共子串的子标签(映射字典中的子标签)来确定主标签,因此调用了寻找最大公共子串的函数。(当然,如果是我的话,我会使用词嵌入的方式寻找最接近的子标签,进而推导出主标签,因为找公共子串不太靠谱)
代码如下:
def hard_mapping(self, sub_label: str):
"""
强匹配函数,当模型生成的子label不存在时,通过最大公共子串找到重合度最高的主label。
Args:
sub_label (str): 子label。
Returns:
str: 主label。
"""
label, max_overlap_str = '', 0
# 遍历映射字典,寻找重合度最高的主label
for main_label, sub_labels in self.label_dict.items():
overlap_num = 0
# 求所有子label与当前推理label之间的公共子串长度之和
for s_label in sub_labels:
overlap_num += self.get_common_sub_str(sub_label, s_label)[1]
if overlap_num >= max_overlap_str:
max_overlap_str = overlap_num
label = main_label
return label
def get_common_sub_str(self, str1: str, str2: str):
"""
寻找最大公共子串。
str1:abcd
str2:abadbcdba
返回公共字串和最大匹配长度
"""
lstr1, lstr2 = len(str1), len(str2)
# 生成0矩阵,为方便后续计算,比字符串长度多了一列
record = [[0 for i in range(lstr2 + 1)] for j in range(lstr1 + 1)]
p = 0 # 最长匹配对应在str1中的最后一位
maxNum = 0 # 最长匹配长度
for i in range(lstr1):
for j in range(lstr2):
if str1[i] == str2[j]:
record[i+1][j+1] = record[i][j] + 1
if record[i+1][j+1] > maxNum:
maxNum = record[i+1][j+1]
p = i + 1
return str1[p-maxNum:p], maxNum
再写段测试代码看看功能:
if __name__ == '__main__':
from rich import print
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('../../预训练模型/bert-base-chinese')
verbalizer = Verbalizer(
verbalizer_file='../data/verbalizer.txt',
tokenizer=tokenizer,
max_label_len=2
)
sub_label = ['西瓜', '裤子', '牛奶']
ret = verbalizer.batch_find_main_label(sub_label, hard_mapping=True)
print(ret)
输出
[
{'label': '水果', 'token_ids': [3717, 3362]},
{'label': '衣服', 'token_ids': [6132, 3302]},
{'label': '蒙牛', 'token_ids': [5885, 4281]}
]
4 模型训练(微调)
4.1 标签映射类(第二部分)
模型在训练时,是以子标签作为目标值,但我们拿到的样本数据,对应的标签却是主标签,所以在计算损失之前,需要根据主标签获取子标签。
前面我们介绍标签映射类的时候,只讲了由子标签找主标签,其实还有一部分代码,即由主标签找子标签,代码如下:
def find_sub_labels(self, label: Union[list, str]):
"""
通过标签找到所有的子标签。
Args:
label (Union[list, str]): 主标签, 文本型 或 id_list, e.g. -> '体育' or [860, 5509]
Returns:
dict -> {
'sub_labels': ['足球', '网球'],
'token_ids': [[6639, 4413], [5381, 4413]]
}
"""
# 如果传入为id_list, 则通过tokenizer转回来
if type(label) == list:
# 移除[PAD]token(如果有)
while self.tokenizer.pad_token_id in label:
label.remove(self.tokenizer.pad_token_id)
# 将id_list转换为字符串
label = ''.join(self.tokenizer.convert_ids_to_tokens(label))
# 如果主标签不在标签映射字典中,则报错
if label not in self.label_dict:
raise ValueError(f'Lable Error: "{label}" not in label_dict {list(self.label_dict)}.')
# 能执行到这里,说明没有报错,则直接从标签映射字典中获取子标签
sub_labels = self.label_dict[label]
# 将每个子标签转化为token id,并去掉[CLS]和[SEP]
token_ids = [_id[1:-1] for _id in self.tokenizer(sub_labels)['input_ids']]
# 遍历每个子标签,并进行截断与补齐
for i in range(len(token_ids)):
# 截断
token_ids[i] = token_ids[i][:self.max_label_len]
# 补齐
if len(token_ids[i]) < self.max_label_len:
token_ids[i] = (token_ids[i] + [self.tokenizer.pad_token_id] *
(self.max_label_len - len(token_ids[i])))
# 返回子标签和token id
ret = {'sub_labels': sub_labels, 'token_ids': token_ids}
return ret
def batch_find_sub_labels(self, label: List[Union[list, str]]):
"""
批量找到子标签。
"""
return [self.find_sub_labels(l) for l in label]
看懂了标签映射类(第一部分),这里的两个函数也很容易看懂。
4.2 损失函数
计算损失的函数在PET/utils/common_utils.py
中,这里需要注意的一个点是,假如某个样本的主标签为衣服,则它的主标签有三个,分别为衣服、裙子、西装,那么计算损失的时候,是计算预测值与这三个子标签的平均损失。
代码如下:
def mlm_loss(logits, mask_positions, sub_mask_labels, cross_entropy_criterion, device):
"""
计算指定位置的mask token的output与label之间的cross entropy loss。
Args:
logits (torch.tensor): 模型原始输出 -> (batch, seq_len, vocab_size)
mask_positions (torch.tensor): mask token的位置 -> (batch, mask_label_num)
sub_mask_labels (list): mask token的sub label, 子标签已转为token id,列表中的元素个数为 batch
e.g. -> [
[[2398, 3352]],
[[2398, 3352], [3819, 3861]]
]
cross_entropy_criterion (CrossEntropyLoss): CE Loss计算器
device (str): cpu还是gpu
Returns:
torch.tensor: CE Loss
"""
batch_size, seq_len, vocab_size = logits.size()
loss = None
for single_value in zip(logits, sub_mask_labels, mask_positions):
# 获取单个样本的相关信息
single_logits = single_value[0] # (seq_len, vocab_size)
single_sub_mask_labels = single_value[1] # (mask_label_num,)
single_mask_positions = single_value[2] # 列表,因为主标签对应的子标签有可能有多个
# 从模型输出中获取mask位置的logits
single_mask_logits = single_logits[single_mask_positions] # (mask_label_num, vocab_size)
# 广播重复,使第一个维度与子标签的数量相同,方便后续计算损失
single_mask_logits = single_mask_logits.repeat(len(single_sub_mask_labels), 1, 1) # (sub_label_num, mask_label_num, vocab_size)
# 修改维度
single_mask_logits = single_mask_logits.reshape(-1, vocab_size) # (sub_label_num * mask_label_num, vocab_size)
# 将子标签转换为torch张量,并迁移设备
single_sub_mask_labels = torch.LongTensor(single_sub_mask_labels).to(device) # (sub_label_num, mask_label_num)
# 调整标签维度
single_sub_mask_labels = single_sub_mask_labels.reshape(-1, 1).squeeze() # (sub_label_num * mask_label_num)
# 计算损失
cur_loss = cross_entropy_criterion(single_mask_logits, single_sub_mask_labels)
# 除以标签数量,得到相对于每个子标签的损失
cur_loss = cur_loss / len(single_sub_mask_labels)
# 累加损失
if not loss:
loss = cur_loss
else:
loss += cur_loss
# 计算每个样本的平均损失
loss = loss / batch_size # (1,)
return loss
4.3 模型的评估指标类
计算模型评估指标的代码在PET/utils/metirc_utils.py
中,这里写了一个名为ClassEvaluator的类,该类用来计算模型的各个评估指标,代码如下:
# coding='utf-8'
import numpy as np
from typing import List
from sklearn.metrics import accuracy_score, precision_score, f1_score, recall_score, confusion_matrix
class ClassEvaluator(object):
def __init__(self):
self.goldens = [] # 真实标签(主标签)
self.predictions = [] # 预测标签(主标签)
def add_batch(self, pred_batch: List[List], gold_batch: List[List]):
"""
添加一个batch中的prediction和gold列表,用于后续统一计算
这里的 pred_batch 和 gold_batch 都是子标签
Args:
pred_batch (list): 模型预测标签列表, e.g. -> [['体', '育'], ['财', '经'], ...]
gold_batch (list): 真实标签标签列表, e.g. -> [['体', '育'], ['财', '经'], ...]
"""
assert len(pred_batch) == len(gold_batch)
# 批量拼接
# [['体', '育'], ['财', '经'], ...] -> ['体育', '财经', ...]
if type(gold_batch[0]) in [list, tuple]:
# 将所有的label拼接为一个整label: ['体', '育'] -> '体育'
pred_batch = [''.join([str(e) for e in ele]) for ele in pred_batch]
# ele是单个样本,例如 ['体', '育'], ''.join([str(e) for e in ele])则是批量拼接
gold_batch = [''.join([str(e) for e in ele]) for ele in gold_batch]
# 扩展真实值与预测值的列表
self.goldens.extend(gold_batch)
self.predictions.extend(pred_batch)
def compute(self, round_num=2) -> dict:
"""
根据当前类中累积的变量值,计算当前的P, R, F1。
Args:
round_num (int): 计算结果保留小数点后几位, 默认小数点后2位。
Returns:
dict -> {
'accuracy': 准确率,
'precision': 精准率,
'recall': 召回率,
'f1': f1值,
'class_metrics': {
'0': {
'precision': 该类别下的precision,
'recall': 该类别下的recall,
'f1': 该类别下的f1
},
...
}
}
"""
classes, class_metrics, res = sorted(list(set(self.goldens) | set(self.predictions))), {}, {}
# 先将self.goldens和self.predictions两个列表转换为集合(set),然后对它们求并集,随后转列表并排序,得到classes
# classes是由字符串构成的列表,其中包含所有真实值与预测值
# 求总体的精度,准确率,召回率,F1
res['accuracy'] = round(accuracy_score(self.goldens, self.predictions), round_num)
res['precision'] = round(precision_score(self.goldens, self.predictions, average='weighted'), round_num)
res['recall'] = round(recall_score(self.goldens, self.predictions, average='weighted'), round_num)
res['f1'] = round(f1_score(self.goldens, self.predictions, average='weighted'), round_num)
try:
# 求混淆矩阵,并计算每个类别的指标
conf_matrix = np.array(confusion_matrix(self.goldens, self.predictions)) # (n_class, n_class)
assert conf_matrix.shape[0] == len(classes)
# 构建每个类别的评价指标
for i in range(conf_matrix.shape[0]):
precision = 0 if sum(conf_matrix[:, i]) == 0 else conf_matrix[i, i] / sum(conf_matrix[:, i])
recall = 0 if sum(conf_matrix[i, :]) == 0 else conf_matrix[i, i] / sum(conf_matrix[i, :])
f1 = 0 if (precision + recall) == 0 else 2 * precision * recall / (precision + recall)
class_metrics[classes[i]] = {
'precision': round(precision, round_num),
'recall': round(recall, round_num),
'f1': round(f1, round_num)
}
# 将每个类别的评价指标加入到返回结果中,这里class_metrics是包含各个类别的指标的字典
res['class_metrics'] = class_metrics
except Exception as e:
print(f'[Warning] Something wrong when calculate class_metrics: {e}')
print(f'-> goldens: {set(self.goldens)}')
print(f'-> predictions: {set(self.predictions)}')
print(f'-> diff elements: {set(self.predictions) - set(self.goldens)}')
res['class_metrics'] = {}
return res
def reset(self):
"""
重置积累的数值,即清空真实值与预测值的列表
"""
self.goldens = []
self.predictions = []
if __name__ == '__main__':
from rich import print
metric = ClassEvaluator()
metric.add_batch(
[['财', '经'], ['财', '经'], ['体', '育'], ['体', '育'], ['计', '算', '机']],
[['体', '育'], ['财', '经'], ['体', '育'], ['计', '算', '机'], ['计', '算', '机']],
)
print(metric.compute())
输出:
{
'accuracy': 0.6,
'precision': 0.7,
'recall': 0.6,
'f1': 0.6,
'class_metrics': {
'体育': {'precision': 0.5, 'recall': 0.5, 'f1': 0.5},
'计算机': {'precision': 1.0, 'recall': 0.5, 'f1': 0.67},
'财经': {'precision': 0.5, 'recall': 1.0, 'f1': 0.67}
}
}
这个类的用法:创建ClassEvaluator对象后,模型每预测一个batch的样本,就把真实标签(主标签)与预测值(同样是主标签)通过add_batch
方法加入进去,当所有用来评估的样本计算完之后,再调用compute
方法计算。
4.4 模型微调
模型的微调代码在PET/train.py
中,这里有两个函数,我们先讲model2train:
import os
import time
from transformers import AutoModelForMaskedLM, AutoTokenizer, get_scheduler
import sys
sys.path.append('data_handle')
from utils.metirc_utils import ClassEvaluator
from utils.common_utils import *
from data_handle.data_loader import *
from utils.verbalizer import Verbalizer
from pet_config import *
pc = ProjectConfig()
def model2train():
# 创建分词器、训练模型、标签映射器
tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)
model = AutoModelForMaskedLM.from_pretrained(pc.pre_model)
verbalizer = Verbalizer(verbalizer_file=pc.verbalizer,
tokenizer=tokenizer,
max_label_len=pc.max_label_len)
# 指定不需要权重衰减(L2正则化)的参数,通过指定参数名所包含的字符串来进行
no_decay = ["bias", "LayerNorm.weight"] # 如果参数名包含"bias"或者"LayerNorm.weight",则对应参数不进行L2正则化
optimizer_grouped_parameters = [
{
"params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
"weight_decay": pc.weight_decay,
},
{
"params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
"weight_decay": 0.0,
},
]
# [nd in n for nd in no_decay],遍历no_decay,若nd在n中则为True,否则False,最后得到一个由布尔值构成的列表
# any(List),判断列表中是否有True,如果有则返回True,否则返回False
# 如果[nd in n for nd in no_decay]为[False, False],则说明参数的名字中,既不包含bias和LayerNorm.weight
# 若参数的名字中,既不包含bias和LayerNorm.weight,则any(...)为False,相关参数需要进行权重衰减
# 创建优化器
optimizer = torch.optim.AdamW(optimizer_grouped_parameters, lr=pc.learning_rate)
model.to(pc.device)
# 创建训练集和验证集的数据导入器
train_dataloader, dev_dataloader = get_data(pc.prompt_file, pc.train_path, pc.dev_path,
pc.max_seq_len, pc.max_label_len,
pc.batch_size, tokenizer)
# 根据训练轮数计算最大训练步数,以便于scheduler动态调整lr
num_update_steps_per_epoch = len(train_dataloader)
# 计算总的训练步数与预热阶段的训练步数,确保学习率在整个训练过程中得以合理地调节
max_train_steps = pc.epochs * num_update_steps_per_epoch
warm_steps = int(pc.warmup_ratio * max_train_steps)
# 创建学习率调度器
lr_scheduler = get_scheduler(
name='linear',
optimizer=optimizer,
num_warmup_steps=warm_steps,
num_training_steps=max_train_steps,
)
# 创建评估指标计算器与损失函数计算器
metric = ClassEvaluator()
criterion = torch.nn.CrossEntropyLoss()
# 训练
loss_list = []
global_step, best_f1 = 0, 0
print('开始训练:')
tic_train = time.time()
for epoch in range(pc.epochs):
for batch in train_dataloader:
# 模型预测
logits = model(input_ids=batch['input_ids'].to(pc.device),
token_type_ids=batch['token_type_ids'].to(pc.device),
attention_mask=batch['attention_mask'].to(pc.device)).logits
# 真实标签(主标签)
mask_labels = batch['mask_labels'].numpy().tolist()
# 根据主标签找子标签
sub_labels = verbalizer.batch_find_sub_labels(mask_labels)
# 获取子标签的id
sub_labels = [ele['token_ids'] for ele in sub_labels]
# 计算损失
loss = mlm_loss(logits, batch['mask_positions'].to(pc.device), sub_labels,
criterion, pc.device)
# 梯度清零、反向传播、更新模型参数、更新学习率
optimizer.zero_grad()
loss.backward()
optimizer.step()
lr_scheduler.step()
# 记录损失
loss_list.append(float(loss.cpu().detach()))
# 全局步数+1
global_step += 1
# 打印信息
if global_step % pc.logging_steps == 0:
time_diff = time.time() - tic_train
loss_avg = sum(loss_list) / len(loss_list)
print("global step %d, epoch: %d, loss: %.5f, speed: %.2f step/s"
% (global_step, epoch, loss_avg, pc.logging_steps / time_diff))
tic_train = time.time()
# 保存与评估模型
if global_step % pc.valid_steps == 0:
cur_save_dir = os.path.join(pc.save_dir, "model_%d" % global_step)
if not os.path.exists(cur_save_dir):
os.makedirs(cur_save_dir)
# 保存模型
model.save_pretrained(os.path.join(cur_save_dir))
tokenizer.save_pretrained(os.path.join(cur_save_dir))
# 评估模型
acc, precision, recall, f1, class_metrics = evaluate_model(model,
metric,
dev_dataloader,
tokenizer,
verbalizer)
print("Evaluation precision: %.5f, recall: %.5f, F1: %.5f" % (precision, recall, f1))
# 保存最好的模型
if f1 > best_f1:
print(f"best F1 performence has been updated: {best_f1:.5f} --> {f1:.5f}")
print(f'Each Class Metrics are: {class_metrics}')
best_f1 = f1
cur_save_dir = os.path.join(pc.save_dir, "model_best")
if not os.path.exists(cur_save_dir):
os.makedirs(cur_save_dir)
model.save_pretrained(os.path.join(cur_save_dir))
tokenizer.save_pretrained(os.path.join(cur_save_dir))
tic_train = time.time()
print('训练结束')
在评估模型时,调用了evaluate_model
函数,该函数正是PET/train.py
脚本中的另一个函数:
def evaluate_model(model, metric, data_loader, tokenizer, verbalizer):
"""
在测试集上评估当前模型的训练效果。
Args:
model: 当前模型
metric: 评估指标类(metric)
data_loader: 测试集的dataloader
global_step: 当前训练步数
"""
model.eval()
metric.reset()
with torch.no_grad():
for step, batch in enumerate(data_loader):
# 模型预测
logits = model(input_ids=batch['input_ids'].to(pc.device),
token_type_ids=batch['token_type_ids'].to(pc.device),
attention_mask=batch['attention_mask'].to(pc.device)).logits
# 真实标签(主标签)
mask_labels = batch['mask_labels'].numpy().tolist() # (batch, label_num)
# 去掉label中的[PAD]
for i in range(len(mask_labels)):
while tokenizer.pad_token_id in mask_labels[i]:
mask_labels[i].remove(tokenizer.pad_token_id)
# token id转文字
mask_labels = [''.join(tokenizer.convert_ids_to_tokens(t)) for t in mask_labels]
# 将mask_position位置的token logits转换为token的id
predictions = convert_logits_to_ids(logits, batch['mask_positions']) # (batch, label_num)
predictions = predictions.cpu().numpy().tolist()
# 找到子label属于的主label
predictions = verbalizer.batch_find_main_label(predictions)
predictions = [ele['label'] for ele in predictions] # 获取字符串
# 将预测值与真实值加入到评估指标计算器中(暂不计算)
metric.add_batch(pred_batch=predictions, gold_batch=mask_labels)
# 计算评估指标
eval_metric = metric.compute()
# 模型切换回训练模式
model.train()
# 返回评估指标
return eval_metric['accuracy'], eval_metric['precision'], \
eval_metric['recall'], eval_metric['f1'], \
eval_metric['class_metrics']
我们再在这个脚本的最下面写一段测试代码:
if __name__ == '__main__':
model2train()
输出:
Some weights of the model checkpoint at ../预训练模型/bert-base-chinese were not used when initializing BertForMaskedLM: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Map: 100%|██████████| 63/63 [00:00<00:00, 941.55 examples/s]
Map: 100%|██████████| 590/590 [00:00<00:00, 1885.03 examples/s]
开始训练:
global step 10, epoch: 1, loss: 0.77675, speed: 0.07 step/s
训练结束
由于epoch配置的是2,所以训练两轮就结束了,实际训练时,需要把epoch改大。
好的,至此,该项目的代码讲解完毕。