爬虫——将数据保存到MongoDB中
目录
- 安装mongodb
- mongod.conf
- init-mongo.js
- docker-yml文件
- 编写mongodb类
- 初始化mongodb
- 增
- 删
- 改
- 查
- 将爬虫的数据保存到mongodb中
- save_to_mongo
- 补充
deepseek
推荐的,爬虫用mongodb
比mysql
效果要好一些,所以就尝试一下
安装mongodb
肯定是线程的mongodb
比较方便啊,所以就搞了个docker的mongodb,只要将数据库挂在出来就行,不过一些配置要搞一下。
mongod.conf
storage:
dbPath: /data/db
wiredTiger:
engineConfig:
cacheSizeGB: 1
journalCompressor: snappy
collectionConfig:
blockCompressor: snappy
operationProfiling:
mode: slowOp
slowOpThresholdMs: 100
replication:
oplogSizeMB: 100
【细节讲解】
dbPath
:/data/db
指定数据库文件的存储路径,这个也是需要挂载出来的地方wiredTiger
:配置WiredTiger
存储引擎的相关参数。engineConfig
:引擎配置。cacheSizeGB
:指定缓存大小为 1 GB,用于控制内存使用。.journalCompressor
:指定snappy
算法来压缩日志
collectionConfig
:集合配置。blockCompressor
指定snappy
算法来压缩集合数据块,以提高存储效率。
operationProfiling
:操作分析配置。mode
:slowOp
仅记录执行时间超过慢操作阈值的操作。slowOpThresholdMs
:100
执行时间超过 100 毫秒的操作将被记录。
replication
:oplogSizeMB
:100
设置操作日志(oplog)的大小为 100 MB。
init-mongo.js
db.createUser({
user: "admin1",
pwd: "admin1",
roles: [
{ role: "readWrite", db: "mydb" },
{ role: "clusterMonitor", db: "admin" }
]
});
这个就简单了,创建用户,以及赋予用户权限。
docker-yml文件
为了方便部署,直接使用docker-compose
一键部署就行了
version: '1.0'
services:
mongodb:
image: mongo # 使用官方特定版本
container_name: mongodb
restart: unless-stopped # 自动重启策略
ports:
- "27017:27017"
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin1# 更改为强密码
MONGO_INITDB_DATABASE: mydb # 初始化数据库
# 性能优化参数
MONGO_WIREDTIGER_CACHE_SIZE: 1 # 更准确的环境变量名
MONGO_OPLOG_SIZE: 100
volumes:
- /e/Temp/mongodb/mongo_data:/data/db # 数据持久化
- /e/Temp/mongodb/init-mongo.js:/docker-entrypoint-initdb.d/init-mongo.js # 初始化脚本
- /e/Temp/mongodb/mongod.conf:/etc/mongo/mongod.conf # 自定义配置文件
command: ["--config", "/etc/mongo/mongod.conf"] # 加载自定义配置
deploy:
resources:
limits:
cpus: '0.5'
memory: 512M # 根据需求适当增加
reservations:
memory: 256M
【细节讲解】
volumes
:这个挂载路径需要根据自己的意愿进行配置。restart
:unless-stopped
,自动重启策略,可以使容器随着docker重启而重启。
# 指定yml
docker-compose -f mongo_docker.yml up -d
编写mongodb类
有现成的库pymongo
,不过还是要我们编写一些内容,也就是自己定义增删改查
初始化mongodb
from pymongo import MongoClient, errors
from pymongo.errors import BulkWriteError
from pymongo.server_api import ServerApi
from pymongo import UpdateOne
class MongoDBManager:
def __init__(self,
mongo_uri="mongodb://admin:admin1@localhost:27017/amazon?authSource=mydb",
db_name="mydb",
collection_name="products",
retry_attempts=3,
retry_delay=2):
"""
:param mongo_uri: MongoDB 连接字符串
:param db_name: 数据库名称(不存在时自动创建)
:param collection_name: 集合名称(不存在时自动创建)
:param retry_attempts: 连接重试次数
:param retry_delay: 重试间隔(秒)
"""
self.mongo_uri = mongo_uri
self.db_name = db_name
self.collection_name = collection_name
self.retry_attempts = retry_attempts
self.retry_delay = retry_delay
# 初始化连接
self.client = None
self.db = None
self.collection = None
self._connect()
在一开始的初始化mongo
对象的时候,尝试连接一下数据库,如果数据库ok的情况下,那么就可以连接。
class MongoDBManager:
# ...
def _connect(self):
# 尝试连接就尝试3次吧,也就是self.retry_attempts
for attemp in range(self.retry_attemps):
try:
self.client = MongoClient(
self.mongo_uri,
serverSelectionTimeoutMS=5000,
server_api=ServerApi('1'),
)
# 初始化数据库和集合(不会实际创建,直到插入数据)
self.db = self.client[self.db_name]
self.collection = self.db[self.collection_name]
# 检查/创建索引(幂等操作)
self._ensure_indexes()
print(f"成功连接 MongoDB | 数据库: {self.db_name} | 集合: {self.collection_name}")
return
except errors.ServerSelectionTimeoutError as e:
print(f"连接尝试 {attempt + 1}/{self.retry_attempts} 失败: {str(e)}")
if attempt < self.retry_attempts - 1:
print(f"{self.retry_delay}秒后重试...")
time.sleep(self.retry_delay)
else:
raise RuntimeError(f"无法连接 MongoDB,请检查服务状态或连接字符串") from e
except errors.OperationFailure as e:
if "Authentication failed" in str(e):
raise RuntimeError("MongoDB 认证失败,请检查用户名密码") from e
else:
raise
def _ensure_indexes(self):
"""创建必要索引(幂等操作)"""
try:
# 创建唯一索引(如果已存在会自动跳过)
self.collection.create_index(
[("ASIN", 1)],
unique=True,
name="asin_unique_idx",
)
except errors.OperationFailure as e:
if "already exists" not in str(e):
print(f"创建索引失败: {str(e)}")
raise
【细节讲解】
ServerApi
:用于指定客户端与 MongoDB 服务器交互时所遵循的稳定 API 版本。表示使用服务器 API 的第一个版本。create_index
:在ASIN
字段上创建一个唯一索引,以确保该字段的值在集合中不重复,并为该索引指定名称,判断是否连接成功。当然也可以使用其他的验证连接的方式,比如心跳检测、验证用户权限、集合存在性验证、服务级健康检查等等。
增
mongodb
提供了多种增
的方法,比如insertOne()
、insertMany()
、bulkWrite()
、upsert()
。接下来就是用上面这些来写入。
class MongoDBManager:
# ...
def insert_data(self, data):
"""插入数据(自动处理初次创建集合)"""
try:
if isinstance(data, list):
result = self.collection.insert_many(data, ordered=False)
return len(result.inserted_ids)
else:
result = self.collection.insert_one(data)
return result.inserted_id
except errors.BulkWriteError as e:
handled_errors = [
err for err in e.details['writeErrors']
if err['code'] != 11000 # 忽略重复键错误(11000)
]
if handled_errors:
raise
return len(e.details['nInserted'])
except errors.DuplicateKeyError:
print("重复数据已跳过")
return 0
def upsert_batch(self, records, key_field="ASIN", batch_size=50000, max_retries=3):
"""
批量插入或更新数据
:param records: 数据列表
:param key_field: 用于更新的唯一键字段
:param batch_size: 每批处理的数据量
:param max_retries: 最大重试次数
:return: 插入或更新的文档数量
"""
if not records:
return 0, 0
total_success = 0
total_fail = 0
operations = []
for record in records:
try:
query = {key_field: record[key_field]}
operations.append(
UpdateOne(query, {"$set": record}, upsert=True)
)
except KeyError:
print(f"记录缺少关键字段 {key_field}: {record}")
continue
for i in range(0, len(operations), batch_size):
batch_ops = operations[i:i + batch_size]
retries = 0
while retries < max_retries:
try:
result = self.collection.bulk_write(batch_ops, ordered=False)
return result.upserted_count + result.modified_count, 0
except BulkWriteError as e:
success = e.details['nInserted'] + e.details['nModified']
total_success += success
total_fail += (len(batch_ops) - success)
print(f"部分操作失败: {len(batch_ops) - success} 条, 第 {retries + 1} 次重试")
except Exception as e:
total_fail += len(batch_ops)
print(f"批量操作失败: {str(e)}")
traceback.print_exc()
return total_success, total_fail
【细节讲解】
insert_data
:这里提供两种方法,当传递进来的是列表的时候,就调用insert_many
,否则就调用insert_one()
upsert_batch
:这里就使用了bulkWrite()
+upsert()
方法UpdateOne(query, {"$set": record}, upsert=True)
:对每个数据构建更新操作,设置upsert=True
,查询条件匹配到一个时执行更新操作,没有匹配到任何文档,则会根据提供的更新内容创建一个新文档。batch_size
:执行分批操作。
删
def delete_data(self, query):
"""
删除符合条件的数据
:param query: 删除条件(字典格式,例如 {"ASIN": "B0XXXXXXX"})
:return: 删除的文档数量
"""
try:
result = self.collection.delete_data(query)
return result.deleted_count
except errors.PyMongoError as e:
print(f"删除数据时发生错误: {str(e)}")
return 0
【细节讲解】
delete_data
:方法已支持根据任意条件删除数据,但是关键的是如何构造。等值条件删除
:当满足特定值时,就执行删除,比如query = {"ASIN": "B0001"} # 精确匹配
。范围条件删除
:符合某一个范围时,就执行删除,比如query = {"price": {"$gt": 100}} # $gt 表示大于
。多条件同时满足
:同时满足两个以上的,就执行删除,比如query = {"$and": [{"category": "Electronics"}, {"stock": 0}]}
,当然也能写成下面这种方式:
query = { "category": "Electronics", "stock": 0}
满足任一条件(OR)
:满足其中之一的条件即可,比如query = {"$or": [{"category": "Electronics"}, {"price": {"$lt": 10}]}
正则表达式匹配
:通过正则表达,获取对应的内容再删除,比如query = {"title": {"$regex": "Wireless", "$options": "i"}}
改
class MongoDBManager:
# ...
# ------------ 改 ------------
def update_data(self, query, update_data, upsert=False):
"""
更新符合条件的数据
:param query: 查询条件(字典格式)
:param update_data: 更新操作(需使用MongoDB更新操作符,例如 {"$set": {"Price": 99.99}})
:param upsert: 如果不存在是否插入新文档
:return: 修改的文档数量
"""
try:
result = self.collection.update_many(
query,
update_data,
upsert=upsert
)
return result.modified_count
except errors.PyMongoError as e:
print(f"更新数据时发生错误: {str(e)}")
return 0
【细节讲解】
批量更新(update_many)
:也就是更新所有匹配查询条件的文档,除了这个还有update_one()
:更新第一个匹配查询条件的文档、replace_one()
:替换整个文档。- 对于爬虫来说,一股脑更新!
查
class MongoDBManager:
# ...
# ------------ 查 ------------
def find_data(self, query=None, projection=None, limit=0):
"""
查询符合条件的文档
:param query: 查询条件(字典格式)
:param projection: 返回字段控制(例如 {"ASIN": 1, "Price": 1})
:param limit: 返回结果数量限制(0表示无限制)
:return: 匹配的文档列表
"""
try:
if query is None:
query = {}
cursor = self.collection.find(query, projection).limit(limit)
return list(cursor)
except errors.PyMongoError as e:
print(f"查询数据时发生错误: {str(e)}")
return []
将爬虫的数据保存到mongodb中
我在上一篇中,通过搜索
→数据获取
→翻页
等操作,搞到了对应的数据,然后仅仅是简单的保存成csv
文件,当然也可以通过自己编写对应的管理器来管理csv文件中的数据内容,不过还是没有现成的数据库好用,哈哈哈哈哈哈。
save_to_mongo
这里要在AmazonBrowser
类里面写一个保存的步骤,也就是save_to_mongo
class AmazonBrowser:
def __init__(self, mongo_manager):
# ...
self.mongo_manager = mongo_manager # 新增这一步
def save_to_mongodb(self, batch_size=50000):
"""将数据保存到MongoDB"""
if self.df.empty:
print("没有数据需要保存")
return
clean_df = self._preprocess_data()
total_records = len(clean_df)
total_batches = (total_records // batch_size) + 1
total_success = 0
total_fail = 0
print(f"开始写入 {total_records} 条数据,分 {total_batches} 批处理")
records = clean_df.to_dict(orient='records')
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
success, fail = self.mongo_manager.upsert_batch(batch)
total_success += success
total_fail += fail
print(f"进度: {min(i + batch_size, total_records)}/{total_records} | 成功: {success} | 失败: {fail}")
print(f"写入完成!总成功: {total_success} | 总失败: {total_fail}")
return total_success, total_fail
def _preprocess_data(self):
"""数据预处理"""
# TODO: 实现数据预处理逻辑
df = self.df.copy()
# 处理空值
df = df.replace({pd.NA: None, '': None})
# 去重(保留最后出现的记录)
df = df.drop_duplicates(subset=['ASIN'], keep='last')
return df
【细节讲解】
_preprocess_data
:在数据保存之前,先对爬取的数据进行数据清洗,确保没有问题。self.mongo_manager.upsert_batch(batch)
:这就是执行数据保存了。
补充
async def main():
target_url = "https://www.amazon.com/"
max_attempts = 2
scraper = None
with MongoDBManager() as mongo_manager:
for attempt in range(max_attempts):
try:
scraper = AmazonBrowser(target_url, mongo_manager)
await scraper.run()
if scraper.get_dataframe() is not None:
success, fail = scraper.save_to_mongodb()
print(f"数据保存结果: 成功 {success} 条,失败 {fail} 条")
break
except playwright._impl._errors.TimeoutError:
if attempt == max_attempts:
raise # 最后一次尝试仍失败则抛出异常
print(f"第{attempt}次尝试超时,20秒后重试...")
if scraper:
await scraper.close() # 关闭当前实例释放资源
await asyncio.sleep(20)
except Exception as e:
if scraper:
await scraper.close()
raise e # 其他异常直接抛出
if __name__ == '__main__':
asyncio.run(main())
就酱~