当前位置: 首页 > article >正文

爬虫——将数据保存到MongoDB中

目录

  • 安装mongodb
    • mongod.conf
    • init-mongo.js
    • docker-yml文件
  • 编写mongodb类
    • 初始化mongodb
  • 将爬虫的数据保存到mongodb中
    • save_to_mongo
  • 补充

deepseek推荐的,爬虫用mongodbmysql效果要好一些,所以就尝试一下

安装mongodb

肯定是线程的mongodb比较方便啊,所以就搞了个docker的mongodb,只要将数据库挂在出来就行,不过一些配置要搞一下。

mongod.conf

storage:
  dbPath: /data/db
  wiredTiger:
    engineConfig:
      cacheSizeGB: 1
      journalCompressor: snappy
    collectionConfig:
      blockCompressor: snappy
operationProfiling:
  mode: slowOp
  slowOpThresholdMs: 100
replication:
  oplogSizeMB: 100

【细节讲解】

  • dbPath/data/db指定数据库文件的存储路径,这个也是需要挂载出来的地方
  • wiredTiger :配置 WiredTiger 存储引擎的相关参数。
    • engineConfig:引擎配置。
      • cacheSizeGB:指定缓存大小为 1 GB,用于控制内存使用。.
      • journalCompressor:指定snappy算法来压缩日志
    • collectionConfig:集合配置。
      • blockCompressor指定snappy算法来压缩集合数据块,以提高存储效率。
  • operationProfiling:操作分析配置。
    • modeslowOp仅记录执行时间超过慢操作阈值的操作。
    • slowOpThresholdMs100执行时间超过 100 毫秒的操作将被记录。
  • replication
    • oplogSizeMB100设置操作日志(oplog)的大小为 100 MB。

init-mongo.js

db.createUser({
  user: "admin1",
  pwd: "admin1",
  roles: [
    { role: "readWrite", db: "mydb" },
    { role: "clusterMonitor", db: "admin" }
  ]
});

这个就简单了,创建用户,以及赋予用户权限。

docker-yml文件

为了方便部署,直接使用docker-compose一键部署就行了

version: '1.0'

services:
  mongodb:
    image: mongo  # 使用官方特定版本
    container_name: mongodb
    restart: unless-stopped  # 自动重启策略
    ports:
      - "27017:27017"
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin1# 更改为强密码
      MONGO_INITDB_DATABASE: mydb  # 初始化数据库
      # 性能优化参数
      MONGO_WIREDTIGER_CACHE_SIZE: 1  # 更准确的环境变量名
      MONGO_OPLOG_SIZE: 100
    volumes:
      - /e/Temp/mongodb/mongo_data:/data/db  # 数据持久化
      - /e/Temp/mongodb/init-mongo.js:/docker-entrypoint-initdb.d/init-mongo.js  # 初始化脚本
      - /e/Temp/mongodb/mongod.conf:/etc/mongo/mongod.conf  # 自定义配置文件
    command: ["--config", "/etc/mongo/mongod.conf"]  # 加载自定义配置
    deploy:
      resources:
        limits:
          cpus: '0.5'
          memory: 512M  # 根据需求适当增加
        reservations:
          memory: 256M

【细节讲解】

  • volumes:这个挂载路径需要根据自己的意愿进行配置。
  • restart: unless-stopped,自动重启策略,可以使容器随着docker重启而重启。
# 指定yml
docker-compose -f mongo_docker.yml up -d

编写mongodb类

有现成的库pymongo,不过还是要我们编写一些内容,也就是自己定义增删改查

初始化mongodb

from pymongo import MongoClient, errors
from pymongo.errors import BulkWriteError
from pymongo.server_api import ServerApi
from pymongo import UpdateOne

class MongoDBManager:
    def __init__(self,
                 mongo_uri="mongodb://admin:admin1@localhost:27017/amazon?authSource=mydb",
                 db_name="mydb",
                 collection_name="products",
                 retry_attempts=3,
                 retry_delay=2):
        """
        :param mongo_uri: MongoDB 连接字符串
        :param db_name: 数据库名称(不存在时自动创建)
        :param collection_name: 集合名称(不存在时自动创建)
        :param retry_attempts: 连接重试次数
        :param retry_delay: 重试间隔(秒)
        """
        self.mongo_uri = mongo_uri
        self.db_name = db_name
        self.collection_name = collection_name
        self.retry_attempts = retry_attempts
        self.retry_delay = retry_delay

        # 初始化连接
        self.client = None
        self.db = None
        self.collection = None
        self._connect()

在一开始的初始化mongo对象的时候,尝试连接一下数据库,如果数据库ok的情况下,那么就可以连接。

class MongoDBManager:
	# ...
	def _connect(self):
	# 尝试连接就尝试3次吧,也就是self.retry_attempts
		for attemp in range(self.retry_attemps):
			try:
				self.client = MongoClient(
                    self.mongo_uri,
                    serverSelectionTimeoutMS=5000,
                    server_api=ServerApi('1'),
                )
                                # 初始化数据库和集合(不会实际创建,直到插入数据)
                self.db = self.client[self.db_name]
                self.collection = self.db[self.collection_name]

                # 检查/创建索引(幂等操作)
                self._ensure_indexes()
                print(f"成功连接 MongoDB | 数据库: {self.db_name} | 集合: {self.collection_name}")
                return
            except errors.ServerSelectionTimeoutError as e:
                print(f"连接尝试 {attempt + 1}/{self.retry_attempts} 失败: {str(e)}")
                if attempt < self.retry_attempts - 1:
                    print(f"{self.retry_delay}秒后重试...")
                    time.sleep(self.retry_delay)
                else:
                    raise RuntimeError(f"无法连接 MongoDB,请检查服务状态或连接字符串") from e
            except errors.OperationFailure as e:
                if "Authentication failed" in str(e):
                    raise RuntimeError("MongoDB 认证失败,请检查用户名密码") from e
                else:
                    raise
    def _ensure_indexes(self):
        """创建必要索引(幂等操作)"""
        try:
            # 创建唯一索引(如果已存在会自动跳过)
            self.collection.create_index(
                [("ASIN", 1)],
                unique=True,
                name="asin_unique_idx",
            )
        except errors.OperationFailure as e:
            if "already exists" not in str(e):
                print(f"创建索引失败: {str(e)}")
                raise

【细节讲解】

  • ServerApi:用于指定客户端与 MongoDB 服务器交互时所遵循的稳定 API 版本。表示使用服务器 API 的第一个版本。
  • create_index:在 ASIN 字段上创建一个唯一索引,以确保该字段的值在集合中不重复,并为该索引指定名称,判断是否连接成功。当然也可以使用其他的验证连接的方式,比如心跳检测、验证用户权限、集合存在性验证、服务级健康检查等等。

mongodb提供了多种的方法,比如insertOne()insertMany()bulkWrite()upsert()。接下来就是用上面这些来写入。

class MongoDBManager:
	# ...
	def insert_data(self, data):
        """插入数据(自动处理初次创建集合)"""
        try:
            if isinstance(data, list):
                result = self.collection.insert_many(data, ordered=False)
                return len(result.inserted_ids)
            else:
                result = self.collection.insert_one(data)
                return result.inserted_id
        except errors.BulkWriteError as e:
            handled_errors = [
                err for err in e.details['writeErrors']
                if err['code'] != 11000  # 忽略重复键错误(11000)
            ]
            if handled_errors:
                raise
            return len(e.details['nInserted'])
        except errors.DuplicateKeyError:
            print("重复数据已跳过")
            return 0

    def upsert_batch(self, records, key_field="ASIN", batch_size=50000, max_retries=3):
        """
        批量插入或更新数据
        :param records: 数据列表
        :param key_field: 用于更新的唯一键字段
        :param batch_size: 每批处理的数据量
        :param max_retries: 最大重试次数
        :return: 插入或更新的文档数量
        """
        if not records:
            return 0, 0
        total_success = 0
        total_fail = 0
        operations = []
        for record in records:
            try:
                query = {key_field: record[key_field]}
                operations.append(
                    UpdateOne(query, {"$set": record}, upsert=True)
                )
            except KeyError:
                print(f"记录缺少关键字段 {key_field}: {record}")
                continue

        for i in range(0, len(operations), batch_size):
            batch_ops = operations[i:i + batch_size]
            retries = 0
            while retries < max_retries:
                try:
                    result = self.collection.bulk_write(batch_ops, ordered=False)
                    return result.upserted_count + result.modified_count, 0
                except BulkWriteError as e:
                    success = e.details['nInserted'] + e.details['nModified']
                    total_success += success
                    total_fail += (len(batch_ops) - success)
                    print(f"部分操作失败: {len(batch_ops) - success} 条, 第 {retries + 1} 次重试")
                except Exception as e:
                    total_fail += len(batch_ops)
                    print(f"批量操作失败: {str(e)}")
                    traceback.print_exc()
        return total_success, total_fail

【细节讲解】

  • insert_data:这里提供两种方法,当传递进来的是列表的时候,就调用insert_many,否则就调用insert_one()
  • upsert_batch:这里就使用了bulkWrite()+upsert()方法
    • UpdateOne(query, {"$set": record}, upsert=True):对每个数据构建更新操作,设置upsert=True,查询条件匹配到一个时执行更新操作,没有匹配到任何文档,则会根据提供的更新内容创建一个新文档。
    • batch_size:执行分批操作。

    def delete_data(self, query):
        """
        删除符合条件的数据
        :param query: 删除条件(字典格式,例如 {"ASIN": "B0XXXXXXX"})
        :return: 删除的文档数量
        """
        try:
            result = self.collection.delete_data(query)
            return result.deleted_count
        except errors.PyMongoError as e:
            print(f"删除数据时发生错误: {str(e)}")
            return 0

【细节讲解】

  • delete_data:方法已支持根据任意条件删除数据,但是关键的是如何构造。
    • 等值条件删除:当满足特定值时,就执行删除,比如query = {"ASIN": "B0001"} # 精确匹配
    • 范围条件删除:符合某一个范围时,就执行删除,比如query = {"price": {"$gt": 100}} # $gt 表示大于
    • 多条件同时满足:同时满足两个以上的,就执行删除,比如query = {"$and": [{"category": "Electronics"}, {"stock": 0}]},当然也能写成下面这种方式:
    query = {
    "category": "Electronics",
    "stock": 0}
    
    • 满足任一条件(OR):满足其中之一的条件即可,比如query = {"$or": [{"category": "Electronics"}, {"price": {"$lt": 10}]}
    • 正则表达式匹配:通过正则表达,获取对应的内容再删除,比如query = {"title": {"$regex": "Wireless", "$options": "i"}}

class MongoDBManager:
	# ...
    # ------------ 改 ------------
    def update_data(self, query, update_data, upsert=False):
        """
        更新符合条件的数据
        :param query: 查询条件(字典格式)
        :param update_data: 更新操作(需使用MongoDB更新操作符,例如 {"$set": {"Price": 99.99}})
        :param upsert: 如果不存在是否插入新文档
        :return: 修改的文档数量
        """
        try:
            result = self.collection.update_many(
                query,
                update_data,
                upsert=upsert
            )
            return result.modified_count
        except errors.PyMongoError as e:
            print(f"更新数据时发生错误: {str(e)}")
            return 0

【细节讲解】

  • 批量更新(update_many):也就是更新所有匹配查询条件的文档,除了这个还有update_one():更新第一个匹配查询条件的文档、replace_one():替换整个文档。
  • 对于爬虫来说,一股脑更新!

class MongoDBManager:
	# ...
	# ------------ 查 ------------
    def find_data(self, query=None, projection=None, limit=0):
        """
        查询符合条件的文档
        :param query: 查询条件(字典格式)
        :param projection: 返回字段控制(例如 {"ASIN": 1, "Price": 1})
        :param limit: 返回结果数量限制(0表示无限制)
        :return: 匹配的文档列表
        """
        try:
            if query is None:
                query = {}
            cursor = self.collection.find(query, projection).limit(limit)
            return list(cursor)
        except errors.PyMongoError as e:
            print(f"查询数据时发生错误: {str(e)}")
            return []

将爬虫的数据保存到mongodb中

我在上一篇中,通过搜索数据获取翻页等操作,搞到了对应的数据,然后仅仅是简单的保存成csv文件,当然也可以通过自己编写对应的管理器来管理csv文件中的数据内容,不过还是没有现成的数据库好用,哈哈哈哈哈哈。

save_to_mongo

这里要在AmazonBrowser类里面写一个保存的步骤,也就是save_to_mongo

class AmazonBrowser:
	def __init__(self, mongo_manager):
		# ...
		self.mongo_manager = mongo_manager # 新增这一步
	
    def save_to_mongodb(self, batch_size=50000):
        """将数据保存到MongoDB"""
        if self.df.empty:
            print("没有数据需要保存")
            return

        clean_df = self._preprocess_data()
        total_records = len(clean_df)
        total_batches = (total_records // batch_size) + 1
        total_success = 0
        total_fail = 0

        print(f"开始写入 {total_records} 条数据,分 {total_batches} 批处理")
        records = clean_df.to_dict(orient='records')
        for i in range(0, len(records), batch_size):
            batch = records[i:i + batch_size]
            success, fail = self.mongo_manager.upsert_batch(batch)
            total_success += success
            total_fail += fail
            print(f"进度: {min(i + batch_size, total_records)}/{total_records} | 成功: {success} | 失败: {fail}")
        print(f"写入完成!总成功: {total_success} | 总失败: {total_fail}")
        return total_success, total_fail

    def _preprocess_data(self):
        """数据预处理"""
        # TODO: 实现数据预处理逻辑
        df = self.df.copy()
        # 处理空值
        df = df.replace({pd.NA: None, '': None})
        # 去重(保留最后出现的记录)
        df = df.drop_duplicates(subset=['ASIN'], keep='last')
        return df

【细节讲解】

  • _preprocess_data:在数据保存之前,先对爬取的数据进行数据清洗,确保没有问题。
  • self.mongo_manager.upsert_batch(batch):这就是执行数据保存了。

补充

async def main():
    target_url = "https://www.amazon.com/"
    max_attempts = 2
    scraper = None
    with MongoDBManager() as mongo_manager:
        for attempt in range(max_attempts):
            try:
                scraper = AmazonBrowser(target_url, mongo_manager)
                await scraper.run()
                if scraper.get_dataframe() is not None:
                    success, fail = scraper.save_to_mongodb()
                    print(f"数据保存结果: 成功 {success} 条,失败 {fail} 条")
                break
            except playwright._impl._errors.TimeoutError:
                if attempt == max_attempts:
                    raise  # 最后一次尝试仍失败则抛出异常
                print(f"第{attempt}次尝试超时,20秒后重试...")
                if scraper:
                    await scraper.close()  # 关闭当前实例释放资源
                await asyncio.sleep(20)
            except Exception as e:
                if scraper:
                    await scraper.close()
                raise e  # 其他异常直接抛出
if __name__ == '__main__':
    asyncio.run(main())

在这里插入图片描述

就酱~
在这里插入图片描述


http://www.kler.cn/a/597414.html

相关文章:

  • Vue 双向数据绑定是什么
  • STM32原理性知识
  • 《深入剖析鸿蒙生态原生应用:一次开发多端部署的技术革新》
  • 【数学建模】Lingo 18.0及其安装教程(保姆级)
  • 【Qt】自定义委托(Delegate)的核心方法
  • 基于Netty实现高性能HTTP服务的架构解析
  • Apache APISIX 架构浅析
  • 如何解决 PHP 运行时错误导致的服务中断
  • 网页性能优化中 有一条叫做“避免使用未合成的动画”,请问该如何理解?
  • vim在连续多行行首插入相同的字符
  • 同旺科技USB to I2C 适配器 ---- 指令之间延时功能
  • 【项目设计】网页版五子棋
  • STM32 HAL库函数原理解析
  • VSCode配置C语言保姆课
  • 数据结构——最短路径BFS算法
  • taosdump恢复数据库
  • Qt窗口控件之对话框QDialog
  • 31天Python入门——第10天:深入理解值传递·引用传递以及深浅拷贝问题
  • 银河麒麟V10-SP3-aarch64操作系统版本 docker run时报错permission denied
  • Springboot之RequestContextHolder 学习笔记