Pytorch封装简单RNN模型,进行中文训练及文本预测
简述
使用pytorch封装简单RNN模型,使用单层nn.RNN
、nn.Linear
等实现,然后做简单的文本预测。
数据集
代码参考李沐:https://zh-v2.d2l.ai/chapter_recurrent-neural-networks/rnn-concise.html,但他使用的是一篇英文小说,
这里改为使用COIG-CQIA
的中文数据集中的:douban_book_introduce.jsonl
、ruozhiba_ruozhiba_ruozhiba.jsonl
两个文件,本文目的是为了学习rnn,所以数据集比较简单,不过这个数据集由于都是问答形式,不像小说那样有主题性,所以感觉学习效果不好。理想的应该还是找个中文长篇小说之类。
COIG-CQIA:
https://huggingface.co/datasets/m-a-p/COIG-CQIA
另外由于COIG-CQIA
的数据是指令问答形式的json文件,所以这里稍作处理,改为单个问题+答案
为一行的纯文本txt格式, 去除其它json字段及各种符号。
代码如下:
def jsonl_to_txt(dir_path):
dict_list = []
jsonl_list = os.listdir(dir_path)
qa_list = list()
chars_to_remove = r'[,。?;、:“”:!~()『』「」【】\"\[\]➕〈〉/<>()‰\%《》\*\?\-\.…·○01234567890123456789•\n\t abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ—*]'
for jsonl in jsonl_list:
path = os.path.join(dir_path, jsonl)
print(path)
with open(path, 'r', encoding='utf-8') as f:
jsonl_data = f.readlines()
for line in jsonl_data:
line_dict = JSON.loads(line)
qa = line_dict['instruction'] + line_dict['output']
qa = re.sub(chars_to_remove, '', qa).strip()
qa_list.append(qa)
path = os.path.join(dir_path, 'chengyu_qa.txt')
with open(path, 'w', encoding='utf-8') as f:
f.write('\n'.join(qa_list))
if __name__ == '__main__':
dir_path = '../data/COIG-CQIA'
jsonl_to_txt(dir_path)
print()
上面处理完毕后,还需要进行词元化、构建词典等步骤,参考:
python实现简单中文词元化、词典构造、时序数据集封装等-CSDN博客
模型封装
RNN — PyTorch 2.4 documentation
可以先观察一下tensorboard的add_graph
函数对模型可视化后的结构:
这里使用单层的RNN(nn.RNN
有默认参数num_layers=1
),nn.functional.one_hot
是为了实现单词的向量化表示,后续可以优化成nn.Embedding
来做词向量。
在nn.functional.one_hot
前将x
进行了转置,这里有点抽象,来关注一下nn.RNN
的参数要求,便可理解。
先看x的初始shape为(batch_size, time_size)
,转置并向量化后为(time_size, batch_size, vocab_size)
。
若不转置直接向量化,则为(batch_size, time_size, vocab_size)
,实际上这两种格式的数据nn.RNN
都支持。
但若为(batch_size, time_size, vocab_size)
形式,则需在创建nn.RNN
实例时指定参数batch_first=False。
另外,还需要提供一个初始的隐状态,这里用init_state
函数实现。
class SimpleRNNModel(nn.Module):
def __init__(self, vocab_size, hidden_size):
super(SimpleRNNModel, self).__init__()
self.vocab_size = vocab_size
self.hidden_size = hidden_size
self.rnn = nn.RNN(vocab_size, hidden_size)
self.linear = nn.Linear(hidden_size, vocab_size)
def forward(self, x, hidden_state=None):
x = nn.functional.one_hot(x.T.long(), num_classes=self.vocab_size)
x = x.to(torch.float32)
outputs, hidden_state = self.rnn(x, hidden_state)
# rrn的outputs.shape(N, L, D*H)
outputs = outputs.reshape(-1, self.hidden_size)
outputs = self.linear(outputs)
return outputs, hidden_state
def init_state(self, device, batch_size=1):
return torch.zeros((self.rnn.num_layers, batch_size, self.hidden_size), device=device)
梯度裁剪
源自李沐:https://zh-v2.d2l.ai/chapter_recurrent-neural-networks/rnn-scratch.html
def grad_clipping(net, max_norm):
if isinstance(net, nn.Module):
params = [p for p in net.parameters() if p.requires_grad]
else:
params = net.params
norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
if norm > max_norm:
for param in params:
param.grad[:] *= max_norm / norm
模型训练
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'\ndevice: {device}')
corpus, vocab = load_corpus("../data/COIG-CQIA/qa_list.txt")
vocab_size = len(vocab)
hidden_size = 256
epochs = 5
batch_size = 50
learning_rate = 0.01
time_size = 4
max_grad_max_norm = 0.5
dataset = make_dataset(corpus=corpus, time_size=time_size)
data_loader = data.DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True)
net = SimpleRNNModel(vocab_size, hidden_size)
net.to(device)
# print(net.state_dict())
criterion = nn.CrossEntropyLoss()
criterion.to(device)
optimizer = optim.Adam(net.parameters(), lr=learning_rate)
writer = SummaryWriter('./train_logs')
# 随便定义个输入, 好使用add_graph
tmp = torch.rand((batch_size, time_size)).to(device)
writer.add_graph(net, tmp)
loss_counter = 0
total_loss = 0
ppl_list = list()
total_train_step = 0
for epoch in range(epochs):
print('------------Epoch {}/{}'.format(epoch + 1, epochs))
for X, y in data_loader:
X, y = X.to(device), y.to(device)
# 如果各个批次间的时序是连续的,则可以把上次的hidden_state传入下个批次, 不然就要重置hidden_state
# 这里batch_size=X.shape[0]是因为在加载数据时, DataLoader没有设置丢弃不完整的批次, 所以存在实际批次不满足设定的batch_size
hidden_state = net.init_state(batch_size=X.shape[0], device=device)
outputs, hidden_state = net(X, hidden_state=hidden_state)
optimizer.zero_grad()
# y也变成 时间序列*批次大小的行数, 才和 outputs 一致
y = y.T.reshape(-1)
# 交叉熵的第二个参数需要LongTorch
loss = criterion(outputs, y.long())
loss.backward()
# 求完梯度之后可以考虑梯度裁剪, 再更新梯度
grad_clipping(net, max_grad_max_norm)
optimizer.step()
total_loss += loss.item()
loss_counter += 1
total_train_step += 1
if total_train_step % 10 == 0:
print(f'Epoch: {epoch + 1}, 累计训练次数: {total_train_step}, 本次loss: {loss.item():.4f}')
writer.add_scalar('train_loss', loss.item(), total_train_step)
ppl = np.exp(total_loss / loss_counter)
ppl_list.append(ppl)
print(f'Epoch {epoch + 1} 结束, batch_loss_average: {total_loss / loss_counter}, perplexity: {ppl}')
writer.add_scalar('ppl', ppl, epoch + 1)
total_loss = 0
loss_counter = 0
torch.save(net.state_dict(), './save/epoch_{}_ppl_{}.pth'.format(epoch + 1, ppl))
writer.close()
tensorboard训练过程观察
横轴为训练epoch。
横轴为训练次数。
文本预测
这里首先完善模型的预测函数(该函数放到模型中):
def predict(self, prefix, num_preds, vocab, device):
state = self.init_state(batch_size=1, device=device)
# prefix为字符, 转成索引
outputs = [vocab.word2idx(prefix[0])]
get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
# 一个字符一个字符跑一遍, 对用户输入进行预热, 即对输入的各个字符间建立联系
for y in prefix[1:]: # 预热期
_, state = self.forward(get_input(), state)
outputs.append(vocab.word2idx(y))
# 刚好每次都用上一次的预测值做输入
for _ in range(num_preds): # 预测num_preds步
y, state = self.forward(get_input(), state)
outputs.append(int(y.argmax(dim=1).reshape(1)))
return ''.join([vocab.idx2word(i) for i in outputs])
实现对提示词处理及预测函数的调用:
注意:这里的语料库应和训练使用的一致。
def predict(state_dict_path, vocab, prefix=None, num_preds=3):
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
vocab_size = len(vocab)
hidden_size = 256
net = SimpleRNNModel(vocab_size, hidden_size).to(device)
net.load_state_dict(torch.load(state_dict_path, map_location=device, weights_only=True))
net.eval()
with torch.no_grad():
outputs = net.predict(prefix=prefix, num_preds=num_preds, vocab=vocab, device=device)
return outputs
if __name__ == '__main__':
corpus, vocab = load_corpus("../data/COIG-CQIA/qa_list.txt")
# corpus, vocab = load_corpus("../data/COIG-CQIA/chengyu_qa.txt")
# print(len(vocab))
# idx = [vocab.word2idx(ch) for ch in prefix]
path = "../save/Simple/新建文件夹/state_dict-time_size_30-ppl_1.pth"
prefix = "有什么超赞的诗句"
print(f'提示词: {prefix}')
outputs = predict(path, vocab, prefix=prefix, num_preds=22)
print(f'预测输出: {outputs}\n')