当前位置: 首页 > article >正文

【NLP251】命名实体识别常用模块(基于Transformer分类)

1. 从JSON格式的数据中加载并预处理样本供Ner任务训练和推理使用

class JsonNerDataset(Dataset):
    """
    定义一个加载json格式原始命名实体识别格式数据的Dataset
    一行一条样本(json字符串),包含: originalText、entities
    """

    def __init__(self,
                 data_path, tokenizer, categories, target_padding_idx=-100,
                 add_special_token=False, first_special_token='[CLS]', last_special_token='[SEP]'
                 ):
        super(JsonNerDataset, self).__init__()
        self.tokenizer: BertTokenizer = tokenizer
        # self.sentence_max_len = self.tokenizer.max_len_single_sentence  # 510
        self.sentence_max_len = 126
        self.categories = categories
        self.token_padding_idx = self.tokenizer.convert_tokens_to_ids('[PAD]')
        self.target_padding_idx = target_padding_idx
        self.add_special_token = add_special_token
        self.first_token_id = self.tokenizer.convert_tokens_to_ids(first_special_token)
        self.last_token_id = self.tokenizer.convert_tokens_to_ids(last_special_token)

        self.records = self.load_records(data_path)

    def load_records(self, data_path) -> list:
        records = []
        with open(data_path, "r", encoding="utf-8") as reader:
            for line in reader:
                # 1. 获取原始数据
                record = json.loads(line.strip())

                # 2. 原始的文本数据转换
                entities = record['entities']
                text = special_token_processor(record['originalText'])
                chars = list(text)  # 分字,就是每个字对应一个token

                # 3. 标签数据转换
                labels = ['O'] * len(chars)
                for entity in entities:
                    label_type = entity['label_type']
                    start_pos = entity['start_pos']  # 包含开始
                    end_pos = entity['end_pos']  # 不包含结尾
                    if end_pos - start_pos == 1:
                        # 只有一个字形成实体
                        labels[start_pos] = f'S-{label_type}'
                    elif end_pos - start_pos > 1:
                        # 多个字形成实体
                        labels[start_pos] = f'B-{label_type}'
                        labels[end_pos - 1] = f'E-{label_type}'
                        for i in range(start_pos + 1, end_pos - 1):
                            labels[i] = f'M-{label_type}'
                    else:
                        raise ValueError(f"数据异常:{record}")

                if self.add_special_token:
                    # 需要对chars、labels进行split分割, 单个样本的长度不能超过510
                    for sub_chars, sub_labels in split_chars(chars, labels, self.sentence_max_len):
                        x = self.tokenizer.convert_tokens_to_ids(sub_chars)  # 针对每个字获取对应的id
                        assert len(sub_chars) == len(x), "bert进行token id转换后,更改了列表长度."
                        y = [self.categories[c] for c in sub_labels]

                        x.insert(0, self.first_token_id)
                        x.append(self.last_token_id)
                        y.insert(0, self.categories['O'])
                        y.append(self.categories['O'])

                        assert len(x) == len(y), f"输入token的长度必须和标签的长度一致,当前长度为:{len(x)} - {len(y)} - {record}"
                        records.append((x, y, len(x)))
                else:
                    x = self.tokenizer.convert_tokens_to_ids(chars)  # 针对每个字获取对应的id
                    assert len(chars) == len(x), "bert进行token id转换后,更改了列表长度."
                    y = [self.categories[c] for c in labels]

                    assert len(x) == len(y), f"输入token的长度必须和标签的长度一致,当前长度为:{len(x)} - {len(y)} - {record}"
                    records.append((x, y, len(x)))
        return records

    def __getitem__(self, index):
        """
        获取index对应的样本信息,包括x和y
        :param index: 样本的索引id
        :return: x,y
        """
        x, y, num_tokens = self.records[index]
        return copy.deepcopy(x), copy.deepcopy(y), num_tokens

    def __len__(self):
        return len(self.records)

    def collate(self, batch):
        max_length = max([t[2] for t in batch])
        x, y, mask = [], [], []
        for i in range(len(batch)):
            _x, _y, _len_current_record = copy.deepcopy(batch[i])
            _mask = [1] * _len_current_record
            if _len_current_record < max_length:
                _x.extend([self.token_padding_idx] * (max_length - _len_current_record))
                _y.extend([self.target_padding_idx] * (max_length - _len_current_record))
                _mask.extend([0] * (max_length - _len_current_record))
            x.append(_x)
            y.append(_y)
            mask.append(_mask)

        token_ids = torch.tensor(x, dtype=torch.long)
        # 1表示实际token,0表示填充位置
        token_masks = torch.tensor(mask, dtype=torch.long)
        target_ids = torch.tensor(y, dtype=torch.long)
        return token_ids, token_masks, target_ids

2. Ner(命名实体识别)任务创建数据加载器createNERDataLoader

def create_dataloader(
        data_path, tokenizer, label_categories, batch_size, shuffle,
        num_workers=0, prefetch_factor=2, target_padding_idx=-100,
        add_special_token=False, first_special_token='[CLS]', last_special_token='[SEP]'
):
    # 创建Dataset对象
    dataset = JsonNerDataset(
        data_path=data_path, tokenizer=tokenizer,
        categories=label_categories,
        target_padding_idx=target_padding_idx,
        add_special_token=add_special_token,
        first_special_token=first_special_token,
        last_special_token=last_special_token
    )
    print(f"当前dataset的总样本数目为:{data_path} - {len(dataset)}")

    # dataloader实际上是一个批次的处理器,因为dataset可以返回一条一条的样本,dataloader就负责将多条样本组合成一个批次对象返回
    prefetch_factor = prefetch_factor if num_workers <= 0 else num_workers * batch_size
    dataloader = DataLoader(
        dataset=dataset,  # 给定单条样本加载的对象
        batch_size=batch_size,  # 给定批次大小
        shuffle=shuffle,  # 获取批次数据的时候是否打乱顺序
        num_workers=num_workers,  # 加载数据的时候是否用多进程,大于0表示使用num_workers个进程
        collate_fn=dataset.collate,  # 给定批次数据合并的方式
        prefetch_factor=prefetch_factor  # 多进程加载的时候,每个进程的预加载的样本数目,一般为num_workers * batch_size
    )
    return dataloader

3. trainNERDataLoaderevalNERDataLoader

def create_train_dataloader(
        data_path, tokenizer, label_categories, batch_size, target_padding_idx=-100,
        add_special_token=False, first_special_token='[CLS]', last_special_token='[SEP]'
):
    return create_dataloader(
        data_path, tokenizer, label_categories, batch_size,
        shuffle=True, num_workers=0, prefetch_factor=2,
        target_padding_idx=target_padding_idx,
        add_special_token=add_special_token,
        first_special_token=first_special_token,
        last_special_token=last_special_token
    )


def create_eval_dataloader(
        data_path, tokenizer, label_categories, batch_size, target_padding_idx=-100,
        add_special_token=False, first_special_token='[CLS]', last_special_token='[SEP]'
):
    return create_dataloader(
        data_path, tokenizer, label_categories, batch_size,
        shuffle=False, num_workers=0, prefetch_factor=2,
        target_padding_idx=target_padding_idx,
        add_special_token=add_special_token,
        first_special_token=first_special_token,
        last_special_token=last_special_token
    )

4.  text_special_char_replace

def special_token_processor(text):
    for old, new in [('”', '"'), ("“", '"'), ("’", "'"), ("‘", "'"), ("`", "'"), ('—', '-')]:
        text = text.replace(old, new)
    return text

5. 提取所有标签(不重名)

def extract_labels_per_file(in_file):
    """
    从单个 JSON 格式的数据文件中提取所有唯一的标签(label_type)。

    :param in_file: 输入的 JSON 数据文件路径。
    :return: 一个包含所有唯一标签的集合。
    """
    # 初始化一个空集合用于存储标签,集合自动去重
    labels = set()

    # 打开输入文件进行读取,指定编码为 UTF-8
    with open(in_file, "r", encoding="utf-8") as reader:
        # 逐行读取文件内容
        for line in reader:
            # 将每行的 JSON 字符串解析为 Python 字典
            record = json.loads(line.strip())
            # 获取当前记录中的实体列表
            entities = record['entities']
            # 遍历每个实体
            for entity in entities:
                # 提取实体的标签类型(label_type),并将其添加到标签集合中
                labels.add(entity['label_type'])

    # 返回包含所有唯一标签的集合
    return labels

6. 生成标签映射字典

def extract_labels(data_paths, categories_out_file):
    """
    提取数据集中所有可能的标签,并生成标签到索引的映射文件。
    
    :param data_paths: 数据文件路径(可以是单个路径或路径列表)。
    :param categories_out_file: 生成的标签映射文件路径。
    """
    # 如果输入的 data_paths 是字符串,则将其转换为列表
    if isinstance(data_paths, str):
        data_paths = [data_paths]

    # 初始化一个空集合用于存储所有标签
    labels = set()
    
    # 遍历每个数据文件路径
    for data_path in data_paths:
        # 提取当前文件中的标签
        tmp_labels = extract_labels_per_file(data_path)
        # 将当前文件的标签与已有的标签集合合并
        labels = labels.union(tmp_labels)
    
    # 将标签集合转换为排序后的列表
    labels = sorted(list(labels))
    # 打印所有提取到的标签
    print(f"所有的标签:{labels}")

    # 初始化标签到索引的映射字典,'O' 表示非实体标签,索引为 0
    categories = {
        'O': 0
    }
    # 遍历每个标签,生成 B-M-E-S 格式的标签
    for label in labels:
        for prefix in ['B', 'M', 'E', 'S']:
            # 为每个标签生成对应的 B-M-E-S 格式,并分配索引
            categories[f'{prefix}-{label}'] = len(categories)
    
    # 打印生成的标签映射字典
    print(categories)
    
    # 将标签映射字典写入指定的输出文件
    with open(categories_out_file, 'w', encoding='utf-8') as writer:
        # 使用 json.dump 将字典保存为 JSON 格式,设置缩进为 2,确保中文字符正常显示
        json.dump(categories, writer, indent=2, ensure_ascii=False)

7. 加载标签类别映射

def load_label_categories(file_path):
    """
    从指定的 JSON 文件中加载标签类别映射(标签到索引的字典)。

    :param file_path: 包含标签类别映射的 JSON 文件路径。
    :return: 一个字典,包含标签到索引的映射。
    """
    # 打开指定路径的 JSON 文件进行读取,指定编码为 UTF-8
    with open(file_path, "r", encoding="utf-8") as reader:
        # 使用 json.load 从文件中加载 JSON 数据,将其解析为 Python 字典
        categories = json.load(reader)
    
    # 返回解析后的标签类别映射字典
    return categories

8. 基于 Transformer 编码器的分类模型

# -*- coding: utf-8 -*-
import copy

import torch
import torch.nn as nn
from transformers import BertModel


class TransformerEncoderSoftmaxNerModel(nn.Module):
    def __init__(self, vocab_size, hidden_size, encoder_num_head, encoder_num_layers, num_class):
        super(TransformerEncoderSoftmaxNerModel, self).__init__()
        # 确保 hidden_size 能被 encoder_num_head 整除
        assert hidden_size % encoder_num_head == 0, "参数中的hidden_size必须能够整除encoder_num_head"

        # 词嵌入层
        self.emb_layer = nn.Embedding(num_embeddings=vocab_size, embedding_dim=hidden_size)

        # Transformer 编码器层
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_size,
            nhead=encoder_num_head,
            dim_feedforward=hidden_size * 4,
            batch_first=True
        )
        self.encoder = nn.TransformerEncoder(
            encoder_layer=encoder_layer, num_layers=encoder_num_layers
        )

        # 分类层(多层感知机)
        self.classify = nn.Sequential(
            nn.Dropout(p=0.1),
            nn.Linear(hidden_size, hidden_size * 4),
            nn.ReLU(),
            nn.Dropout(p=0.1),
            nn.Linear(hidden_size * 4, hidden_size * 4),
            nn.ReLU(),
            nn.Linear(hidden_size * 4, num_class),
        )
        self.num_class = num_class
        self.hidden_size = hidden_size

    def forward(self, token_ids, token_masks=None):
        """
        前向过程
        :param token_ids: [N,T] N个样本,每个样本有T个token
        :param token_masks: [N,T] N个样本,每个样本的每个token是否是真实的实际token,True表示填充,False表示实际值
        :return: [N,T,num_class] N个样本,每个样本有T个token,每个token属于num_class个类别的置信度
        """
        if token_masks is not None:
            mask_dtype = token_masks.dtype
            if mask_dtype in [torch.int, torch.int64, torch.int32, torch.int8, torch.int16]:
                token_masks = (1 - token_masks).to(dtype=torch.bool)

        # 1. 获取每个token对应的静态词向量 [N,T] -> [N,T,E]
        token_x = self.emb_layer(token_ids)

        # 2. token交叉获取每个token对应的动态词向量 [N,T,E] -> [N,T,E]
        token_x = self.encoder(token_x, src_key_padding_mask=token_masks)

        # 3. 针对每个token进行全连接,判断属于各个类别的置信度
        # 最终的矩阵乘法操作为: [N,T,?] * [?,num_class] -> [N, T, num_class]
        score = self.classify(token_x)

        return score

9.评估指标-准确率函数

def calc_token_accuracy_number(score, target_labels, target_masks):
    # 1. 获取模型预测标签id [N,T,num_class] -> [N,T]
    pred_labels = torch.argmax(score, dim=-1)
    pred_labels = pred_labels.to(dtype=target_labels.dtype, device=target_labels.device)
    # print(f"预测为0的标签占比:{torch.mean((pred_labels == 0).to(dtype=torch.float))}")

    # 2. 比较实际标签和预测标签,查看是否相等
    is_equal = (pred_labels == target_labels).to(dtype=torch.float)

    # 3. 计算均值的分子和分母
    numerator = torch.sum(is_equal).cpu().item()
    denominator = torch.sum(target_masks.to(dtype=is_equal.dtype)).cpu().item()

    return numerator, denominator


def token_accuracy(score, target_labels, target_masks):
    """
    计算token的准确率
    :param score: [N,T,num_class] 模型输出的置信度
    :param target_labels: [N,T] 实际样本的标签id
    :param target_masks: [N,T] 填充mask,实际值的地方为1,填充值的地方为0
    :return: float 准确率
    """
    numerator, denominator = calc_token_accuracy_number(score, target_labels, target_masks)
    if denominator <= 0.0:
        return 0.0
    return numerator / denominator

9. 训练函数

# -*- coding: utf-8 -*-
import os

import torch
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

from medical_ner.datas.bmeso_ner_labels import load_label_categories
from medical_ner.datas.json_ner_dataloader import create_train_dataloader, create_eval_dataloader
from medical_ner.losses.loss import CustomCrossEntropyLoss
from medical_ner.metrics.accuracy import token_accuracy, token_accuracy_v2, \
    calc_token_accuracy_number_v2, div
from medical_ner.models.ner_bert_softmax import TransformerEncoderSoftmaxNerModel
from transformers import BertTokenizer


def train(
        train_data_path, eval_data_path, bert_tokenizer_path, label_path,
        model_save_path, summary_log_dir,
        batch_size=8, total_epoch=10,
        hidden_size=128, encoder_num_head=4, encoder_num_layers=2,
        target_padding_idx=-100,
        lr=0.01
):
    """
    训练一个基于 Transformer 编码器的命名实体识别(NER)模型。

    :param train_data_path: 训练数据文件路径。
    :param eval_data_path: 评估数据文件路径。
    :param bert_tokenizer_path: BERT 分词器的预训练模型路径。
    :param label_path: 标签类别映射文件路径。
    :param model_save_path: 模型保存路径。
    :param summary_log_dir: TensorBoard 日志保存路径。
    :param batch_size: 每个批次的样本数量,默认为 8。
    :param total_epoch: 总训练轮数,默认为 10。
    :param hidden_size: 隐藏层维度,默认为 128。
    :param encoder_num_head: Transformer 编码器的头数,默认为 4。
    :param encoder_num_layers: Transformer 编码器的层数,默认为 2。
    :param target_padding_idx: 目标标签的填充索引,默认为 -100。
    :param lr: 学习率,默认为 0.01。
    """

    # 创建模型保存路径和日志路径的目录,如果不存在则创建
    os.makedirs(model_save_path, exist_ok=True)
    os.makedirs(summary_log_dir, exist_ok=True)

    # 依赖信息的加载
    '''
    使用 BertTokenizer 加载预训练的 BERT 分词器。
    加载标签类别映射(从文件中读取)。
    定义保存最优模型和最后一个模型的路径。
    '''
    tokenizer = BertTokenizer.from_pretrained(bert_tokenizer_path)
    label_categories = load_label_categories(label_path)
    best_dump_path = os.path.join(model_save_path, "best.pkl")
    last_dump_path = os.path.join(model_save_path, "last.pkl")

    # 1. 数据加载及解析
    train_dataloader = create_train_dataloader(
        train_data_path, tokenizer, label_categories, batch_size,
        target_padding_idx=target_padding_idx
    )
    eval_dataloader = create_eval_dataloader(
        eval_data_path, tokenizer, label_categories, batch_size * 2,
        target_padding_idx=target_padding_idx
    )

    # 2. 网络构造、损失函数构造、优化器构造
    #初始化 NER 模型(基于 Transformer 编码器)
    net = TransformerEncoderSoftmaxNerModel(
        vocab_size=tokenizer.vocab_size,
        hidden_size=hidden_size,
        encoder_num_head=encoder_num_head,
        encoder_num_layers=encoder_num_layers,
        num_class=len(label_categories)
    )

    # 需要安装一个tensorboard库,一般情况下安装tensorflow的时候,会自动安装这个库
    # pip install tensorboard==2.12.3
    # 查看可视化页面,在命令行,执行以下命令:
    # tensorboard --logdir log_dir
    # tensorboard --logdir D:\workspaces\study\NLP04\projects\NamedEntityRecognition\medical_ner\test\output\medical\ner_softmax\logs

    # 初始化 TensorBoard 日志记录器
    writer = SummaryWriter(log_dir=summary_log_dir)
    # writer.add_graph(net, input_to_model=torch.randint(0, 100, (4, 20)))

    # 定义损失函数和优化器
    loss_fn = CustomCrossEntropyLoss(ignore_index=target_padding_idx, summary_writer=writer)
    train_fn = optim.SGD(params=net.parameters(), lr=lr)

    # 3/4/5. 遍历训练、评估、持久化
    # 初始化最佳准确率
    best_acc = float('-inf')
    #开始训练循环,遍历指定的总轮数。
    for epoch in range(total_epoch):
        # 当前epoch模型训练
        net.train()
        train_fn.zero_grad()
        for batch, (x, mask, y) in enumerate(train_dataloader):
            # a. 前向过程
            score = net(token_ids=x, token_masks=mask)  # score: [N,T,num_class]
            loss = loss_fn(torch.permute(score, dims=(0, 2, 1)), y)
            acc1 = token_accuracy(score, y, target_masks=mask)
            negative_acc, positive_acc, acc2 = token_accuracy_v2(score, y, target_masks=mask)

            # b. 反向更新
            loss.backward()
            train_fn.step()
            train_fn.zero_grad()

            # c. 日志的输出/运行过程的输出
            writer.add_scalar('train_loss', loss.item())
            writer.add_scalars('train_acc', {
                'acc': acc2,
                'positive': positive_acc,
                'negative': negative_acc
            })
            print(f"Epoch {epoch}/{total_epoch} Batch {batch} Loss {loss.item():.5f} "
                  f"Token Accuracy {acc1:.3f} - {negative_acc:.3f} - {positive_acc:.3f} - {acc2:.3f}")

        # 当前epoch的模型评估&持久化
        net.eval()
        with torch.no_grad():
            eval_acc_number = [0.0, 0.0, 0.0, 0.0]
            for _, (x, mask, y) in enumerate(eval_dataloader):
                score = net(token_ids=x, token_masks=mask)  # score: [N,?,num_class]
                acc_number = calc_token_accuracy_number_v2(
                    score, y, mask
                )
                for i in range(4):
                    eval_acc_number[i] += acc_number[i]

            eval_negative_acc = div(eval_acc_number[0], eval_acc_number[1])
            eval_positive_acc = div(eval_acc_number[2], eval_acc_number[3])
            eval_acc = div(eval_acc_number[0] + eval_acc_number[2], eval_acc_number[1] + eval_acc_number[3])
            print(f"Epoch {epoch}/{total_epoch} "
                  f"Eval Token Accuracy {eval_negative_acc:.3f} - {eval_positive_acc:.3f} - {eval_acc:.3f}")
            writer.add_scalars('eval_acc', {
                'acc': eval_acc,
                'positive': eval_positive_acc,
                'negative': eval_negative_acc
            }, global_step=epoch)

        # 模型持久化
        save_obj = {
            'net': net.state_dict(),
            'epoch': epoch,
            'best_accuracy': eval_acc
        }
        if eval_acc > best_acc:
            print(f"保存最优模型为 {epoch} {eval_acc} {best_dump_path}")
            torch.save(save_obj, best_dump_path)
            best_acc = eval_acc
        torch.save(save_obj, last_dump_path)

    # 训练完成后,关闭相关资源
    writer.close()

10. 主函数

if __name__ == '__main__':
    bert_tokenizer_path = r"D:\cache\huggingface\hub\models--bert-base-chinese\snapshots\c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f"
    train_data_path = "./datas/medical/training.txt"
    eval_data_path = "./datas/medical/test.json"
    label_path = "./datas/medical/categories.json"
    model_save_path = "./output/medical/ner_softmax/models"
    summary_log_dir = "./output/medical/ner_softmax/logs"

    train(
        train_data_path, eval_data_path, bert_tokenizer_path, label_path,
        model_save_path, summary_log_dir
    )


http://www.kler.cn/a/546345.html

相关文章:

  • 从驾驶员到智能驾驶:汽车智能化进程中的控制与仿真技术
  • 【JavaScript】《JavaScript高级程序设计 (第4版) 》笔记-Chapter12-BOM
  • HBASE面试技巧
  • 洛谷 acwing刷题 有关图的存储形式和djstra算法的例题
  • C语言进阶习题(4结构体)【1】通讯录的实现
  • 从无序到有序:上北智信通过深度数据分析改善会议室资源配置
  • 企业网站设计HTML源码模板
  • 【认证授权FAQ】HP Anyware LLS服务器常用命令
  • minio在上传pdf文件时设置Content-Type: application/pdf有什么作用
  • 硬件-电源-隔离与非隔离的区别
  • 如何评估云原生GenAI应用开发中的安全风险(上)
  • 寻找两个有序数组的中位数
  • 【OJ项目】深入剖析 JudgeServiceImpl 类:题目的判题逻辑详解
  • 基于javaweb的SpringBootoa办公自动化系统设计和实现(源码+文档+部署讲解)
  • 【油猴脚本/Tampermonkey】DeepSeek 服务器繁忙无限重试(20250214优化)
  • CZML 格式详解,javascript加载导出CZML文件示例
  • 图数据库neo4j进阶(一):csv文件导入节点及关系
  • Vue 2 — 配置请求转发
  • qt + opengl 给立方体增加阴影
  • 08模拟法 + 技巧 + 数学 + 缓存(D3_数学)