深度学习----------------------文本预处理
目录
- 文本预处理
- 读取数据集
- 词源化
- 词表
- 该部分总代码
- 该部分总代码
- 整合所有功能
- 该部分总代码
文本预处理
文本预处理:把文本当作一个时序序列
将解析文本的常见预处理步骤。 这些步骤通常包括:
①将文本作为字符串加载到内存中。
②将字符串拆分为词元(如单词和字符)。
③建立一个词表,将拆分的词元映射到数字索引。
④将文本转换为数字索引序列,方便模型操作。
import collections
import re
from d2l import torch as d2l
读取数据集
将数据集读取到由多条文本行组成的列表中
import re
from d2l import torch as d2l
# @save
# 字典的值是一个元组,包含数据集的URL(d2l.DATA_URL + 'timemachine.txt')和数据的哈希值('090b5e7e70c295757f55df93cb0a180b9691891a')
# 用于验证下载数据的完整性。
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')
def read_time_machine(): # @save
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
# 打开文件取所有行到lines列表中。
lines = f.readlines()
# re.sub('[^A-Za-z]+', ' ', line)是将非字母字符替换为空格,这里是为了去掉标点符号
# .strip()是去掉提取的字符串两端的空白
# .lower()是将所有字母都设置为小写字母
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
lines = read_time_machine()
print(f'# 文本总行数: {len(lines)}')
# 看一下列表中的第1行内容和第11行内容
print(lines[0])
print(lines[10])
词源化
每个文本序列又被拆分成一个标记列表
文本行列表(lines)作为输入,列表中的每个元素是一个文本序列(如一条文本行)。每个文本序列又被拆分成一个词元列表,词元(token)是文本的基本单位。最后,返回一个由词元列表组成的列表,其中的每个词元都是一个字符串(string)。
import re
from d2l import torch as d2l
# @save
# 字典的值是一个元组,包含数据集的URL(d2l.DATA_URL + 'timemachine.txt')和数据的哈希值('090b5e7e70c295757f55df93cb0a180b9691891a')
# 用于验证下载数据的完整性。
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')
def read_time_machine(): # @save
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
# 打开文件取所有行,并将每行存储在列表 lines 中
lines = f.readlines()
# re.sub('[^A-Za-z]+', ' ', line)是将非字母字符替换为空格,这里是为了去掉标点符号
# .strip()是去掉提取的字符串两端的空白
# .lower()是将所有字母都设置为小写字母
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
def tokenize(lines, token='word'): # @save
"""将文本行拆分为单词或字符词元"""
if token == 'word':
# 以空格为分隔符将每行字符串拆分为单词列表
return [line.split() for line in lines]
elif token == 'char':
# 将每行字符串拆分为字符列表
return [list(line) for line in lines]
else:
print('错误:未知词元类型:' + token)
lines = read_time_machine()
# 对 lines 进行分词处理,使用默认的 'word' 令牌类型
tokens = tokenize(lines)
# 打印前11行的分词结果
for i in range(11):
# 空列表表示空行
print(tokens[i])
词表
词元的类型是字符串,而模型需要的输入是数字,因此这种类型不方便模型使用。所以构建一个字典,通常也叫做词表(vocabulary),用来将字符串类型的词元映射到从0开始的数字索引中。
我们先将训练集中的所有文档合并在一起,对它们的唯一词元进行统计,得到的统计结果称之为语料(corpus)。然后根据每个唯一词元的出现频率,为其分配一个数字索引。很少出现的词元通常被移除,这可以降低复杂性。 另外,语料库中不存在或已删除的任何词元都将映射到一个特定的未知词元。我们可以选择增加一个列表,用于保存那些被保留的词元。
例如:填充词元(“<pad>”); 序列开始词元(“<bos>”); 序列结束词元(“<eos>”)
class Vocab: #@save
"""文本词表"""
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率排序
counter = count_corpus(tokens)
self._token_freqs = sorted(counter.items(), key=lambda x: x[1],
reverse=True)
# 未知词元的索引为0
self.idx_to_token = ['<unk>'] + reserved_tokens
self.token_to_idx = {token: idx
for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
# 出现频率< min_freq的token去掉,min_freq=0时表示包括所有token
if freq < min_freq:
break
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
def __len__(self):
return len(self.idx_to_token)
# 返回给定tokens的indexs
def __getitem__(self, tokens):
if not isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
# 返回给定indexs对于的tokens
def to_tokens(self, indices):
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]
@property
def unk(self): # 未知词元的索引为0
return 0
@property
def token_freqs(self):
return self._token_freqs
def count_corpus(tokens): #@save
"""统计词元的频率"""
# 这里的tokens是1D列表或2D列表
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
# Counter 对象内部使用字典来存储元素及其对应的计数,元素作为键,计数作为值
return collections.Counter(tokens)
首先使用时光机器数据集作为语料库来构建词表,然后打印前几个高频词元及其索引。
vocab = Vocab(tokens)
print(list(vocab.token_to_idx.items())[:10])
该部分总代码
import collections
import re
from d2l import torch as d2l
class Vocab: # @save
"""文本词表"""
# 接受三个参数:tokens(词元列表,默认为空列表)、min_freq(最小词频,默认为0)、reserved_tokens(保留词元列表,默认为空列表)。
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率排序
# 统计tokens中每个词元的频率,并将结果存储在counter中。
counter = count_corpus(tokens)
# 对counter中的项按频率降序排序,并将排序后的列表存储在_token_freqs属性中。
self._token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True)
# 未知词元的索引为0
# 初始化一个列表,其中包含未知词元(<unk>)和保留词元(reserved_tokens),该列表用于将索引映射到词元。
self.idx_to_token = ['<unk>'] + reserved_tokens
# 创建一个字典,将idx_to_token列表中的词元映射到其索引。
self.token_to_idx = {token: idx
for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
# 出现频率< min_freq的token去掉,min_freq=0时表示包括所有token
if freq < min_freq:
break
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
def __len__(self):
# 返回idx_to_token列表的长度,即词表的大小。
return len(self.idx_to_token)
# 返回给定tokens的indexs
def __getitem__(self, tokens):
# 检查tokens是否是列表或元组,如果不是,则假设它是一个单个词元。
if not isinstance(tokens, (list, tuple)):
# 如果tokens是单个词元,尝试从token_to_idx中获取其索引,如果未找到,则返回未知词元的索引(self.unk)。
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
# 返回给定indexs对于的tokens
def to_tokens(self, indices):
# 将单个索引转换为单个词元。
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
# 将索引列表转换为对应的词元列表
return [self.idx_to_token[index] for index in indices]
@property
def unk(self): # 未知词元的索引为0
return 0
@property
def token_freqs(self):
# 返回按频率排序的词元频率列表。
return self._token_freqs
# 统计给定词元列表中每个词元的频率。
def count_corpus(tokens): # @save
"""统计词元的频率"""
# 这里的tokens是1D列表或2D列表
# 检查tokens是否为空或第一个元素是否为列表,以确定是否需要展平。
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
# 统计展平后词元列表中每个词元的频率
# Counter 对象内部使用字典来存储元素及其对应的计数,元素作为键,计数作为值
return collections.Counter(tokens)
def read_time_machine(): # @save
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
# 打开文件取所有行,并将每行存储在列表 lines 中
lines = f.readlines()
# re.sub('[^A-Za-z]+', ' ', line)是将非字母字符替换为空格,这里是为了去掉标点符号
# .strip()是去掉提取的字符串两端的空白
# .lower()是将所有字母都设置为小写字母
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
def tokenize(lines, token='word'): # @save
"""将文本行拆分为单词或字符词元"""
if token == 'word':
# 以空格为分隔符将每行字符串拆分为单词列表
return [line.split() for line in lines]
elif token == 'char':
# 将每行字符串拆分为字符列表
return [list(line) for line in lines]
else:
print('错误:未知词元类型:' + token)
# @save
# 字典的值是一个元组,包含数据集的URL(d2l.DATA_URL + 'timemachine.txt')和数据的哈希值('090b5e7e70c295757f55df93cb0a180b9691891a')
# 用于验证下载数据的完整性。
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')
lines = read_time_machine()
# 对 lines 进行分词处理,使用默认的 'word' 令牌类型
tokens = tokenize(lines)
vocab = Vocab(tokens)
# vocab.token_to_idx是字典属性,键是词汇(tokens),值是它们对应的索引(indices)
# 将items()返回的视图对象转换成一个列表(list)
print(list(vocab.token_to_idx.items())[:10])
将每一条文本行转换成一个数字索引列表
for i in [0, 10]:
print('文本:', tokens[i])
print('索引:', vocab[tokens[i]])
该部分总代码
import collections
import re
from d2l import torch as d2l
class Vocab: # @save
"""文本词表"""
# 接受三个参数:tokens(词元列表,默认为空列表)、min_freq(最小词频,默认为0)、reserved_tokens(保留词元列表,默认为空列表)。
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率排序
# 统计tokens中每个词元的频率,并将结果存储在counter中。
counter = count_corpus(tokens)
# 对counter中的项按频率降序排序,并将排序后的列表存储在_token_freqs属性中。
self._token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True)
# 未知词元的索引为0
# 初始化一个列表,其中包含未知词元(<unk>)和保留词元(reserved_tokens),该列表用于将索引映射到词元。
self.idx_to_token = ['<unk>'] + reserved_tokens
# 创建一个字典,将idx_to_token列表中的词元映射到其索引。
self.token_to_idx = {token: idx
for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
# 出现频率< min_freq的token去掉,min_freq=0时表示包括所有token
if freq < min_freq:
break
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
def __len__(self):
# 返回idx_to_token列表的长度,即词表的大小。
return len(self.idx_to_token)
# 返回给定tokens的indexs
def __getitem__(self, tokens):
# 检查tokens是否是列表或元组,如果不是,则假设它是一个单个词元。
if not isinstance(tokens, (list, tuple)):
# 如果tokens是单个词元,尝试从token_to_idx中获取其索引,如果未找到,则返回未知词元的索引(self.unk)。
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
# 返回给定indexs对于的tokens
def to_tokens(self, indices):
# 将单个索引转换为单个词元。
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
# 将索引列表转换为对应的词元列表
return [self.idx_to_token[index] for index in indices]
@property
def unk(self): # 未知词元的索引为0
return 0
@property
def token_freqs(self):
# 返回按频率排序的词元频率列表。
return self._token_freqs
# 统计给定词元列表中每个词元的频率。
def count_corpus(tokens): # @save
"""统计词元的频率"""
# 这里的tokens是1D列表或2D列表
# 检查tokens是否为空或第一个元素是否为列表,以确定是否需要展平。
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
# 统计展平后词元列表中每个词元的频率
# Counter 对象内部使用字典来存储元素及其对应的计数,元素作为键,计数作为值
return collections.Counter(tokens)
def read_time_machine(): # @save
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
# 打开文件取所有行,并将每行存储在列表 lines 中
lines = f.readlines()
# re.sub('[^A-Za-z]+', ' ', line)是将非字母字符替换为空格,这里是为了去掉标点符号
# .strip()是去掉提取的字符串两端的空白
# .lower()是将所有字母都设置为小写字母
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
def tokenize(lines, token='word'): # @save
"""将文本行拆分为单词或字符词元"""
if token == 'word':
# 以空格为分隔符将每行字符串拆分为单词列表
return [line.split() for line in lines]
elif token == 'char':
# 将每行字符串拆分为字符列表
return [list(line) for line in lines]
else:
print('错误:未知词元类型:' + token)
# @save
# 字典的值是一个元组,包含数据集的URL(d2l.DATA_URL + 'timemachine.txt')和数据的哈希值('090b5e7e70c295757f55df93cb0a180b9691891a')
# 用于验证下载数据的完整性。
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')
lines = read_time_machine()
# 对 lines 进行分词处理,使用默认的 'word' 令牌类型
tokens = tokenize(lines)
vocab = Vocab(tokens)
for i in [0, 10]:
print('文本:', tokens[i])
print('索引:', vocab[tokens[i]])
整合所有功能
将所有功能打包到load_corpus_time_machine函数中,该函数返回corpus(词元索引列表)和vocab(时光机器语料库的词表)。 我们在这里所做的改变是:
为了简化后面章节中的训练,我们使用字符(而不是单词)实现文本词元化;
时光机器数据集中的每个文本行不一定是一个句子或一个段落,还可能是一个单词,因此返回的corpus仅处理为单个列表,而不是使用多词元列表构成的一个列表。
def load_corpus_time_machine(max_tokens=-1): #@save
"""返回时光机器数据集的词元索引列表和词表"""
lines = read_time_machine()
tokens = tokenize(lines, 'char')
vocab = Vocab(tokens)
# 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
# 所以将所有文本行展平到一个列表中
corpus = [vocab[token] for line in tokens for token in line]
# max_tokens>0意为限制返回的词元数量,为-1即为返回所有词元
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab
corpus, vocab = load_corpus_time_machine()
len(corpus), len(vocab)
该部分总代码
import collections
import re
from d2l import torch as d2l
class Vocab: # @save
"""文本词表"""
# 接受三个参数:tokens(词元列表,默认为空列表)、min_freq(最小词频,默认为0)、reserved_tokens(保留词元列表,默认为空列表)。
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率排序
# 统计tokens中每个词元的频率,并将结果存储在counter中。
counter = count_corpus(tokens)
# 对counter中的项按频率降序排序,并将排序后的列表存储在_token_freqs属性中。
self._token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True)
# 未知词元的索引为0
# 初始化一个列表,其中包含未知词元(<unk>)和保留词元(reserved_tokens),该列表用于将索引映射到词元。
self.idx_to_token = ['<unk>'] + reserved_tokens
# 创建一个字典,将idx_to_token列表中的词元映射到其索引。
self.token_to_idx = {token: idx
for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
# 出现频率< min_freq的token去掉,min_freq=0时表示包括所有token
if freq < min_freq:
break
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
def __len__(self):
# 返回idx_to_token列表的长度,即词表的大小。
return len(self.idx_to_token)
# 返回给定tokens的indexs
def __getitem__(self, tokens):
# 检查tokens是否是列表或元组,如果不是,则假设它是一个单个词元。
if not isinstance(tokens, (list, tuple)):
# 如果tokens是单个词元,尝试从token_to_idx中获取其索引,如果未找到,则返回未知词元的索引(self.unk)。
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
# 返回给定indexs对于的tokens
def to_tokens(self, indices):
# 将单个索引转换为单个词元。
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
# 将索引列表转换为对应的词元列表
return [self.idx_to_token[index] for index in indices]
@property
def unk(self): # 未知词元的索引为0
return 0
@property
def token_freqs(self):
# 返回按频率排序的词元频率列表。
return self._token_freqs
# 统计给定词元列表中每个词元的频率。
def count_corpus(tokens): # @save
"""统计词元的频率"""
# 这里的tokens是1D列表或2D列表
# 检查tokens是否为空或第一个元素是否为列表,以确定是否需要展平。
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
# 统计展平后词元列表中每个词元的频率
# Counter 对象内部使用字典来存储元素及其对应的计数,元素作为键,计数作为值
return collections.Counter(tokens)
def read_time_machine(): # @save
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
# 打开文件取所有行,并将每行存储在列表 lines 中
lines = f.readlines()
# re.sub('[^A-Za-z]+', ' ', line)是将非字母字符替换为空格,这里是为了去掉标点符号
# .strip()是去掉提取的字符串两端的空白
# .lower()是将所有字母都设置为小写字母
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
def tokenize(lines, token='word'): # @save
"""将文本行拆分为单词或字符词元"""
if token == 'word':
# 以空格为分隔符将每行字符串拆分为单词列表
return [line.split() for line in lines]
elif token == 'char':
# 将每行字符串拆分为字符列表
return [list(line) for line in lines]
else:
print('错误:未知词元类型:' + token)
def load_corpus_time_machine(max_tokens=-1): # @save
"""返回时光机器数据集的词元索引列表和词表"""
lines = read_time_machine()
tokens = tokenize(lines, 'char')
vocab = Vocab(tokens)
# 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
# 所以将所有文本行展平到一个列表中
corpus = [vocab[token] for line in tokens for token in line]
# max_tokens>0意为限制返回的词元数量,为-1即为返回所有词元
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab
corpus, vocab = load_corpus_time_machine()
print(len(corpus))
print(len(vocab))