【NLP 16、实践 ③ 找出特定字符在字符串中的位置】
看着父亲苍老的白发和渐渐老态的面容
希望时间再慢一些
—— 24.12.19
一、定义模型
1.初始化模型
① 初始化父类
super(TorchModel, self).__init__(): 调用父类 nn.Module 的初始化方法,确保模型能够正确初始化。
② 创建嵌入层
self.embedding = nn.Embedding(len(vocab), vector_dim): 创建一个嵌入层,将词汇表中的每个词映射到一个 vector_dim 维度的向量。
③ 创建RNN层
self.rnn = nn.RNN(vector_dim, vector_dim, batch_first=True): 创建一个 RNN 层,输入和输出的特征维度均为 vector_dim,并且输入数据的第一维是批量大小。
④ 创建线性分类层
self.classify = nn.Linear(vector_dim, sentence_length + 1): 创建一个线性层,将 RNN 输出的特征向量映射到 sentence_length + 1 个分类标签。+1 是因为可能有某个词不存在的情况,此时的真实标签被设为 sentence_length。
⑤ 定义损失函数
self.loss = nn.functional.cross_entropy: 定义交叉熵损失函数,用于计算模型预测值与真实标签之间的差异。
class TorchModel(nn.Module):
def __init__(self, vector_dim, sentence_length, vocab):
super(TorchModel, self).__init__()
self.embedding = nn.Embedding(len(vocab), vector_dim) #embedding层
# self.pool = nn.AvgPool1d(sentence_length) #池化层
#可以自行尝试切换使用rnn
self.rnn = nn.RNN(vector_dim, vector_dim, batch_first=True)
# +1的原因是可能出现a不存在的情况,那时的真实label在构造数据时设为了sentence_length
self.classify = nn.Linear(vector_dim, sentence_length + 1)
self.loss = nn.functional.cross_entropy
2、前向传播定义
① 输入嵌入
x = self.embedding(x):将输入 x 通过嵌入层转换为向量表示
② RNN处理
rnn_out, hidden = self.rnn(x):将嵌入后的向量输入到RNN层,得到每个时间步的输出 rnn_out 和最后一个时间步的隐藏状态 hidden。
③ 提取特征
x = rnn_out[:, -1, :]:从RNN的输出中提取最后一个时间步(最后一维)的特征向量。
④ 分类
y_pred = self.classify(x):将提取的特征向量通过线性层进行分类,得到预测值 y_pred。
⑤ 损失计算
如果提供了真实标签 y,则计算并返回损失值;否则,返回预测值。
#当输入真实标签,返回loss值;无真实标签,返回预测值
def forward(self, x, y=None):
x = self.embedding(x)
#使用pooling的情况,先使用pooling池化层会丢失模型语句的时序信息
# x = x.transpose(1, 2)
# x = self.pool(x)
# x = x.squeeze()
#使用rnn的情况
# rnn_out:每个字对应的向量 hidden:最后一个输出的隐含层对应的向量
rnn_out, hidden = self.rnn(x)
# 中间维度改变,变成(batch_size数据样本数量, sentence_length文本长度, vector_dim向量维度)
x = rnn_out[:, -1, :] #或者写hidden.squeeze()也是可以的,因为rnn的hidden就是最后一个位置的输出
#接线性层做分类
y_pred = self.classify(x)
if y is not None:
return self.loss(y_pred, y) #预测值和真实值计算损失
else:
return y_pred
二、数据
1.建立词表
① 定义字符集
定义一个字符集 chars,包含字母 'a' 到 'k'。
② 定义字典
初始化一个字典 vocab,其中键为 'pad',值为 0。
③ 遍历字符集
使用 enumerate 遍历字符集 chars,为每个字符分配一个唯一的序号,从 1 开始。
④ 定义unk键
添加一个特殊的键 'unk',其值为当前字典的长度(即 26)。
⑤ 返回词汇表
将生成的词汇表返回
#字符集随便挑了一些字,实际上还可以扩充
#为每个字生成一个标号
#{"a":1, "b":2, "c":3...}
#abc -> [1,2,3]
def build_vocab():
chars = "abcdefghijk" #字符集
vocab = {"pad":0}
for index, char in enumerate(chars):
vocab[char] = index+1 #每个字对应一个序号
vocab['unk'] = len(vocab) #26
return vocab
2.随机生成样本
① 采样
random.sample(list(vocab.keys()), sentence_length):从词汇表 vocab 的键中随机选择 sentence_length 个不同的字符,生成列表 x
② 标签生成
index('a'):检查列表 x 中是否包含字符 "a",如果包含,记录 "a" 在列表中的索引位置为 y,否则,设置 y 为 sentence_length。
③ 转换
将列表 x 中的每个字符转换为其在词汇表中的序号,如果字符不在词汇表中,则使用 unk 的序号
④ 返回结果
返回转换后的列表 x 和标签 y
#随机生成一个样本
def build_sample(vocab, sentence_length):
#注意这里用sample,是不放回的采样,每个字母不会重复出现,但是要求字符串长度要小于词表长度
x = random.sample(list(vocab.keys()), sentence_length)
#指定哪些字出现时为正样本
if "a" in x:
y = x.index("a")
else:
y = sentence_length
x = [vocab.get(word, vocab['unk']) for word in x] #将字转换成序号,为了做embedding
return x, y
3.建立数据集
① 初始化数据集
创建两个空列表 dataset_x 和 dataset_y,用于存储生成的样本和对应的标签
② 生成样本
使用 for 循环,循环次数为 sample_length,即需要生成的样本数量。在每次循环中,调用 build_sample 函数生成一个样本 (x, y),其中 x 是输入数据,y 是标签
③ 存储样本
将生成的样本 x 添加到 dataset_x 列表中。将生成的标签 y 添加到 dataset_y 列表中
④ 返回数据集
将 dataset_x 和 dataset_y 转换为 torch.LongTensor 类型,以便在 PyTorch 中使用。返回转换后的数据集。
#建立数据集
#输入需要的样本数量。需要多少生成多少
def build_dataset(sample_length, vocab, sentence_length):
dataset_x = []
dataset_y = []
for i in range(sample_length):
x, y = build_sample(vocab, sentence_length)
dataset_x.append(x)
dataset_y.append(y)
return torch.LongTensor(dataset_x), torch.LongTensor(dataset_y)
三、模型测试、训练、评估
1.建立模型
① 参数:
vocab:词汇表,通常是一个包含所有字符或单词的列表或字典
char_dim:字符的维度,即每个字符在嵌入层中的向量长度
sentence_length:句子的最大长度
② 过程:
使用传入的参数 char_dim、sentence_length 和 vocab 实例化一个 TorchModel 对象并返回
#建立模型
def build_model(vocab, char_dim, sentence_length):
model = TorchModel(char_dim, sentence_length, vocab)
return model
2.测试模型
① 设置模型为评估模式
model.eval():将模型设置为评估模式,禁用 dropout 等训练时的行为
② 生成测试数据集
调用 build_dataset 函数生成 200 个用于测试的样本
③ 打印样本数量
输出当前测试集中样本的数量
④ 模型预测
使用 torch.no_grad() 禁用梯度计算,提高推理速度并减少内存消耗,然后对生成的测试数据进行预测
⑤ 计算准确率
遍历预测结果和真实标签,统计正确和错误的预测数量,并计算准确率
⑥ 输出结果
打印正确预测的数量和准确率,并返回准确率
#测试代码
#用来测试每轮模型的准确率
def evaluate(model, vocab, sample_length):
model.eval()
x, y = build_dataset(200, vocab, sample_length) #建立200个用于测试的样本
print("本次预测集中共有%d个样本"%(len(y)))
correct, wrong = 0, 0
with torch.no_grad():
y_pred = model(x) #模型预测
for y_p, y_t in zip(y_pred, y): #与真实标签进行对比
if int(torch.argmax(y_p)) == int(y_t):
correct += 1
else:
wrong += 1
print("正确预测个数:%d, 正确率:%f"%(correct, correct/(correct+wrong)))
return correct/(correct+wrong)
3.模型训练
① 配置参数
设置训练轮数epoch_num、批量大小batch_size、训练样本数train_sample、字符维度char_dim、句子长度sentence_length和学习率learning_rate
② 建立字表
调用 build_vocab 函数生成字符到索引的映射。
③ 建立模型
调用 build_model 函数创建模型。
④ 选择优化器
torch.optim.Adam(model.parameters(), lr=learning_rate):使用 Adam 优化器
⑤ 训练过程
model.train():模型进入训练模式。每个 epoch 中,按批量生成训练数据,计算损失,反向传播并更新权重。记录每个 epoch 的平均损失。
⑥ 评估模型
每个 epoch 结束后,调用 evaluate 函数评估模型性能。
⑦ 记录日志
记录每个 epoch 的准确率和平均损失。
⑧ 绘制图表
绘制准确率和损失的变化曲线。
⑨ 保存模型和词表
保存模型参数和词表
def main():
#配置参数
epoch_num = 20 #训练轮数
batch_size = 40 #每次训练样本个数
train_sample = 1000 #每轮训练总共训练的样本总数
char_dim = 30 #每个字的维度
sentence_length = 10 #样本文本长度
learning_rate = 0.001 #学习率
# 建立字表
vocab = build_vocab()
# 建立模型
model = build_model(vocab, char_dim, sentence_length)
# 选择优化器
optim = torch.optim.Adam(model.parameters(), lr=learning_rate)
log = []
# 训练过程
for epoch in range(epoch_num):
model.train()
watch_loss = []
for batch in range(int(train_sample / batch_size)):
x, y = build_dataset(batch_size, vocab, sentence_length) #构造一组训练样本
optim.zero_grad() #梯度归零
loss = model(x, y) #计算loss
loss.backward() #计算梯度
optim.step() #更新权重
watch_loss.append(loss.item())
print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))
acc = evaluate(model, vocab, sentence_length) #测试本轮模型结果
log.append([acc, np.mean(watch_loss)])
#画图
plt.plot(range(len(log)), [l[0] for l in log], label="acc") #画acc曲线
plt.plot(range(len(log)), [l[1] for l in log], label="loss") #画loss曲线
plt.legend()
plt.show()
#保存模型
torch.save(model.state_dict(), "model.pth")
# 保存词表
writer = open("vocab.json", "w", encoding="utf8")
writer.write(json.dumps(vocab, ensure_ascii=False, indent=2))
writer.close()
return
四、模型预测
1.训练并保存模型
if __name__ == "__main__":
main()
2.预测数据
用保存的训练好的模型进行预测
① 初始化参数
设置每个字的维度 char_dim 和样本文本长度 sentence_length
② 加载字符表
从指定路径加载字符表 vocab
③ 建立模型
调用 build_model 函数构建模型
④ 加载模型权重
从指定路径加载预训练的模型权重
⑤ 序列化输入
将输入字符串转换为模型所需的输入格式
⑥ 模型预测
将输入数据传递给模型进行预测
⑦ 输出结果
打印每个输入字符串的预测类别和概率值
#使用训练好的模型做预测
def predict(model_path, vocab_path, input_strings):
char_dim = 30 # 每个字的维度
sentence_length = 10 # 样本文本长度
vocab = json.load(open(vocab_path, "r", encoding="utf8")) #加载字符表
model = build_model(vocab, char_dim, sentence_length) #建立模型
model.load_state_dict(torch.load(model_path,weights_only=True)) #加载训练好的权重
x = []
for input_string in input_strings:
x.append([vocab[char] for char in input_string]) #将输入序列化
model.eval() #测试模式
with torch.no_grad(): #不计算梯度
result = model.forward(torch.LongTensor(x)) #模型预测
for i, input_string in enumerate(input_strings):
print("输入:%s, 预测类别:%s, 概率值:%s" % (input_string, torch.argmax(result[i]), result[i])) #打印结果
3.调用函数进行预测
if __name__ == "__main__":
# main()
test_strings = ["kijabcdefh", "gijkbcdeaf", "gkijadfbec", "kijhdefacb"]
predict("model.pth", "vocab.json", test_strings)