虫洞数观系列一 | 豆瓣电影TOP250数据采集与MySQL存储实战
目录
系列文章
1. 引言
2 技术栈Request
2.1请求头headers
2.2查找定位信息
2.3处理网页html结构化数据
2.4每页url规律
2.5逐条查找所需信息
2.6完整代码
3 数据存储至mysql
3.1新建数据库
3.2编写数据库写入py文件
3.2.1构建1个执行sql语句的函数
3.2.2构造一个将dataframe转成sql语句的函数
3.2.3完整代码
4 总结
系列文章
虫洞数观系列总览 | 技术全景:豆瓣电影TOP250数据采集→分析→可视化完整指南
1. 引言
豆瓣电影TOP250以其清晰的页面结构、规律的数据排列(如电影名称、评分、导演等信息的固定标签和类名),成为爬虫练习的理想目标。虽然存在请求频率限制、IP检测等反爬机制,但相比淘宝、微博等大型网站,其反爬措施较为宽松。本文将基于Requests和Selenium两种技术栈,演示如何高效爬取该榜单数据。
网址链接豆瓣电影 Top 250
2 技术栈Request
2.1请求头headers
headers
是 HTTP 请求中的一部分,用于向服务器传递客户端(如浏览器)的相关信息。服务器会根据这些信息来判断请求的来源、客户端类型以及如何处理请求。主要作用是绕过反爬虫机制,通过模拟浏览器行为,避免被服务器识别为爬虫。
在网站中如何查找自己的请求头
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36
请求头解读:
-
Mozilla/5.0
:这是一个通用的标识符,表示客户端兼容 Mozilla 浏览器(现代浏览器通常都会包含这个字段)。 -
Windows NT 10.0; Win64; x64
:表示客户端的操作系统信息。-
Windows NT 10.0
:操作系统是 Windows 10。 -
Win64
和x64
:表示系统是 64 位架构。
-
-
AppleWebKit/537.36
:表示客户端使用的渲染引擎是 AppleWebKit(Chrome 和 Safari 都基于此引擎)。 -
KHTML, like Gecko
:表示客户端兼容 KHTML 和 Gecko 渲染引擎(Gecko 是 Firefox 的渲染引擎)。 -
Chrome/134.0.0.0
:表示客户端使用的是 Chrome 浏览器,版本号为 134.0.0.0。 -
Safari/537.36
:表示客户端兼容 Safari 浏览器,版本号为 537.36。
2.2查找定位信息
很多人可能一开始不理解xpath,尤其看到网页信息,如"/html/body/div[3]/div[1]/div/div[1]/ol/li[1]/div/div[2]/div[1]/a/span[1]"就感觉头大,不重要!!!,重在理解大致含义,看久啦就习惯啦,慢慢就理解啦,以下是告诉你怎么查看xpath
2.3处理网页html结构化数据
在使用 requests
请求网页后,返回的数据通常是 HTML 格式的结构化文本。HTML 文档由标签、属性和内容组成,具有清晰的层次结构。以下是一个简单的 HTML 示例,展示了其典型的结构:
<div class="info">
<div class="hd">
<a href="https://movie.douban.com/subject/1292052/">
<span class="title">肖申克的救赎</span>
</a>
</div>
</div>
在处理网页数据时,虽然可以直接对 requests.text
(即原始的 HTML 字符串)进行操作,但这种方式会面临极大的工作量。HTML 文档通常包含复杂的标签嵌套和属性结构,手动解析字符串不仅繁琐,还容易出错。为了高效、准确地提取目标数据,引入 etree
,将 HTML 字符串解析为树形结构(ElementTree)。这种结构化解析方式不仅简化了数据提取过程,还支持强大的 XPath 和 CSS 选择器,极大地提升了开发效率。
以下是一个简单的示例,展示如何使用 etree
解析 HTML 并提取数据:
from lxml import etree
tree = etree.HTML(html)
# 使用 XPath 提取电影名称
titles = tree.xpath('//div[@class="info"]/div[@class="hd"]/a/span[@class="title"]/text()')
print(titles) # 输出:['肖申克的救赎']
2.4每页url规律
# 第1页
'https://movie.douban.com/top250?start=0&filter='
# 第2页
'https://movie.douban.com/top250?start=25&filter='
# 第3页
'https://movie.douban.com/top250?start=50&filter='
# 第4页
'https://movie.douban.com/top250?start=75&filter='
# 第5页
'https://movie.douban.com/top250?start=100&filter='
# 尾页
'https://movie.douban.com/top250?start=225&filter='
可以发现一个规律
for page in range(1, 11):
# 目标url
url = f'https://movie.douban.com/top250?start={(page - 1) * 25}&filter='
2.5逐条查找所需信息
我们需要一层一层定位所需要的信息,如下图
2.6完整代码
# 导入模块
import pandas as pd
import requests
from lxml import etree
import csv
def crawler_douban():
# 请求头信息
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36'
}
moive_list = []
for page in range(1, 11):
# 目标url
url = f'https://movie.douban.com/top250?start={(page - 1) * 25}&filter='
# 发送请求, 获取响应
res = requests.get(url, headers=headers)
# 打印响应信息
# print(res.text)
# 网页源码
html = res.text
# 实例化etree对象
tree = etree.HTML(html)
divs = tree.xpath('//div[@class="info"]')
# print(divs)
i = 1
for div in divs:
dic_temp = {}
dic_temp['电影中文名'] = ''
dic_temp['电影英文名'] = ''
dic_temp['电影详情页链接'] = ''
dic_temp['导演'] = ''
dic_temp['主演'] = ''
dic_temp['上映年份'] = ''
dic_temp['国籍'] = ''
dic_temp['类型'] = ''
dic_temp['评分'] = ''
dic_temp['评分人数'] = ''
dic_temp['评语'] = ''
print('>>>>>>>>>>')
print(i)
print(div)
div_temp = div.xpath('./div[@class="hd"]/a')
urlx = div_temp[0].get('href')
texts = [a.xpath('string(.)').strip() for a in div_temp]
print(texts)
need_list1 = []
for each_text in texts:
each_text_list = each_text.split('\n')
for each_one in each_text_list:
need_list1.append(each_one.replace(' ', ''))
div_temp = div.xpath('./div[@class="bd"]')
texts = [a.xpath('string(.)').strip() for a in div_temp]
need_list2 = []
for each_text in texts:
each_text_list = each_text.split('\n')
for each_one in each_text_list:
need_list2.append(each_one.replace(' ', ''))
print(need_list1)
print(need_list2)
dic_temp['电影中文名'] = need_list1[0]
dic_temp['电影英文名'] = need_list1[1].replace('\xa0/\xa0', '')
dic_temp['电影详情页链接'] = urlx
dic_temp['导演'] = need_list2[0].split('xa0\xa0\xa0')[0]
print(need_list2[0])
try:
dic_temp['导演'] = dic_temp['导演'].replace('导演:', '').replace('\xa0\xa0\xa0', '')
dic_temp['导演'] = dic_temp['导演'].split('主演:')[0]
except:
pass
try:
dic_temp['主演'] = need_list2[0].split('主演:')[1]
except:
pass
dic_temp['上映年份'] = need_list2[1].split('\xa0/\xa0')[0]
dic_temp['国籍'] = need_list2[1].split('\xa0/\xa0')[1]
dic_temp['类型'] = need_list2[1].split('\xa0/\xa0')[2]
dic_temp['评分'] = need_list2[7]
dic_temp['评分人数'] = need_list2[9]
if need_list2[-1] != need_list2[9]:
dic_temp['评语'] = need_list2[-1]
if dic_temp != {}:
print(dic_temp)
moive_list.append(dic_temp)
i += 1
print(f'----------------------第{page}页爬取完成--------------------------------------')
print('-----------------------爬虫结束-------------------------------')
df = pd.DataFrame(moive_list)
df.to_excel('douban_TOP250_moive.xlsx', index=None)
return df
# 程序入口
if __name__ == "__main__":
df = crawler_douban()
print(df)
3 数据存储至mysql
3.1新建数据库
使用Navicat新建数据库时,既可通过可视化界面操作,也能直接编写SQL命令完成。
可视化界面操作
编写SQL命令
CREATE TABLE `top250movie` (
`update_date` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '更新日期',
`feature` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '特征值',
`movie_ch` varchar(255) DEFAULT NULL COMMENT '电影中文名',
`movie_en` varchar(255) DEFAULT NULL COMMENT '电影英文名',
`movie_url` varchar(255) DEFAULT NULL COMMENT '电影详情页链接',
`director` varchar(255) DEFAULT NULL COMMENT '导演',
`star` varchar(255) DEFAULT NULL COMMENT '主演',
`start_year` varchar(255) DEFAULT NULL COMMENT '上映年份',
`country` varchar(255) DEFAULT NULL COMMENT '国籍',
`type` varchar(255) DEFAULT NULL COMMENT '类型',
`rating` varchar(255) DEFAULT NULL COMMENT '评分',
`num_ratings` varchar(255) DEFAULT NULL COMMENT '评分人数',
`comment` varchar(1000) DEFAULT NULL COMMENT '评语',
PRIMARY KEY (`feature`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
3.2编写数据库写入py文件
3.2.1构建1个执行sql语句的函数
定义了一个函数 cost_execute_sql_no_return_value(sql)
,用于执行 不返回结果 的 SQL 语句(如 INSERT
、UPDATE
、DELETE
),并处理数据库连接池、事务和错误。
数据库连接池配置
__config = {
"host": "localhost", # MySQL 服务器地址
"port": 3306, # 端口号
"user": "root", # 用户名
"password": "dandan1901", # 密码
"database": "douban" # 数据库名
}
-
定义了 MySQL 的连接参数,包括主机、端口、用户名、密码和数据库名。
初始化连接池
try:
pool = mysql.connector.pooling.MySQLConnectionPool(
**__config,
pool_size=10 # 连接池大小(最多 10 个连接)
)
except Exception as e:
print(e) # 如果连接池初始化失败,打印错误
-
MySQLConnectionPool
:创建一个 MySQL 连接池,pool_size=10
表示最多维护 10 个连接。 -
异常处理:如果连接池初始化失败(如密码错误、网络问题),捕获异常并打印。
执行 SQL 语句
try:
con = pool.get_connection() # 从连接池获取一个连接
cursor = con.cursor() # 创建游标
cursor.execute(sql) # 执行 SQL
con.commit() # 提交事务
print(('Successfully insert ' + sql).encode('gbk', 'ignore').decode('gbk', 'ignore'))
except Exception as e:
print(e) # 打印错误信息
if "con" in dir(): # 如果连接存在
con.rollback() # 回滚事务
print(('Failed insert ' + sql).encode('gbk', 'ignore').decode('gbk', 'ignore'))
finally:
if "con" in dir(): # 确保连接被关闭
con.close()
-
pool.get_connection()
:从连接池获取一个可用连接。 -
cursor.execute(sql)
:执行传入的 SQL 语句。 -
con.commit()
:如果执行成功,提交事务。 -
con.rollback()
:如果执行失败,回滚事务。 -
con.close()
:在finally
中确保连接被关闭(防止资源泄漏)。 -
编码处理:
('Successfully insert ' + sql).encode('gbk', 'ignore').decode('gbk', 'ignore')
这段代码是为了避免中文字符在打印时出现编码错误(gbk
是 Windows 终端常用编码)。
3.2.2构造一个将dataframe转成sql语句的函数
将 DataFrame 中的数据批量插入/更新到 MySQL 数据库的 top250movie
表中
def write_info_into_db(df):
for i in range(df.shape[0]):
df.loc[i, '特征值'] = 'TOP' + str(i + 1).zfill(4)
df['更新日期'] = str(datetime.datetime.now())
# 将数据写入数据库
insert_cols = {
'update_date': '更新日期',
'feature': '特征值',
'movie_ch': '电影中文名',
'movie_en': '电影英文名',
'movie_url': '电影详情页链接',
'director': '导演',
'star': '主演',
'start_year': '上映年份',
'country': '国籍',
'type': '类型',
'rating': '评分',
'num_ratings': '评分人数',
'comment': '评语',
}
# df_copy = pd.DataFrame(df)
db_list = list(insert_cols.keys())
print(db_list)
for i in range(df.shape[0]):
dbname = 'top250movie'
sql = "REPLACE INTO " + dbname + " ("
for ix in range(len(db_list)):
if ix < len(db_list) - 1:
sql = sql + str(db_list[ix]) + ', '
else:
sql = sql + str(db_list[ix])
sql = sql + ") VALUES ("
for ix in range(len(db_list)):
each_key = db_list[ix]
print(df.loc[i, insert_cols[each_key]])
if ix < len(db_list) - 1:
sql += "'" + str(df.loc[i, insert_cols[each_key]]) + "'" + ","
else:
sql += "'" + str(df.loc[i, insert_cols[each_key]]) + "'" + ");"
print('>>>>>>>>>>')
print(sql)
cost_execute_sql_no_return_value(sql)
数据处理
for i in range(df.shape[0]):
df.loc[i, '特征值'] = 'TOP' + str(i + 1).zfill(4) # 生成形如 "TOP0001" 的特征值
-
为 DataFrame 的每一行添加
特征值
列,格式为TOP0001
、TOP0002
...(zfill(4)
保证 4 位数字)。
df['更新日期'] = str(datetime.datetime.now()) # 添加当前时间戳
-
添加
更新日期
列,值为当前时间(字符串格式)。
数据库字段映射
insert_cols = {
'update_date': '更新日期', # 数据库列名: DataFrame 列名
'feature': '特征值',
'movie_ch': '电影中文名',
'movie_en': '电影英文名',
'movie_url': '电影详情页链接',
'director': '导演',
'star': '主演',
'start_year': '上映年份',
'country': '国籍',
'type': '类型',
'rating': '评分',
'num_ratings': '评分人数',
'comment': '评语',
}
-
定义数据库列名(
key
)和 DataFrame 列名(value
)的映射关系。
db_list = list(insert_cols.keys()) # 获取数据库列名列表
print(db_list) # 打印列名(调试用)
生成并执行 SQL
for i in range(df.shape[0]): # 遍历 DataFrame 每一行
dbname = 'top250movie'
sql = "REPLACE INTO " + dbname + " ("
# 拼接列名部分(如 REPLACE INTO top250movie (update_date, feature, ...))
for ix in range(len(db_list)):
if ix < len(db_list) - 1:
sql += str(db_list[ix]) + ', '
else:
sql += str(db_list[ix])
sql += ") VALUES ("
# 拼接值部分(如 VALUES ('2023-01-01', 'TOP0001', ...))
for ix in range(len(db_list)):
each_key = db_list[ix]
print(df.loc[i, insert_cols[each_key]]) # 打印当前值(调试用)
if ix < len(db_list) - 1:
sql += "'" + str(df.loc[i, insert_cols[each_key]]) + "', "
else:
sql += "'" + str(df.loc[i, insert_cols[each_key]]) + "');"
print('>>>>>>>>>>')
print(sql) # 打印完整 SQL(调试用)
cost_execute_sql_no_return_value(sql) # 执行 SQL
-
REPLACE INTO
:如果主键冲突,则删除旧记录并插入新记录(类似INSERT + DELETE
)。 -
动态生成 SQL:根据
db_list
和 DataFrame 的值拼接 SQL 语句。 -
执行 SQL:调用
cost_execute_sql_no_return_value(sql)
(前文定义的函数)执行。
3.2.3完整代码
# 导入模块
import datetime
import mysql.connector.pooling
import pandas as pd
import requests
from lxml import etree
def cost_execute_sql_no_return_value(sql):
__config = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "faw-vw.1901",
"database": "douban"
}
try:
pool = mysql.connector.pooling.MySQLConnectionPool(
**__config,
pool_size=10
)
except Exception as e:
print(e)
try:
con = pool.get_connection()
cursor = con.cursor()
cursor.execute(sql)
con.commit()
print(('Successfully insert ' + sql).encode('gbk', 'ignore').decode('gbk', 'ignore'))
except Exception as e:
print(e)
if "con" in dir():
con.rollback()
print(('Failed insert ' + sql).encode('gbk', 'ignore').decode('gbk', 'ignore'))
finally:
if "con" in dir():
con.close()
def crawler_douban():
# 请求头信息
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36'
}
moive_list = []
for page in range(1, 11):
# 目标url
url = f'https://movie.douban.com/top250?start={(page - 1) * 25}&filter='
# 发送请求, 获取响应
res = requests.get(url, headers=headers)
# 打印响应信息
# print(res.text)
# 网页源码
html = res.text
# 实例化etree对象
tree = etree.HTML(html)
divs = tree.xpath('//div[@class="info"]')
# print(divs)
i = 1
for div in divs:
dic_temp = {}
dic_temp['电影中文名'] = ''
dic_temp['电影英文名'] = ''
dic_temp['电影详情页链接'] = ''
dic_temp['导演'] = ''
dic_temp['主演'] = ''
dic_temp['上映年份'] = ''
dic_temp['国籍'] = ''
dic_temp['类型'] = ''
dic_temp['评分'] = ''
dic_temp['评分人数'] = ''
dic_temp['评语'] = ''
print('>>>>>>>>>>')
print(i)
print(div)
div_temp = div.xpath('./div[@class="hd"]/a')
urlx = div_temp[0].get('href')
texts = [a.xpath('string(.)').strip() for a in div_temp]
print(texts)
need_list1 = []
for each_text in texts:
each_text_list = each_text.split('\n')
for each_one in each_text_list:
need_list1.append(each_one.replace(' ', ''))
div_temp = div.xpath('./div[@class="bd"]')
texts = [a.xpath('string(.)').strip() for a in div_temp]
need_list2 = []
for each_text in texts:
each_text_list = each_text.split('\n')
for each_one in each_text_list:
need_list2.append(each_one.replace(' ', ''))
print(need_list1)
print(need_list2)
dic_temp['电影中文名'] = need_list1[0]
dic_temp['电影英文名'] = need_list1[1].replace('\xa0/\xa0', '')
dic_temp['电影详情页链接'] = urlx
dic_temp['导演'] = need_list2[0].split('xa0\xa0\xa0')[0]
print(need_list2[0])
try:
dic_temp['导演'] = dic_temp['导演'].replace('导演:', '').replace('\xa0\xa0\xa0', '')
dic_temp['导演'] = dic_temp['导演'].split('主演:')[0]
except:
pass
try:
dic_temp['主演'] = need_list2[0].split('主演:')[1]
except:
pass
dic_temp['上映年份'] = need_list2[1].split('\xa0/\xa0')[0]
dic_temp['国籍'] = need_list2[1].split('\xa0/\xa0')[1]
dic_temp['类型'] = need_list2[1].split('\xa0/\xa0')[2]
dic_temp['评分'] = need_list2[7]
dic_temp['评分人数'] = need_list2[9]
if need_list2[-1] != need_list2[9]:
dic_temp['评语'] = need_list2[-1]
if dic_temp != {}:
print(dic_temp)
moive_list.append(dic_temp)
i += 1
print(f'----------------------第{page}页爬取完成--------------------------------------')
print('-----------------------爬虫结束-------------------------------')
df = pd.DataFrame(moive_list)
df.to_excel('douban_TOP250_moive.xlsx', index=None)
return df
def write_info_into_db(df):
for i in range(df.shape[0]):
df.loc[i, '特征值'] = 'TOP' + str(i + 1).zfill(4)
df['更新日期'] = str(datetime.datetime.now())
# 将数据写入数据库
insert_cols = {
'update_date': '更新日期',
'feature': '特征值',
'movie_ch': '电影中文名',
'movie_en': '电影英文名',
'movie_url': '电影详情页链接',
'director': '导演',
'star': '主演',
'start_year': '上映年份',
'country': '国籍',
'type': '类型',
'rating': '评分',
'num_ratings': '评分人数',
'comment': '评语',
}
# df_copy = pd.DataFrame(df)
db_list = list(insert_cols.keys())
print(db_list)
for i in range(df.shape[0]):
dbname = 'top250movie'
sql = "REPLACE INTO " + dbname + " ("
for ix in range(len(db_list)):
if ix < len(db_list) - 1:
sql = sql + str(db_list[ix]) + ', '
else:
sql = sql + str(db_list[ix])
sql = sql + ") VALUES ("
for ix in range(len(db_list)):
each_key = db_list[ix]
print(df.loc[i, insert_cols[each_key]])
if ix < len(db_list) - 1:
sql += "'" + str(df.loc[i, insert_cols[each_key]]) + "'" + ","
else:
sql += "'" + str(df.loc[i, insert_cols[each_key]]) + "'" + ");"
print('>>>>>>>>>>')
print(sql)
cost_execute_sql_no_return_value(sql)
# 程序入口
if __name__ == "__main__":
# 爬取数据
df = crawler_douban()
print(df)
# 将数据写入数据库
write_info_into_db(df)
展示结果如下
4 总结
此处省略1万字,感谢各位看到最后,最后附上代码信息,数据库也在文件中,百度网盘链接如下
通过网盘分享的文件:项目工坊_豆瓣爬虫
链接: https://pan.baidu.com/s/1cY8plsXfVYl8NuhBixVrTQ?pwd=y131 提取码: y131