当前位置: 首页 > article >正文

大模型开发(四):PET项目——新零售决策评价系统(上)

PET项目——新零售决策评价系统(上)

  • 0 前言
  • 1 项目介绍
    • 1.1 PET简介
    • 1.2 项目背景
    • 1.3 项目结构
    • 1.4 硬件配置
  • 2 数据处理
    • 2.1 数据介绍
    • 2.2 提示词模板与标签映射
    • 2.3 BERT模型的输入格式
    • 2.4 硬模板类
    • 2.5 函数式编程
    • 2.6 datasets模块
      • 主要功能:
      • 在本项目中的使用:
    • 2.7 数据集加载与批处理
    • 2.8 数据集导入器
  • 3 模型配置与推理
    • 3.1 配置文件
    • 3.2 模型推理
    • 3.3 convert_logits_to_ids函数
    • 3.4 标签映射类(第一部分)
  • 4 模型训练(微调)
    • 4.1 标签映射类(第二部分)
    • 4.2 损失函数
    • 4.3 模型的评估指标类
    • 4.4 模型微调

0 前言

上篇文章我们介绍了使用全量微调构建医疗问诊机器人,比较常用的微调还有提示词微调(Prompt微调)、PEFT,而Prompt微调用的比较多有PET和P-Tuning,而我们今天介绍的项目,就是使用PET的方式微调大模型。

1 项目介绍

1.1 PET简介

PET的全称是Pattern-Exploiting Training,主要用来做文本分类(Prompt微调基本都是文本分两类),它是通过构建提示词模板(硬模版,需要我们自己指定模板)的方式,将下游任务转化为一个完形填空任务,这样就可以用BERT的MLM模型来进行预测了。

1.2 项目背景

智能推荐系统是AI技术在新零售中 最为常见且有效的应用之一,通过分析用户的购买历史、浏览行为以及喜好偏好,推荐系统可以根据个人特征给用户进行个性化商品推荐。这种个性化推荐不仅可以提高用户购买意愿,减少信息过载,还可以带来更高的用户满意度和销量。

在智能推荐系统中,文本分类的应用属于重要的应用环节。比如:某电商网站都允许用户为商品填写评论,这些文本评论能够体现出用户的偏好以及商品特征信息。因此,本次项目我们将以“电商平台用户评论”为背景,基于深度学习方法实现评论文本的准确分类,这样做的目的是通过用户对不同商品或服务的评价,平台能够快速回应用户需求,改进产品和服务。同时,自动分类也为个性化推荐奠定基础,帮助用户更轻松地找到符合其偏好的商品。

本项目要做的就是对用户的评论进行分类,当输入用户的评论后,输出这是对什么商品的评论,比如输入为“包装不错,苹果挺甜的,个头也大。”,输出“苹果”。

本项目我们使用BERT模型作为基础模型。

1.3 项目结构

我们把代码和预训练模型放到两个不同的目录中
在这里插入图片描述
其中代码在PET目录下面,结构如下:
在这里插入图片描述

1.4 硬件配置

硬件我们使用FunHPC云算力市场上的RTX 3080显卡,显存为12G,关于FunHPC云算力的使用,可以参考这篇文章。

2 数据处理

个人认为,本项目最难、最繁琐的部分就是数据处理了,特别是之前没有开发过和BERT相关的项目。这里需要铺垫的知识有点多,我尽量在用到之前,对其进行讲解。

2.1 数据介绍

训练集和验证集分别在data目录的train.txt与dev.txt中,其中 train.txt 内容如下:
在这里插入图片描述
train.txt一共包含63条样本数据,dev.txt一共包含590条样本数据,每一行用\t分开,前半部分为标签(label),后半部分为原始输入 (用户评论),如果想使用自定义数据训练,只需要仿照上述示例数据构建数据集即可。

训练集数据少是由prompt微调的思想决定的,微调的时候更多地是使用预训练模型地能力,验证集数据多是为了保证评估结果的可信度,因此需要比较多的数据对模型进行充分评估。

2.2 提示词模板与标签映射

data目录下的prompt.txt中保存的是提示词模板,内容如下:

这是一条{MASK}评论:{textA}

data目录下的verbalizer.txt中保存的是主标签与子标签的对应关系,内容如下:

电脑	电脑
水果	水果,苹果,香蕉,西瓜,西瓜
平板	平板
衣服	衣服,裙子,西装
酒店	酒店
洗浴	洗浴
书籍	书籍
蒙牛	蒙牛
手机	手机
电器	电器

这里第一列是主标签,第二列是子标签,我们的项目中,主标签和子标签是一致的。但很多情况下,两者是不一致的。

例如,以新闻分类为例:

某条新闻的标题为:中国女排再夺冠!
我们构建的PET模板为:下面是[MASK][MASK]新闻:textA,Label:体育/财经/时政/军事。
如果使用标签去填充,那么构建的提示词为:下面是[MASK][MASK]新闻:中国女排再夺冠!

如果让模型去预测[MASK][MASK]遮盖的内容,那么很可能得到排球、女排、体育等词汇,它有可能不在我们要预测的范围内(体育/财经/时政/军事),所以需要对标签进行映射,将排球、女排、体育都映射成体育。

训练时,以子标签作为目标值,损失函数的计算也是通过对比预测值与子标签的差距来进行;计算各种评估指标时(例如准确率、召回率等)则是对比主标签(即把预测值转换为主标签,然后计算)。

2.3 BERT模型的输入格式

如果不了解BERT模型的输入格式,那么很难理解本项目的数据处理过程。

BERT(Bidirectional Encoder Representations from Transformers)模型的输入通常包含以下几个部分,这些部分共同构成了一个完整的输入格式:

  1. input_ids

    • 含义:这是输入文本经过分词器(如 AutoTokenizer)编码后的 token ID 列表。
    • 作用:用于表示输入文本中的每个 token(单词或子词)。BERT 模型通过这些 IDs 来查找对应的词向量。
  2. attention_mask

    • 含义:这是一个与 input_ids 长度相同的二进制列表,用于标识哪些位置是有效的 token(值为 1),哪些位置是填充的 token(值为 0)。
    • 作用:帮助模型忽略填充的部分,只关注实际的输入内容。这对于处理不同长度的输入序列非常重要。
  3. token_type_ids

    • 含义:这是一个与 input_ids 长度相同的整数列表,用于区分不同的句子或文本片段。BERT 支持处理多句输入(例如:[CLS] 句子A [SEP] 句子B [SEP]),此时需要通过 token_type_ids 来标识每个 token 属于哪个句子。
    • 作用:在多句输入的情况下,帮助模型理解哪些 token 属于哪个句子。例如,在问答任务中,token_type_ids 可以区分问题和上下文。
    • 如果输入是单句 "这是一条[MASK]评论:这个手机也太卡了",那么 token_type_ids 将全部为 0。
    • 如果输入是两个句子 [CLS]句子A[SEP]句子B[SEP],那么 token_type_ids 的前半部分为 0,后半部分为 1。
  4. position_ids(可选):

    • 含义:这是每个 token 在输入序列中的位置索引。
    • 例如"这是一条[MASK]评论:这个手机也太卡了",假设经过分词器处理后,输入文本被编码为以下 token 列表(包括特殊 token [CLS][SEP]),对应的 position_ids 就是这些 token 在输入序列中的位置索引:
input_tokens = ['[CLS]', '这', '是', '一', '条', '[MASK]', '评', '论', ':', '这', '个', '手', '机', '也', '太', '卡', '了', '[SEP]']
position_ids = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
  1. mask_position(特定任务使用):
    • 含义:这是一个标记 [MASK] 位置的列表,用于指示哪些位置是被遮盖的 token。
    • 作用:在 MLM(Masked Language Model)任务中,模型需要知道哪些位置是被遮盖的,以便进行预测。
    • [MASK] 出现在第 6 个位置(从 0 开始计数),因此 mask_position 为:
mask_position = [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

对于大多数 BERT 应用场景,输入至少需要包含 input_idsattention_mask。如果涉及多句输入,则还需要 token_type_ids,而 mask_position 则是特定任务(如 MLM)中使用的辅助信息。我们当前的分类任务,就是被 [MASK] 遮盖的商品类别,所以是MLM任务,需要mask_position

2.4 硬模板类

所谓硬模板,指的是人工设计的模板,或者说开发人员/产品经理设计的模板,除了硬模板,还有软模板,软模版是大模型生成的模板,关于软模版,我们这里不展开,后面的文章会介绍。

data_handle目录下的template.py中写了一个名为HardTemplate的类,它有两个功能:一是将样本数据填充到模板里面,构成提示词;二是将提示词按照BERT模型的输入格式进行转化。

template.py的内容如下:

# -*- coding:utf-8 -*-
from transformers import AutoTokenizer
import numpy as np


class HardTemplate(object):
    """
    硬模板,人工定义句子和[MASK]之间的位置关系。
    """

    def __init__(self, prompt: str):
        """
        Args:
            prompt (str): prompt格式定义字符串,它是提示词模板
            e.g. -> "这是一条{MASK}评论:{textA}。",两个花括号中的内容是自定义字段,目前用 MASK 和 textA 进行占位
        """
        self.prompt = prompt
        self.inputs_list = []               # 用于填充prompt的输入列表,获取模板分词后的结果
        self.custom_tokens = set(['MASK'])  # 从prompt中解析出的自定义token,比如'MASK'和'textA',即花括号中的内容
        self.prompt_analysis()              # 解析prompt模板

    def prompt_analysis(self):
        """
        将prompt文字模板拆解为可映射的数据结构。

        Examples:
            prompt -> "这是一条{MASK}评论:{textA}。"

            该函数执行后
            inputs_list -> ['这', '是', '一', '条', 'MASK', '评', '论', ':', 'textA', '。']
            custom_tokens -> {'textA', 'MASK'},这是一个集合
        """
        idx = 0
        while idx < len(self.prompt):
            str_part = ''
            if self.prompt[idx] not in ['{', '}']:
                self.inputs_list.append(self.prompt[idx])
            if self.prompt[idx] == '{':                  # 进入自定义字段
                idx += 1
                while self.prompt[idx] != '}':              # 右边的花括号是在这个分支中遍历完
                    str_part += self.prompt[idx]            # 拼接该自定义字段的值
                    idx += 1
            elif self.prompt[idx] == '}':                   # 右边花括号如果出现在这个分支中,则说明花括号不匹配,报错
                raise ValueError("Unmatched bracket '}', check your prompt.")
            if str_part:
                self.inputs_list.append(str_part)
                self.custom_tokens.add(str_part)            # 将所有自定义字段存储,后续会检测输入信息是否完整
            idx += 1

    def __call__(self,
                 inputs_dict: dict,
                 tokenizer,
                 mask_length,
                 max_seq_len=512):
        """
        输入一个样本,将其填充到硬模板当中,并将其转化为Bert模型的输入格式
        Bert模型的输入包含 input_ids、attention_mask、token_type_ids、position_ids(可选)、mask_position(特定任务使用)

        Args:
            inputs_dict (dict): prompt中的参数字典, e.g. -> {"textA": "这个手机也太卡了", "MASK": "[MASK]"}
            tokenizer: 用于encoding文本
            mask_length (int): MASK token 的长度,即掩码的字符长度

        Returns:
            dict -> {
                'text': '[CLS]这是一条[MASK]评论:这个手机也太卡了。[SEP]',
                'input_ids': [1, 47, 10, 7, 304, 3, 480, 279, 74, 47, 27, 247, 98, 105, 512, 777, 15, 12043, 2],
                'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],    # 填充后的提示词,哪些位置是需要注意的,对于不需要注意的,即用[PAD]填充的,则置0
                'mask_position': [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]      # 在[MASK]填充的位置置1
            }
        """
        # 定义输出格式(Bert模型的接收格式)
        outputs = {
            'text': '', 
            'input_ids': [],
            'token_type_ids': [],
            'attention_mask': [],
            'mask_position': []
        }

        # 构建输入到模型的字符串,即将样本信息填充到模板
        str_formated = ''
        for value in self.inputs_list:
            if value in self.custom_tokens:
                if value == 'MASK':
                    # 用inputs_dict中,'MASK'键对应的值(本项目对应的值为'[MASK]'),替换模板中的'MASK'字段
                    # 并且根据mask_length,将MASK字段重复mask_length次,拼接起来
                    str_formated += inputs_dict[value] * mask_length
                else:
                    # 这个分支中,value == 'textA',即将样本信息填充到模板中
                    str_formated += inputs_dict[value]
            else:
                str_formated += value
        # print(f'str_formated-->{str_formated}')

        # 将填充后的字符串,用 tokenizer 进行编码,得到模型的输入格式
        encoded = tokenizer(text=str_formated,
                            truncation=True,
                            max_length=max_seq_len,
                            padding='max_length')
        # print(f'encoded--->{encoded}')
        outputs['input_ids'] = encoded['input_ids']     # encoded['input_ids']有起始符、分隔符和填充符,分词器给加上的
        outputs['token_type_ids'] = encoded['token_type_ids']
        outputs['attention_mask'] = encoded['attention_mask']

        # 将input_ids从词表中的索引转化为对应的token,方便阅读
        outputs['text'] = ''.join(tokenizer.convert_ids_to_tokens(encoded['input_ids']))

        # 将'[MASK]'转化为词表中的索引,方便后续使用
        mask_token_id = tokenizer.convert_tokens_to_ids(['[MASK]'])[0]

        # 获取mask_position,即[MASK]的位置
        mask_position = np.where(np.array(outputs['input_ids']) == mask_token_id)[0].tolist()
        outputs['mask_position'] = mask_position

        return outputs


if __name__ == '__main__':
    # 创建分词器
    tokenizer = AutoTokenizer.from_pretrained('../../预训练模型/bert-base-chinese')
    # print(tokenizer.convert_ids_to_tokens([3819, 3352, 3819, 3352]))
    # print(tokenizer.convert_tokens_to_ids(['网', '球']))

    # 创建硬模板对象
    hard_template = HardTemplate(prompt='这是一条{MASK}评论:{textA}')
    # print(hard_template.inputs_list)
    # print(hard_template.custom_tokens)

    # 往硬模板内填充内容
    tep = hard_template(
                inputs_dict={'textA': '包装不错,苹果挺甜的,个头也大。', 'MASK': '[MASK]'},
                tokenizer=tokenizer,
                max_seq_len=30,
                mask_length=2)

    # 打印
    for key, value in tep.items():
        print(f'{key}: {value}')

对BERT模型的输入格式有所了解后,这个脚本的代码理解起来就比较容易。输出:

text: [CLS]这是一条[MASK][MASK]评论:包装不错,苹果挺甜的,个头也大。[SEP][PAD][PAD][PAD]
input_ids: [101, 6821, 3221, 671, 3340, 103, 103, 6397, 6389, 8038, 1259, 6163, 679, 7231, 8024, 5741, 3362, 2923, 4494, 4638, 8024, 702, 1928, 738, 1920, 511, 102, 0, 0, 0]
token_type_ids: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
attention_mask: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0]
mask_position: [5, 6]

2.5 函数式编程

按理说,函数式编程和数据处理关系并不大,但data_handle目录下的data_preprocess.py用到了,所以这里讲一下。

functools.partial 是 Python 标准库 functools 模块中的一个工具,用于部分应用函数(partial function application),它允许你固定某些参数的值,从而创建一个新的函数。

以下是 functools.partial 的用法和特点:

  1. 基本语法
from functools import partial

new_function = partial(function, *args, **kwargs)
  • function: 要部分应用的原始函数。
  • *args: 固定的位置参数。
  • **kwargs: 固定的关键字参数。
  • new_function: 返回的新函数,其中部分参数已经被固定。

通过 partial,你可以将一个函数的部分参数预先绑定,生成一个新的函数。新函数在调用时只需传递剩余的参数即可。

  1. 示例1:固定位置参数
from functools import partial

def add(x, y):
    return x + y

# 创建一个新函数,固定第一个参数为5
add_five = partial(add, 5)

print(add_five(10))  # 输出: 15
  1. 示例2:固定关键字参数
from functools import partial

def greet(name, greeting="Hello"):
    return f"{greeting}, {name}!"

# 创建一个新函数,固定greeting为"Hi"
hi_greet = partial(greet, greeting="Hi")

print(hi_greet("Alice"))  # 输出: Hi, Alice!
  1. 示例 3:结合位置参数和关键字参数
from functools import partial

def multiply(x, y, z):
    return x * y * z

# 创建一个新函数,固定x=2, y=3
multiply_by_2_and_3 = partial(multiply, 2, 3)

print(multiply_by_2_and_3(4))  # 输出: 24

2.6 datasets模块

同样的,data_handle目录下的data_preprocess.py也用到了datasets模块,它是 Hugging Face 提供的一个用于处理机器学习和深度学习任务中数据集的 Python 库。它简化了从各种来源加载、预处理和迭代数据的过程,支持多种格式的数据集,并且与 Hugging Face 的 transformers 库无缝集成。

安装命令:pip install datasets

主要功能:

  1. 加载数据集

    • 可以从本地文件或远程资源加载数据集。
    • 支持多种格式,如 CSV、JSON、文本文件等。
    • 内置了许多常用数据集,可以直接通过名称加载。
  2. 数据预处理

    • 提供了丰富的 API 来对数据进行预处理,如分词、编码、填充等。
    • 支持批量处理(batched),可以显著提高处理效率。
    new_dataset = dataset.map(function, batched=False, batch_size=None, num_proc=1, ...)
    
    • 参数说明如下:
      • function: 要应用到每个样本上的函数。该函数应该接受一个字典作为输入,并返回一个字典。
      • batched (bool): 是否以批次的方式处理数据。如果为 True,则 function 将接收一批样本(即包含多个样本的字典),而不是单个样本。
      • batch_size (int): 如果 batched=True,则指定每批次的样本数量。默认情况下,batch_size=1000。
      • num_proc (int): 并行处理的进程数。设置为大于 1 的值可以加速处理过程。
      • 其他参数: 还有其他一些参数,如 remove_columns、load_from_cache_file 等,可以根据需要使用。
  3. 数据迭代(了解即可)

    • 提供了灵活的迭代器,方便在训练过程中逐批次获取数据。
    • 支持分布式训练中的数据分片。
  4. 数据增强(了解即可)

    • 可以通过自定义函数对数据进行增强或转换。

在本项目中的使用:

在本项目的代码中,datasets 模块主要用于以下几个方面:

  1. 加载数据集
train_dataset = load_dataset('text', data_files=r'../data/train.txt')

这里使用了 load_dataset 函数从本地文本文件加载数据集。load_dataset 返回一个 DatasetDict 对象,其中包含不同分割(如训练集、验证集、测试集)的数据。

  1. 映射数据处理函数
new_dataset = train_dataset.map(convert_func, batched=True)

使用 map 方法将 convert_func 函数应用到数据集的每个样本上,并返回一个新的数据集对象,batched=True 参数表示将整个数据集作为一个批次进行处理,这个方法非常高效,因为它可以在底层进行优化,例如并行处理和批处理。

至于convert_func是什么,我们稍后介绍。

2.7 数据集加载与批处理

data_handle目录下的data_preprocess.py,有了前面的知识铺垫,里面的代码理解起来会容易很多。内容如下:

import torch
import numpy as np
from functools import partial
from datasets import load_dataset
from transformers import AutoTokenizer
from template import HardTemplate


def convert_example(
        examples: dict,
        tokenizer,
        max_seq_len: int,
        max_label_len: int,
        hard_template: HardTemplate,
        train_mode=True,
        return_tensor=False) -> dict:
    """
    将样本数据转换为模型接收的输入数据。

    Args:
        examples (dict): 训练数据样本, e.g. -> {
                                                "text": [
                                                            '手机	这个手机也太卡了。',
                                                            '体育	世界杯为何迟迟不见宣传',
                                                            ...
                                                ]
                                            }
        max_seq_len (int): 句子的最大长度,若没有达到最大长度,则padding为最大长度
        max_label_len (int): 最大label长度,若没有达到最大长度,则padding为最大长度
        hard_template (HardTemplate): 模板类。
        train_mode (bool): 训练阶段 or 推理阶段。
        return_tensor (bool): 是否返回tensor类型,如不是,则返回numpy类型。

    Returns:
        dict (str: np.array) -> tokenized_output = {
                            'input_ids': [[1, 47, 10, 7, 304, 3, 3, 3, 3, 47, 27, 247, 98, 105, 512, 777, 15, 12043, 2], ...],
                            'token_type_ids': [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], ...],
                            'attention_mask': [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], ...],
                            'mask_positions': [[5, 6, 7, 8], ...],
                            'mask_labels': [[2372, 3442, 0, 0], [2643, 4434, 2334, 0], ...]
                        }
    """
    # 定义输出格式(Bert模型的接收格式)
    tokenized_output = {
        'input_ids': [],
        'token_type_ids': [],
        'attention_mask': [],
        'mask_positions': [],
        'mask_labels': []
    }

    # 遍历样本数据,将样本填充到模板中,并转化为Bert模型的输入格式
    for i, example in enumerate(examples['text']):
        if train_mode:
            # 如果是训练模式,则既有样本的label,也有样本的文本内容
            label, content = example.strip().split('\t')
        else:
            # 如果是评估(推理)模式,则只有样本的文本内容
            content = example.strip()

        # 需要将每个样本整合成一个字典,这样硬模板对象才方便处理
        inputs_dict = {
            'textA': content,
            'MASK': '[MASK]'
        }

        # 将样本填充到硬模板当中,并将其转化为Bert模型的输入格式
        encoded_inputs = hard_template(
            inputs_dict=inputs_dict,
            tokenizer=tokenizer,
            max_seq_len=max_seq_len,
            mask_length=max_label_len)

        # 收集硬模板的处理结果
        tokenized_output['input_ids'].append(encoded_inputs["input_ids"])
        tokenized_output['token_type_ids'].append(encoded_inputs["token_type_ids"])
        tokenized_output['attention_mask'].append(encoded_inputs["attention_mask"])
        tokenized_output['mask_positions'].append(encoded_inputs["mask_position"])

        # 对于训练模式,则需要将label转化为Bert模型的输入格式
        if train_mode:
            # 将标签转化为id
            label_encoded = tokenizer(text=[label])

            # 因为分词器处理的结果是以[CLS]开头和[SEP]结尾的,所以需要将[CLS]和[SEP]去除
            label_encoded = label_encoded['input_ids'][0][1:-1] # 这里中间有个索引[0],是因为分词器输入标签时,也用了一层列表包装

            # 去掉[CLS]和[SEP]后,如果标签的长度大于max_label_len,则截断
            label_encoded = label_encoded[:max_label_len]

            # 如果标签的长度小于max_label_len,则填充pad_token_id
            label_encoded = label_encoded + [tokenizer.pad_token_id] * (max_label_len - len(label_encoded))

            # 收集处理后的标签
            tokenized_output['mask_labels'].append(label_encoded)

    # 将数据转化为torch.tensor或者numpy.array格式,方便后续处理
    for k, v in tokenized_output.items():
        if return_tensor:
            tokenized_output[k] = torch.LongTensor(v)
        else:
            tokenized_output[k] = np.array(v)

    return tokenized_output


if __name__ == '__main__':
    # 导入数据
    train_dataset = load_dataset('text', data_files=r'../data/train.txt')
    # load_dataset会返回一个datasets.dataset_dict.DatasetDict对象,该对象有一个名为train的键,train_dataset['train']是一个datasets.arrow_dataset.Dataset对象
    # train_dataset['train']['text']是一个列表,里面存储着每条样本的文本内容,原来txt文件中,每个非空行都会被当成一个样本

    # 创建分词器
    tokenizer = AutoTokenizer.from_pretrained('../../预训练模型/bert-base-chinese')

    # 创建硬模板对象
    hard_template = HardTemplate(prompt='这是一条{MASK}评论:{textA}')

    # 函数式编程
    convert_func = partial(convert_example,
                           tokenizer=tokenizer,
                            hard_template=hard_template,
                            max_seq_len=30,
                            max_label_len=2,
                           )
	# 这里将 `convert_example` 函数的部分参数(如 `tokenizer`, `hard_template`, `max_seq_len`, `max_label_len`)预先绑定,
	# 生成了一个新的函数 `convert_func`。这样,在后续调用 `convert_func` 时,只需要传递剩余的参数(如 `examples` 和其他可选参数)。

    # batched=True相当于将train_dataset看成一个批次的样本直接对数据进行处理,节省时间
    new_dataset = train_dataset.map(convert_func, batched=True)     # dataset是一个datasets.dataset_dict.DatasetDict对象

    for value in new_dataset['train']:
        # value将是一个字典,其中包含输入的id、token_type_id、attention_mask、mask_position和mask_label
        print(type(value))
        for k, v in value.items():
            print(k, v)

        print(len(value['input_ids']))
        break

输出:

<class 'dict'>
text 电脑	(1)这款笔记本外观感觉挺漂亮的,分量吗,对我来说不算沉。 (2)安装了WindowsXP系统后,运行的速度挺快。发热量没有想象中那么大。可能尚未运行很耗资源的程序,没有感到内存的弊病。不过,1G的内存确实有点小。 (3)附赠的包很不错,挺有手感的。但是附赠的鼠标实在是太小了,幸好同时订了一个双飞燕的鼠标哟。
input_ids [101, 6821, 3221, 671, 3340, 103, 103, 6397, 6389, 8038, 113, 122, 114, 6821, 3621, 5011, 6381, 3315, 1912, 6225, 2697, 6230, 2923, 4023, 778, 4638, 8024, 1146, 7030, 102]
token_type_ids [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
attention_mask [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
mask_positions [5, 6]
mask_labels [4510, 5554]
30

2.8 数据集导入器

data_handle目录下的data_loader.py只有一个函数,即get_data,它的目的是获得数据导入器,只要前面的template.py和data_preprocess.py掌握之后,这里的代码理解起来很容易。

# coding:utf-8
from torch.utils.data import DataLoader
from transformers import default_data_collator
from data_preprocess import *


def get_data(prompt_file, train_path, dev_path, max_seq_len, max_label_len, batch_size, tokenizer):
    """
    准备数据集函数。

    该函数从指定的文件中读取提示词模板,加载训练集和验证集,并将它们转换为模型可以接受的格式。
    参数:
    - prompt_file: 提示词模板文件的路径。
    - train_path: 训练集文件的路径。
    - dev_path: 验证集文件的路径。
    - max_seq_len: 输入序列的最大长度。
    - max_label_len: 标签序列的最大长度。
    - batch_size: 每个批次的大小。
    - tokenizer: 分词器。

    返回:
    - train_dataloader: 训练集的数据加载器。
    - dev_dataloader: 验证集的数据加载器。
    """
    # 从文件prompt.txt从读取提示词模板
    prompt = open(prompt_file, 'r', encoding='utf8').readlines()[0].strip()

    # 创建硬模板对象
    hard_template = HardTemplate(prompt=prompt)

    # 导入数据
    dataset = load_dataset('text', data_files={'train': train_path, 'dev': dev_path})
    # load_dataset会返回一个datasets.dataset_dict.DatasetDict对象,字典里包含 train 键和 dev 键

    # 将将样本数据转换为模型接收的输入数据。
    # 函数式编程
    new_func = partial(convert_example,
                       tokenizer=tokenizer,
                       hard_template=hard_template,
                       max_seq_len=max_seq_len,
                       max_label_len=max_label_len)

    # 将数据集转换为模型接收的输入数据
    dataset = dataset.map(new_func, batched=True)

    # 从数据集对象中获取训练集与验证集
    train_dataset = dataset["train"]
    dev_dataset = dataset["dev"]
    # print('train_dataset', train_dataset[:2])
    # print('*'*80)
    # default_data_collator作用,转换为tensor数据类型

    # 创建DataLoader对象,用于加载数据集
    train_dataloader = DataLoader(train_dataset,
                                  shuffle=True,
                                  collate_fn=default_data_collator,
                                  batch_size=batch_size)
    dev_dataloader = DataLoader(dev_dataset,
                                collate_fn=default_data_collator,
                                batch_size=batch_size)
    return train_dataloader, dev_dataloader


if __name__ == '__main__':
    # 创建分词器
    tokenizer = AutoTokenizer.from_pretrained('../../预训练模型/bert-base-chinese')

    # 调用get_data函数,获取训练集和验证集的数据加载器
    train_dataloader, dev_dataloader = get_data(prompt_file='../data/prompt.txt',
                                                train_path='../data/train.txt',
                                                dev_path='../data/dev.txt',
                                                max_seq_len=512,
                                                max_label_len=2,
                                                batch_size=8,
                                                tokenizer=tokenizer)
    print('训练集batch数量:', len(train_dataloader))
    print('验证集batch数量:', len(dev_dataloader))
    print('-'*80)

    for i, value in enumerate(train_dataloader):
        print('type(value): ', type(value))
        print('value.keys(): ', value.keys())
        print('len(value): ', len(value))
        print(r"value['input_ids'].shape: ", value['input_ids'].shape)
        print(r"value['input_ids'].dtype: ", value['input_ids'].dtype)

输出

训练集batch数量: 8
验证集batch数量: 74
--------------------------------------------------------------------------------
type(value):  <class 'dict'>
value.keys():  dict_keys(['input_ids', 'token_type_ids', 'attention_mask', 'mask_positions', 'mask_labels'])
len(value):  5
value['input_ids'].shape:  torch.Size([8, 512])
value['input_ids'].dtype:  torch.int64

从输出中可以看到,value['input_ids'].shapetorch.Size([8, 512]),8表示batch_size,512表示句子的最大长度。

3 模型配置与推理

3.1 配置文件

在当前目录(PET)下有个名为pet_config.py的文件,它是模型的配置文件,即把相关的配置参数都写到了一个类中,代码如下:

# coding:utf-8
import torch


class ProjectConfig(object):
    def __init__(self):
        self.device = 'cuda:0' if torch.cuda.is_available() else 'cpu' # windows电脑/linux服务器
        self.pre_model = r'../预训练模型/bert-base-chinese'
        self.train_path = r'data/train.txt'
        self.dev_path = r'data/dev.txt'
        self.prompt_file = r'data/prompt.txt'
        self.verbalizer = r'data/verbalizer.txt'
        self.max_seq_len = 512      # 输入文本的最大长度
        self.batch_size = 8
        self.learning_rate = 5e-5   # 学习率,在NLP中,一般1e-6 ~ 1e-5之间
        self.weight_decay = 0       # L2正则化系数
        self.warmup_ratio = 0.06	# 学习率预热参数
        self.max_label_len = 2      # 标签的最大长度(字符个数)
        self.epochs = 2
        self.logging_steps = 10     # 日志相关设置
        self.valid_steps = 20       # 每隔多少个step做一次验证
        self.save_dir = r'checkpoints'


if __name__ == '__main__':
    pc = ProjectConfig()
    print(pc.prompt_file)
    print(pc.pre_model)

输出

data/prompt.txt
../预训练模型/bert-base-chinese

有了上一篇文章(全量微调项目——基于GPT2 搭建医疗问诊机器人)和刚刚介绍的内容为基础,这个脚本看懂并不难,这里不做赘述。

我们这边只是为了简单演示,epoch配置的是2,实际工作中,不可能用那么小的。

3.2 模型推理

在当前目录下,有个名为inference.py的脚本,内容如下:

import time
from typing import List

import torch
from rich import print
from transformers import AutoTokenizer, AutoModelForMaskedLM

import sys
sys.path.append('data_handle')

from utils.verbalizer import Verbalizer
from data_handle.template import HardTemplate
from data_handle.data_preprocess import convert_example
from utils.common_utils import convert_logits_to_ids


def inference(contents: List[str]):
    """
    推理函数,输入原始句子,输出mask label的预测值。

    Args:
        contents (List[str]): 描原始句子列表。
    """
    device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
    model_path = 'checkpoints/model_best_old'

    # 创建分词器
    tokenizer = AutoTokenizer.from_pretrained(model_path)

    # 创建模型
    model = AutoModelForMaskedLM.from_pretrained(model_path)
    model.to(device).eval()

    # 标签最大长度
    max_label_len = 2

	# 创建标签映射器
    verbalizer = Verbalizer(
        verbalizer_file='data/verbalizer.txt',
        tokenizer=tokenizer,
        max_label_len=max_label_len
    )

    # prompt定义
    prompt = open('data/prompt.txt', 'r', encoding='utf8').readlines()[0].strip()

    # 创建硬模板
    hard_template = HardTemplate(prompt=prompt)

    with torch.no_grad():
        start_time = time.time()

        # 将样本数据转换为模型接收的输入数据。
        examples = {'text': contents}
        tokenized_output = convert_example(
            examples, 
            tokenizer, 
            hard_template=hard_template,
            max_seq_len=128,
            max_label_len=max_label_len,
            train_mode=False,
            return_tensor=True
        )

        # 模型推理
        logits = model(input_ids=tokenized_output['input_ids'].to(device),
                        token_type_ids=tokenized_output['token_type_ids'].to(device),
                        attention_mask=tokenized_output['attention_mask'].to(device)).logits
		# logits的形状为(batch_size, seq_len, vocab_size)

        # 推理结果后处理
        predictions = convert_logits_to_ids(logits, tokenized_output['mask_positions']).cpu().numpy().tolist()  # (batch, label_num)
        predictions = verbalizer.batch_find_main_label(predictions)		# 找到子label属于的主label
        predictions = [ele['label'] for ele in predictions]
        used = time.time() - start_time
        print(f'Used {used}s.')
        return predictions


if __name__ == '__main__':
    # 测试数据
    contents = [
        '天台很好看,躺在躺椅上很悠闲,因为活动所以我觉得性价比还不错,适合一家出行,特别是去迪士尼也蛮近的,下次有机会肯定还会再来的,值得推荐',
        '环境,设施,很棒,周边配套设施齐全,前台小姐姐超级漂亮!酒店很赞,早餐不错,服务态度很好,前台美眉很漂亮。性价比超高的一家酒店。强烈推荐',
        "物流超快,隔天就到了,还没用,屯着出游的时候用的,听方便的,占地小",
        "福行市来到无早集市,因为是喜欢的面包店,所以跑来集市看看。第一眼就看到了,之前在微店买了小刘,这次买了老刘,还有一直喜欢的巧克力磅蛋糕。好奇老板为啥不做柠檬磅蛋糕了,微店一直都是买不到的状态。因为不爱碱水硬欧之类的,所以期待老板多来点其他小点,饼干一直也是大爱,那天好像也没看到",
        "服务很用心,房型也很舒服,小朋友很喜欢,下次去嘉定还会再选择。床铺柔软舒适,晚上休息很安逸,隔音效果不错赞,下次还会来"
    ]
    print("针对下面的文本评论,请分别给出对应所属类别:")

    # 推理
    res = inference(contents)
    #print('inference label(s):', res)

    # 将测试数据与推理结果拼接成字典
    new_dict = {}
    for i in range(len(contents)):
        new_dict[contents[i]] = res[i]
    print(new_dict)
    

输出

针对下面的文本评论,请分别给出对应所属类别:
Used 0.44545769691467285s.
{
    '天台很好看,躺在躺椅上很悠闲,因为活动所以我觉得性价比还不错,适合一家出行
,特别是去迪士尼也蛮近的,下次有机会肯定还会再来的,值得推荐': '酒店',
    '环境,设施,很棒,周边配套设施齐全,前台小姐姐超级漂亮!酒店很赞,早餐不错
,服务态度很好,前台美眉很漂亮。性价比超高的一家酒店。强烈推荐': '酒店',
    '物流超快,隔天就到了,还没用,屯着出游的时候用的,听方便的,占地小': 
'衣服',
    '福行市来到无早集市,因为是喜欢的面包店,所以跑来集市看看。第一眼就看到了,
之前在微店买了小刘,这次买了老刘,还有一直喜欢的巧克力磅蛋糕。好奇老板为啥不做
柠檬磅蛋糕了,微店一直都是买不到的状态。因为不爱碱水硬欧之类的,所以期待老板多
来点其他小点,饼干一直也是大爱,那天好像也没看到': '衣服',
    '服务很用心,房型也很舒服,小朋友很喜欢,下次去嘉定还会再选择。床铺柔软舒适
,晚上休息很安逸,隔音效果不错赞,下次还会来': '酒店'
}

代码中的模型路径,是在训练之后得到的最优模型,当然,这份代码是训练完之后讲解的,因此有这个。
在推理结果后处理的部分,使用了convert_logits_to_ids函数,以及标签映射器的batch_find_main_label,我们来逐个介绍。

3.3 convert_logits_to_ids函数

这个函数的代码在 PET/utils/common_utils.py 中,它将mask位置的预测结果转化为词表中的token_id,代码如下:

import torch
def convert_logits_to_ids(logits: torch.tensor, mask_positions: torch.tensor):
    """
    输入Language Model的词表概率分布(LMModel的logits,即模型的输出),将mask_position位置的
    token logits转换为token的id。

    Args:
        logits (torch.tensor): model output -> (batch, seq_len, vocab_size)
        mask_positions (torch.tensor): mask token的位置 -> (batch, max_label_len)

    Returns:
        torch.LongTensor: 对应mask position上最大概率的推理token -> (batch, max_label_len)
    """
    label_length = mask_positions.size()[1]  # 标签长度
    batch_size, seq_len, vocab_size = logits.size()

    # 输出的逻辑值打平,方便后续从中获取最大概率的token id
    logits = logits.reshape(batch_size * seq_len, -1)  # (batch_size * seq_len, vocab_size)

    # 将mask_positions转换为逻辑值打平后的索引
    mask_positions_after_reshaped = []
    for batch, mask_pos in enumerate(mask_positions.detach().cpu().numpy().tolist()):
        for pos in mask_pos:
            mask_positions_after_reshaped.append(batch * seq_len + pos)

    # 找到mask位置的预测概率分布
    mask_logits = logits[mask_positions_after_reshaped]  # (batch_size * label_num, vocab_size)

    # 找到每个mask位置的最大概率所在索引(字典中的索引),即预测的token_id
    predict_tokens = mask_logits.argmax(dim=-1)         # (batch_size * max_label_len)

    # 将获得的token_id转化成batch的形式
    predict_tokens = predict_tokens.reshape(-1, label_length)  # (batch_size, max_label_len)

    return predict_tokens


if __name__ == '__main__':
    logits = torch.randn(2, 20, 21193)
    mask_positions = torch.LongTensor([
        [5, 6],
        [5, 6],
    ])
    predict_tokens = convert_logits_to_ids(logits, mask_positions)
    print(predict_tokens)

输出

tensor([[20063, 20244],
        [ 6725,  7702]])

当然,PET/utils/common_utils.py中还有另一个计算损失的函数,这个我们后面会讲。

3.4 标签映射类(第一部分)

推理的时候,创建了一个标签映射器,因为推理推出来的很可能是子标签,这里需要将其转化为主标签。
该类的代码在PET/utils/verbalizer.py中,这个类的方法太多,代码太长,我这里只讲推理中用到的,剩下的需要到训练的时候再介绍

# -*- coding:utf-8 -*-
from typing import Union, List  # Union 是 typing 模块中定义的一个类,用于表示多个类型中的任意一种类型


class Verbalizer(object):
    def __init__(self, verbalizer_file: str, tokenizer, max_label_len: int):
        """
        Args:
            verbalizer_file (str): verbalizer文件存放地址。
            tokenizer: 用于文本和id之间的转换。
            max_label_len (int): 标签长度,若大于则截断,若小于则补齐
        """
        self.tokenizer = tokenizer
        self.label_dict = self.load_label_dict(verbalizer_file)
        self.max_label_len = max_label_len

    def load_label_dict(self, verbalizer_file: str):
        """
        读取本地文件,构建verbalizer字典(标签映射字典)。

        Args:
            verbalizer_file (str): verbalizer文件存放地址。

        Returns:
            dict -> {
                '体育': ['篮球', '足球','网球', '排球',  ...],
                '酒店': ['宾馆', '旅馆', '旅店', '酒店', ...],
                ...
            }
        """
        label_dict = {}
        with open(verbalizer_file, 'r', encoding='utf8') as f:
            # 逐行读取
            for line in f.readlines():
                label, sub_labels = line.strip().split('\t')
                label_dict[label] = list(set(sub_labels.split(',')))    # 这里使用集合来去重
        return label_dict

    def find_main_label(self, sub_label: List[Union[list, str]], hard_mapping=True):
        """
        通过子标签找到父标签。

        Args:
            sub_label (List[Union[list, str]]): 子标签, 文本型 或 id_list, e.g. -> '苹果' or [5741, 3362]
            hard_mapping (bool): 当生成的词语不在子标签中,是否一定要匹配到一个最相似的label。

        Returns:
            dict -> {
                'label': '水果', 
                'token_ids': [3717, 3362]
            }
        """
        # 如果传入为id_list, 则通过tokenizer转回来
        if type(sub_label) == list:
            # 移除[PAD]token(如果有)
            pad_token_id = self.tokenizer.pad_token_id
            while pad_token_id in sub_label:
                sub_label.remove(pad_token_id)

            # 将id_list转换为字符串
            sub_label = ''.join(self.tokenizer.convert_ids_to_tokens(sub_label))

        # 遍历标签映射字典,看标签是否在映射字典的子标签中
        main_label = '无'
        for label, s_labels in self.label_dict.items():
            if sub_label in s_labels:
                main_label = label
                break

        # 如果标签不在映射字典的子标签中,那么main_label将是'无‘
        # 此时若hard_mapping为True,则通过最大公共子串匹配最相似的label
        if main_label == '无' and hard_mapping:
            main_label = self.hard_mapping(sub_label)

        # 构建返回结果样式
        ret = {
            'label': main_label,
            'token_ids': self.tokenizer(main_label)['input_ids'][1:-1]  # 通过分词器转为id_list,并去掉[CLS]和[SEP]
        }
        return ret

    def batch_find_main_label(self, sub_label: List[Union[list, str]], hard_mapping=True):
        """
        批量通过子标签找父标签。
        """
        return [self.find_main_label(l, hard_mapping) for l in sub_label]

inference.py中调用的是batch_find_main_label函数,而batch_find_main_label则是通过列表生成式调用find_main_label函数,通过代码中的注释,能比较好地理解这个函数在坐什么。

对于衣物类的评论,有可能模型生成的词汇是“裤子”,而它不在“衣服”的子标签中,这个时候需要使用强匹配策略。

find_main_label函数中,如果大模型生成的词汇(预测的子标签),不在映射字典的子标签中,则调用hard_mapping函数,该函数是通过寻找与生成的词汇有最大公共子串的子标签(映射字典中的子标签)来确定主标签,因此调用了寻找最大公共子串的函数。(当然,如果是我的话,我会使用词嵌入的方式寻找最接近的子标签,进而推导出主标签,因为找公共子串不太靠谱)

代码如下:

    def hard_mapping(self, sub_label: str):
        """
        强匹配函数,当模型生成的子label不存在时,通过最大公共子串找到重合度最高的主label。

        Args:
            sub_label (str): 子label。

        Returns:
            str: 主label。
        """
        label, max_overlap_str = '', 0

        # 遍历映射字典,寻找重合度最高的主label
        for main_label, sub_labels in self.label_dict.items():
            overlap_num = 0
            # 求所有子label与当前推理label之间的公共子串长度之和
            for s_label in sub_labels:
                overlap_num += self.get_common_sub_str(sub_label, s_label)[1]
            if overlap_num >= max_overlap_str:
                max_overlap_str = overlap_num
                label = main_label
        return label

	def get_common_sub_str(self, str1: str, str2: str):
        """
        寻找最大公共子串。
            str1:abcd
            str2:abadbcdba

        返回公共字串和最大匹配长度
        """
        lstr1, lstr2 = len(str1), len(str2)
        # 生成0矩阵,为方便后续计算,比字符串长度多了一列
        record = [[0 for i in range(lstr2 + 1)] for j in range(lstr1 + 1)]
        p = 0  # 最长匹配对应在str1中的最后一位
        maxNum = 0  # 最长匹配长度

        for i in range(lstr1):
            for j in range(lstr2):
                if str1[i] == str2[j]:
                    record[i+1][j+1] = record[i][j] + 1
                    if record[i+1][j+1] > maxNum:
                        maxNum = record[i+1][j+1]
                        p = i + 1

        return str1[p-maxNum:p], maxNum

再写段测试代码看看功能:

if __name__ == '__main__':
    from rich import print
    from transformers import AutoTokenizer

    tokenizer = AutoTokenizer.from_pretrained('../../预训练模型/bert-base-chinese')
    verbalizer = Verbalizer(
        verbalizer_file='../data/verbalizer.txt',
        tokenizer=tokenizer,
        max_label_len=2
    )

    sub_label = ['西瓜', '裤子', '牛奶']
    ret = verbalizer.batch_find_main_label(sub_label, hard_mapping=True)
    print(ret)

输出

[
    {'label': '水果', 'token_ids': [3717, 3362]},
    {'label': '衣服', 'token_ids': [6132, 3302]},
    {'label': '蒙牛', 'token_ids': [5885, 4281]}
]

4 模型训练(微调)

4.1 标签映射类(第二部分)

模型在训练时,是以子标签作为目标值,但我们拿到的样本数据,对应的标签却是主标签,所以在计算损失之前,需要根据主标签获取子标签。
前面我们介绍标签映射类的时候,只讲了由子标签找主标签,其实还有一部分代码,即由主标签找子标签,代码如下:

    def find_sub_labels(self, label: Union[list, str]):
        """
        通过标签找到所有的子标签。

        Args:
            label (Union[list, str]): 主标签, 文本型 或 id_list, e.g. -> '体育' or [860, 5509]
        
        Returns:
            dict -> {
                'sub_labels': ['足球', '网球'],
                'token_ids': [[6639, 4413], [5381, 4413]]
            }
        """
        # 如果传入为id_list, 则通过tokenizer转回来
        if type(label) == list:
            # 移除[PAD]token(如果有)
            while self.tokenizer.pad_token_id in label:
                label.remove(self.tokenizer.pad_token_id)

            # 将id_list转换为字符串
            label = ''.join(self.tokenizer.convert_ids_to_tokens(label))

        # 如果主标签不在标签映射字典中,则报错
        if label not in self.label_dict:
            raise ValueError(f'Lable Error: "{label}" not in label_dict {list(self.label_dict)}.')

        # 能执行到这里,说明没有报错,则直接从标签映射字典中获取子标签
        sub_labels = self.label_dict[label]


        # 将每个子标签转化为token id,并去掉[CLS]和[SEP]
        token_ids = [_id[1:-1] for _id in self.tokenizer(sub_labels)['input_ids']]

        # 遍历每个子标签,并进行截断与补齐
        for i in range(len(token_ids)):
            # 截断
            token_ids[i] = token_ids[i][:self.max_label_len]
            # 补齐
            if len(token_ids[i]) < self.max_label_len:
                token_ids[i] = (token_ids[i] + [self.tokenizer.pad_token_id] *
                                (self.max_label_len - len(token_ids[i])))

        # 返回子标签和token id
        ret = {'sub_labels': sub_labels, 'token_ids': token_ids}
        return ret
    
    def batch_find_sub_labels(self, label: List[Union[list, str]]):
        """
        批量找到子标签。
        """
        return [self.find_sub_labels(l) for l in label]

看懂了标签映射类(第一部分),这里的两个函数也很容易看懂。

4.2 损失函数

计算损失的函数在PET/utils/common_utils.py中,这里需要注意的一个点是,假如某个样本的主标签为衣服,则它的主标签有三个,分别为衣服、裙子、西装,那么计算损失的时候,是计算预测值与这三个子标签的平均损失。

代码如下:

def mlm_loss(logits, mask_positions, sub_mask_labels, cross_entropy_criterion, device):
    """
    计算指定位置的mask token的output与label之间的cross entropy loss。

    Args:
        logits (torch.tensor): 模型原始输出 -> (batch, seq_len, vocab_size)
        mask_positions (torch.tensor): mask token的位置  -> (batch, mask_label_num)
        sub_mask_labels (list): mask token的sub label, 子标签已转为token id,列表中的元素个数为 batch
                                    e.g. -> [
                                        [[2398, 3352]],
                                        [[2398, 3352], [3819, 3861]]
                                    ]
        cross_entropy_criterion (CrossEntropyLoss): CE Loss计算器
        device (str): cpu还是gpu

    Returns:
        torch.tensor: CE Loss
    """
    batch_size, seq_len, vocab_size = logits.size()
    loss = None
    for single_value in zip(logits, sub_mask_labels, mask_positions):
        # 获取单个样本的相关信息
        single_logits = single_value[0]             # (seq_len, vocab_size)
        single_sub_mask_labels = single_value[1]    # (mask_label_num,)
        single_mask_positions = single_value[2]     # 列表,因为主标签对应的子标签有可能有多个

        # 从模型输出中获取mask位置的logits
        single_mask_logits = single_logits[single_mask_positions]  # (mask_label_num, vocab_size)

        # 广播重复,使第一个维度与子标签的数量相同,方便后续计算损失
        single_mask_logits = single_mask_logits.repeat(len(single_sub_mask_labels), 1, 1)  # (sub_label_num, mask_label_num, vocab_size)

        # 修改维度
        single_mask_logits = single_mask_logits.reshape(-1, vocab_size)  # (sub_label_num * mask_label_num, vocab_size)

        # 将子标签转换为torch张量,并迁移设备
        single_sub_mask_labels = torch.LongTensor(single_sub_mask_labels).to(device)  # (sub_label_num, mask_label_num)

        # 调整标签维度
        single_sub_mask_labels = single_sub_mask_labels.reshape(-1, 1).squeeze()  # (sub_label_num * mask_label_num)

        # 计算损失
        cur_loss = cross_entropy_criterion(single_mask_logits, single_sub_mask_labels)

        # 除以标签数量,得到相对于每个子标签的损失
        cur_loss = cur_loss / len(single_sub_mask_labels)

        # 累加损失
        if not loss:
            loss = cur_loss
        else:
            loss += cur_loss

    # 计算每个样本的平均损失
    loss = loss / batch_size  # (1,)
    return loss

4.3 模型的评估指标类

计算模型评估指标的代码在PET/utils/metirc_utils.py中,这里写了一个名为ClassEvaluator的类,该类用来计算模型的各个评估指标,代码如下:

# coding='utf-8'
import numpy as np
from typing import List
from sklearn.metrics import accuracy_score, precision_score, f1_score, recall_score, confusion_matrix


class ClassEvaluator(object):
    def __init__(self):
        self.goldens = []       # 真实标签(主标签)
        self.predictions = []   # 预测标签(主标签)

    def add_batch(self, pred_batch: List[List], gold_batch: List[List]):
        """
        添加一个batch中的prediction和gold列表,用于后续统一计算
        这里的 pred_batch 和 gold_batch 都是子标签

        Args:
            pred_batch (list): 模型预测标签列表, e.g. ->  [['体', '育'], ['财', '经'], ...]
            gold_batch (list): 真实标签标签列表, e.g. ->  [['体', '育'], ['财', '经'], ...]
        """
        assert len(pred_batch) == len(gold_batch)

        # 批量拼接
        # [['体', '育'], ['财', '经'], ...] -> ['体育', '财经', ...]
        if type(gold_batch[0]) in [list, tuple]:
            # 将所有的label拼接为一个整label: ['体', '育'] -> '体育'
            pred_batch = [''.join([str(e) for e in ele]) for ele in pred_batch]
            # ele是单个样本,例如 ['体', '育'], ''.join([str(e) for e in ele])则是批量拼接
            gold_batch = [''.join([str(e) for e in ele]) for ele in gold_batch]

        # 扩展真实值与预测值的列表
        self.goldens.extend(gold_batch)
        self.predictions.extend(pred_batch)

    def compute(self, round_num=2) -> dict:
        """
        根据当前类中累积的变量值,计算当前的P, R, F1。

        Args:
            round_num (int): 计算结果保留小数点后几位, 默认小数点后2位。

        Returns:
            dict -> {
                'accuracy': 准确率,
                'precision': 精准率,
                'recall': 召回率,
                'f1': f1值,
                'class_metrics': {
                    '0': {
                            'precision': 该类别下的precision,
                            'recall': 该类别下的recall,
                            'f1': 该类别下的f1
                        },
                    ...
                }
            }
        """
        classes, class_metrics, res = sorted(list(set(self.goldens) | set(self.predictions))), {}, {}
        # 先将self.goldens和self.predictions两个列表转换为集合(set),然后对它们求并集,随后转列表并排序,得到classes
        # classes是由字符串构成的列表,其中包含所有真实值与预测值

        # 求总体的精度,准确率,召回率,F1
        res['accuracy'] = round(accuracy_score(self.goldens, self.predictions), round_num)
        res['precision'] = round(precision_score(self.goldens, self.predictions, average='weighted'), round_num)
        res['recall'] = round(recall_score(self.goldens, self.predictions, average='weighted'), round_num)
        res['f1'] = round(f1_score(self.goldens, self.predictions, average='weighted'), round_num)

        try:
            # 求混淆矩阵,并计算每个类别的指标
            conf_matrix = np.array(confusion_matrix(self.goldens, self.predictions))  # (n_class, n_class)

            assert conf_matrix.shape[0] == len(classes)

            # 构建每个类别的评价指标
            for i in range(conf_matrix.shape[0]):
                precision = 0 if sum(conf_matrix[:, i]) == 0 else conf_matrix[i, i] / sum(conf_matrix[:, i])
                recall = 0 if sum(conf_matrix[i, :]) == 0 else conf_matrix[i, i] / sum(conf_matrix[i, :])
                f1 = 0 if (precision + recall) == 0 else 2 * precision * recall / (precision + recall)
                class_metrics[classes[i]] = {
                    'precision': round(precision, round_num),
                    'recall': round(recall, round_num),
                    'f1': round(f1, round_num)
                }

            # 将每个类别的评价指标加入到返回结果中,这里class_metrics是包含各个类别的指标的字典
            res['class_metrics'] = class_metrics

        except Exception as e:
            print(f'[Warning] Something wrong when calculate class_metrics: {e}')
            print(f'-> goldens: {set(self.goldens)}')
            print(f'-> predictions: {set(self.predictions)}')
            print(f'-> diff elements: {set(self.predictions) - set(self.goldens)}')
            res['class_metrics'] = {}

        return res

    def reset(self):
        """
        重置积累的数值,即清空真实值与预测值的列表
        """
        self.goldens = []
        self.predictions = []


if __name__ == '__main__':
    from rich import print

    metric = ClassEvaluator()
    metric.add_batch(
        [['财', '经'], ['财', '经'], ['体', '育'], ['体', '育'], ['计', '算', '机']],
        [['体', '育'], ['财', '经'], ['体', '育'], ['计', '算', '机'], ['计', '算', '机']],
    )
    print(metric.compute())

输出:

{
    'accuracy': 0.6,
    'precision': 0.7,
    'recall': 0.6,
    'f1': 0.6,
    'class_metrics': {
        '体育': {'precision': 0.5, 'recall': 0.5, 'f1': 0.5},
        '计算机': {'precision': 1.0, 'recall': 0.5, 'f1': 0.67},
        '财经': {'precision': 0.5, 'recall': 1.0, 'f1': 0.67}
    }
}

这个类的用法:创建ClassEvaluator对象后,模型每预测一个batch的样本,就把真实标签(主标签)与预测值(同样是主标签)通过add_batch方法加入进去,当所有用来评估的样本计算完之后,再调用compute方法计算。

4.4 模型微调

模型的微调代码在PET/train.py中,这里有两个函数,我们先讲model2train:

import os
import time
from transformers import AutoModelForMaskedLM, AutoTokenizer, get_scheduler

import sys
sys.path.append('data_handle')

from utils.metirc_utils import ClassEvaluator
from utils.common_utils import *
from data_handle.data_loader import *
from utils.verbalizer import Verbalizer

from pet_config import *
pc = ProjectConfig()


def model2train():
    # 创建分词器、训练模型、标签映射器
    tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)
    model = AutoModelForMaskedLM.from_pretrained(pc.pre_model)
    verbalizer = Verbalizer(verbalizer_file=pc.verbalizer,
                            tokenizer=tokenizer,
                            max_label_len=pc.max_label_len)

    # 指定不需要权重衰减(L2正则化)的参数,通过指定参数名所包含的字符串来进行
    no_decay = ["bias", "LayerNorm.weight"] # 如果参数名包含"bias"或者"LayerNorm.weight",则对应参数不进行L2正则化
    optimizer_grouped_parameters = [
        {
            "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
            "weight_decay": pc.weight_decay,
        },
        {
            "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
            "weight_decay": 0.0,
        },
    ]
    # [nd in n for nd in no_decay],遍历no_decay,若nd在n中则为True,否则False,最后得到一个由布尔值构成的列表
    # any(List),判断列表中是否有True,如果有则返回True,否则返回False
    # 如果[nd in n for nd in no_decay]为[False, False],则说明参数的名字中,既不包含bias和LayerNorm.weight
    # 若参数的名字中,既不包含bias和LayerNorm.weight,则any(...)为False,相关参数需要进行权重衰减

    # 创建优化器
    optimizer = torch.optim.AdamW(optimizer_grouped_parameters, lr=pc.learning_rate)
    model.to(pc.device)

    # 创建训练集和验证集的数据导入器
    train_dataloader, dev_dataloader = get_data(pc.prompt_file, pc.train_path, pc.dev_path,
                                                pc.max_seq_len, pc.max_label_len,
                                                pc.batch_size, tokenizer)

    # 根据训练轮数计算最大训练步数,以便于scheduler动态调整lr
    num_update_steps_per_epoch = len(train_dataloader)

    # 计算总的训练步数与预热阶段的训练步数,确保学习率在整个训练过程中得以合理地调节
    max_train_steps = pc.epochs * num_update_steps_per_epoch
    warm_steps = int(pc.warmup_ratio * max_train_steps)

    # 创建学习率调度器
    lr_scheduler = get_scheduler(
        name='linear',
        optimizer=optimizer,
        num_warmup_steps=warm_steps,
        num_training_steps=max_train_steps,
    )

    # 创建评估指标计算器与损失函数计算器
    metric = ClassEvaluator()
    criterion = torch.nn.CrossEntropyLoss()

    # 训练
    loss_list = []
    global_step, best_f1 = 0, 0
    print('开始训练:')
    tic_train = time.time()
    for epoch in range(pc.epochs):
        for batch in train_dataloader:
            # 模型预测
            logits = model(input_ids=batch['input_ids'].to(pc.device),
                           token_type_ids=batch['token_type_ids'].to(pc.device),
                           attention_mask=batch['attention_mask'].to(pc.device)).logits

            # 真实标签(主标签)
            mask_labels = batch['mask_labels'].numpy().tolist()

            # 根据主标签找子标签
            sub_labels = verbalizer.batch_find_sub_labels(mask_labels)

            # 获取子标签的id
            sub_labels = [ele['token_ids'] for ele in sub_labels]

            # 计算损失
            loss = mlm_loss(logits, batch['mask_positions'].to(pc.device), sub_labels,
                            criterion, pc.device)

            # 梯度清零、反向传播、更新模型参数、更新学习率
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            lr_scheduler.step()

            # 记录损失
            loss_list.append(float(loss.cpu().detach()))

            # 全局步数+1
            global_step += 1

            # 打印信息
            if global_step % pc.logging_steps == 0:
                time_diff = time.time() - tic_train
                loss_avg = sum(loss_list) / len(loss_list)
                print("global step %d, epoch: %d, loss: %.5f, speed: %.2f step/s"
                      % (global_step, epoch, loss_avg, pc.logging_steps / time_diff))
                tic_train = time.time()

            # 保存与评估模型
            if global_step % pc.valid_steps == 0:
                cur_save_dir = os.path.join(pc.save_dir, "model_%d" % global_step)
                if not os.path.exists(cur_save_dir):
                    os.makedirs(cur_save_dir)

                # 保存模型
                model.save_pretrained(os.path.join(cur_save_dir))
                tokenizer.save_pretrained(os.path.join(cur_save_dir))

                # 评估模型
                acc, precision, recall, f1, class_metrics = evaluate_model(model,
                                                                           metric,
                                                                           dev_dataloader,
                                                                           tokenizer,
                                                                           verbalizer)
                print("Evaluation precision: %.5f, recall: %.5f, F1: %.5f" % (precision, recall, f1))

                # 保存最好的模型
                if f1 > best_f1:
                    print(f"best F1 performence has been updated: {best_f1:.5f} --> {f1:.5f}")
                    print(f'Each Class Metrics are: {class_metrics}')
                    best_f1 = f1
                    cur_save_dir = os.path.join(pc.save_dir, "model_best")
                    if not os.path.exists(cur_save_dir):
                        os.makedirs(cur_save_dir)
                    model.save_pretrained(os.path.join(cur_save_dir))
                    tokenizer.save_pretrained(os.path.join(cur_save_dir))
                tic_train = time.time()
    print('训练结束')

在评估模型时,调用了evaluate_model函数,该函数正是PET/train.py脚本中的另一个函数:

def evaluate_model(model, metric, data_loader, tokenizer, verbalizer):
    """
    在测试集上评估当前模型的训练效果。

    Args:
        model: 当前模型
        metric: 评估指标类(metric)
        data_loader: 测试集的dataloader
        global_step: 当前训练步数
    """
    model.eval()
    metric.reset()

    with torch.no_grad():
        for step, batch in enumerate(data_loader):
            # 模型预测
            logits = model(input_ids=batch['input_ids'].to(pc.device),
                           token_type_ids=batch['token_type_ids'].to(pc.device),
                           attention_mask=batch['attention_mask'].to(pc.device)).logits

            # 真实标签(主标签)
            mask_labels = batch['mask_labels'].numpy().tolist()  # (batch, label_num)

            # 去掉label中的[PAD]
            for i in range(len(mask_labels)):
                while tokenizer.pad_token_id in mask_labels[i]:
                    mask_labels[i].remove(tokenizer.pad_token_id)

            # token id转文字
            mask_labels = [''.join(tokenizer.convert_ids_to_tokens(t)) for t in mask_labels]

            # 将mask_position位置的token logits转换为token的id
            predictions = convert_logits_to_ids(logits, batch['mask_positions'])  # (batch, label_num)
            predictions = predictions.cpu().numpy().tolist()

            # 找到子label属于的主label
            predictions = verbalizer.batch_find_main_label(predictions)
            predictions = [ele['label'] for ele in predictions]     # 获取字符串

            # 将预测值与真实值加入到评估指标计算器中(暂不计算)
            metric.add_batch(pred_batch=predictions, gold_batch=mask_labels)

    # 计算评估指标
    eval_metric = metric.compute()

    # 模型切换回训练模式
    model.train()

    # 返回评估指标
    return eval_metric['accuracy'], eval_metric['precision'], \
           eval_metric['recall'], eval_metric['f1'], \
           eval_metric['class_metrics']

我们再在这个脚本的最下面写一段测试代码:


if __name__ == '__main__':
    model2train()

输出:

Some weights of the model checkpoint at ../预训练模型/bert-base-chinese were not used when initializing BertForMaskedLM: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Map: 100%|██████████| 63/63 [00:00<00:00, 941.55 examples/s]
Map: 100%|██████████| 590/590 [00:00<00:00, 1885.03 examples/s]
开始训练:
global step 10, epoch: 1, loss: 0.77675, speed: 0.07 step/s
训练结束

由于epoch配置的是2,所以训练两轮就结束了,实际训练时,需要把epoch改大。

好的,至此,该项目的代码讲解完毕。


http://www.kler.cn/a/576584.html

相关文章:

  • Java基础关键_018_集合(二)
  • 数据结构——单调栈
  • 腾讯云大模型知识引擎驱动DeepSeek满血版能源革命大模型:架构、优势与产业变革
  • 文档进行embedding,Faiss向量检索
  • Facebook 与文化多样性:社交平台中的语言与文化差异
  • 基于Spring Boot的校园失物招领系统的设计与实现(LW+源码+讲解)
  • 一站式3D虚拟展厅搭建方案,让企业展示更高效
  • 无人机灯光原理与应用解析!
  • 深入解析动态住宅IP
  • 六十天前端强化训练之第十二天之闭包深度解析
  • Docker安装milvus及其基本使用说明
  • Manus如何应对数据安全与合规风险?
  • 新版全开源短剧平台上百案例,带支付收益模式,支持媒资管理/广告回传
  • docker oracle11
  • k8s1.30 监控并限制节点使用资源(kubelet+metrics-server)
  • 深入解析网络协议:从OSI七层模型到HTTP与TCP/IP的关系
  • 使用PHP实现异步编程:挑战与解决方案
  • DeepSeek-R1:使用KTransformers实现高效部署指南
  • 面试java做了一道逻辑题,人麻了
  • 零售交易流程相关知识(top-down拆解)