1. 从JSON格式的数据中加载并预处理样本供Ner任务训练和推理使用
class JsonNerDataset(Dataset):
"""
定义一个加载json格式原始命名实体识别格式数据的Dataset
一行一条样本(json字符串),包含: originalText、entities
"""
def __init__(self,
data_path, tokenizer, categories, target_padding_idx=-100,
add_special_token=False, first_special_token='[CLS]', last_special_token='[SEP]'
):
super(JsonNerDataset, self).__init__()
self.tokenizer: BertTokenizer = tokenizer
# self.sentence_max_len = self.tokenizer.max_len_single_sentence # 510
self.sentence_max_len = 126
self.categories = categories
self.token_padding_idx = self.tokenizer.convert_tokens_to_ids('[PAD]')
self.target_padding_idx = target_padding_idx
self.add_special_token = add_special_token
self.first_token_id = self.tokenizer.convert_tokens_to_ids(first_special_token)
self.last_token_id = self.tokenizer.convert_tokens_to_ids(last_special_token)
self.records = self.load_records(data_path)
def load_records(self, data_path) -> list:
records = []
with open(data_path, "r", encoding="utf-8") as reader:
for line in reader:
# 1. 获取原始数据
record = json.loads(line.strip())
# 2. 原始的文本数据转换
entities = record['entities']
text = special_token_processor(record['originalText'])
chars = list(text) # 分字,就是每个字对应一个token
# 3. 标签数据转换
labels = ['O'] * len(chars)
for entity in entities:
label_type = entity['label_type']
start_pos = entity['start_pos'] # 包含开始
end_pos = entity['end_pos'] # 不包含结尾
if end_pos - start_pos == 1:
# 只有一个字形成实体
labels[start_pos] = f'S-{label_type}'
elif end_pos - start_pos > 1:
# 多个字形成实体
labels[start_pos] = f'B-{label_type}'
labels[end_pos - 1] = f'E-{label_type}'
for i in range(start_pos + 1, end_pos - 1):
labels[i] = f'M-{label_type}'
else:
raise ValueError(f"数据异常:{record}")
if self.add_special_token:
# 需要对chars、labels进行split分割, 单个样本的长度不能超过510
for sub_chars, sub_labels in split_chars(chars, labels, self.sentence_max_len):
x = self.tokenizer.convert_tokens_to_ids(sub_chars) # 针对每个字获取对应的id
assert len(sub_chars) == len(x), "bert进行token id转换后,更改了列表长度."
y = [self.categories[c] for c in sub_labels]
x.insert(0, self.first_token_id)
x.append(self.last_token_id)
y.insert(0, self.categories['O'])
y.append(self.categories['O'])
assert len(x) == len(y), f"输入token的长度必须和标签的长度一致,当前长度为:{len(x)} - {len(y)} - {record}"
records.append((x, y, len(x)))
else:
x = self.tokenizer.convert_tokens_to_ids(chars) # 针对每个字获取对应的id
assert len(chars) == len(x), "bert进行token id转换后,更改了列表长度."
y = [self.categories[c] for c in labels]
assert len(x) == len(y), f"输入token的长度必须和标签的长度一致,当前长度为:{len(x)} - {len(y)} - {record}"
records.append((x, y, len(x)))
return records
def __getitem__(self, index):
"""
获取index对应的样本信息,包括x和y
:param index: 样本的索引id
:return: x,y
"""
x, y, num_tokens = self.records[index]
return copy.deepcopy(x), copy.deepcopy(y), num_tokens
def __len__(self):
return len(self.records)
def collate(self, batch):
max_length = max([t[2] for t in batch])
x, y, mask = [], [], []
for i in range(len(batch)):
_x, _y, _len_current_record = copy.deepcopy(batch[i])
_mask = [1] * _len_current_record
if _len_current_record < max_length:
_x.extend([self.token_padding_idx] * (max_length - _len_current_record))
_y.extend([self.target_padding_idx] * (max_length - _len_current_record))
_mask.extend([0] * (max_length - _len_current_record))
x.append(_x)
y.append(_y)
mask.append(_mask)
token_ids = torch.tensor(x, dtype=torch.long)
# 1表示实际token,0表示填充位置
token_masks = torch.tensor(mask, dtype=torch.long)
target_ids = torch.tensor(y, dtype=torch.long)
return token_ids, token_masks, target_ids
2. Ner(命名实体识别)任务创建数据加载器createNERDataLoader
def create_dataloader(
data_path, tokenizer, label_categories, batch_size, shuffle,
num_workers=0, prefetch_factor=2, target_padding_idx=-100,
add_special_token=False, first_special_token='[CLS]', last_special_token='[SEP]'
):
# 创建Dataset对象
dataset = JsonNerDataset(
data_path=data_path, tokenizer=tokenizer,
categories=label_categories,
target_padding_idx=target_padding_idx,
add_special_token=add_special_token,
first_special_token=first_special_token,
last_special_token=last_special_token
)
print(f"当前dataset的总样本数目为:{data_path} - {len(dataset)}")
# dataloader实际上是一个批次的处理器,因为dataset可以返回一条一条的样本,dataloader就负责将多条样本组合成一个批次对象返回
prefetch_factor = prefetch_factor if num_workers <= 0 else num_workers * batch_size
dataloader = DataLoader(
dataset=dataset, # 给定单条样本加载的对象
batch_size=batch_size, # 给定批次大小
shuffle=shuffle, # 获取批次数据的时候是否打乱顺序
num_workers=num_workers, # 加载数据的时候是否用多进程,大于0表示使用num_workers个进程
collate_fn=dataset.collate, # 给定批次数据合并的方式
prefetch_factor=prefetch_factor # 多进程加载的时候,每个进程的预加载的样本数目,一般为num_workers * batch_size
)
return dataloader
3. trainNERDataLoader
和evalNERDataLoader
def create_train_dataloader(
data_path, tokenizer, label_categories, batch_size, target_padding_idx=-100,
add_special_token=False, first_special_token='[CLS]', last_special_token='[SEP]'
):
return create_dataloader(
data_path, tokenizer, label_categories, batch_size,
shuffle=True, num_workers=0, prefetch_factor=2,
target_padding_idx=target_padding_idx,
add_special_token=add_special_token,
first_special_token=first_special_token,
last_special_token=last_special_token
)
def create_eval_dataloader(
data_path, tokenizer, label_categories, batch_size, target_padding_idx=-100,
add_special_token=False, first_special_token='[CLS]', last_special_token='[SEP]'
):
return create_dataloader(
data_path, tokenizer, label_categories, batch_size,
shuffle=False, num_workers=0, prefetch_factor=2,
target_padding_idx=target_padding_idx,
add_special_token=add_special_token,
first_special_token=first_special_token,
last_special_token=last_special_token
)
4. text_special_char_replace
def special_token_processor(text):
for old, new in [('”', '"'), ("“", '"'), ("’", "'"), ("‘", "'"), ("`", "'"), ('—', '-')]:
text = text.replace(old, new)
return text
5. 提取所有标签(不重名)
def extract_labels_per_file(in_file):
"""
从单个 JSON 格式的数据文件中提取所有唯一的标签(label_type)。
:param in_file: 输入的 JSON 数据文件路径。
:return: 一个包含所有唯一标签的集合。
"""
# 初始化一个空集合用于存储标签,集合自动去重
labels = set()
# 打开输入文件进行读取,指定编码为 UTF-8
with open(in_file, "r", encoding="utf-8") as reader:
# 逐行读取文件内容
for line in reader:
# 将每行的 JSON 字符串解析为 Python 字典
record = json.loads(line.strip())
# 获取当前记录中的实体列表
entities = record['entities']
# 遍历每个实体
for entity in entities:
# 提取实体的标签类型(label_type),并将其添加到标签集合中
labels.add(entity['label_type'])
# 返回包含所有唯一标签的集合
return labels
6. 生成标签映射字典
def extract_labels(data_paths, categories_out_file):
"""
提取数据集中所有可能的标签,并生成标签到索引的映射文件。
:param data_paths: 数据文件路径(可以是单个路径或路径列表)。
:param categories_out_file: 生成的标签映射文件路径。
"""
# 如果输入的 data_paths 是字符串,则将其转换为列表
if isinstance(data_paths, str):
data_paths = [data_paths]
# 初始化一个空集合用于存储所有标签
labels = set()
# 遍历每个数据文件路径
for data_path in data_paths:
# 提取当前文件中的标签
tmp_labels = extract_labels_per_file(data_path)
# 将当前文件的标签与已有的标签集合合并
labels = labels.union(tmp_labels)
# 将标签集合转换为排序后的列表
labels = sorted(list(labels))
# 打印所有提取到的标签
print(f"所有的标签:{labels}")
# 初始化标签到索引的映射字典,'O' 表示非实体标签,索引为 0
categories = {
'O': 0
}
# 遍历每个标签,生成 B-M-E-S 格式的标签
for label in labels:
for prefix in ['B', 'M', 'E', 'S']:
# 为每个标签生成对应的 B-M-E-S 格式,并分配索引
categories[f'{prefix}-{label}'] = len(categories)
# 打印生成的标签映射字典
print(categories)
# 将标签映射字典写入指定的输出文件
with open(categories_out_file, 'w', encoding='utf-8') as writer:
# 使用 json.dump 将字典保存为 JSON 格式,设置缩进为 2,确保中文字符正常显示
json.dump(categories, writer, indent=2, ensure_ascii=False)
7. 加载标签类别映射
def load_label_categories(file_path):
"""
从指定的 JSON 文件中加载标签类别映射(标签到索引的字典)。
:param file_path: 包含标签类别映射的 JSON 文件路径。
:return: 一个字典,包含标签到索引的映射。
"""
# 打开指定路径的 JSON 文件进行读取,指定编码为 UTF-8
with open(file_path, "r", encoding="utf-8") as reader:
# 使用 json.load 从文件中加载 JSON 数据,将其解析为 Python 字典
categories = json.load(reader)
# 返回解析后的标签类别映射字典
return categories
8. 基于 Transformer 编码器的分类模型
# -*- coding: utf-8 -*-
import copy
import torch
import torch.nn as nn
from transformers import BertModel
class TransformerEncoderSoftmaxNerModel(nn.Module):
def __init__(self, vocab_size, hidden_size, encoder_num_head, encoder_num_layers, num_class):
super(TransformerEncoderSoftmaxNerModel, self).__init__()
# 确保 hidden_size 能被 encoder_num_head 整除
assert hidden_size % encoder_num_head == 0, "参数中的hidden_size必须能够整除encoder_num_head"
# 词嵌入层
self.emb_layer = nn.Embedding(num_embeddings=vocab_size, embedding_dim=hidden_size)
# Transformer 编码器层
encoder_layer = nn.TransformerEncoderLayer(
d_model=hidden_size,
nhead=encoder_num_head,
dim_feedforward=hidden_size * 4,
batch_first=True
)
self.encoder = nn.TransformerEncoder(
encoder_layer=encoder_layer, num_layers=encoder_num_layers
)
# 分类层(多层感知机)
self.classify = nn.Sequential(
nn.Dropout(p=0.1),
nn.Linear(hidden_size, hidden_size * 4),
nn.ReLU(),
nn.Dropout(p=0.1),
nn.Linear(hidden_size * 4, hidden_size * 4),
nn.ReLU(),
nn.Linear(hidden_size * 4, num_class),
)
self.num_class = num_class
self.hidden_size = hidden_size
def forward(self, token_ids, token_masks=None):
"""
前向过程
:param token_ids: [N,T] N个样本,每个样本有T个token
:param token_masks: [N,T] N个样本,每个样本的每个token是否是真实的实际token,True表示填充,False表示实际值
:return: [N,T,num_class] N个样本,每个样本有T个token,每个token属于num_class个类别的置信度
"""
if token_masks is not None:
mask_dtype = token_masks.dtype
if mask_dtype in [torch.int, torch.int64, torch.int32, torch.int8, torch.int16]:
token_masks = (1 - token_masks).to(dtype=torch.bool)
# 1. 获取每个token对应的静态词向量 [N,T] -> [N,T,E]
token_x = self.emb_layer(token_ids)
# 2. token交叉获取每个token对应的动态词向量 [N,T,E] -> [N,T,E]
token_x = self.encoder(token_x, src_key_padding_mask=token_masks)
# 3. 针对每个token进行全连接,判断属于各个类别的置信度
# 最终的矩阵乘法操作为: [N,T,?] * [?,num_class] -> [N, T, num_class]
score = self.classify(token_x)
return score
9.评估指标-准确率函数
def calc_token_accuracy_number(score, target_labels, target_masks):
# 1. 获取模型预测标签id [N,T,num_class] -> [N,T]
pred_labels = torch.argmax(score, dim=-1)
pred_labels = pred_labels.to(dtype=target_labels.dtype, device=target_labels.device)
# print(f"预测为0的标签占比:{torch.mean((pred_labels == 0).to(dtype=torch.float))}")
# 2. 比较实际标签和预测标签,查看是否相等
is_equal = (pred_labels == target_labels).to(dtype=torch.float)
# 3. 计算均值的分子和分母
numerator = torch.sum(is_equal).cpu().item()
denominator = torch.sum(target_masks.to(dtype=is_equal.dtype)).cpu().item()
return numerator, denominator
def token_accuracy(score, target_labels, target_masks):
"""
计算token的准确率
:param score: [N,T,num_class] 模型输出的置信度
:param target_labels: [N,T] 实际样本的标签id
:param target_masks: [N,T] 填充mask,实际值的地方为1,填充值的地方为0
:return: float 准确率
"""
numerator, denominator = calc_token_accuracy_number(score, target_labels, target_masks)
if denominator <= 0.0:
return 0.0
return numerator / denominator
9. 训练函数
# -*- coding: utf-8 -*-
import os
import torch
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from medical_ner.datas.bmeso_ner_labels import load_label_categories
from medical_ner.datas.json_ner_dataloader import create_train_dataloader, create_eval_dataloader
from medical_ner.losses.loss import CustomCrossEntropyLoss
from medical_ner.metrics.accuracy import token_accuracy, token_accuracy_v2, \
calc_token_accuracy_number_v2, div
from medical_ner.models.ner_bert_softmax import TransformerEncoderSoftmaxNerModel
from transformers import BertTokenizer
def train(
train_data_path, eval_data_path, bert_tokenizer_path, label_path,
model_save_path, summary_log_dir,
batch_size=8, total_epoch=10,
hidden_size=128, encoder_num_head=4, encoder_num_layers=2,
target_padding_idx=-100,
lr=0.01
):
"""
训练一个基于 Transformer 编码器的命名实体识别(NER)模型。
:param train_data_path: 训练数据文件路径。
:param eval_data_path: 评估数据文件路径。
:param bert_tokenizer_path: BERT 分词器的预训练模型路径。
:param label_path: 标签类别映射文件路径。
:param model_save_path: 模型保存路径。
:param summary_log_dir: TensorBoard 日志保存路径。
:param batch_size: 每个批次的样本数量,默认为 8。
:param total_epoch: 总训练轮数,默认为 10。
:param hidden_size: 隐藏层维度,默认为 128。
:param encoder_num_head: Transformer 编码器的头数,默认为 4。
:param encoder_num_layers: Transformer 编码器的层数,默认为 2。
:param target_padding_idx: 目标标签的填充索引,默认为 -100。
:param lr: 学习率,默认为 0.01。
"""
# 创建模型保存路径和日志路径的目录,如果不存在则创建
os.makedirs(model_save_path, exist_ok=True)
os.makedirs(summary_log_dir, exist_ok=True)
# 依赖信息的加载
'''
使用 BertTokenizer 加载预训练的 BERT 分词器。
加载标签类别映射(从文件中读取)。
定义保存最优模型和最后一个模型的路径。
'''
tokenizer = BertTokenizer.from_pretrained(bert_tokenizer_path)
label_categories = load_label_categories(label_path)
best_dump_path = os.path.join(model_save_path, "best.pkl")
last_dump_path = os.path.join(model_save_path, "last.pkl")
# 1. 数据加载及解析
train_dataloader = create_train_dataloader(
train_data_path, tokenizer, label_categories, batch_size,
target_padding_idx=target_padding_idx
)
eval_dataloader = create_eval_dataloader(
eval_data_path, tokenizer, label_categories, batch_size * 2,
target_padding_idx=target_padding_idx
)
# 2. 网络构造、损失函数构造、优化器构造
#初始化 NER 模型(基于 Transformer 编码器)
net = TransformerEncoderSoftmaxNerModel(
vocab_size=tokenizer.vocab_size,
hidden_size=hidden_size,
encoder_num_head=encoder_num_head,
encoder_num_layers=encoder_num_layers,
num_class=len(label_categories)
)
# 需要安装一个tensorboard库,一般情况下安装tensorflow的时候,会自动安装这个库
# pip install tensorboard==2.12.3
# 查看可视化页面,在命令行,执行以下命令:
# tensorboard --logdir log_dir
# tensorboard --logdir D:\workspaces\study\NLP04\projects\NamedEntityRecognition\medical_ner\test\output\medical\ner_softmax\logs
# 初始化 TensorBoard 日志记录器
writer = SummaryWriter(log_dir=summary_log_dir)
# writer.add_graph(net, input_to_model=torch.randint(0, 100, (4, 20)))
# 定义损失函数和优化器
loss_fn = CustomCrossEntropyLoss(ignore_index=target_padding_idx, summary_writer=writer)
train_fn = optim.SGD(params=net.parameters(), lr=lr)
# 3/4/5. 遍历训练、评估、持久化
# 初始化最佳准确率
best_acc = float('-inf')
#开始训练循环,遍历指定的总轮数。
for epoch in range(total_epoch):
# 当前epoch模型训练
net.train()
train_fn.zero_grad()
for batch, (x, mask, y) in enumerate(train_dataloader):
# a. 前向过程
score = net(token_ids=x, token_masks=mask) # score: [N,T,num_class]
loss = loss_fn(torch.permute(score, dims=(0, 2, 1)), y)
acc1 = token_accuracy(score, y, target_masks=mask)
negative_acc, positive_acc, acc2 = token_accuracy_v2(score, y, target_masks=mask)
# b. 反向更新
loss.backward()
train_fn.step()
train_fn.zero_grad()
# c. 日志的输出/运行过程的输出
writer.add_scalar('train_loss', loss.item())
writer.add_scalars('train_acc', {
'acc': acc2,
'positive': positive_acc,
'negative': negative_acc
})
print(f"Epoch {epoch}/{total_epoch} Batch {batch} Loss {loss.item():.5f} "
f"Token Accuracy {acc1:.3f} - {negative_acc:.3f} - {positive_acc:.3f} - {acc2:.3f}")
# 当前epoch的模型评估&持久化
net.eval()
with torch.no_grad():
eval_acc_number = [0.0, 0.0, 0.0, 0.0]
for _, (x, mask, y) in enumerate(eval_dataloader):
score = net(token_ids=x, token_masks=mask) # score: [N,?,num_class]
acc_number = calc_token_accuracy_number_v2(
score, y, mask
)
for i in range(4):
eval_acc_number[i] += acc_number[i]
eval_negative_acc = div(eval_acc_number[0], eval_acc_number[1])
eval_positive_acc = div(eval_acc_number[2], eval_acc_number[3])
eval_acc = div(eval_acc_number[0] + eval_acc_number[2], eval_acc_number[1] + eval_acc_number[3])
print(f"Epoch {epoch}/{total_epoch} "
f"Eval Token Accuracy {eval_negative_acc:.3f} - {eval_positive_acc:.3f} - {eval_acc:.3f}")
writer.add_scalars('eval_acc', {
'acc': eval_acc,
'positive': eval_positive_acc,
'negative': eval_negative_acc
}, global_step=epoch)
# 模型持久化
save_obj = {
'net': net.state_dict(),
'epoch': epoch,
'best_accuracy': eval_acc
}
if eval_acc > best_acc:
print(f"保存最优模型为 {epoch} {eval_acc} {best_dump_path}")
torch.save(save_obj, best_dump_path)
best_acc = eval_acc
torch.save(save_obj, last_dump_path)
# 训练完成后,关闭相关资源
writer.close()
10. 主函数
if __name__ == '__main__':
bert_tokenizer_path = r"D:\cache\huggingface\hub\models--bert-base-chinese\snapshots\c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f"
train_data_path = "./datas/medical/training.txt"
eval_data_path = "./datas/medical/test.json"
label_path = "./datas/medical/categories.json"
model_save_path = "./output/medical/ner_softmax/models"
summary_log_dir = "./output/medical/ner_softmax/logs"
train(
train_data_path, eval_data_path, bert_tokenizer_path, label_path,
model_save_path, summary_log_dir
)