【python爬虫】设计自己的爬虫 2. 数据保存封装 mongodb,mysql和elasticsearch
mongodb, mysql和elasticsearch 功能较相似,所以打算用一套接口将它们封装起来
基类StorageBase
如下:
class StorageBase:
def __init__(self, host=None, port=None, database=None, table=None, location=None, account=None, password=None,
url=None):
self.host = host
self.port = port
self.database = database
self.table = table
self.location = location
self.account = account
self.password = password
self.url = url
def build_connect(self):
raise NotImplementedError
# 增
def add(self, table_collection_index, data, id=None):
raise NotImplementedError
# 删
def delete(self, table_collection_index, condition=None, data=None, id=None):
raise NotImplementedError
# 改
def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):
raise NotImplementedError
# 查
def search(self, table_collection_index, condition=None):
raise NotImplementedError
# 没有就新增,有就更新
def add_or_update(self, table_collection_index, data, condition=None):
raise NotImplementedError
封装mongodb
class MongodbStore(StorageBase):
def __init__(self, host=MONGODB_CONNECTION_HOST, port=MONGODB_CONNECTION_PORT, database=MONGODB_DATABASE,
account=MONGODB_DATABASE_USER, password=MONGODB_DATABASE_PASS):
self.client = None
self.db = None
StorageBase.__init__(self, host=host, port=port,
database=database, account=account, password=password)
def build_connect(self):
try:
connection = f"mongodb://{self.account}:{self.password}@{self.host}:{self.port}/{self.database}"
self.client = pymongo.MongoClient(connection)
self.db = self.client[self.database]
except Exception as e:
print("失败:{0}".format(e))
self.db = None
else:
print(f"连接成功")
return self.db
# 增
# data要是列表形式,否则报错
# 添加单个可以是 单个元素的数组
def add(self, table_collection_index, data, id=None):
try:
result = self.db[table_collection_index].insert_many(data)
print(f"新增成功")
except Exception as e:
print("失败:{0}".format(e))
result = None
else:
print(f"新增成功")
return result
def add_one(self, table_collection_index, data):
try:
result = self.db[table_collection_index].insert_one(data)
except Exception as e:
print("失败:{0}".format(e))
result = None
else:
print(f"新增成功")
return result
# 没有就新增,有就更新
def add_or_update(self, table_collection_index, data, condition=None):
existing_doc = self.search_one(table_collection_index, conditions=None)
if existing_doc:
# 如果文档存在,则更新
self.update_one(table_collection_index, condition, {
"key": "set",
"value": data
})
print('数据已更新。')
else:
# 如果文档不存在,则插入新文档
self.add_one(table_collection_index, data)
print('新数据已插入。')
# 删
def delete(self, table_collection_index, condition=None, data=None, id=None):
try:
result = self.db[table_collection_index].delete_many(parse_mongodb_condition(data))
except Exception as e:
print("失败:{0}".format(e))
result = None
else:
print(f"删除成功")
return result
def delete_one(self, table_collection_index, data):
try:
result = self.db[table_collection_index].delete_one(parse_mongodb_condition(data))
except Exception as e:
print("失败:{0}".format(e))
result = None
else:
print(f"删除成功")
return result
# 改
# search_condition 指定查询条件
# update_condition 指定更新条件
def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):
try:
result = self.db[table_collection_index].update_many(parse_mongodb_condition(condition),
parse_mongodb_condition(update_condition))
except Exception as e:
logging.error(f"失败:{e}")
result = None
else:
print(
f"匹配的数据{result.matched_count if result else 0}条,影响的数据{result.modified_count if result else 0}条")
return result
# 改单个
def update_one(self, table_collection_index=None, search_condition=None, update_condition=None):
try:
result = self.db[table_collection_index].update_one(parse_mongodb_condition(search_condition),
parse_mongodb_condition(update_condition))
except Exception as e:
print("失败:{0}".format(e))
result = None
else:
print(
f"匹配的数据{result.matched_count if result else 0}条,影响的数据{result.modified_count if result else 0}条")
return result
# 计数
def count(self, table_collection_index, conditions):
try:
result = self.db[table_collection_index].count_documents(parse_mongodb_condition(conditions))
except Exception as e:
print("失败:{0}".format(e))
result = None
else:
print(f"共有{result.matched_count if result else 0}条数据")
return result
# 查
def search(self, table_collection_index, conditions=None):
try:
results = self.db[table_collection_index].find(parse_mongodb_condition(conditions) if conditions else {})
except Exception as e:
print("失败:{0}".format(e))
results = None
return results
def search_one(self, table_collection_index, conditions=None):
try:
result = self.db[table_collection_index].find_one(parse_mongodb_condition(conditions) if conditions else {})
print(f"查询到的内容:{result}")
except Exception as e:
print("失败:{0}".format(e))
result = None
return result
# 排序
def sort(self, table_collection_index, sort_key, conditions=None, skip_num=None, limit_num=None,
sort_type=pymongo.ASCENDING):
try:
query = self.db[table_collection_index].find(
parse_mongodb_condition(conditions) if conditions else {}).sort(sort_key,
sort_type)
if skip_num and limit_num:
results = query.skip(skip_num).limit(limit_num)
elif skip_num:
results = query.skip(skip_num)
elif limit_num:
results = query.limit(limit_num)
else:
results = query
except Exception as e:
print("失败:{0}".format(e))
result = None
return results
# 测试代码
if __name__ == "__main__":
# 建立连接
mongodb_connect = MongodbStore()
mongodb_connect.build_connect()
# student1 = {
# "id": 1,
# "name": "Jordan3",
# "age": 23,
# "gender": "male"
# }
#
# student2 = {
# "name": "Mike",
# "age": 21,
# "gender": "male"
# }
# mongodb_connect.add("students", [student1])
# result = mongodb_connect.delete("students", {"name": "Mike"})
# result = mongodb_connect.update("students", {"age": {"#gt": 23}}, {"$inc": {"age": 1}})
# print(result.matched_count, result.modified_count)
# 自定义分隔符号
# condition = parse_mongodb_condition("age*>*20;name*regex*'^J.*'", ";", "*")
condition_list = [
{
"key": "age",
"condition": ">",
"value": "20"
},
{
"key": "name",
"condition": "regex",
"value": "'^J.*'"
},
]
# 不设置condition的话表示等于
# search_list = [
# {
# "key": "age",
# "value": "20"
# },
# ]
# 条件表达式解析
mongodb_connect.search("students", condition_list)
mongodb_connect.sort("students", 'age', condition_list, None, None, pymongo.DESCENDING)
这里要注意的是考虑到mongodb搜索条件的语法有些复杂,因此对其进行了一些简化,简化的思路是把条件转化成简单的键值对对象,下面看代码
# 优化该方法的思路
# 1. 使用列表推导式替代了外部循环。
# 2. 使用 str.join() 方法替代了内部循环中的字符串连接操作,这样可以减少不必要的字符串创建和连接操作,提高代码效率。
# 3. 使用 dict.values() 方法直接获取字典中的所有值,避免了在内部循环中通过键获取值的操作。
# 将条件对象数组转化为条件字符串
def turn_list_to_condition_str(list, split_cond=',', split_key=':'):
conditions_list = [
split_key.join([str(value) for value in search.values()])
for search in list
]
return split_cond.join(conditions_list)
# 该方法将条件字符串转换为条件对象
def parse_mongodb_condition(conditions, split_cond=',', split_key=':'):
# 传入字符串规则
# 例子:"age:>:20,name:regex:'^J.*'"
# ,分隔多个条件
# age:>:20 age是字段 >是条件 20是值 中间用:分隔
# 分隔符默认是,和: 也可以自定义
# 分割字符串
pairs = turn_list_to_condition_str(conditions).split(split_cond)
# 构建字典
condition_dict = {}
# 避免了在每次循环中都多次调用 pair.split(split_key)
for pair in pairs:
split_pair = pair.split(split_key)
key = replace_chars_by_dit(split_pair[0], MONGODB_CONDITION_DICTIONARY)
if len(split_pair) == 3:
cond = replace_chars_by_dit(split_pair[1], MONGODB_CONDITION_DICTIONARY)
condition_dict[key] = {cond: eval(split_pair[2])}
elif len(split_pair) == 2:
condition_dict[key] = eval(split_pair[1])
# print(f'解析后的条件表达式是{condition_dict}')
return condition_dict
# 测试代码
if __name__ == "__main__":
# condition_str = "age:20,name:regex:'^J.*'"
# print(parse_mongodb_condition(condition_str))
search_list = [
{
"key": "age",
"condition": ">",
"value": "20"
},
{
"key": "name",
"condition": "regex",
# 注意字符串的值要帶单引号''
"value": "'^J.*'"
},
]
search_list = {
"key": "set",
"value": "20"
},
print(parse_mongodb_condition(search_list))
封装 Elasticsearch
# 可以和关系数据库的概念对比理解
# Relational DB -> Databases -> Tables -> Rows -> Columns
# Elasticsearch -> Indices -> Types -> Documents -> Fields
class ElasticsearchStore(StorageBase):
def __init__(self, host=ELASTICSEARCH_HOST, port=ELASTICSEARCH_PORT,
account=ELASTICSEARCH_USERNAME, password=ELASTICSEARCH_PASSWORD,
verify_certs=ELASTICSEARCH_VERIFY_CERTS):
self.es = None
self.verify_certs = verify_certs
StorageBase.__init__(self, host=host, port=port, account=account, password=password)
def build_connect(self, default=True):
try:
if default:
self.es = Elasticsearch()
else:
url = f"https://[{self.account}:{self.password}@]{self.host}:{self.port}"
self.es = Elasticsearch(url, verify_certs=self.verify_certs)
except Exception as e:
print("失败:{0}".format(e))
self.es = None
else:
print(f"连接成功")
return self.es
# 增
def add(self, table_collection_index, data, id=None):
try:
result = self.es.index(index=table_collection_index, body=data, id=id)
print(f"新增成功")
except Exception as e:
print("失败:{0}".format(e))
result = None
else:
print(f"新增成功")
return result
# 删
def delete(self, table_collection_index, condition=None, data=None, id=None):
try:
result = self.es.delete(index=table_collection_index, id=id, ignore=[400, 404])
print(f"删除成功")
except Exception as e:
print("失败:{0}".format(e))
result = None
else:
print(f"删除成功")
return result
# 改
def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):
try:
result = self.es.update(index=table_collection_index, body=data, id=id, ignore=[400, 404])
except Exception as e:
logging.error(f"失败:{e}")
result = None
else:
print(f"更新成功")
return result
# 获取
def get(self, table_collection_index, id):
try:
result = self.es.get(index=table_collection_index, id=id, ignore=[400, 404])
except Exception as e:
logging.error(f"失败:{e}")
result = None
else:
print(f"获取成功{result}")
return result
# 查
# dsl = {
# 'query': {
# 'match': {
# 'title': '高考 圆梦'
# }
# }
# }
def search(self, table_collection_index, condition=None):
try:
results = self.es.search(index=table_collection_index, body=condition)
except Exception as e:
logging.error(f"失败:{e}")
results = None
else:
print(f"搜索成功{results}")
return results
# 测试代码
if __name__ == "__main__":
# 建立连接
elasticsearch_store = ElasticsearchStore()
elasticsearch_store.build_connect()
# doc = {
# 'author': 'kimchy',
# 'text': 'Elasticsearch: cool. bonsai cool.',
# 'timestamp': datetime.now(),
# }
# elasticsearch_connect.add('test-index', doc, 1)
# elasticsearch_connect.get('test-index',1)
doc1 = {
'author': 'kimchy',
'text': 'test',
'timestamp': datetime.now(),
}
elasticsearch_store.update('test-index', doc1, 1)
# elasticsearch_store.delete('test-index', 1)
elasticsearch_store.search('test-index', {"query": {"match_all": {}}})
封装Mysql
class MysqlStore(StorageBase):
def __init__(self, host=MYSQL_HOST, port=MYSQL_PORT,
account=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DATABASE):
self.db = None
self.cursor = None
StorageBase.__init__(self, host=host, port=port, account=account, password=password, database=database)
def build_connect(self) -> pymysql.cursors.Cursor:
"""
建立数据库连接。
Returns:
pymysql.cursors.Cursor: 数据库游标对象,用于执行 SQL 语句。
"""
try:
self.db = pymysql.connect(host=self.host, user=self.account, password=self.password, port=self.port,
database=self.database)
self.cursor = self.db.cursor()
except Exception as e:
print(f"连接失败:{e}")
self.db = None
self.cursor = None
else:
print("连接成功")
return self.cursor
# 增
# table 表
# data 新增数据 键值对对象
def add(self, table_collection_index, data, id=None):
# data.keys()返回的是键的数组
keys = ', '.join(data.keys())
# 下面这段是构造多个%s最为占位符,有几个字段就构造几个
values = ', '.join(['%s'] * len(data))
# 要执行的sql语句
sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table_collection_index, keys=keys, values=values)
try:
self.cursor.execute(sql, tuple(data.values()))
self.db.commit()
print(f"新增成功")
except Exception as e:
print("失败:{0}".format(e))
self.db.rollback()
finally:
print(f"{sql}")
# 没有新增,有就更新
# table 表
# data 新增数据 键值对对象
def add_or_update(self, table_collection_index, data, condition=None):
# data.keys()返回的是键的数组
keys = ', '.join(data.keys())
# 下面这段是构造多个%s最为占位符,有几个字段就构造几个
values = ', '.join(['%s'] * len(data))
# 要执行的sql语句
# 如果主键已经存在,就执行更新操作
sql = 'INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE '.format(table=table_collection_index, keys=keys,
values=values)
update = ', '.join(["{key} = %s".format(key=key) for key in data])
sql += update
try:
self.cursor.execute(sql, tuple(data.values()) * 2)
self.db.commit()
print(f"新增成功")
except Exception as e:
print("失败:{0}".format(e))
self.db.rollback()
finally:
print(f"{sql}")
# 删
# table 表名
# condition 删除条件
def delete(self, table_collection_index, condition=None, data=None, id=None):
sql = 'DELETE FROM {table} WHERE {condition}'.format(table=table, condition=condition)
try:
self.cursor.execute(sql)
self.db.commit()
print(f"删除成功")
except Exception as e:
print("失败:{0}".format(e))
self.db.rollback()
finally:
print(f"{sql}")
# 改
# table 表名
# data 数据
# condition 条件
def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):
# data.keys()返回的是键的数组
keys = ', '.join(data.keys())
# 下面这段是构造多个%s最为占位符,有几个字段就构造几个
values = ', '.join(['%s'] * len(data))
# 要跟新的数据
update = ','.join(["{key} = %s".format(key=key) for key in data])
# 要执行的sql语句
sql = 'UPDATE {table} SET {update} WHERE {condition}'.format(table=table_collection_index, update=update, condition=condition)
try:
self.cursor.execute(sql, tuple(data.values()))
self.db.commit()
print(f"更新成功")
except Exception as e:
print("失败:{0}".format(e))
self.db.rollback()
finally:
print(f"{sql}")
# 查
def search(self, table_collection_index, condition=None):
# 要执行的sql语句
sql = 'SELECT * FROM {table} WHERE {condition}'.format(table=table_collection_index, condition=condition)
try:
self.cursor.execute(sql)
# fetchall获取结果的所有数据
results = self.cursor.fetchall()
# fetchall得到的是二重元祖,其中每个元素都是一条记录
# 要注意的是fetch的内部实现中有一个偏移指针,用来指向查询结果,偏移指针最开始指向第一条数据,取了一次数据后,指针偏移到下一条数据,
# fetchone被调用后,结果的偏移指针会指向下一条数据,fetchall方法返回的是从偏移指针指向的数据一直到结束的所有数据,所有fetchall获取的数据会少一条
for row in results:
print(row)
except Exception as e:
print("失败:{0}".format(e))
# 测试代码
if __name__ == "__main__":
# 建立连接
mysql_connect = MysqlStore()
mysql_connect.build_connect()
data = {
"name": "Bob",
"age": 22
}
table = "students"
mysql_connect.search(table, 'age>=20')
最后要注意的是
数据库连接的相关信息都放在统一的文件里并且设置为默认的值,也可以初始化的时候传入相应的值