当前位置: 首页 > article >正文

Sentence-BERT实现文本匹配【分类目标函数】

引言

从这篇文章开始进入文本系列的BERT预训练模型时代,本文实战Sentence-BERT提出的如何训练嵌入模型的方法。实现类似Huggingface的风格,不同方法的实现之间可能会有一些重复,但每个方法是独立的,降低了复杂性。

架构

image-20210922235941425

我们首先来看的是这种孪生网络结构。如上图所示,句子A和句子B经过同一个预训练好的BERT模型,然后经过池化操作得到定长向量 u \pmb u u v \pmb v v,最后对这两个向量进行一些处理和拼接喂给一个分类层得到最终的输出——对应类别的logits。

可以用于有相似标签(1)和不相似标签(0)的数据集,判断给定的语句对是相似还是不相似的,作为一个分类任务。但在推理阶段,拿到这两个语句的句嵌入后,不需要真的经过Softmax分类器,而是直接计算句嵌入之间的余弦相似度即可。

实现

实现采用类似Huggingface的形式,每个文件夹下面有一种模型。分为modelingargumentstrainer等不同的文件。不同的架构放置在不同的文件夹内。

modeling.py:

from dataclasses import dataclass

import torch
from torch import Tensor, nn

from transformers.file_utils import ModelOutput

from transformers import (
    AutoModel,
    AutoTokenizer,
)

import numpy as np
from tqdm.autonotebook import trange
from typing import Optional

# 定义输出类,继承ModelOutput
@dataclass
class BiOutput(ModelOutput):
    loss: Optional[Tensor] = None
    scores: Optional[Tensor] = None

# 基于分类目标函数训练的Sentence Bert模型
class SentenceBert(nn.Module):
    def __init__(
        self,
        model_name: str,
        trust_remote_code: bool = True,
        max_length: int = None,
        num_classes: int = 2,
        pooling_mode: str = "mean",
        normalize_embeddings: bool = False,
    ) -> None:
        super().__init__()
        self.model_name = model_name
        self.normalize_embeddings = normalize_embeddings

        self.device = "cuda" if torch.cuda.is_available() else "cpu"

        self.tokenizer = AutoTokenizer.from_pretrained(
            model_name, trust_remote_code=trust_remote_code
        )
        self.model = AutoModel.from_pretrained(
            model_name, trust_remote_code=trust_remote_code
        ).to(self.device)

        self.max_length = max_length
        self.pooling_mode = pooling_mode

        self.loss_fct = nn.CrossEntropyLoss(reduction="mean")
        # (u, v, |u - v|)
        self.classifier = nn.Linear(self.model.config.hidden_size * 3, num_classes)

    def sentence_embedding(self, last_hidden_state, attention_mask):
        # 支持平均池化和cls token
        if self.pooling_mode == "mean":
            attention_mask = attention_mask.unsqueeze(-1).float()
            return torch.sum(last_hidden_state * attention_mask, dim=1) / torch.clamp(
                attention_mask.sum(1), min=1e-9
            )
        else:
            # cls
            return last_hidden_state[:, 0]
	# 将原始文本编码成句向量,支持批处理防止OOM
    def encode(
        self,
        sentences: str | list[str],
        batch_size: int = 64,
        convert_to_tensor: bool = True,
        show_progress_bar: bool = False,
    ):
        if isinstance(sentences, str):
            sentences = [sentences]

        all_embeddings = []

        for start_index in trange(
            0, len(sentences), batch_size, desc="Batches", disable=not show_progress_bar
        ):
            batch = sentences[start_index : start_index + batch_size]

            features = self.tokenizer(
                batch,
                padding=True,
                truncation=True,
                return_tensors="pt",
                return_attention_mask=True,
                max_length=self.max_length,
            ).to(self.device)

            out_features = self.model(**features, return_dict=True)
            embeddings = self.sentence_embedding(
                out_features.last_hidden_state, features["attention_mask"]
            )
            if not self.training:
                # 非训练模式下可以脱离计算图,节省显存
                embeddings = embeddings.detach()

            if self.normalize_embeddings:
                embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)

            if not convert_to_tensor:
                embeddings = embeddings.cpu()

            all_embeddings.extend(embeddings)
		# 支持将句向量转换为Tensor或Numpy
        if convert_to_tensor:
            all_embeddings = torch.stack(all_embeddings)
        else:
            all_embeddings = np.asarray([emb.numpy() for emb in all_embeddings])

        return all_embeddings

    def concat_embedding(self, source_embeddings, target_embeddings):
        # (u, v, |u - v|)的拼接
        embeddings = torch.cat(
            [
                source_embeddings,
                target_embeddings,
                torch.abs(source_embeddings - target_embeddings),
            ],
            dim=-1,
        )
        return self.classifier(embeddings)

    def compute_loss(self, source_embed, target_embed, labels):
        logits = self.concat_embedding(source_embed, target_embed)
        labels = torch.LongTensor(labels).to(self.device)
        # 计算损失
        return self.loss_fct(logits, labels)

    def forward(self, source, target, labels) -> BiOutput:
        """
        forward仅是用于训练,推理时直接使用encode即可
        Args:
            source :
            target :
        """
        source_embed = self.encode(source)
        target_embed = self.encode(target)

        scores = torch.cosine_similarity(source_embed, target_embed)

        loss = self.compute_loss(source_embed, target_embed, labels)
        return BiOutput(loss, scores)

    def save_pretrained(self, output_dir: str):
        # 修改保存模型参数的方法
        state_dict = self.model.state_dict()
        state_dict = type(state_dict)(
            {k: v.clone().cpu().contiguous() for k, v in state_dict.items()}
        )
        self.model.save_pretrained(output_dir, state_dict=state_dict)

整个模型的实现放到modeling.py文件中。

arguments.py:

from dataclasses import dataclass, field
from typing import Optional

import os


@dataclass
class ModelArguments:
    model_name_or_path: str = field(
        metadata={
            "help": "Path to pretrained model"
        }
    )
    config_name: Optional[str] = field(
        default=None,
        metadata={
            "help": "Pretrained config name or path if not the same as model_name"
        },
    )
    tokenizer_name: Optional[str] = field(
        default=None,
        metadata={
            "help": "Pretrained tokenizer name or path if not the same as model_name"
        },
    )


@dataclass
class DataArguments:
    train_data_path: str = field(
        default=None, metadata={"help": "Path to train corpus"}
    )
    eval_data_path: str = field(default=None, metadata={"help": "Path to eval corpus"})
    max_length: int = field(
        default=512,
        metadata={
            "help": "The maximum total input sequence length after tokenization for input text."
        },
    )

    def __post_init__(self):
        if not os.path.exists(self.train_data_path):
            raise FileNotFoundError(
                f"cannot find file: {self.train_data_path}, please set a true path"
            )
        
        if not os.path.exists(self.eval_data_path):
            raise FileNotFoundError(
                f"cannot find file: {self.eval_data_path}, please set a true path"
            )

定义了模型和数据相关参数。

dataset.py:

from torch.utils.data import Dataset
from datasets import Dataset as dt
import pandas as pd

from utils import build_dataframe_from_csv


class PairDataset(Dataset):
    def __init__(self, data_path: str) -> None:

        df = build_dataframe_from_csv(data_path)
        self.dataset = dt.from_pandas(df, split="train")

        self.total_len = len(self.dataset)

    def __len__(self):
        return self.total_len

    def __getitem__(self, index) -> dict[str, str]:
        query1 = self.dataset[index]["query1"]
        query2 = self.dataset[index]["query2"]
        label = self.dataset[index]["label"]
        return {"query1": query1, "query2": query2, "label": label}


class PairCollator:
    def __call__(self, features) -> dict[str, list[str]]:
        queries1 = []
        queries2 = []
        labels = []

        for feature in features:
            queries1.append(feature["query1"])
            queries2.append(feature["query2"])
            labels.append(feature["label"])

        return {"source": queries1, "target": queries2, "labels": labels}

数据集类考虑了LCQMC数据集的格式,即成对的语句和一个数值标签。类似:

Hello.	Hi.	1
Nice to see you.	Nice	0

trainer.py:

import torch
from transformers.trainer import Trainer

from typing import Optional
import os
import logging

from modeling import SentenceBert

TRAINING_ARGS_NAME = "training_args.bin"
logger = logging.getLogger(__name__)


class BiTrainer(Trainer):

    def compute_loss(self, model: SentenceBert, inputs, return_outputs=False):
        outputs = model(**inputs)
        loss = outputs.loss

        return (loss, outputs) if return_outputs else loss

    def _save(self, output_dir: Optional[str] = None, state_dict=None):
        # If we are executing this function, we are the process zero, so we don't check for that.
        output_dir = output_dir if output_dir is not None else self.args.output_dir
        os.makedirs(output_dir, exist_ok=True)
        logger.info(f"Saving model checkpoint to {output_dir}")

        self.model.save_pretrained(output_dir)

        if self.tokenizer is not None:
            self.tokenizer.save_pretrained(output_dir)

        # Good practice: save your training arguments together with the trained model
        torch.save(self.args, os.path.join(output_dir, TRAINING_ARGS_NAME))

继承🤗 Transformers的Trainer类,重写了compute_loss_save方法。

这样我们就可以利用🤗 Transformers来训练我们的模型了。

utils.py:

import torch
import pandas as pd
from scipy.stats import pearsonr, spearmanr
from typing import Tuple


def build_dataframe_from_csv(dataset_csv: str) -> pd.DataFrame:
    df = pd.read_csv(
        dataset_csv,
        sep="\t",
        header=None,
        names=["query1", "query2", "label"],
    )

    return df


def compute_spearmanr(x, y):
    return spearmanr(x, y).correlation


def compute_pearsonr(x, y):
    return pearsonr(x, y)[0]


def find_best_acc_and_threshold(scores, labels, high_score_more_similar: bool):
    """Copied from https://github.com/UKPLab/sentence-transformers/tree/master"""
    assert len(scores) == len(labels)
    rows = list(zip(scores, labels))

    rows = sorted(rows, key=lambda x: x[0], reverse=high_score_more_similar)
    print(rows)

    max_acc = 0
    best_threshold = -1
    # positive examples number so far
    positive_so_far = 0
    # remain negative examples
    remaining_negatives = sum(labels == 0)

    for i in range(len(rows) - 1):
        score, label = rows[i]
        if label == 1:
            positive_so_far += 1
        else:
            remaining_negatives -= 1

        acc = (positive_so_far + remaining_negatives) / len(labels)
        if acc > max_acc:
            max_acc = acc
            best_threshold = (rows[i][0] + rows[i + 1][0]) / 2

    return max_acc, best_threshold


def metrics(y: torch.Tensor, y_pred: torch.Tensor) -> Tuple[float, float, float, float]:
    TP = ((y_pred == 1) & (y == 1)).sum().float()  # True Positive
    TN = ((y_pred == 0) & (y == 0)).sum().float()  # True Negative
    FN = ((y_pred == 0) & (y == 1)).sum().float()  # False Negatvie
    FP = ((y_pred == 1) & (y == 0)).sum().float()  # False Positive
    p = TP / (TP + FP).clamp(min=1e-8)  # Precision
    r = TP / (TP + FN).clamp(min=1e-8)  # Recall
    F1 = 2 * r * p / (r + p).clamp(min=1e-8)  # F1 score
    acc = (TP + TN) / (TP + TN + FP + FN).clamp(min=1e-8)  # Accurary
    return acc, p, r, F1


def compute_metrics(predicts, labels):
    return metrics(labels, predicts)

定义了一些帮助函数,从sentence-transformers库中拷贝了寻找最佳准确率阈值的实现find_best_acc_and_threshold

除了准确率,还计算了句嵌入的余弦相似度与真实标签之间的斯皮尔曼等级相关系数指标。

最后定义训练和测试脚本。

train.py:

from transformers import set_seed, HfArgumentParser, TrainingArguments

import logging
from pathlib import Path

from datetime import datetime

from modeling import SentenceBert
from trainer import BiTrainer
from arguments import DataArguments, ModelArguments
from dataset import PairCollator, PairDataset

logger = logging.getLogger(__name__)
logging.basicConfig(
    format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
    datefmt="%m/%d/%Y %H:%M:%S",
    level=logging.INFO,
)


def main():
    parser = HfArgumentParser((TrainingArguments, DataArguments, ModelArguments))
    training_args, data_args, model_args = parser.parse_args_into_dataclasses()
	# 根据当前时间生成输出目录
    output_dir = f"{training_args.output_dir}/{model_args.model_name_or_path.replace('/', '-')}-{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
    training_args.output_dir = output_dir

    logger.info(f"Training parameters {training_args}")
    logger.info(f"Data parameters {data_args}")
    logger.info(f"Model parameters {model_args}")
	# 设置随机种子
    set_seed(training_args.seed)
	# 加载预训练模型
    model = SentenceBert(
        model_args.model_name_or_path,
        trust_remote_code=True,
        max_length=data_args.max_length,
    )
	
    tokenizer = model.tokenizer
	# 构建训练和测试集
    train_dataset = PairDataset(data_args.train_data_path)
    eval_dataset = PairDataset(data_args.eval_data_path)
	# 传入参数
    trainer = BiTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        data_collator=PairCollator(),
        tokenizer=tokenizer,
    )
    Path(training_args.output_dir).mkdir(parents=True, exist_ok=True)
	# 开始训练
    trainer.train()
    trainer.save_model()


if __name__ == "__main__":
    main()

训练

基于train.py定义了train.sh传入相关参数:

timestamp=$(date +%Y%m%d%H%M)
logfile="train_${timestamp}.log"

# change CUDA_VISIBLE_DEVICES
CUDA_VISIBLE_DEVICES=3 nohup python train.py \
    --model_name_or_path=hfl/chinese-macbert-large \
    --output_dir=output \
    --train_data_path=data/train.txt \
    --eval_data_path=data/dev.txt \
    --num_train_epochs=3 \
    --save_total_limit=5 \
    --learning_rate=2e-5 \
    --weight_decay=0.01 \
    --warmup_ratio=0.01 \
    --bf16=True \
    --eval_strategy=epoch \
    --save_strategy=epoch \
    --per_device_train_batch_size=64 \
    --report_to="none" \
    --remove_unused_columns=False \
    --max_length=128 \
    > "$logfile" 2>&1 &


以上参数根据个人环境修改,这里使用的是哈工大的chinese-macbert-large预训练模型。

注意:

  • --remove_unused_columns是必须的。
  • 通过bf16=True可以加速训练同时不影响效果。
  • 其他参数可以自己调整。
...
100%|██████████| 11193/11193 [46:22<00:00,  4.32it/09/02/2024 20:14:12 - INFO - trainer - Saving model checkpoint to output/hfl-chinese-macbert-large-2024-09-02_19-27-42/checkpoint-11193
100%|██████████| 11193/11193 [46:40<00:00,  4.00it/s]
09/02/2024 20:14:29 - INFO - trainer - Saving model checkpoint to output/hfl-chinese-macbert-large-2024-09-02_19-27-42
{'eval_loss': 0.32857951521873474, 'eval_runtime': 56.1904, 'eval_samples_per_second': 156.646, 'eval_steps_per_second': 19.594, 'epoch': 3.0}
{'train_runtime': 2800.0605, 'train_samples_per_second': 255.815, 'train_steps_per_second': 3.997, 'train_loss': 0.1678411467916938, 'epoch': 3.0}

这里仅训练了3轮,我们拿最后保存的模型output/hfl-chinese-macbert-large-2024-09-02_19-27-42进行测试。

测试

test.py: 测试脚本见后文的完整代码。

test.sh:

# change CUDA_VISIBLE_DEVICES
CUDA_VISIBLE_DEVICES=0 python test.py \
    --model_name_or_path=output/hfl-chinese-macbert-large-2024-09-02_19-27-42/checkpoint-11193 \
    --test_data_path=data/test.txt
  

输出:

TestArguments(model_name_or_path='output/hfl-chinese-macbert-large-2024-09-02_19-27-42/checkpoint-11193', test_data_path='data/test.txt', max_length=64, batch_size=128)
Batches: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 98/98 [00:11<00:00,  8.71it/s]
Batches: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 98/98 [00:10<00:00,  8.99it/s]
max_acc: 0.8918, best_threshold: 0.898765
spearman corr: 0.7982 |  pearson_corr corr: 0.7514 | compute time: 22.19s
accuracy=0.892 precision=0.896 recal=0.886 f1 score=0.8912

正如上文所说,我们还计算了spearman。

测试集上的准确率达到89.2%,超过了之前我们看到的所有模型。

完整代码

完整代码: →点此←

本文代码是和某次提交相关的,Master分支上的代码随时可能会被优化。

参考

  1. [论文笔记]Sentence-BERT: Sentence Embeddings using Siamese BERT-Networks

http://www.kler.cn/a/293750.html

相关文章:

  • 【Rust练习】21.动态数组 Vector
  • adb shell常用命令
  • Java学习--网络编程
  • GitLab 如何跨版本升级?
  • JavaEE进阶----SpringMVC(三)---响应的获取
  • Apache ECharts
  • Gitlab-ce upgrade 16.0.1 to 17.3.1【Gitlab-ce 16.0.1 升级 17.3.1】
  • git 提交代码由原先账号密码调整为ssh
  • DevExpress WinForms v24.1新版亮点:功能区、数据编辑器全新升级
  • xxl-job分布式任务调度平台
  • 「深入理解」HTML Meta标签:网页元信息的配置
  • 在嵌入式板子上搭建和自定义live555服务器---编译问题和方法整理
  • 虚幻5|C++第三人称射击(1)添加摄像机
  • 球球大作战
  • ELK学习笔记(二)——使用K8S部署Kibana8.15.0
  • 辨别高防服务器
  • python tkinter 文本类组件
  • python 下载excel 添加水印
  • [240907] Python 标准库中鲜为人知的宝藏 | Node.js 22.8.0 发布:编译缓存 API、覆盖率阈值等新特性
  • mysql连接oceanbase数据库集群+租户
  • 企业微信群管理:构建高效沟通与协作的社群生态
  • Node.js 入门:中间件与安全性深度解析
  • LeetCode 算法:单词拆分 c++
  • 栈和队列的数据结构
  • ASP.NET Core 入门教学八 集成RocketMQ消息队列
  • json字符串CSS格式化