NLP深度学习 DAY4:Word2Vec详解:两种模式(CBOW与Skip-gram)
用稀疏向量表示文本,即所谓的词袋模型在 NLP 有着悠久的历史。正如上文中介绍的,早在 2001年就开始使用密集向量表示词或词嵌入。Mikolov等人在2013年提出的创新技术是通过去除隐藏层,逼近目标,进而使这些单词嵌入的训练更加高效。虽然这些技术变更本质上很简单,但它们与高效的word2vec配合使用,便能使大规模的词嵌入训练成为可能。
0 前言:词袋模型
首先解释一下什么是词袋模型:
词袋模型(Bag-of-Words, BOW) 是最早、最经典的文本表示方法之一(1940左右出现,甚至比n-gram还要早,我愿称之为原始祖师爷)。它将一段文本看作一个「袋子」:里面盛放了这个文本所含的所有单词,但忽略了单词的顺序、句法结构等信息,只关注单词是否出现以及出现次数(或频率)。这就好比把句子里的单词都抓出来扔进一个袋子,摇匀后再数一数这些单词都有什么、各出现几次。
1.1 典型示例
假设我们的词表是 "I","like","apples","banana",大小 V=4(指的就是一共有4个词)。
有两句话:
- “I like apples”
- “I like banana apples”
那么在词袋表示下:
-
对于 “I like apples”:
- "I" 出现 1 次
- "like" 出现 1 次
- "apples"出现 1 次
- "banana"出现 0 次
向量表示可写作[1, 1, 1, 0]
-
对于 “I like banana apples”:
- "I" 出现 1 次
- "like" 出现 1 次
- "apples"出现 1 次
- "banana"出现 1 次
向量表示可写作[1, 1, 1, 1]
可以看到,这种表示只管词的出现情况,不会去记录 “banana”是在“apples”前还是后出现,也不会记录它们之间的距离。这样就得到最纯粹的词袋表示。
一、词向量引入
先来考虑一个问题:如何能将文本向量化呢?听起来比较抽象,我们可以先从人的角度来思考。
如何用一个数值向量描述一个人呢?只用身高或者体重,还是综合其各项指标呢?当然是综合各项指标能更加准确的描述一个人啦,具体什么指标还得看你能收集到什么指标。比如除了常规的身高、体重外,我们还可以用人的性格,从内向到外向设置为从-1到+1,人的性格让“专家”去打分,从而获取人性格的数值化数据。
只要有了向量,就可以用不同方法(欧氏距离、曼哈顿距离、切比雪夫距离、余弦相似度等)来计算两个向量之间的相似度了!
通常来说,向量的维度越高,能提供的信息也就越多,从而计算结果的可靠性就更值得信赖
现在回到正题,如何描述词的特征?通常都是在词的层面上来构建特征。Word2Vec就是要把词转化为向量。
下图展示了一个50维的词向量:
假设现在已经拿到了一份训练好的词向量,其中每一个词都表示为50维的向量,如下图所示:
如果将它们在热度图中显示,结果如下:
在结果中可以发现,相似的词在特征表达中比较相似,也就是说明词的特征是有实际意义的!
二、词向量模型
在词向量模型中输入和输出是什么?中间这个黑盒又是什么?
如下图所示,在词向量模型中,输入可以是多个词。例如下面所示的,输入是 Thou 和 shalt,模型的任务是预测它们的下一个词是什么。
早期的神经网络的词嵌入方法(word2vec出现之前使用n-gram去训练原始词表里面的300维向量 差不多2001年)
最后一层连接了 SoftMax,所以网络的输出是所有词可能是下一个词的概率。
那么有人就会问了,输入是文字,文字怎么输入到神经网络中啊 ?这个问题很好,我们通常会用一个 Embedding 层来解决这个问题。如下图所示,在神经网络初始化的时候,我们会随机初始化一个 N×K 的矩阵,其中 N 是 词典的大小,K 是词向量的维数(一个自行设定的超参数)。然后,我们会用一个 N×N 的矩阵和 N×K 的矩阵相乘,得到一个新的 N×K的矩阵向下进行前向传播。其中,N×N 的矩阵会在输入的文字的对应对角线上设置为1,其余位置均为0。N×K 的矩阵是随机初始化的,通过反向传播进行更新调整。
下面展示了一个例子(假设输入的两个词在词典中的位置是2和3处):
三、训练数据构建(还是早期的n-gram模型)
问:我们的训练数据应该从哪找呢?
答:一切具有正常逻辑的语句都可以作为训练数据。如小说、论文等。
如果我们有一个句子,那么我们可以按照下面你的方式构建数据集,选出前三个词,用前两个作为词模型的输入,最后一个词作为词模型输出的目标,继而进行训练。如下图所示:
然后,我们还可以将”窗口“往右平移一个词,如下图所示,构造一个新的训练数据
当然,这个”窗口“的宽度也是可以自己设置的,在上例中,窗口宽度设置为 3,也可以设置为 4、5、6 等等
四、Word2vec(2013年)不同模型对比
4.1 CBOW
CBOW的全称是continuous bag of words(连续词袋模型)。其本质也是通过context word(背景词)来预测target word(目标词)。
CBOW之所以叫连续词袋模型,是因为在每个窗口内它也不考虑词序信息,因为它是直接把上下文的词向量相加了,自然就损失了词序信息。CBOW抛弃了词序信息,指的就是在每个窗口内部上下文直接相加而没有考虑词序。
用 CBOW 构造数据集的例子1如下图所示:
例子2:
假设我们有一个简单的句子:
“The cat sits on the mat”
-
选择“sits”作为目标词(w_t ),窗口大小设置为 2,意味着它的上下文是:
- 左边两个词: “The”, “cat”
- 右边两个词: “on”, “the” (如果只算到 “on the” 可能这样)
-
构建训练样本时,就会出现:
- 输入:上下文词“[The, cat, on, the]”各自的向量,合并/平均后得到 vcontext。
- 输出:目标词“sits”在词表上的概率。
对于句子中的其他位置,也会类似地滑动窗口,把每个词当作目标词,然后获取它的上下文词,构建训练样本。例如,把“cat”当目标词时,上下文就是 [The], [sits, on]
(在超出边界时,窗口不够可特殊处理)。
最大缺点:为什么说 “CBOW 不考虑词序”(之所以叫连续词袋模型)
- 在同一个窗口里,CBOW 只是把上下文词向量进行相加/平均。
- 这样做后,我们无法区分“cat sits on the mat” 和 “on the cat sits the mat” 这样的词序变化,因为最终得到的上下文向量是一样的。
- 这就是“Bag of Words”的含义:视上下文词为一个无序的集合。
尽管 CBOW 在窗口内部抛弃了词序信息,但它仍然是“连续”地按照窗口来遍历整篇文本(不会跨句子或任意地远距离取词),所以叫 “Continuous Bag of Words”。
4.2 Skip-gram 模型
Skip-gram 模型和 CBOW 相反,Skip-gram 模型的输入是一个词汇,输出则是该词汇的上下文。如下图所示:
下面举一个例子,设”窗口“宽度为5,每次用”窗口“的第三个也就是中的词汇作为输入,其余上下文作为输出,分别构建数据集,如下图所示:
然后用构建好的数据集丢给词模型进行训练,如下图所示:
如果一个语料库稍微大一点,可能的结果就太多了,最后一层 SoftMax 的计算就会很耗时,有什么办法来解决吗?
下面提出了一个初始解决方案:假设,传统模型中,我们输入 not ,希望输出是 thou,但是由于语料库庞大,最后一层 SoftMax 太过耗时,所以我们可以改为:将 not 和 thou 同时作为输入,做一个二分类问题,类别 1 表示 not 和 thou 是邻居,类别 0 表示它们不是邻居。
上面提到的解决方案出发点非常好,但是由于训练集本来就是用上下文构建出来的,所以训练集构建出来的标签全为 1 ,无法较好的进行训练,如下图所示:
改进方案:加入一些负样本(负采样模型),一般负采样个数为 5 个就好,负采样示意图如下图所示:
最大缺点:和上面的CBOW一样无法考虑词序
-
Skip-gram输入是目标词,输出是预测其上下文词。与 CBOW 类似,上下文词也被视为独立的个体,不关心它们的顺序或位置。
-
例如,对于目标词 "sat",模型会尝试预测周围的词(如 "The", "cat", "on", "the"),但预测过程中这些词的顺序无关紧要。
4.3 CBOW 和 Skip-gram 对比
五、词向量训练过程
5.1 初始化词向量矩阵
5.2 训练模型
通过神经网络反向传播来计算更新,此时不光更新权重参数矩阵W,也会更新输入数据
训练完成后,我们就得到了比较准确的 Word Embeddings,从而得到了每个词的向量表示!!!
六、Python 代码实战
6.1 Model
from torch import nn
class DNN(nn.Module):
def __init__(self, vocabulary_size, embedding_dim):
super(DNN, self).__init__()
self.embedding = nn.Linear(vocabulary_size, embedding_dim, bias=False)
print("embedding_size:", list(self.embedding.weight.size()))
self.layers = nn.Sequential(
nn.Linear(vocabulary_size * embedding_dim, embedding_dim // 2),
nn.LeakyReLU(),
nn.Linear(embedding_dim // 2, 4),
nn.LeakyReLU(),
nn.Linear(4, 1),
)
# Mean squared error loss
self.criterion = nn.MSELoss()
# self.criterion = nn.CrossEntropyLoss()
def forward(self, x):
x = self.embedding(x)
x = x.view(x.size()[0], -1)
x = self.layers(x)
x = x.squeeze(1)
return x
def cal_loss(self, pred, target):
""" Calculate loss """
return self.criterion(pred, target)
6.2 DataSet
import random
import numpy as np
from torch.utils.data import Dataset
class MyDataSet(Dataset):
def __init__(self, features, labels):
self.features = features
self.labels = labels
def __getitem__(self, index):
return self.features[index], self.labels[index]
def __len__(self):
return len(self.features)
def get_data_set(data_path, window_width, window_step, negative_sample_num):
with open(data_path, 'r', encoding='utf-8') as file:
document = file.read()
document = document.replace(",", "").replace("?", "").replace(".", "").replace('"', '')
data = document.split(" ")
print(f"数据中共有 {len(data)} 个单词")
# 构造词典
vocabulary = set()
for word in data:
vocabulary.add(word)
vocabulary = list(vocabulary)
print(f"词典大小为 {len(vocabulary)}")
# index_dict
index_dict = dict()
for index, word in enumerate(vocabulary):
index_dict[word] = index
# 开始滑动窗口,构造数据
features = []
labels = []
neighbor_dict = dict()
for start_index in range(0, len(data), window_step):
if start_index + window_width - 1 < len(data):
mid_index = int((start_index + start_index + window_width - 1) / 2)
for index in range(start_index, start_index + window_width):
if index != mid_index:
feature = np.zeros((len(vocabulary), len(vocabulary)))
feature[index_dict[data[index]]][index_dict[data[index]]] = 1
feature[index_dict[data[mid_index]]][index_dict[data[mid_index]]] = 1
features.append(feature)
labels.append(1)
if data[mid_index] in neighbor_dict.keys():
neighbor_dict[data[mid_index]].add(data[index])
else:
neighbor_dict[data[mid_index]] = {data[index]}
# 负采样
for _ in range(negative_sample_num):
random_word = vocabulary[random.randint(0, len(vocabulary))]
for word in vocabulary:
if random_word not in neighbor_dict.keys() or word not in neighbor_dict[random_word]:
feature = np.zeros((len(vocabulary), len(vocabulary)))
feature[index_dict[random_word]][index_dict[random_word]] = 1
feature[index_dict[word]][index_dict[word]] = 1
features.append(feature)
labels.append(0)
break
# 返回dataset和词典
return MyDataSet(features, labels), vocabulary, index_dict
6.3 Main
import random
from math import sqrt
import numpy as np
import torch
from torch.utils.data import DataLoader
from Python.DataSet import get_data_set
from Python.Model import DNN
def same_seed(seed):
"""
Fixes random number generator seeds for reproducibility
固定时间种子。由于cuDNN会自动从几种算法中寻找最适合当前配置的算法,为了使选择的算法固定,所以固定时间种子
:param seed: 时间种子
:return: None
"""
torch.backends.cudnn.deterministic = True # 解决算法本身的不确定性,设置为True 保证每次结果是一致的
torch.backends.cudnn.benchmark = False # 解决了算法选择的不确定性,方便复现,提升训练速度
np.random.seed(seed) # 按顺序产生固定的数组,如果使用相同的seed,则生成的随机数相同, 注意每次生成都要调用一次
torch.manual_seed(seed) # 手动设置torch的随机种子,使每次运行的随机数都一致
random.seed(seed)
if torch.cuda.is_available():
# 为GPU设置唯一的时间种子
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
def train(model, train_loader, config):
# Setup optimizer
optimizer = getattr(torch.optim, config['optimizer'])(
model.parameters(), **config['optim_hyper_paras'])
device = config['device']
epoch = 0
while epoch < config['n_epochs']:
model.train() # set model to training mode
loss_arr = []
for x, y in train_loader: # iterate through the dataloader
optimizer.zero_grad() # set gradient to zero
x, y = x.to(device).to(torch.float32), y.to(device).to(torch.float32) # move data to device (cpu/cuda)
pred = model(x) # forward pass (compute output)
mse_loss = model.cal_loss(pred, y) # compute loss
mse_loss.backward() # compute gradient (backpropagation)
optimizer.step() # update model with optimizer
loss_arr.append(mse_loss.item())
print(f"epoch: {epoch}/{config['n_epochs']} , loss: {np.mean(loss_arr)}")
epoch += 1
print('Finished training after {} epochs'.format(epoch))
def find_min_distance_word_vector(cur_i, vector, embeddings, vocabulary):
def calc_distance(v1, v2):
# 计算欧式距离
distance = 0
for i in range(len(v1)):
distance += sqrt(pow(v1[i] - v2[i], 2))
return distance
min_distance = None
min_i = -1
for i, word in enumerate(vocabulary):
if cur_i != i:
distance = calc_distance(vector, embeddings[i].tolist())
if min_distance is None or min_distance > distance:
min_distance = distance
min_i = i
return min_i
if __name__ == '__main__':
data_path = './data/data.txt'
config = {
'seed': 3407, # Your seed number, you can pick your lucky number. :)
'device': 'cuda' if torch.cuda.is_available() else 'cpu',
'n_epochs': 20, # Number of epochs.
'batch_size': 64,
'optimizer': 'Adam',
'optim_hyper_paras': { # hyper-parameters for the optimizer (depends on which optimizer you are using)
'lr': 0.001, # learning rate of optimizer
},
'embedding_dim': 6, # 词向量长度
'window_width': 5, # 窗口的宽度
'window_step': 2, # 窗口滑动的步长
'negative_sample_num': 10 # 要增加的负样本个数
}
same_seed(config['seed'])
data_set, vocabulary, index_dict = get_data_set(data_path, config['window_width'], config['window_step'],
config['negative_sample_num'])
train_loader = DataLoader(data_set, config['batch_size'], shuffle=True, drop_last=False, pin_memory=True)
model = DNN(len(vocabulary), config['embedding_dim']).to(config['device'])
train(model, train_loader, config)
# 训练完,看看embeddings,展示部分词的词向量,并找到离它最近的词的词向量
embeddings = torch.t(model.embedding.weight)
for i in range(10):
print('%-50s%s' % (f"{vocabulary[i]} 的词向量为 :", str(embeddings[i].tolist())))
min_i = find_min_distance_word_vector(i, embeddings[i].tolist(), embeddings, vocabulary)
print('%-45s%s' % (
f"离 {vocabulary[i]} 最近的词为 {vocabulary[min_i]} , 它的词向量为 :", str(embeddings[min_i].tolist())))
print('-' * 200)
七、根据上面的代码举个例子
一、数据处理流程(以示例文本说明)
假设输入文件 data.txt
内容为:
I love machine learning because it is interesting.
1. 文本预处理
-
清洗:移除标点(
, ? . "
),处理后得到:I love machine learning because it is interesting
-
分词:按空格切分为单词列表:
data = ["I", "love", "machine", "learning", "because", "it", "is", "interesting"]
2. 构建词汇表
-
去重后得到词汇表(假设顺序固定):
vocabulary = ["I", "love", "machine", "learning", "because", "it", "is", "interesting"]
-
索引映射
index_dict
:{"I":0, "love":1, "machine":2, "learning":3, "because":4, "it":5, "is":6, "interesting":7}
3. 滑动窗口生成正样本
假设窗口宽度 window_width=5
,步长 window_step=2
:
-
窗口划分:
-
窗口1:
["I", "love", "machine", "learning", "because"]
,中心词是第2个(machine
,索引2)。 -
窗口内上下文词:
I
(索引0)、love
(索引1)、learning
(索引3)、because
(索引4)。 -
每个上下文词与中心词生成一个正样本。
-
-
特征矩阵示例(中心词
machine
和上下文词love
):# 特征矩阵形状 [8,8](词汇表大小=8) feature = np.zeros((8,8)) feature[1][1] = 1 # 上下文词 "love" 的对角线置1 feature[2][2] = 1 # 中心词 "machine" 的对角线置1
该样本标签为
1
。
4. 负采样生成负样本
随机选择不共现的词对(例如 machine
和 it
):
feature = np.zeros((8,8)) feature[2][2] = 1 # "machine" feature[5][5] = 1 # "it"
标签为 0
。
二、模型架构详解(DNN 类)
1. Embedding 层
-
定义:
nn.Linear(vocab_size, embed_dim, bias=False)
-
作用:将输入矩阵的每个词索引转换为嵌入向量。
-
输入形状:
[batch_size, vocab_size, vocab_size]
-
输出形状:
[batch_size, vocab_size, embed_dim]
-
矩阵乘法等价于对每个词进行线性变换。
-
2. 全连接层
-
结构:
nn.Sequential( nn.Linear(vocab_size * embed_dim, embed_dim//2), # 展平后输入 nn.LeakyReLU(), nn.Linear(embed_dim//2, 4), nn.LeakyReLU(), nn.Linear(4, 1) )
-
作用:将展平后的嵌入向量映射到标量输出。
3. 前向传播流程
以单个样本 [8,8]
(词汇表大小=8)为例:
-
输入矩阵:
[1, 8, 8]
(batch_size=1) -
Embedding 层:与
[8, embed_dim]
权重矩阵相乘,得到[1, 8, embed_dim]
。 -
Reshape:展平为
[1, 8*embed_dim]
。 -
全连接层:逐步降维到标量输出。
三、训练过程(以示例说明)
1. 数据加载
-
Dataset:包含特征矩阵和标签。
-
DataLoader:按批次加载数据(
batch_size=64
)。
2. 损失函数与优化器
-
损失函数:
MSELoss
,优化目标使正样本输出接近1,负样本接近0。 -
优化器:
Adam
,学习率0.001
。
3. 训练循环
-
前向传播:输入特征矩阵,计算预测值。
-
反向传播:根据 MSE 损失更新参数。
四、词向量可视化与最近邻查找
1. 提取词向量
-
权重矩阵:
embeddings = torch.t(model.embedding.weight)
,形状[vocab_size, embed_dim]
。 -
示例输出:
"machine" 的词向量为 : [0.12, -0.45, 0.78, ...]
2. 计算最近邻
-
欧式距离:遍历所有词向量,找到距离最小的非自身词。
-
示例结果:
离 "machine" 最近的词为 "learning" ,距离 0.89