Python爬虫框架 - 实际项目(拿到可以直接用)
项目目标:
设计一个功能完善的爬虫框架,能够实现以下功能:
- 任务管理:支持多任务并发抓取
- 请求发送:支持HTTP请求发送和代理管理
- 页面解析:支持多种网页解析方式
- 数据存储:支持多种数据存储方式
- 结果展示:支持多种结果展示方式
框架设计思路:
- 模块化设计:将爬虫框架拆分为多个功能模块,每个模块负责不同的任务。
- 可扩展性:框架设计支持功能模块的扩展和替换。
- 异常处理:在每个模块中加入异常处理机制,确保爬虫稳定运行。
- 日志记录:记录爬虫运行过程中的关键信息,便于排查问题。
代码分析:
我们将分为以下几个模块进行讲解:
- 任务管理模块
- 请求发送模块
- 页面解析模块
- 数据存储模块
- 结果展示模块
案例项目实现
1. 任务管理模块 (task_manager.py
)
from queue import Queue
import threading
class TaskQueue:
def __init__(self):
self.queue = Queue()
self.lock = threading.Lock()
self.tasks = []
def add_task(self, task):
"""添加任务"""
with self.lock:
self.tasks.append(task)
self.queue.put(task)
def get_task(self):
"""获取任务"""
return self.queue.get()
def task_done(self):
"""标记任务完成"""
self.queue.task_done()
def get_remaining_tasks(self):
"""获取剩余任务数"""
return self.queue.qsize()
def get_total_tasks(self):
"""获取总任务数"""
return len(self.tasks)
def get_completed_tasks(self):
"""获取已完成任务数"""
return self.get_total_tasks() - self.get_remaining_tasks()
代码分析:
- 使用Python自带的
queue.Queue
实现任务队列。 - 使用
threading.Lock
实现线程安全。 - 提供了任务的添加、获取、完成以及统计功能。
2. 请求发送模块 (request_sender.py
)
import requests
from requests.exceptions import RequestException
import time
class RequestSender:
def __init__(self, headers=None, proxies=None):
self.session = requests.Session()
self.headers = headers
self.proxies = proxies
def send_request(self, url, method='GET', timeout=10):
"""发送HTTP请求"""
try:
response = self.session.request(
method=method,
url=url,
headers=self.headers,
proxies=self.proxies,
timeout=timeout
)
response.raise_for_status()
return response
except RequestException as e:
print(f"请求失败: {e}")
return None
def set_headers(self, headers):
"""设置请求头"""
self.headers = headers
def set_proxies(self, proxies):
"""设置代理"""
self.proxies = proxies
def random_delay(self, min_delay=1, max_delay=3):
"""随机延迟"""
delay = (max_delay - min_delay) * random.random() + min_delay
time.sleep(delay)
代码分析:
- 使用
requests.Session()
保持会话状态。 - 提供了请求头和代理设置功能。
- 集成了随机延迟功能,防止被目标网站封IP。
3. 页面解析模块 (page_parser.py
)
from bs4 import BeautifulSoup
import re
class PageParser:
def __init__(self, content, parser='html.parser'):
self.soup = BeautifulSoup(content, parser)
def extract_links(self, base_url=None):
"""提取页面中的所有链接"""
links = []
for link in self.soup.find_all('a', href=True):
url = link['href']
if base_url and not url.startswith('http'):
url = base_url + url
links.append(url)
return links
def extract_content(self, selector):
"""提取指定内容"""
elements = self.soup.select(selector)
return [element.get_text() for element in elements]
def extract_images(self):
"""提取页面中的所有图片链接"""
images = []
for img in self.soup.find_all('img', src=True):
images.append(img['src'])
return images
def find_all(self, name=None, attrs={}, recursive=True, text=None):
"""查找所有匹配的标签"""
return self.soup.find_all(name, attrs, recursive, text)
代码分析:
- 使用
BeautifulSoup
进行HTML解析。 - 提供了提取链接、内容、图片等功能。
- 支持自定义选择器进行内容提取。
4. 数据存储模块 (data_storage.py
)
import sqlite3
class DataStorage:
def __init__(self, db_name='spider.db'):
self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor()
def create_table(self, table_name, columns):
"""创建数据表"""
columns_str = ', '.join([f'{col} {typ}' for col, typ in columns.items()])
self.cursor.execute(f'CREATE TABLE IF NOT EXISTS {table_name} ({columns_str})')
self.conn.commit()
def insert_data(self, table_name, data):
"""插入数据"""
placeholders = ', '.join(['?'] * len(data))
columns = ', '.join(data.keys())
values = tuple(data.values())
self.cursor.execute(f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})', values)
self.conn.commit()
def query_data(self, table_name, columns='*', condition=None):
"""查询数据"""
query = f'SELECT {columns} FROM {table_name}'
if condition:
query += f' WHERE {condition}'
self.cursor.execute(query)
return self.cursor.fetchall()
def close(self):
"""关闭数据库连接"""
self.conn.close()
代码分析:
- 使用
sqlite3
进行数据存储。 - 提供了创建数据表、插入数据、查询数据功能。
- 支持自定义表结构和查询条件。
5. 结果展示模块 (result_viewer.py
)
from prettytable import PrettyTable
class ResultViewer:
def __init__(self):
self.table = PrettyTable()
def display_data(self, data, fields=None):
"""显示数据"""
if fields:
self.table.field_names = fields
for row in data:
self.table.add_row(row)
print(self.table)
def display_statistics(self, stats):
"""显示统计信息"""
print("\n爬虫统计信息:")
for key, value in stats.items():
print(f"{key}: {value}")
代码分析:
- 使用
PrettyTable
库以表格形式展示数据。 - 提供了展示数据和统计信息的功能。
案例项目主程序 (main.py
)
import random
from task_manager import TaskQueue
from request_sender import RequestSender
from page_parser import PageParser
from data_storage import DataStorage
from result_viewer import ResultViewer
class NewsSpider:
def __init__(self, base_url, max_threads=5):
self.base_url = base_url
self.task_queue = TaskQueue()
self.request_sender = RequestSender()
self.data_storage = DataStorage()
self.result_viewer = ResultViewer()
self.max_threads = max_threads
self.running_threads = 0
self.total_tasks = 0
self.completed_tasks = 0
def start(self):
"""启动爬虫"""
self.init_db()
self.seed_tasks()
self.start_threads()
def init_db(self):
"""初始化数据库"""
columns = {
'id': 'INTEGER PRIMARY KEY AUTOINCREMENT',
'title': 'TEXT',
'content': 'TEXT',
'publish_date': 'TEXT',
'url': 'TEXT'
}
self.data_storage.create_table('news', columns)
def seed_tasks(self):
"""初始化任务"""
self.task_queue.add_task(self.base_url)
self.total_tasks = self.task_queue.get_total_tasks()
def start_threads(self):
"""启动线程"""
for _ in range(self.max_threads):
thread = threading.Thread(target=self.run)
thread.start()
def run(self):
"""爬虫执行逻辑"""
while not self.task_queue.empty():
self.running_threads += 1
url = self.task_queue.get_task()
self.crawl(url)
self.task_queue.task_done()
self.running_threads -= 1
def crawl(self, url):
"""抓取逻辑"""
self.request_sender.random_delay()
response = self.request_sender.send_request(url)
if not response:
return
parser = PageParser(response.text)
news_list = parser.extract_content('.news-item')
for news in news_list:
data = {
'title': news['title'],
'content': news['content'],
'publish_date': news['publish_date'],
'url': url
}
self.data_storage.insert_data('news', data)
links = parser.extract_links(self.base_url)
for link in links:
self.task_queue.add_task(link)
def get_statistics(self):
"""获取爬虫统计信息"""
return {
'总任务数': self.task_queue.get_total_tasks(),
'剩余任务数': self.task_queue.get_remaining_tasks(),
'已完成任务数': self.task_queue.get_completed_tasks(),
'运行线程数': self.running_threads
}
def display_results(self):
"""展示结果"""
stats = self.get_statistics()
self.result_viewer.display_statistics(stats)
data = self.data_storage.query_data('news', '*', 'id <= 10')
self.result_viewer.display_data(data, ['标题', '内容', '发布时间', 'URL'])
self.data_storage.close()
if __name__ == '__main__':
base_url = 'http://example.com/news'
spider = NewsSpider(base_url, max_threads=5)
spider.start()
spider.display_results()
代码分析:
- 实现了一个完整的新闻爬虫逻辑。
- 支持多线程并发抓取。
- 数据存储在SQLite数据库中。
- 结果以表格形式展示。