【Python爬虫(38)】解锁Scrapy - Redis:构建高效分布式爬虫
【Python爬虫】专栏简介:本专栏是 Python 爬虫领域的集大成之作,共 100 章节。从 Python 基础语法、爬虫入门知识讲起,深入探讨反爬虫、多线程、分布式等进阶技术。以大量实例为支撑,覆盖网页、图片、音频等各类数据爬取,还涉及数据处理与分析。无论是新手小白还是进阶开发者,都能从中汲取知识,助力掌握爬虫核心技能,开拓技术视野。
目录
- 一、引言
- 二、Scrapy - Redis 扩展安装与配置
- 2.1 安装 Redis
- 2.2 安装 Scrapy - Redis
- 2.3 配置 Scrapy 项目
- 三、改造 Scrapy 项目为分布式爬虫
- 3.1 修改爬虫类继承
- 3.2 配置 Redis 相关信息
- 3.3 编写爬取逻辑
- 四、分布式爬虫中的数据合并与处理
- 4.1 数据存储到 Redis
- 4.2 合并数据
- 4.3 数据处理
- 五、总结与展望
一、引言
在大数据时代,数据的价值愈发凸显,而获取大量数据的重要手段之一便是爬虫技术。对于大规模的数据爬取任务,单机爬虫往往显得力不从心,无论是从爬取速度,还是从应对目标网站反爬机制的能力来看,都难以满足需求。这时,分布式爬虫应运而生。分布式爬虫通过将爬取任务分配到多个节点上并行执行,能够充分利用集群的计算资源,显著提高数据爬取的效率,同时也增强了爬虫系统的稳定性和可靠性。
Scrapy - Redis 作为一款强大的分布式爬虫框架,它巧妙地将 Scrapy 框架与 Redis 数据库相结合,为实现分布式爬虫提供了便捷高效的解决方案。Scrapy 本身是一个功能强大的 Python 爬虫框架,具有异步处理、强大的选择器、中间件支持等特性,能够方便地进行数据抓取和处理。而 Redis 是一个高性能的内存数据库,支持多种数据结构,如字符串、哈希、列表、集合等,其读写速度快、高可用、数据持久化等特点,使其非常适合作为分布式爬虫中的任务队列和去重容器。通过 Scrapy - Redis,我们可以轻松地将爬虫任务分布到多个节点上,利用 Redis 进行任务调度和去重,实现高效、稳定的分布式爬虫系统。接下来,就让我们深入了解如何使用 Scrapy - Redis 实现分布式爬虫。
二、Scrapy - Redis 扩展安装与配置
在使用 Scrapy - Redis 实现分布式爬虫之前,我们需要先完成相关的安装和配置工作,这包括安装 Redis 以及 Scrapy - Redis 扩展,并对 Scrapy 项目进行相应的配置。
2.1 安装 Redis
Redis 在分布式爬虫中扮演着至关重要的角色,它作为任务队列,能够高效地存储和分发待爬取的 URL 任务,确保各个爬虫节点都能获取到任务;同时,它作为去重容器,利用其集合数据结构可以快速判断 URL 是否已经被爬取过,避免重复爬取,从而节省资源和时间。
Redis 的下载安装步骤会因操作系统的不同而有所差异:
- Windows 系统:可以在 Microsoft Open Tech 的 GitHub 仓库(https://github.com/microsoftarchive/redis/releases)上下载 Redis 的 Windows 版本。下载完成后,解压压缩包到指定目录,例如C:\redis。然后进入解压后的目录,打开命令提示符,执行redis-server.exe redis.windows.conf命令来启动 Redis 服务。
- Linux 系统:如果是基于 Debian 或 Ubuntu 的系统,可以使用以下命令进行安装:
sudo apt-get update
sudo apt-get install redis-server
安装完成后,使用sudo systemctl start redis命令启动 Redis 服务。
若是基于 CentOS 的系统,则可以使用以下命令安装:
sudo yum install epel-release
sudo yum install redis
安装完成后,同样使用sudo systemctl start redis命令启动 Redis 服务。
2.2 安装 Scrapy - Redis
Scrapy - Redis 是将 Scrapy 框架与 Redis 数据库进行结合的扩展库,它为 Scrapy 提供了分布式调度、去重以及数据存储等功能,使得我们能够方便地构建分布式爬虫系统。在安装 Scrapy - Redis 之前,请确保已经安装了 Scrapy 和 Python 环境。安装 Scrapy - Redis 非常简单,只需要使用 pip 命令即可:
pip install scrapy-redis
2.3 配置 Scrapy 项目
完成安装后,我们需要对 Scrapy 项目进行配置,以启用 Scrapy - Redis 的分布式功能。打开 Scrapy 项目中的settings.py文件,进行如下配置:
- 启用分布式调度器和去重类:
# 使用Scrapy - Redis的调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 使用Scrapy - Redis的去重类
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
- 配置 Redis 连接信息:
# Redis服务器地址,默认为本地localhost
REDIS_HOST = 'localhost'
# Redis服务器端口,默认为6379
REDIS_PORT = 6379
如果 Redis 设置了密码,可以通过REDIS_PARAMS来配置密码:
REDIS_PARAMS = {
'password': 'your_redis_password'
}
- 配置调度器持久化:
# 保持任务队列,不清空,即使爬虫停止,任务队列中的任务也会保留
SCHEDULER_PERSIST = True
- 配置数据存储到 Redis 的 Pipeline(可选):如果希望将爬取到的数据直接存储到 Redis 中,可以配置ITEM_PIPELINES:
ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline': 300,
}
此外,还可以通过配置REDIS_ITEMS_KEY来指定存储数据的键名,例如:
REDIS_ITEMS_KEY = '%(spider)s:items'
上述配置完成后,我们就成功地将 Scrapy 项目与 Redis 进行了整合,为实现分布式爬虫奠定了基础。接下来,我们将对 Scrapy 爬虫进行改造,使其能够利用 Redis 作为任务队列和去重容器。
三、改造 Scrapy 项目为分布式爬虫
在完成了 Scrapy - Redis 的安装与配置后,接下来我们需要对 Scrapy 项目进行改造,使其能够利用 Redis 作为任务队列和去重容器,实现分布式爬虫的功能。这主要涉及到修改爬虫类的继承关系、配置 Redis 相关信息以及编写爬取逻辑等步骤。
3.1 修改爬虫类继承
在使用 Scrapy - Redis 实现分布式爬虫时,我们需要修改爬虫类的继承关系。如果原爬虫类继承自CrawlSpider,则需要将其更改为继承自RedisCrawlSpider;如果原爬虫类继承自Spider,则需要将其更改为继承自RedisSpider。这是因为RedisCrawlSpider和RedisSpider是 Scrapy - Redis 提供的特殊爬虫类,它们已经集成了与 Redis 交互的功能,能够从 Redis 中获取任务并将新的任务请求放入 Redis 中。
例如,原爬虫类继承自CrawlSpider:
from scrapy.spiders import CrawlSpider
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import Rule
class MyCrawlSpider(CrawlSpider):
name ='my_crawl_spider'
allowed_domains = ['example.com']
start_urls = ['http://example.com']
rules = (
Rule(LinkExtractor(allow=r'/page/\d+'), follow=True),
Rule(LinkExtractor(allow=r'/article/\d+'), callback='parse_item'),
)
def parse_item(self, response):
# 解析数据的逻辑
pass
修改为继承自RedisCrawlSpider:
from scrapy_redis.spiders import RedisCrawlSpider
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import Rule
class MyRedisCrawlSpider(RedisCrawlSpider):
name ='my_redis_crawl_spider'
allowed_domains = ['example.com']
# 不需要设置start_urls,从Redis中获取任务
redis_key ='my_redis_crawl_spider:start_urls'
rules = (
Rule(LinkExtractor(allow=r'/page/\d+'), follow=True),
Rule(LinkExtractor(allow=r'/article/\d+'), callback='parse_item'),
)
def parse_item(self, response):
# 解析数据的逻辑
pass
同样地,如果原爬虫类继承自Spider:
from scrapy.spiders import Spider
class MySpider(Spider):
name ='my_spider'
allowed_domains = ['example.com']
start_urls = ['http://example.com']
def parse(self, response):
# 解析数据的逻辑
pass
则需要修改为继承自RedisSpider:
from scrapy_redis.spiders import RedisSpider
class MyRedisSpider(RedisSpider):
name ='my_redis_spider'
allowed_domains = ['example.com']
redis_key ='my_redis_spider:start_urls'
def parse(self, response):
# 解析数据的逻辑
pass
3.2 配置 Redis 相关信息
在修改了爬虫类的继承关系后,我们需要配置 Redis 相关的信息。首先,在爬虫类中设置redis_key,这个键用于指定在 Redis 中存储任务队列的键名。例如:
class MyRedisSpider(RedisSpider):
name ='my_redis_spider'
allowed_domains = ['example.com']
redis_key ='my_redis_spider:start_urls'
这里的my_redis_spider:start_urls就是存储任务队列的键名,在启动爬虫之前,需要将初始的 URL 任务添加到这个键对应的队列中。
除了在爬虫类中设置redis_key,还需要在settings.py文件中进一步配置 Redis 的相关参数,如 Redis 服务器的地址、端口等。如果 Redis 设置了密码,也需要在这里进行配置。在前面的安装与配置部分已经提及了相关配置,这里再次强调,例如:
# Redis服务器地址,默认为本地localhost
REDIS_HOST = 'localhost'
# Redis服务器端口,默认为6379
REDIS_PORT = 6379
# 如果Redis设置了密码
REDIS_PARAMS = {
'password': 'your_redis_password'
}
3.3 编写爬取逻辑
在完成了上述步骤后,就可以开始编写爬取逻辑了。在爬虫类的parse方法(如果是CrawlSpider及其子类,还包括rules中定义的回调函数)中,编写解析网页内容、提取数据的逻辑。与普通的 Scrapy 爬虫类似,通过response对象获取网页内容,并使用xpath、css选择器或其他方式提取所需的数据。
例如,从网页中提取标题和链接:
class MyRedisSpider(RedisSpider):
name ='my_redis_spider'
allowed_domains = ['example.com']
redis_key ='my_redis_spider:start_urls'
def parse(self, response):
title = response.xpath('//title/text()').get()
links = response.css('a::attr(href)').extract()
for link in links:
yield response.follow(link, self.parse)
item = {
'title': title,
'links': links
}
yield item
在上述代码中,首先从网页中提取标题和链接,然后对每个链接发起新的请求(通过response.follow方法),并继续使用parse方法进行处理。最后,将提取到的数据封装成item并返回。
在分布式爬虫中,还需要将爬取到的数据通过 Item Pipeline 存储到 Redis 中。如果在settings.py中配置了scrapy_redis.pipelines.RedisPipeline,那么爬取到的item会自动被存储到 Redis 中。如果需要自定义存储逻辑,可以编写自己的 Item Pipeline,并在其中实现将数据存储到 Redis 的功能。例如:
import redis
from itemadapter import ItemAdapter
class CustomRedisPipeline:
def __init__(self, redis_host, redis_port):
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_client = None
@classmethod
def from_crawler(cls, crawler):
return cls(
redis_host=crawler.settings.get('REDIS_HOST'),
redis_port=crawler.settings.get('REDIS_PORT')
)
def open_spider(self, spider):
self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port)
def close_spider(self, spider):
self.redis_client.close()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
data = dict(adapter)
self.redis_client.rpush('my_redis_spider:items', data)
return item
在上述代码中,定义了一个自定义的CustomRedisPipeline,在process_item方法中,将item转换为字典形式,并使用 Redis 的rpush方法将其存储到my_redis_spider:items键对应的列表中。在settings.py中启用这个 Pipeline:
ITEM_PIPELINES = {
'myproject.pipelines.CustomRedisPipeline': 300,
}
通过以上步骤,我们就完成了将 Scrapy 项目改造为分布式爬虫的过程,使其能够利用 Redis 作为任务队列和去重容器,实现高效的分布式数据爬取。
四、分布式爬虫中的数据合并与处理
在分布式爬虫中,多个节点并行爬取数据,如何有效地合并和处理这些分散的数据是一个关键问题。这不仅关系到最终数据的完整性和准确性,还影响到后续数据分析和应用的效果。下面我们将探讨如何将多个节点爬取的数据存储到 Redis,并从 Redis 中读取数据进行合并和处理。
4.1 数据存储到 Redis
在使用 Scrapy - Redis 实现分布式爬虫时,每个爬虫节点爬取到的数据通常会存储到 Redis 中。通过在settings.py文件中配置scrapy_redis.pipelines.RedisPipeline,可以将爬取到的item自动存储到 Redis 中。例如:
ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline': 300,
}
Redis 提供了丰富的数据结构,在存储爬虫数据时,常用的是哈希(Hash)和列表(List)结构。如果使用哈希结构,每个item可以作为一个哈希表存储,其中item的字段作为哈希表的字段名,字段值作为哈希表的字段值。例如,爬取到的新闻数据包含标题、作者、发布时间等字段:
import redis
from itemadapter import ItemAdapter
class CustomRedisPipeline:
def __init__(self, redis_host, redis_port):
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_client = None
@classmethod
def from_crawler(cls, crawler):
return cls(
redis_host=crawler.settings.get('REDIS_HOST'),
redis_port=crawler.settings.get('REDIS_PORT')
)
def open_spider(self, spider):
self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port)
def close_spider(self, spider):
self.redis_client.close()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
data = dict(adapter)
item_id = data.get('id') # 假设每个item有唯一的id字段
if item_id:
self.redis_client.hmset(f'news:{item_id}', data)
return item
上述代码中,CustomRedisPipeline将item以哈希的形式存储到 Redis 中,键名为news:{item_id},其中{item_id}是每个新闻数据的唯一标识。
如果使用列表结构,每个item可以被序列化为字符串后,通过rpush命令添加到列表中。例如:
import redis
import json
from itemadapter import ItemAdapter
class ListRedisPipeline:
def __init__(self, redis_host, redis_port):
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_client = None
@classmethod
def from_crawler(cls, crawler):
return cls(
redis_host=crawler.settings.get('REDIS_HOST'),
redis_port=crawler.settings.get('REDIS_PORT')
)
def open_spider(self, spider):
self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port)
def close_spider(self, spider):
self.redis_client.close()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
data = dict(adapter)
self.redis_client.rpush('news_list', json.dumps(data))
return item
这里将item转换为 JSON 字符串后,添加到名为news_list的列表中。
4.2 合并数据
当所有爬虫节点完成数据爬取并存储到 Redis 后,需要从 Redis 中读取所有节点的爬取结果并进行合并。如果数据存储在哈希结构中,可以通过遍历所有的哈希键,将每个哈希表中的数据提取出来进行合并。例如:
import redis
redis_client = redis.Redis(host='localhost', port=6379)
keys = redis_client.keys('news:*')
all_data = []
for key in keys:
data = redis_client.hgetall(key)
all_data.append(dict(data))
如果数据存储在列表结构中,可以使用llen获取列表长度,然后通过lrange按索引范围读取列表中的所有元素,将其反序列化为原始数据格式后进行合并。例如:
import redis
import json
redis_client = redis.Redis(host='localhost', port=6379)
length = redis_client.llen('news_list')
all_data = []
for i in range(length):
item_str = redis_client.lrange('news_list', i, i + 1)[0]
item = json.loads(item_str)
all_data.append(item)
除了手动编写代码进行数据合并,也可以使用一些工具或脚本进行数据汇总。例如,使用 Redis 的SUBSCRIBE和PUBLISH功能,将各个节点爬取的数据通过消息队列的方式发送到一个汇总节点,在汇总节点上进行数据合并。
4.3 数据处理
在合并数据之后,通常需要对数据进行一系列的处理操作,以满足后续数据分析和应用的需求。这些处理操作包括数据清洗、去重、格式转换等。
数据清洗是去除数据中的噪声、错误数据和不相关数据的过程。例如,去除网页中提取的数据中的 HTML 标签、特殊字符等。可以使用正则表达式或相关的库(如BeautifulSoup)进行处理。例如,使用re模块去除字符串中的 HTML 标签:
import re
def clean_html(html):
clean = re.compile('<.*?>')
return re.sub(clean, '', html)
数据去重是避免重复数据对分析结果产生影响的重要步骤。如果在爬取过程中没有完全避免重复数据的产生,可以在数据合并后再次进行去重。对于 Python 中的列表数据,可以使用set结合数据的唯一标识进行去重。例如:
unique_data = []
seen = set()
for item in all_data:
item_id = item.get('id')
if item_id and item_id not in seen:
unique_data.append(item)
seen.add(item_id)
数据格式转换是将数据转换为适合后续分析和存储的格式。例如,将日期字符串转换为datetime对象,将数字字符串转换为数值类型等。假设数据中有一个日期字段date,格式为YYYY - MM - DD,可以使用datetime模块进行转换:
import datetime
for item in unique_data:
date_str = item.get('date')
if date_str:
item['date'] = datetime.datetime.strptime(date_str, '%Y-%m-%d')
通过以上数据合并和处理步骤,可以将分布式爬虫中多个节点爬取的数据整合为高质量的数据集,为后续的数据分析、挖掘和应用提供有力支持。
五、总结与展望
使用 Scrapy - Redis 实现分布式爬虫,为大规模数据爬取提供了高效、可靠的解决方案。通过安装和配置 Scrapy - Redis 扩展,将 Scrapy 项目改造为分布式爬虫,利用 Redis 作为任务队列和去重容器,使得爬虫任务能够在多个节点上并行执行,大大提高了数据爬取的效率和系统的稳定性。同时,在分布式爬虫中,合理地进行数据合并与处理,能够确保获取到的数据完整、准确,为后续的数据分析和应用奠定坚实的基础。
随着互联网数据量的持续增长以及对数据处理实时性要求的不断提高,分布式爬虫在未来的数据获取领域将发挥更加重要的作用。在技术发展方面,分布式爬虫将与人工智能、大数据处理等技术深度融合,实现更加智能化的爬取策略和数据处理方式。例如,利用机器学习算法自动识别和应对目标网站的反爬机制,根据数据特征自动优化爬取任务的分配等。同时,随着云计算技术的普及,基于云平台的分布式爬虫部署将变得更加便捷和经济高效,能够轻松地根据任务需求弹性扩展计算资源。在应用场景上,分布式爬虫将广泛应用于金融市场监测、舆情分析、电商数据采集、科研数据获取等多个领域,为各行业的决策和发展提供有力的数据支持。总之,分布式爬虫技术有着广阔的发展前景和应用潜力,值得我们持续关注和深入研究。