深度学习----------------------语言模型
目录
- 语言模型
- 使用计数来建模
- N元语法
- 总结
- 语言模型和数据集
- 自然语言统计
- 一元语法
- 该部分总代码
- 二元语法
- 该部分总代码
- 三元语法
- 该部分总代码
- 直观地对比三种模型中的标记频率
- 一元
- 二元
- 三元
- 该部分总代码
- 随机采样(方法一)
- 该部分总代码
- 顺序分区(方法二)
- 该部分总代码
- 将上面的两个采样函数包装到一个类中
语言模型
给定文本序列 x 1 x_1 x1,…, x T x_T xT,语言模型的目标是估计联合概率p( x 1 x_1 x1,…, x T x_T xT)
它的应用包括
①做预训练模型
②生成本文,给定前面几个词,不断的使用来生成后续文本。
③判断多个序列中哪个最常见
使用计数来建模
N元语法
当序列很长时,因为文本量不够大,很可能n( x 1 x_1 x1,…, x T x_T xT)≤1
可以使用马尔可夫假设可以缓解这个问题
τ=0时,是一元语法:
τ=1时,是二元语法:
τ=2时,是三元语法:
总结
语言模型估计文本序列的联合概率
使用统计方法时采用n元语法
语言模型和数据集
自然语言统计
一元语法
from d2l import torch as d2l
# 使用d2l库中的函数读取时间机器文本,并进行分词处理
tokens = d2l.tokenize(d2l.read_time_machine())
# 将tokens中的所有单词连接成一个列表,生成语料库corpus
corpus = [token for line in tokens for token in line]
# 使用语料库corpus构建词汇表vocab
vocab = d2l.Vocab(corpus)
# 输出词汇表中出现频率最高的前10个单词和对应的频率
print(vocab.token_freqs[:10])
# 最流行的词被称为停用词画出的词频图
# 从词汇表的token_freqs中提取频率信息,存储在列表freqs中
freqs = [freq for token, freq in vocab.token_freqs]
# 使用d2l库中的plot函数绘制词频图
# 设置横轴为token,纵轴为对应的频率,横轴使用对数刻度,纵轴也使用对数刻度
d2l.plot(freqs, xlabel='token: x', ylabel='frequency: n(x)', xscale='log', yscale='log')
该部分总代码
import random
import torch
from d2l import torch as d2l
# 使用d2l库中的函数读取时间机器文本,并进行分词处理
tokens = d2l.tokenize(d2l.read_time_machine())
# 将tokens中的所有单词连接成一个列表,生成语料库corpus
corpus = [token for line in tokens for token in line]
# 使用语料库corpus构建词汇表vocab,并且在这进行了排序
vocab = d2l.Vocab(corpus)
# 最流行的词被称为停用词画出的词频图
# 从词汇表的token_freqs中提取频率信息,存储在列表freqs中
# token是词汇表中的词,frequency是该词在数据集中出现的频率。然后提取频率信息,丢弃对应的词
freqs = [freq for token, freq in vocab.token_freqs]
# 使用d2l库中的plot函数绘制词频图
# 设置横轴为token,纵轴为对应的频率,横轴使用对数刻度,纵轴也使用对数刻度
d2l.plot(freqs, xlabel='token: x', ylabel='frequency: n(x)', xscale='log', yscale='log')
d2l.plt.show()
二元语法
# 使用列表推导式将corpus中的相邻两个词组成二元语法的词元组合,存储在bigram_tokens列表中
bigram_tokens = [pair for pair in zip(corpus[:-1], corpus[1:])] # 二元语法
# 使用bigram_tokens构建二元语法的词汇表bigram_vocab
bigram_vocab = d2l.Vocab(bigram_tokens)
# 输出二元语法词汇表中出现频率最高的前10个词元组合和对应的频率
bigram_vocab.token_freqs[:10]
该部分总代码
from d2l import torch as d2l
# 使用d2l库中的函数读取时间机器文本,并进行分词处理
tokens = d2l.tokenize(d2l.read_time_machine())
# 将tokens中的所有单词连接成一个列表,生成语料库corpus
corpus = [token for line in tokens for token in line]
# 使用列表推导式将corpus中的相邻两个词组成二元语法的词元组合,存储在bigram_tokens列表中
bigram_tokens = [pair for pair in zip(corpus[:-1], corpus[1:])] # 二元语法
# 使用bigram_tokens构建二元语法的词汇表bigram_vocab
bigram_vocab = d2l.Vocab(bigram_tokens)
# 输出二元语法词汇表中出现频率最高的前10个词元组合和对应的频率
print(bigram_vocab.token_freqs[:10])
三元语法
# 使用列表推导式将corpus中的相邻三个词组成三元语法的词元组合,存储在trigram_tokens列表中
trigram_tokens = [triple for triple in zip(corpus[:-2],corpus[1:-1],corpus[2:])]
# 使用trigram_tokens构建三元语法的词汇表trigram_vocab
trigram_vocab = d2l.Vocab(trigram_tokens)
# 输出三元语法词汇表中出现频率最高的前10个词元组合和对应的频率
print(trigram_vocab.token_freqs[:10])
该部分总代码
from d2l import torch as d2l
# 使用d2l库中的函数读取时间机器文本,并进行分词处理
tokens = d2l.tokenize(d2l.read_time_machine())
# 将tokens中的所有单词连接成一个列表,生成语料库corpus
corpus = [token for line in tokens for token in line]
# 使用列表推导式将corpus中的相邻三个词组成三元语法的词元组合,存储在trigram_tokens列表中
trigram_tokens = [triple for triple in zip(corpus[:-2],corpus[1:-1],corpus[2:])]
# 使用trigram_tokens构建三元语法的词汇表trigram_vocab
trigram_vocab = d2l.Vocab(trigram_tokens)
# 输出三元语法词汇表中出现频率最高的前10个词元组合和对应的频率
print(trigram_vocab.token_freqs[:10])
直观地对比三种模型中的标记频率
一元
# 将tokens中的所有单词连接成一个列表,生成语料库corpus
# 首先是外循环,遍历tokens中的每一个元素,line代表tokens中的一个元素(列表)
# 然后是内循环:对于外层循环中的每个line(是列表)进行遍历其中的所有元素,line中的每个元素都被称为token
# 然后内层循环的每个token都会被添加到最终生成的列表中。
corpus = [token for line in tokens for token in line]
示列:有以下嵌套列表作为tokens:
执行该代码后,corpus将会是:
这个列表包含了tokens中所有子列表的元素,所有子列表的元素都被扁平化到了一个单一的列表中。
二元
bigram_tokens = [pair for pair in zip(corpus[:-1], corpus[1:])]
三元
corpus = ['I', 'am', 'learning', 'natural', 'language', 'processing', 'with', 'Python']
经过
trigram_tokens = [triple for triple in zip(corpus[:-2], corpus[1:-1], corpus[2:])]
则
corpus[:-2]=['I', 'am', 'learning', 'natural', 'language']
corpus[1:-1]=['am', 'learning', 'natural', 'language', 'processing']
corpus[2:]=['learning', 'natural', 'language', 'processing', 'with']
故:
trigram_tokens = [('I', 'am', 'learning'), ('am', 'learning', 'natural'), ('learning', 'natural', 'language'), ('natural', 'language', 'processing')]
该部分总代码
from d2l import torch as d2l
# 使用d2l库中的函数读取时间机器文本,并进行分词处理
tokens = d2l.tokenize(d2l.read_time_machine())
# 将tokens中的所有单词连接成一个列表,生成语料库corpus
# 首先是外循环,遍历tokens中的每一个元素,line代表tokens中的一个元素(列表)
# 然后是内循环:对于外层循环中的每个line(是列表)进行遍历其中的所有元素,line中的每个元素都被称为token
# 然后内层循环的每个token都会被添加到最终生成的列表中。
corpus = [token for line in tokens for token in line]
# 使用语料库corpus构建词汇表vocab
# 1、遍历了corpus,统计每个单词的出现次数
# 2、对单词排序
# 3、创建了词汇表对象,该对象包含单词到索引的映射、索引到单词的逆映射、词汇表大小以及可选的嵌入矩阵。
vocab = d2l.Vocab(corpus)
# 使用列表推导式将corpus中的相邻两个词组成二元语法的词元组合,存储在bigram_tokens列表中
# corpus[:-1]从第一个元素开始取到倒数第二个元素(不包括最后一个元素)目的:为这些元素找到它们各自的下一个元素来形成bigram
# corpus[1:]从第二个元素开始取到最后一个元素(包括最后一个元素)这些元素将作为bigram中的第二个元素。
# zip函数将两个列表作为输入,并将它们对应的元素打包成一个个元组然后返回一个迭代器,该迭代器生成这些元组。
bigram_tokens = [pair for pair in zip(corpus[:-1], corpus[1:])]
bigram_vocab = d2l.Vocab(bigram_tokens)
# 使用列表推导式将corpus中的相邻三个词组成三元语法的词元组合,存储在trigram_tokens列表中
# corpus[:-2]从corpus列表中获取除了最后两个元素之外的所有元素
# corpus[1:-1]从corpus列表中获取除了第一个和最后一个元素之外的所有元素
# corpus[2:]从corpus列表中获取从第三个元素开始到列表末尾的所有元素
# zip函数将这三个列表作为参数,并生成一个迭代器,该迭代器包含来自每个列表的对应位置的元组。
trigram_tokens = [triple for triple in zip(corpus[:-2], corpus[1:-1], corpus[2:])]
trigram_vocab = d2l.Vocab(trigram_tokens)
# 从词汇表的token_freqs中提取频率信息,存储在列表freqs中
freqs = [freq for token, freq in vocab.token_freqs]
bigram_freqs = [freq for token, freq in bigram_vocab.token_freqs]
trigram_freqs = [freq for token, freq in trigram_vocab.token_freqs]
d2l.plot([freqs, bigram_freqs, trigram_freqs], xlabel='token: x',
ylabel='frequency: n(x)', xscale='log', yscale='log',
legend=['unigram', 'bigram', 'trigram'])
d2l.plt.show()
随机采样(方法一)
随机地生成一个小批量数据的特征和标签以供读取。在随机采样中,每个样本都是在原始的长序列上任意捕获的子序列。
在迭代过程中,来自两个相邻的、随机的、小批量中的子序列
不一定在原始序列上相邻。对于语言建模,目标是基于到目前为止我们看到的词元来预测下一个词元, 因此标签是移位了一个词元的原始序列。
下面的代码每次可以从数据中随机生成一个小批量。在这里,参数batch_size指定了每个小批量中子序列样本的数目,参数num_steps是每个子序列中预定义的时间步数。
# num_steps的作用等价于tau
def seq_data_iter_random(corpus, batch_size, num_steps): #@save
"""使用随机抽样生成一个小批量子序列"""
# 从随机偏移量开始对序列进行分区,随机范围(0, num_steps - 1)
corpus = corpus[random.randint(0, num_steps - 1):]
# 减去1,是因为我们需要考虑标签
# num_subseqs是可以生成的子序列数量,子序列长度为num_steps
num_subseqs = (len(corpus) - 1) // num_steps
# 长度为num_steps的子序列的起始索引,以num_steps为单位跳转
initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
# 在随机抽样的迭代过程中,
# 来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻
random.shuffle(initial_indices)
def data(pos):
# 返回从pos位置开始的长度为num_steps的序列
return corpus[pos: pos + num_steps]
num_batches = num_subseqs // batch_size
for i in range(0, batch_size * num_batches, batch_size):
# 在这里,initial_indices包含子序列的随机起始索引
initial_indices_per_batch = initial_indices[i: i + batch_size]
X = [data(j) for j in initial_indices_per_batch]
Y = [data(j + 1) for j in initial_indices_per_batch]
yield torch.tensor(X), torch.tensor(Y)
之前讲的是把所有东西抽出来在序列模型中
缺点:
每变了一次数据,每一个数据被用过很多次
例如:前四个数据预测第五个
然后第二个到第五个预测第六个,其中2、3、4用了两次。(如果序列够长,那么就与τ有关,τ=4,则τ中第一个数使用一次,第二个数使用二次,第三个数使用三次,第四个数使用四次)
现在遍历一次数据,每个数据就用一次
把整个序列切成长为τ的序列,然后每一次随机取从里面取一段。(图中的是τ)
缺点:
如果下次还是这么做的话,那么可能遍历不到红色部分。
就是说:每次切法是一定的。
解决方法:
每一次开始切的时候,在[0,τ)之间随机取一个值,比如取K,那么从K元素开始切成如下图长为τ的序列,前面K个元素在这一轮就不要了,这样的话,然后在里面每一次随机取挑选一个序列拿出来做到mini batch。
这样的好处:遍历一次数据,每个数据就是用过一次,而不是在序列模型中每个模型的数据会用梯子。
该部分总代码
import random
import torch
# num_steps的作用等价于tau,长为tau的数(即:X)是一个样本,来预测下一个词,然后滑动这个窗口
def seq_data_iter_random(corpus, batch_size, num_steps): # @save
"""使用随机抽样生成一个小批量子序列"""
# 从随机偏移量K开始对序列进行分区并把K前面的丢掉,随机范围(0, num_steps - 1)
# 这个random的函数包括两端的值(即:0到4共五个)
# corpus[0,4]也就是说从 0到最后、1到最后、2到最后、3到最后、4到最后
# 随机选择一个起始点并截取到末尾
corpus = corpus[random.randint(0, num_steps - 1):]
# 减去1,是因为我们需要考虑标签
# num_subseqs是可以生成的子序列数量,子序列长度为num_steps
#这里假设corpus的长度为35( 35-1)//5=6,无论是从0开始还是4开始最后都是6
num_subseqs = (len(corpus) - 1) // num_steps
# 长度为num_steps的子序列的起始索引,以num_steps为单位跳转
# range(0, num_subseqs * num_steps, num_steps)0到30(不包含,子序列数量*子序列长度=30)步长为5
# 最终initial_indices=[0, 5, 10, 15, 20, 25]
initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
# 在随机抽样的迭代过程中,
# 来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻
# 比如可以打乱成[20, 15, 10, 25, 5, 0]
random.shuffle(initial_indices)
print(initial_indices)
def data(pos):
# 返回从pos位置开始的长度为num_steps的序列
return corpus[pos: pos + num_steps]
# 6//2=3(num_batches)
num_batches = num_subseqs // batch_size
# (0,2*3=6)即:0到6(不包含)
for i in range(0, batch_size * num_batches, batch_size):
# i的取值是0、2、4
# 在这里,initial_indices包含子序列的随机起始索引
# 例如i=0时,initial_indices的索引是[0,2],也就是0和1的索引
# 比如initial_indices=[20, 15, 10, 25, 5, 0]那么就是20和15,在下面的X则是20、21、22、23、24和15、16、17、18、19
initial_indices_per_batch = initial_indices[i: i + batch_size]
print(initial_indices_per_batch)
# 这个data函数接收到一个位置并返回该位置开始的长度为num_steps 的子序列。
X = [data(j) for j in initial_indices_per_batch]
Y = [data(j + 1) for j in initial_indices_per_batch]
yield torch.tensor(X), torch.tensor(Y)
my_seq = list(range(35))
for X, Y in seq_data_iter_random(my_seq, batch_size=2, num_steps=5):
print('X: ', X, '\nY:', Y)
顺序分区(方法二)
保证两个向量的小批量中的子序列在原始序列上也是相邻的。(即:小批量中第i个样本和下一个小批量中的第i个样本是相邻的,这里因为batchsize=2,所以小批量中只有2个样本。) 这种策略在基于小批量的迭代过程中保留了拆分的子序列的顺序,因此称为顺序分区。
def seq_data_iter_sequential(corpus, batch_size, num_steps): #@save
"""使用顺序分区生成一个小批量子序列"""
# 从随机偏移量开始划分序列
offset = random.randint(0, num_steps)
# num_tokens可生成的总子序列长度
num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
Xs = torch.tensor(corpus[offset: offset + num_tokens])
Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
# 按batch_size划分Xs和Ys,每一列为一个batch的data
Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
num_batches = Xs.shape[1] // num_steps
for i in range(0, num_steps * num_batches, num_steps):
X = Xs[:, i: i + num_steps]
Y = Ys[:, i: i + num_steps]
yield X, Y
该部分总代码
import random
import torch
# num_steps的作用等价于tau,长为tau的数(即:X)是一个样本,来预测下一个词,然后滑动这个窗口
def seq_data_iter_sequential(corpus, batch_size, num_steps): #@save
"""使用顺序分区生成一个小批量子序列"""
# 从随机偏移量开始划分序列
# 生成一个0到num_steps之间的随机整数作为起始偏移量
offset = random.randint(0, num_steps)
print('起始偏移量:',offset)
# 35-offset-1=34-offset,然后整除批量大小可以得到最多有多少个子序列的数量。然后乘以批量大小就可以得到总子序列数量num_tokens
num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
#总序列为从偏移量开始到总序列的长度,即:num_tokens
Xs = torch.tensor(corpus[offset: offset + num_tokens])
Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
# 按batch_size划分Xs和Ys,每一列为一个batch的data
# 两行xx列,两行是一个小批量的数量,列的总数除以τ(即:num_steps)为可以分为几个batch_size
Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
num_batches = Xs.shape[1] // num_steps
# 0到5*num_batches(此时序列可以分为3个小批量的大小),此时步长为5
# 0到15是下标,不是Xs的数据
for i in range(0, num_steps * num_batches, num_steps):
X = Xs[:, i: i + num_steps]
Y = Ys[:, i: i + num_steps]
yield X, Y
my_seq = list(range(35))
for X, Y in seq_data_iter_sequential(my_seq, batch_size=2, num_steps=5):
print('X: ', X, '\nY:', Y)
将上面的两个采样函数包装到一个类中
以便将其用作数据迭代器。
class SeqDataLoader: #@save
"""加载序列数据的迭代器"""
def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
if use_random_iter:
self.data_iter_fn = d2l.seq_data_iter_random
else:
self.data_iter_fn = d2l.seq_data_iter_sequential
self.corpus, self.vocab = d2l.load_corpus_time_machine(max_tokens)
self.batch_size, self.num_steps = batch_size, num_steps
def __iter__(self):
return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
定义函数load_data_time_machine, 它可以同时返回数据迭代器和词表, 因此可以与其他带有load_data前缀的函数 (如3.5节 中定义的d2l.load_data_fashion_mnist)类似地使用。
def load_data_time_machine(batch_size, num_steps, #@save
use_random_iter=False, max_tokens=10000):
"""返回时光机器数据集的迭代器和词表"""
data_iter = SeqDataLoader(
batch_size, num_steps, use_random_iter, max_tokens)
return data_iter, data_iter.vocab