当前位置: 首页 > article >正文

【python爬虫】设计自己的爬虫 2. 数据保存封装 mongodb,mysql和elasticsearch

mongodb, mysql和elasticsearch 功能较相似,所以打算用一套接口将它们封装起来

基类StorageBase

如下:

class StorageBase:
    def __init__(self, host=None, port=None, database=None, table=None, location=None, account=None, password=None,
                 url=None):
        self.host = host
        self.port = port
        self.database = database
        self.table = table
        self.location = location
        self.account = account
        self.password = password
        self.url = url

    def build_connect(self):
        raise NotImplementedError

    # 增
    def add(self, table_collection_index, data, id=None):
        raise NotImplementedError

    # 删
    def delete(self, table_collection_index, condition=None, data=None, id=None):
        raise NotImplementedError

    # 改
    def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):
        raise NotImplementedError

    # 查
    def search(self, table_collection_index, condition=None):
        raise NotImplementedError

    # 没有就新增,有就更新
    def add_or_update(self, table_collection_index, data, condition=None):
        raise NotImplementedError

封装mongodb

class MongodbStore(StorageBase):
    def __init__(self, host=MONGODB_CONNECTION_HOST, port=MONGODB_CONNECTION_PORT, database=MONGODB_DATABASE,
                 account=MONGODB_DATABASE_USER, password=MONGODB_DATABASE_PASS):
        self.client = None
        self.db = None

        StorageBase.__init__(self, host=host, port=port,
                             database=database, account=account, password=password)

    def build_connect(self):
        try:
            connection = f"mongodb://{self.account}:{self.password}@{self.host}:{self.port}/{self.database}"
            self.client = pymongo.MongoClient(connection)
            self.db = self.client[self.database]

        except Exception as e:
            print("失败:{0}".format(e))
            self.db = None
        else:
            print(f"连接成功")

        return self.db

    # 增
    # data要是列表形式,否则报错
    # 添加单个可以是 单个元素的数组
    def add(self, table_collection_index, data, id=None):
        try:
            result = self.db[table_collection_index].insert_many(data)
            print(f"新增成功")
        except Exception as e:
            print("失败:{0}".format(e))
            result = None
        else:
            print(f"新增成功")

        return result

    def add_one(self, table_collection_index, data):
        try:
            result = self.db[table_collection_index].insert_one(data)

        except Exception as e:
            print("失败:{0}".format(e))
            result = None
        else:
            print(f"新增成功")
        return result

    # 没有就新增,有就更新
    def add_or_update(self, table_collection_index, data, condition=None):
        existing_doc = self.search_one(table_collection_index, conditions=None)

        if existing_doc:
            # 如果文档存在,则更新

            self.update_one(table_collection_index, condition, {
                "key": "set",
                "value": data
            })
            print('数据已更新。')
        else:
            # 如果文档不存在,则插入新文档
            self.add_one(table_collection_index, data)
            print('新数据已插入。')

    # 删
    def delete(self, table_collection_index, condition=None, data=None, id=None):
        try:
            result = self.db[table_collection_index].delete_many(parse_mongodb_condition(data))

        except Exception as e:
            print("失败:{0}".format(e))
            result = None
        else:
            print(f"删除成功")
        return result

    def delete_one(self, table_collection_index, data):
        try:
            result = self.db[table_collection_index].delete_one(parse_mongodb_condition(data))

        except Exception as e:
            print("失败:{0}".format(e))
            result = None
        else:
            print(f"删除成功")
        return result

    # 改
    # search_condition 指定查询条件
    # update_condition 指定更新条件

    def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):
        try:
            result = self.db[table_collection_index].update_many(parse_mongodb_condition(condition),
                                                                 parse_mongodb_condition(update_condition))
        except Exception as e:
            logging.error(f"失败:{e}")
            result = None
        else:
            print(
                f"匹配的数据{result.matched_count if result else 0}条,影响的数据{result.modified_count if result else 0}条")

        return result

    # 改单个
    def update_one(self, table_collection_index=None, search_condition=None, update_condition=None):

        try:
            result = self.db[table_collection_index].update_one(parse_mongodb_condition(search_condition),
                                                                parse_mongodb_condition(update_condition))

        except Exception as e:
            print("失败:{0}".format(e))
            result = None
        else:
            print(
                f"匹配的数据{result.matched_count if result else 0}条,影响的数据{result.modified_count if result else 0}条")

        return result

    # 计数
    def count(self, table_collection_index, conditions):

        try:
            result = self.db[table_collection_index].count_documents(parse_mongodb_condition(conditions))

        except Exception as e:
            print("失败:{0}".format(e))
            result = None
        else:
            print(f"共有{result.matched_count if result else 0}条数据")

        return result

    # 查
    def search(self, table_collection_index, conditions=None):

        try:
            results = self.db[table_collection_index].find(parse_mongodb_condition(conditions) if conditions else {})

        except Exception as e:
            print("失败:{0}".format(e))
            results = None

        return results

    def search_one(self, table_collection_index, conditions=None):

        try:
            result = self.db[table_collection_index].find_one(parse_mongodb_condition(conditions) if conditions else {})
            print(f"查询到的内容:{result}")
        except Exception as e:
            print("失败:{0}".format(e))
            result = None

        return result

    # 排序
    def sort(self, table_collection_index, sort_key, conditions=None, skip_num=None, limit_num=None,
             sort_type=pymongo.ASCENDING):

        try:
            query = self.db[table_collection_index].find(
                parse_mongodb_condition(conditions) if conditions else {}).sort(sort_key,
                                                                                sort_type)
            if skip_num and limit_num:
                results = query.skip(skip_num).limit(limit_num)
            elif skip_num:
                results = query.skip(skip_num)
            elif limit_num:
                results = query.limit(limit_num)
            else:
                results = query

        except Exception as e:
            print("失败:{0}".format(e))
            result = None

        return results

# 测试代码
if __name__ == "__main__":
    # 建立连接
    mongodb_connect = MongodbStore()
    mongodb_connect.build_connect()

    # student1 = {
    #     "id": 1,
    #     "name": "Jordan3",
    #     "age": 23,
    #     "gender": "male"
    # }
    #
    # student2 = {
    #     "name": "Mike",
    #     "age": 21,
    #     "gender": "male"
    # }
    # mongodb_connect.add("students", [student1])

    # result = mongodb_connect.delete("students", {"name": "Mike"})
    # result = mongodb_connect.update("students", {"age": {"#gt": 23}}, {"$inc": {"age": 1}})
    # print(result.matched_count, result.modified_count)

    # 自定义分隔符号
    # condition = parse_mongodb_condition("age*>*20;name*regex*'^J.*'", ";", "*")

    condition_list = [
        {
            "key": "age",
            "condition": ">",
            "value": "20"
        },
        {
            "key": "name",
            "condition": "regex",
            "value": "'^J.*'"
        },
    ]

    # 不设置condition的话表示等于
    # search_list = [
    #     {
    #         "key": "age",
    #         "value": "20"
    #     },
    # ]

    # 条件表达式解析
    mongodb_connect.search("students", condition_list)
    mongodb_connect.sort("students", 'age', condition_list, None, None, pymongo.DESCENDING)


这里要注意的是考虑到mongodb搜索条件的语法有些复杂,因此对其进行了一些简化,简化的思路是把条件转化成简单的键值对对象,下面看代码

# 优化该方法的思路
# 1. 使用列表推导式替代了外部循环。
# 2. 使用 str.join() 方法替代了内部循环中的字符串连接操作,这样可以减少不必要的字符串创建和连接操作,提高代码效率。
# 3. 使用 dict.values() 方法直接获取字典中的所有值,避免了在内部循环中通过键获取值的操作。
# 将条件对象数组转化为条件字符串
def turn_list_to_condition_str(list, split_cond=',', split_key=':'):
    conditions_list = [
        split_key.join([str(value) for value in search.values()])
        for search in list
    ]
    return split_cond.join(conditions_list)



# 该方法将条件字符串转换为条件对象
def parse_mongodb_condition(conditions, split_cond=',', split_key=':'):
    # 传入字符串规则
    # 例子:"age:>:20,name:regex:'^J.*'"
    # ,分隔多个条件
    # age:>:20  age是字段 >是条件 20是值 中间用:分隔
    # 分隔符默认是,和: 也可以自定义
    # 分割字符串

    pairs = turn_list_to_condition_str(conditions).split(split_cond)

    # 构建字典
    condition_dict = {}

    # 避免了在每次循环中都多次调用 pair.split(split_key)
    for pair in pairs:
        split_pair = pair.split(split_key)
        key = replace_chars_by_dit(split_pair[0], MONGODB_CONDITION_DICTIONARY)

        if len(split_pair) == 3:
            cond = replace_chars_by_dit(split_pair[1], MONGODB_CONDITION_DICTIONARY)
            condition_dict[key] = {cond: eval(split_pair[2])}

        elif len(split_pair) == 2:
            condition_dict[key] = eval(split_pair[1])

    # print(f'解析后的条件表达式是{condition_dict}')
    return condition_dict

# 测试代码
if __name__ == "__main__":
    # condition_str = "age:20,name:regex:'^J.*'"
    # print(parse_mongodb_condition(condition_str))

    search_list = [
        {
            "key": "age",
            "condition": ">",
            "value": "20"
        },
        {
            "key": "name",
            "condition": "regex",
            # 注意字符串的值要帶单引号''
            "value": "'^J.*'"
        },
    ]

    search_list = {
        "key": "set",
        "value": "20"
    },
    print(parse_mongodb_condition(search_list))


封装 Elasticsearch

# 可以和关系数据库的概念对比理解
# Relational DB -> Databases -> Tables -> Rows -> Columns
# Elasticsearch -> Indices -> Types -> Documents -> Fields
class ElasticsearchStore(StorageBase):
    def __init__(self, host=ELASTICSEARCH_HOST, port=ELASTICSEARCH_PORT,
                 account=ELASTICSEARCH_USERNAME, password=ELASTICSEARCH_PASSWORD,
                 verify_certs=ELASTICSEARCH_VERIFY_CERTS):

        self.es = None
        self.verify_certs = verify_certs

        StorageBase.__init__(self, host=host, port=port, account=account, password=password)

    def build_connect(self, default=True):
        try:
            if default:
                self.es = Elasticsearch()
            else:
                url = f"https://[{self.account}:{self.password}@]{self.host}:{self.port}"
                self.es = Elasticsearch(url, verify_certs=self.verify_certs)

        except Exception as e:
            print("失败:{0}".format(e))
            self.es = None
        else:
            print(f"连接成功")

        return self.es

    # 增
    def add(self, table_collection_index, data, id=None):
        try:
            result = self.es.index(index=table_collection_index, body=data, id=id)
            print(f"新增成功")
        except Exception as e:
            print("失败:{0}".format(e))
            result = None
        else:
            print(f"新增成功")

        return result

    # 删
    def delete(self, table_collection_index, condition=None, data=None, id=None):
        try:
            result = self.es.delete(index=table_collection_index, id=id, ignore=[400, 404])
            print(f"删除成功")
        except Exception as e:
            print("失败:{0}".format(e))
            result = None
        else:
            print(f"删除成功")

        return result

    # 改
    def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):
        try:
            result = self.es.update(index=table_collection_index, body=data, id=id, ignore=[400, 404])
        except Exception as e:
            logging.error(f"失败:{e}")
            result = None
        else:
            print(f"更新成功")

        return result

    # 获取
    def get(self, table_collection_index, id):
        try:
            result = self.es.get(index=table_collection_index, id=id, ignore=[400, 404])
        except Exception as e:
            logging.error(f"失败:{e}")
            result = None
        else:
            print(f"获取成功{result}")

        return result

    # 查
    # dsl = {
    #     'query': {
    #         'match': {
    #             'title': '高考 圆梦'
    #         }
    #     }
    # }
    def search(self, table_collection_index, condition=None):
        try:
            results = self.es.search(index=table_collection_index, body=condition)
        except Exception as e:
            logging.error(f"失败:{e}")
            results = None
        else:
            print(f"搜索成功{results}")

        return results


# 测试代码
if __name__ == "__main__":
    # 建立连接
    elasticsearch_store = ElasticsearchStore()
    elasticsearch_store.build_connect()

    # doc = {
    #     'author': 'kimchy',
    #     'text': 'Elasticsearch: cool. bonsai cool.',
    #     'timestamp': datetime.now(),
    # }
    # elasticsearch_connect.add('test-index', doc, 1)
    # elasticsearch_connect.get('test-index',1)
    doc1 = {
        'author': 'kimchy',
        'text': 'test',
        'timestamp': datetime.now(),
    }
    elasticsearch_store.update('test-index', doc1, 1)
    # elasticsearch_store.delete('test-index', 1)
    elasticsearch_store.search('test-index', {"query": {"match_all": {}}})

封装Mysql

class MysqlStore(StorageBase):
    def __init__(self, host=MYSQL_HOST, port=MYSQL_PORT,
                 account=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DATABASE):
        self.db = None
        self.cursor = None
        StorageBase.__init__(self, host=host, port=port, account=account, password=password, database=database)

    def build_connect(self) -> pymysql.cursors.Cursor:
        """
        建立数据库连接。

        Returns:
            pymysql.cursors.Cursor: 数据库游标对象,用于执行 SQL 语句。
        """
        try:
            self.db = pymysql.connect(host=self.host, user=self.account, password=self.password, port=self.port,
                                      database=self.database)
            self.cursor = self.db.cursor()

        except Exception as e:
            print(f"连接失败:{e}")
            self.db = None
            self.cursor = None
        else:
            print("连接成功")

        return self.cursor

    # 增
    # table 表
    # data 新增数据 键值对对象
    def add(self, table_collection_index, data, id=None):
        # data.keys()返回的是键的数组
        keys = ', '.join(data.keys())
        # 下面这段是构造多个%s最为占位符,有几个字段就构造几个
        values = ', '.join(['%s'] * len(data))
        # 要执行的sql语句
        sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table_collection_index, keys=keys, values=values)
        try:
            self.cursor.execute(sql, tuple(data.values()))
            self.db.commit()
            print(f"新增成功")
        except Exception as e:
            print("失败:{0}".format(e))
            self.db.rollback()
        finally:
            print(f"{sql}")

    # 没有新增,有就更新
    # table 表
    # data 新增数据 键值对对象
    def add_or_update(self, table_collection_index, data, condition=None):
        # data.keys()返回的是键的数组
        keys = ', '.join(data.keys())
        # 下面这段是构造多个%s最为占位符,有几个字段就构造几个
        values = ', '.join(['%s'] * len(data))
        # 要执行的sql语句
        # 如果主键已经存在,就执行更新操作
        sql = 'INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE '.format(table=table_collection_index, keys=keys,
                                                                                              values=values)
        update = ', '.join(["{key} = %s".format(key=key) for key in data])
        sql += update

        try:
            self.cursor.execute(sql, tuple(data.values()) * 2)
            self.db.commit()
            print(f"新增成功")
        except Exception as e:
            print("失败:{0}".format(e))
            self.db.rollback()
        finally:
            print(f"{sql}")

    # 删
    # table 表名
    # condition 删除条件
    def delete(self, table_collection_index, condition=None, data=None, id=None):
        sql = 'DELETE FROM  {table} WHERE {condition}'.format(table=table, condition=condition)
        try:
            self.cursor.execute(sql)
            self.db.commit()
            print(f"删除成功")
        except Exception as e:
            print("失败:{0}".format(e))
            self.db.rollback()
        finally:
            print(f"{sql}")

    # 改
    # table 表名
    # data 数据
    # condition 条件
    def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):
        # data.keys()返回的是键的数组
        keys = ', '.join(data.keys())
        # 下面这段是构造多个%s最为占位符,有几个字段就构造几个
        values = ', '.join(['%s'] * len(data))
        # 要跟新的数据
        update = ','.join(["{key} = %s".format(key=key) for key in data])
        # 要执行的sql语句
        sql = 'UPDATE {table} SET {update} WHERE {condition}'.format(table=table_collection_index, update=update, condition=condition)

        try:
            self.cursor.execute(sql, tuple(data.values()))
            self.db.commit()
            print(f"更新成功")
        except Exception as e:
            print("失败:{0}".format(e))
            self.db.rollback()
        finally:
            print(f"{sql}")

    # 查
    def search(self, table_collection_index, condition=None):
        # 要执行的sql语句
        sql = 'SELECT * FROM {table} WHERE {condition}'.format(table=table_collection_index, condition=condition)

        try:
            self.cursor.execute(sql)
            # fetchall获取结果的所有数据
            results = self.cursor.fetchall()
            # fetchall得到的是二重元祖,其中每个元素都是一条记录
            # 要注意的是fetch的内部实现中有一个偏移指针,用来指向查询结果,偏移指针最开始指向第一条数据,取了一次数据后,指针偏移到下一条数据,
            # fetchone被调用后,结果的偏移指针会指向下一条数据,fetchall方法返回的是从偏移指针指向的数据一直到结束的所有数据,所有fetchall获取的数据会少一条

            for row in results:
                print(row)

        except Exception as e:
            print("失败:{0}".format(e))

# 测试代码
if __name__ == "__main__":
    # 建立连接
    mysql_connect = MysqlStore()
    mysql_connect.build_connect()
    data = {
        "name": "Bob",
        "age": 22
    }
    table = "students"
    mysql_connect.search(table, 'age>=20')

最后要注意的是
数据库连接的相关信息都放在统一的文件里并且设置为默认的值,也可以初始化的时候传入相应的值


http://www.kler.cn/news/156269.html

相关文章:

  • SQL -高阶3
  • Linux系统centos7防火墙firewall开放IP及端口命令
  • 2023.12.3 hive-sql日期函数小练习
  • k8s中批量处理Pod应用的Job和CronJob控制器、处理守护型pod的DaemonSet控制器介绍
  • 深入理解Go语言GC机制
  • SAP_ABAP_RZ11解决SAP运行超时问题 TIME_OUT / rdisp/scheduler/prio_high/max_runtime
  • 最强Node js 后端框架学习看这一篇文章就够
  • 内衣洗衣机哪个牌子好用?小型洗衣机五大排名
  • 在项目根目录未找到 app.json
  • leetcode:232. 用栈实现队列
  • Mybatis-Plus实现逻辑删除
  • C++ 实现微信退款和对账示例代码
  • ASP.NET 网上选课系统的设计与实现
  • 二叉查找树和红黑树
  • 卖家低价侵权了怎么处理
  • 一款自动帮你生成UI界面和代码的AI神器
  • MySQL练习题,学生成绩查询练习题,附带答案
  • JIRA部分数据库结构
  • Spring AOP解析
  • 基于Java SSM框架实现美好生活九宫格日志网站系统项目【项目源码+论文说明】
  • Docker push 命令
  • 在CentOS7下安装Docker与Docker Compose
  • 举例说明自然语言处理(NLP)技术。
  • 二分查找算法:搜索有序数组中目标元素的利器
  • 松下、书客、明基护眼台灯值不值得买?热门护眼台灯真实测评!
  • 客户销售目标拆解:数据驱动的方法和策略
  • 【LeeCode】142.环形链表II
  • 开启gitlab中远程连接pgsql
  • 燃料电池汽车市场分析:预计2028年将达到118亿美元
  • 前端需要掌握的技术有哪些方面