基于LSTM的情感分析
导包
根据代码需要进行相关的导包
import os
import torch
import random
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
import torch.nn as nn
from tqdm import tqdm
相关路径文件的准备
# 原始数据路径
data_file = os.path.relpath(os.path.join(os.path.dirname(__file__),'data/hotel_discuss2.csv'))
#判断路径是否存在
if not os.path.exists(data_file):
print(f"{data_file}数据文件不存在!")
exit()
# 字典路径
dict_file = os.path.relpath(os.path.join(os.path.dirname(__file__),'data/dict.txt'))
# 编码后存放的路径
encoding_file = os.path.relpath(os.path.join(os.path.dirname(__file__),'data/encoding.txt'))
设备的切换
# 设备切换
device = "cuda" if torch.cuda.is_available() else "cpu"
数据准备(数据清洗)
# 需要过滤的字符(例如标点符号)
puncts = '。,'
# 词向量编码
with open(data_file, 'r', encoding='utf-8-sig') as f:
data = f.readlines()
for line in data:
# line = line.strip() 是一个在编程中常见的操作,主要用于去除字符串开头和结尾的空白字符。
# 这里的空白字符包括空格、制表符(tab)、换行符等。
line = line.strip()
# line字符串可进行遍历循环
for word in line:
# 先过滤掉不需要的字符
if word not in puncts:
# 判断遍历的词是否在字典中,不在,则加入字典中
# 达到一个词去重的目的
if word not in mydict:
# 从1开始编码
mydict[word] = code
code += 1
# 未知字符的编码
mydict['<unk>'] = code
# 将数据字典存到本地
with open(dict_file, 'w', encoding='utf-8-sig') as f:
f.write(str(mydict))
加载词向量文件
#加载字典内容
def load_dict(dict_file):
with open(dict_file, 'r', encoding='utf-8-sig') as file:
"""
file.read():这个方法会读取文件中的所有内容,并将其作为一个字符串返回。如果文件很大,这样做可能会消耗大量内存。
eval():这是一个内置函数,它会解析传入的字符串表达式,并在当前上下文中执行它。
这意味着如果文件中包含的是有效的Python表达式或语句,eval() 将执行这些代码并返回执行的结果。
重点
例如,如果文件中包含字典、列表或任何其他Python表达式,eval() 可以将它们转换成相应的对象。
"""
return eval(file.read())
new_dict = load_dict(dict_file)
定义词向量字典的长度
# 定义获取字典大小的函数
def get_dict_size(new_dict):
return len(new_dict)
打开原始数据,构建训练数据集
# 打开原始数据
with open(data_file, 'r', encoding='utf-8-sig') as f1:
# 写入编码文件
with open(encoding_file, 'w', encoding='utf-8-sig') as f2:
#将原始数据的每行数据进行编码
for line in f1.readlines():
# 去字符串两头的空格
line = line.strip()
# 获取标签和内容
label = line[0]
# 从2取是因为索引1的位置是,
content = line[2:]
# 对内容进行遍历
for ch in content:
# 剔除在puncts中的字符
if ch not in puncts:
# 写入每个词对应的编码
f2.write(str(new_dict[ch]))
# 每个词之间用逗号隔开
f2.write(',')
# 写入标签:用空格和内容进行分隔,并对下一行1数据进行换行
f2.write('\t'+str(label)+'\n')
数据长度的填充
#存放训练数据
values = []
#存放真实数据
labels = []
#预设每次输入的长度是256,不够就补0,超过的部分则不要
max_lens = 256
# 读取编码数据
with open(encoding_file, 'r', encoding='utf-8-sig') as f:
lines = f.readlines()
# random.shuffle(lines) 是 Python 中用于将列表 lines 的元素随机打乱顺序的方法。
random.shuffle(lines)
for line in lines:
# 根据空格来划分训练数据和标签
data, label = line.split('\t')
# 根据逗号进行分割,并转换为int类型,存入列表中
val = [int(i) for i in data[0:-1].split(',')]
# 如果i 大于 max_lens,则将val列表中的元素填充为0,直到 val 的长度等于 max_lens。
for i in range(max_lens - len(val)):
val.append(0)
val = val[:max_lens]
values.append(val)
labels.append(int(label))
# 将列表转换为numpy数组,并转换为int64类型
values = np.array(values, dtype='int64')
labels = np.array(labels, dtype='int64')
数据集的划分
# 数据集的划分
train_x, test_x, train_y, test_y = train_test_split(values, labels,
test_size=0.2, random_state=1)
转为Tensor数据集
本项目使用的pytorch的模型框架进行训练,所以数据集最终要转为tensor类型的数据集
# 将数据转换为tensor
train_data = TensorDataset(torch.from_numpy(train_x), torch.from_numpy(train_y))
test_data = TensorDataset(torch.from_numpy(test_x), torch.from_numpy(test_y))
数据集批次的划分
train_data = DataLoader(dataset=train_data, batch_size=64, shuffle=True)
test_data = DataLoader(dataset=test_data, batch_size=64, shuffle=True)
模型搭建
# 模型搭建
class LSTM(nn.Module):
def __init__(self, input_size, output_size, embedding_dim=512,hidden_size=512):
# 继承父类
super(LSTM, self).__init__()
# 定义隐藏层大小
self.hidden_size = hidden_size
# 词嵌入
# input_size:字典的大小
# embedding_dim:词向量的维度
# device:设备
self.embedding = nn.Embedding(input_size, embedding_dim, device=device)
# 定义LSTM层
# embedding_dim:词向量的维度
# hidden_size:隐藏层的维度
# num_layers:LSTM层的数量
self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers=4)
# 定义全连接层,输出层
# hidden_size:隐藏层的维度
# output_size:输出维度
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
# 词嵌入
embed = self.embedding(x)
# 数据形状:(seq_len, batch, input_size),因为batch_first=True,所以需要转置一下
permute_ = embed.permute(1, 0, 2)
# h0 = torch.zeros(1, permute_.size(0), self.hidden_size).to(device)
# c0 = torch.zeros(1, permute_.size(0), self.hidden_size).to(device)
# LSTM层,输出:(seq_len, batch, hidden_size)
# out: 输出,h_n:最后一个隐藏层的状态,c_n:最后一个cell的状态
out, _ = self.lstm(permute_)
# 输出层
"""
out[-1, :, :]:这个索引操作选择了out张量的最后一个时间步的所有批次和所有隐藏单元的数据。具体来说:
-1 表示选择序列中的最后一个时间步的输出。
: 表示选择该时间步下所有批次的数据。
第二个 : 表示选择这些批次数据中的所有隐藏单元。
"""
out = self.fc(out[-1, :, :])
return out
训练参数定义
# 学习率
lr = 1e-4
# 训练轮次
epochs = 20
模型、损坏函数、优化器的创建
model = LSTM(get_dict_size(new_dict),2)
# model.train()
model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)
loss_fc = nn.CrossEntropyLoss()
loss_old = 100
模型训练与测试
# 初始化保存训练和测试结果的列表
train_result = []
test_result = []
# 初始化保存训练和测试损失、精度的列表
train_losses = []
train_accuracies = []
test_losses = []
test_accuracies = []
# 循环训练每个周期
for epoch in range(1, epochs + 1):
# 创建一个进度条以显示训练数据的处理进度
pbar = tqdm(train_data)
# 初始化每个周期的总损失和总准确度
loss_all = 0
acc_all = 0
# 遍历训练数据集的每一个批次
for step, (x, y) in enumerate(pbar):
# 将标签数据移动到指定的设备(如GPU)
y = y.to(device)
# 将输入数据移动到指定的设备(如GPU)
x = x.to(device)
# 计算每个序列的实际长度(非零元素的数量)
# lengths = torch.sum(x != 0, dim=-1).cpu().long()
# 使用模型进行前向传播,得到输出
out = model(x)
# 计算损失
loss = loss_fc(out, y)
# 反向传播计算梯度
loss.backward()
# 更新模型参数
optimizer.step()
# 将梯度归零
optimizer.zero_grad()
# 累加损失
loss_all += loss.item()
# 计算当前批次的平均损失
loss_time = loss_all / (step + 1)
# 计算当前批次的准确度
# 使用 torch.mean() 计算这些 1.0 和 0.0 值的平均数,也就是所有预测正确的样本占总样本的比例,这实际上就是模型的准确率。
acc = torch.mean((y == torch.argmax(out, dim=-1)).float())
# 累加准确度
acc_all += acc
# 计算当前批次的平均准确度
acc_time = acc_all / (step + 1)
# 构建进度条的描述信息
s = "train => epoch:{} - step:{} - loss:{:.4f} - loss_time:{:.4f} - acc:{:.4f} - acc_time:{:.4f}".format(epoch, step, loss, loss_time, acc, acc_time)
# 设置进度条的描述
pbar.set_description(s)
# 将描述信息添加到训练结果列表中
train_result.append(s + "\n")
# 记录当前epoch的训练损失和精度
train_losses.append(loss_time)
train_accuracies.append(acc_time.item())
# 禁用梯度计算,因为我们只需要前向传播进行预测
with torch.no_grad():
# 创建一个进度条以显示测试数据的处理进度
pbar = tqdm(test_data)
# 初始化每个周期的总损失和总准确度
loss_all = 0
acc_all = 0
# 遍历测试数据集的每一个批次
for step, (x, y) in enumerate(pbar):
# 将标签数据移动到指定的设备(如GPU)
y = y.to(device)
# 将输入数据移动到指定的设备(如GPU)
x = x.to(device)
# 计算每个序列的实际长度(非零元素的数量)
lengths = torch.sum(x != 0, dim=-1).cpu().long()
# 使用模型进行前向传播,得到输出
out = model(x)
# 计算损失
loss = loss_fc(out, y)
# 累加损失
loss_all += loss.item()
# 计算当前批次的平均损失
test_loss_time = loss_all / (step + 1)
# 计算当前批次的准确度
acc = torch.mean((y == torch.argmax(out, dim=-1)).float())
# 累加准确度
acc_all += acc
# 计算当前批次的平均准确度
acc_time = acc_all / (step + 1)
# 构建进度条的描述信息
s = "test => epoch:{} - step:{} - loss:{:.4f} - loss_time:{:.4f} -acc:{:.4f} - acc_time:{:.4f}".format(epoch, step, loss, test_loss_time, acc, acc_time)
# 设置进度条的描述
pbar.set_description(s)
# 将描述信息添加到测试结果列表中
test_result.append(s + "\n")
# 记录当前epoch的测试损失和精度
test_losses.append(test_loss_time)
test_accuracies.append(acc_time.item())
# 将训练结果写入文件
with open(f"train_result.txt", "w") as f:
f.writelines(train_result)
# 将测试结果写入文件
with open(f"test_result.txt", "w") as f:
f.writelines(test_result)
# 保存模型,如果当前测试损失小于之前记录的最小损失
if loss_old > test_loss_time:
loss_old = test_loss_time
torch.save(model.state_dict(), f"model.pkl")
应用
import torch
from data_deal import *
from model import LSTM
# 加载字典
new_dict = load_dict(dict_file)
# 设备切换
device = "cuda" if torch.cuda.is_available() else "cpu"
# 模型加载
model = LSTM(get_dict_size(new_dict), 2)
model.load_state_dict(torch.load("model.pkl"))
model.to(device)
model.eval() # 设置为评估模式
# 用户输入
while True:
text = input("请输入文本(输入'退出'以结束):")
if text == "退出":
break
# 词编码
text = [new_dict[ch] for ch in sen]
text = torch.tensor(text).unsqueeze(0).to(device)
prediction = model(text)
prediction = torch.argmax(prediction, dim=1).item()
print(f"预测结果: {'正面' if prediction == 1 else '负面'}")