当前位置: 首页 > article >正文

虫洞数观系列一 | 豆瓣电影TOP250数据采集与MySQL存储实战

目录

系列文章

1. 引言

2 技术栈Request

2.1请求头headers

2.2查找定位信息

2.3处理网页html结构化数据

2.4每页url规律

2.5逐条查找所需信息

2.6完整代码

3 数据存储至mysql

3.1新建数据库

3.2编写数据库写入py文件

3.2.1构建1个执行sql语句的函数

3.2.2构造一个将dataframe转成sql语句的函数

3.2.3完整代码

4 总结


系列文章

虫洞数观系列总览 | 技术全景:豆瓣电影TOP250数据采集→分析→可视化完整指南

1. 引言

豆瓣电影TOP250以其清晰的页面结构、规律的数据排列(如电影名称、评分、导演等信息的固定标签和类名),成为爬虫练习的理想目标。虽然存在请求频率限制、IP检测等反爬机制,但相比淘宝、微博等大型网站,其反爬措施较为宽松。本文将基于Requests和Selenium两种技术栈,演示如何高效爬取该榜单数据。

网址链接豆瓣电影 Top 250

2 技术栈Request

2.1请求头headers

headers 是 HTTP 请求中的一部分,用于向服务器传递客户端(如浏览器)的相关信息。服务器会根据这些信息来判断请求的来源、客户端类型以及如何处理请求。主要作用是绕过反爬虫机制,通过模拟浏览器行为,避免被服务器识别为爬虫。

在网站中如何查找自己的请求头


Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36

请求头解读:

  • Mozilla/5.0:这是一个通用的标识符,表示客户端兼容 Mozilla 浏览器(现代浏览器通常都会包含这个字段)。

  • Windows NT 10.0; Win64; x64:表示客户端的操作系统信息。

    • Windows NT 10.0:操作系统是 Windows 10。

    • Win64 和 x64:表示系统是 64 位架构。

  • AppleWebKit/537.36:表示客户端使用的渲染引擎是 AppleWebKit(Chrome 和 Safari 都基于此引擎)。

  • KHTML, like Gecko:表示客户端兼容 KHTML 和 Gecko 渲染引擎(Gecko 是 Firefox 的渲染引擎)。

  • Chrome/134.0.0.0:表示客户端使用的是 Chrome 浏览器,版本号为 134.0.0.0。

  • Safari/537.36:表示客户端兼容 Safari 浏览器,版本号为 537.36。

2.2查找定位信息

很多人可能一开始不理解xpath,尤其看到网页信息,如"/html/body/div[3]/div[1]/div/div[1]/ol/li[1]/div/div[2]/div[1]/a/span[1]"就感觉头大,不重要!!!,重在理解大致含义,看久啦就习惯啦,慢慢就理解啦,以下是告诉你怎么查看xpath

2.3处理网页html结构化数据

在使用 requests 请求网页后,返回的数据通常是 HTML 格式的结构化文本。HTML 文档由标签、属性和内容组成,具有清晰的层次结构。以下是一个简单的 HTML 示例,展示了其典型的结构:

<div class="info">
  <div class="hd">
    <a href="https://movie.douban.com/subject/1292052/">
      <span class="title">肖申克的救赎</span>
    </a>
  </div>
</div>

在处理网页数据时,虽然可以直接对 requests.text(即原始的 HTML 字符串)进行操作,但这种方式会面临极大的工作量。HTML 文档通常包含复杂的标签嵌套和属性结构,手动解析字符串不仅繁琐,还容易出错。为了高效、准确地提取目标数据,引入 etree,将 HTML 字符串解析为树形结构(ElementTree)。这种结构化解析方式不仅简化了数据提取过程,还支持强大的 XPath 和 CSS 选择器,极大地提升了开发效率。

以下是一个简单的示例,展示如何使用 etree 解析 HTML 并提取数据:

from lxml import etree

tree = etree.HTML(html)
# 使用 XPath 提取电影名称
titles = tree.xpath('//div[@class="info"]/div[@class="hd"]/a/span[@class="title"]/text()')
print(titles)  # 输出:['肖申克的救赎']

2.4每页url规律

# 第1页
'https://movie.douban.com/top250?start=0&filter='

# 第2页
'https://movie.douban.com/top250?start=25&filter='

# 第3页
'https://movie.douban.com/top250?start=50&filter='

# 第4页
'https://movie.douban.com/top250?start=75&filter='

# 第5页
'https://movie.douban.com/top250?start=100&filter='

# 尾页
'https://movie.douban.com/top250?start=225&filter='

可以发现一个规律

for page in range(1, 11):
    # 目标url
    url = f'https://movie.douban.com/top250?start={(page - 1) * 25}&filter='

2.5逐条查找所需信息

我们需要一层一层定位所需要的信息,如下图

2.6完整代码

# 导入模块
import pandas as pd
import requests
from lxml import etree
import csv


def crawler_douban():
    # 请求头信息
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36'
    }

    moive_list = []

    for page in range(1, 11):

        # 目标url
        url = f'https://movie.douban.com/top250?start={(page - 1) * 25}&filter='

        # 发送请求, 获取响应
        res = requests.get(url, headers=headers)
        # 打印响应信息
        # print(res.text)
        # 网页源码
        html = res.text
        # 实例化etree对象
        tree = etree.HTML(html)

        divs = tree.xpath('//div[@class="info"]')

        # print(divs)
        i = 1
        for div in divs:

            dic_temp = {}
            dic_temp['电影中文名'] = ''
            dic_temp['电影英文名'] = ''
            dic_temp['电影详情页链接'] = ''
            dic_temp['导演'] = ''
            dic_temp['主演'] = ''
            dic_temp['上映年份'] = ''
            dic_temp['国籍'] = ''
            dic_temp['类型'] = ''
            dic_temp['评分'] = ''
            dic_temp['评分人数'] = ''
            dic_temp['评语'] = ''
            print('>>>>>>>>>>')
            print(i)
            print(div)
            div_temp = div.xpath('./div[@class="hd"]/a')
            urlx = div_temp[0].get('href')
            texts = [a.xpath('string(.)').strip() for a in div_temp]
            print(texts)
            need_list1 = []
            for each_text in texts:
                each_text_list = each_text.split('\n')
                for each_one in each_text_list:
                    need_list1.append(each_one.replace(' ', ''))

            div_temp = div.xpath('./div[@class="bd"]')
            texts = [a.xpath('string(.)').strip() for a in div_temp]
            need_list2 = []
            for each_text in texts:
                each_text_list = each_text.split('\n')
                for each_one in each_text_list:
                    need_list2.append(each_one.replace(' ', ''))

            print(need_list1)
            print(need_list2)
            dic_temp['电影中文名'] = need_list1[0]
            dic_temp['电影英文名'] = need_list1[1].replace('\xa0/\xa0', '')
            dic_temp['电影详情页链接'] = urlx
            dic_temp['导演'] = need_list2[0].split('xa0\xa0\xa0')[0]
            print(need_list2[0])
            try:
                dic_temp['导演'] = dic_temp['导演'].replace('导演:', '').replace('\xa0\xa0\xa0', '')
                dic_temp['导演'] = dic_temp['导演'].split('主演:')[0]
            except:
                pass
            try:
                dic_temp['主演'] = need_list2[0].split('主演:')[1]
            except:
                pass
            dic_temp['上映年份'] = need_list2[1].split('\xa0/\xa0')[0]
            dic_temp['国籍'] = need_list2[1].split('\xa0/\xa0')[1]
            dic_temp['类型'] = need_list2[1].split('\xa0/\xa0')[2]
            dic_temp['评分'] = need_list2[7]
            dic_temp['评分人数'] = need_list2[9]
            if need_list2[-1] != need_list2[9]:
                dic_temp['评语'] = need_list2[-1]

            if dic_temp != {}:
                print(dic_temp)
                moive_list.append(dic_temp)

            i += 1

            print(f'----------------------第{page}页爬取完成--------------------------------------')
    print('-----------------------爬虫结束-------------------------------')

    df = pd.DataFrame(moive_list)

    df.to_excel('douban_TOP250_moive.xlsx', index=None)
    return df


# 程序入口
if __name__ == "__main__":
    df = crawler_douban()
    print(df)

3 数据存储至mysql

3.1新建数据库

使用Navicat新建数据库时,既可通过可视化界面操作,也能直接编写SQL命令完成。

可视化界面操作

编写SQL命令

CREATE TABLE `top250movie` (
  `update_date` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '更新日期',
  `feature` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '特征值',
  `movie_ch` varchar(255) DEFAULT NULL COMMENT '电影中文名',
  `movie_en` varchar(255) DEFAULT NULL COMMENT '电影英文名',
  `movie_url` varchar(255) DEFAULT NULL COMMENT '电影详情页链接',
  `director` varchar(255) DEFAULT NULL COMMENT '导演',
  `star` varchar(255) DEFAULT NULL COMMENT '主演',
  `start_year` varchar(255) DEFAULT NULL COMMENT '上映年份',
  `country` varchar(255) DEFAULT NULL COMMENT '国籍',
  `type` varchar(255) DEFAULT NULL COMMENT '类型',
  `rating` varchar(255) DEFAULT NULL COMMENT '评分',
  `num_ratings` varchar(255) DEFAULT NULL COMMENT '评分人数',
  `comment` varchar(1000) DEFAULT NULL COMMENT '评语',
  PRIMARY KEY (`feature`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3.2编写数据库写入py文件

3.2.1构建1个执行sql语句的函数

定义了一个函数 cost_execute_sql_no_return_value(sql),用于执行 不返回结果 的 SQL 语句(如 INSERTUPDATEDELETE),并处理数据库连接池、事务和错误。

数据库连接池配置

__config = {
    "host": "localhost",      # MySQL 服务器地址
    "port": 3306,             # 端口号
    "user": "root",           # 用户名
    "password": "dandan1901", # 密码
    "database": "douban"      # 数据库名
}
  • 定义了 MySQL 的连接参数,包括主机、端口、用户名、密码和数据库名。

初始化连接池

try:
    pool = mysql.connector.pooling.MySQLConnectionPool(
        **__config,
        pool_size=10  # 连接池大小(最多 10 个连接)
    )
except Exception as e:
    print(e)  # 如果连接池初始化失败,打印错误
  • MySQLConnectionPool:创建一个 MySQL 连接池,pool_size=10 表示最多维护 10 个连接。

  • 异常处理:如果连接池初始化失败(如密码错误、网络问题),捕获异常并打印。

执行 SQL 语句

try:
    con = pool.get_connection()  # 从连接池获取一个连接
    cursor = con.cursor()        # 创建游标
    cursor.execute(sql)          # 执行 SQL
    con.commit()                 # 提交事务
    print(('Successfully insert ' + sql).encode('gbk', 'ignore').decode('gbk', 'ignore'))
except Exception as e:
    print(e)  # 打印错误信息
    if "con" in dir():           # 如果连接存在
        con.rollback()           # 回滚事务
    print(('Failed insert ' + sql).encode('gbk', 'ignore').decode('gbk', 'ignore'))
finally:
    if "con" in dir():           # 确保连接被关闭
        con.close()
  • pool.get_connection():从连接池获取一个可用连接。

  • cursor.execute(sql):执行传入的 SQL 语句。

  • con.commit():如果执行成功,提交事务。

  • con.rollback():如果执行失败,回滚事务。

  • con.close():在 finally 中确保连接被关闭(防止资源泄漏)。

  • 编码处理
    ('Successfully insert ' + sql).encode('gbk', 'ignore').decode('gbk', 'ignore')
    这段代码是为了避免中文字符在打印时出现编码错误(gbk 是 Windows 终端常用编码)。

3.2.2构造一个将dataframe转成sql语句的函数

将 DataFrame 中的数据批量插入/更新到 MySQL 数据库的 top250movie 表中

def write_info_into_db(df):
    for i in range(df.shape[0]):
        df.loc[i, '特征值'] = 'TOP' + str(i + 1).zfill(4)

    df['更新日期'] = str(datetime.datetime.now())

    # 将数据写入数据库
    insert_cols = {
        'update_date': '更新日期',
        'feature': '特征值',
        'movie_ch': '电影中文名',
        'movie_en': '电影英文名',
        'movie_url': '电影详情页链接',
        'director': '导演',
        'star': '主演',
        'start_year': '上映年份',
        'country': '国籍',
        'type': '类型',
        'rating': '评分',
        'num_ratings': '评分人数',
        'comment': '评语',
    }
    # df_copy = pd.DataFrame(df)
    db_list = list(insert_cols.keys())
    print(db_list)
    for i in range(df.shape[0]):
        dbname = 'top250movie'
        sql = "REPLACE INTO " + dbname + " ("
        for ix in range(len(db_list)):
            if ix < len(db_list) - 1:
                sql = sql + str(db_list[ix]) + ', '
            else:
                sql = sql + str(db_list[ix])
        sql = sql + ") VALUES ("
        for ix in range(len(db_list)):
            each_key = db_list[ix]
            print(df.loc[i, insert_cols[each_key]])

            if ix < len(db_list) - 1:
                sql += "'" + str(df.loc[i, insert_cols[each_key]]) + "'" + ","
            else:
                sql += "'" + str(df.loc[i, insert_cols[each_key]]) + "'" + ");"

        print('>>>>>>>>>>')
        print(sql)
        cost_execute_sql_no_return_value(sql)

数据处理

for i in range(df.shape[0]):
    df.loc[i, '特征值'] = 'TOP' + str(i + 1).zfill(4)  # 生成形如 "TOP0001" 的特征值
  • 为 DataFrame 的每一行添加 特征值 列,格式为 TOP0001TOP0002...(zfill(4) 保证 4 位数字)。

df['更新日期'] = str(datetime.datetime.now())  # 添加当前时间戳
  • 添加 更新日期 列,值为当前时间(字符串格式)。

数据库字段映射

insert_cols = {
    'update_date': '更新日期',    # 数据库列名: DataFrame 列名
    'feature': '特征值',
    'movie_ch': '电影中文名',
    'movie_en': '电影英文名',
    'movie_url': '电影详情页链接',
    'director': '导演',
    'star': '主演',
    'start_year': '上映年份',
    'country': '国籍',
    'type': '类型',
    'rating': '评分',
    'num_ratings': '评分人数',
    'comment': '评语',
}
  • 定义数据库列名(key)和 DataFrame 列名(value)的映射关系。

db_list = list(insert_cols.keys())  # 获取数据库列名列表
print(db_list)  # 打印列名(调试用)

生成并执行 SQL

for i in range(df.shape[0]):  # 遍历 DataFrame 每一行
    dbname = 'top250movie'
    sql = "REPLACE INTO " + dbname + " ("
    
    # 拼接列名部分(如 REPLACE INTO top250movie (update_date, feature, ...))
    for ix in range(len(db_list)):
        if ix < len(db_list) - 1:
            sql += str(db_list[ix]) + ', '
        else:
            sql += str(db_list[ix])
    
    sql += ") VALUES ("
    
    # 拼接值部分(如 VALUES ('2023-01-01', 'TOP0001', ...))
    for ix in range(len(db_list)):
        each_key = db_list[ix]
        print(df.loc[i, insert_cols[each_key]])  # 打印当前值(调试用)
        
        if ix < len(db_list) - 1:
            sql += "'" + str(df.loc[i, insert_cols[each_key]]) + "', "
        else:
            sql += "'" + str(df.loc[i, insert_cols[each_key]]) + "');"
    
    print('>>>>>>>>>>')
    print(sql)  # 打印完整 SQL(调试用)
    cost_execute_sql_no_return_value(sql)  # 执行 SQL
  • REPLACE INTO:如果主键冲突,则删除旧记录并插入新记录(类似 INSERT + DELETE)。

  • 动态生成 SQL:根据 db_list 和 DataFrame 的值拼接 SQL 语句。

  • 执行 SQL:调用 cost_execute_sql_no_return_value(sql)(前文定义的函数)执行。

3.2.3完整代码

# 导入模块
import datetime

import mysql.connector.pooling
import pandas as pd
import requests
from lxml import etree


def cost_execute_sql_no_return_value(sql):
    __config = {
        "host": "localhost",
        "port": 3306,
        "user": "root",
        "password": "faw-vw.1901",
        "database": "douban"
    }
    try:
        pool = mysql.connector.pooling.MySQLConnectionPool(
            **__config,
            pool_size=10
        )
    except Exception as e:
        print(e)

    try:
        con = pool.get_connection()
        cursor = con.cursor()
        cursor.execute(sql)
        con.commit()
        print(('Successfully insert ' + sql).encode('gbk', 'ignore').decode('gbk', 'ignore'))
    except Exception as e:
        print(e)
        if "con" in dir():
            con.rollback()
        print(('Failed insert ' + sql).encode('gbk', 'ignore').decode('gbk', 'ignore'))
    finally:
        if "con" in dir():
            con.close()


def crawler_douban():
    # 请求头信息
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36'
    }

    moive_list = []

    for page in range(1, 11):

        # 目标url
        url = f'https://movie.douban.com/top250?start={(page - 1) * 25}&filter='

        # 发送请求, 获取响应
        res = requests.get(url, headers=headers)
        # 打印响应信息
        # print(res.text)
        # 网页源码
        html = res.text
        # 实例化etree对象
        tree = etree.HTML(html)

        divs = tree.xpath('//div[@class="info"]')

        # print(divs)
        i = 1
        for div in divs:

            dic_temp = {}
            dic_temp['电影中文名'] = ''
            dic_temp['电影英文名'] = ''
            dic_temp['电影详情页链接'] = ''
            dic_temp['导演'] = ''
            dic_temp['主演'] = ''
            dic_temp['上映年份'] = ''
            dic_temp['国籍'] = ''
            dic_temp['类型'] = ''
            dic_temp['评分'] = ''
            dic_temp['评分人数'] = ''
            dic_temp['评语'] = ''
            print('>>>>>>>>>>')
            print(i)
            print(div)
            div_temp = div.xpath('./div[@class="hd"]/a')
            urlx = div_temp[0].get('href')
            texts = [a.xpath('string(.)').strip() for a in div_temp]
            print(texts)
            need_list1 = []
            for each_text in texts:
                each_text_list = each_text.split('\n')
                for each_one in each_text_list:
                    need_list1.append(each_one.replace(' ', ''))

            div_temp = div.xpath('./div[@class="bd"]')
            texts = [a.xpath('string(.)').strip() for a in div_temp]
            need_list2 = []
            for each_text in texts:
                each_text_list = each_text.split('\n')
                for each_one in each_text_list:
                    need_list2.append(each_one.replace(' ', ''))

            print(need_list1)
            print(need_list2)
            dic_temp['电影中文名'] = need_list1[0]
            dic_temp['电影英文名'] = need_list1[1].replace('\xa0/\xa0', '')
            dic_temp['电影详情页链接'] = urlx
            dic_temp['导演'] = need_list2[0].split('xa0\xa0\xa0')[0]
            print(need_list2[0])
            try:
                dic_temp['导演'] = dic_temp['导演'].replace('导演:', '').replace('\xa0\xa0\xa0', '')
                dic_temp['导演'] = dic_temp['导演'].split('主演:')[0]
            except:
                pass
            try:
                dic_temp['主演'] = need_list2[0].split('主演:')[1]
            except:
                pass
            dic_temp['上映年份'] = need_list2[1].split('\xa0/\xa0')[0]
            dic_temp['国籍'] = need_list2[1].split('\xa0/\xa0')[1]
            dic_temp['类型'] = need_list2[1].split('\xa0/\xa0')[2]
            dic_temp['评分'] = need_list2[7]
            dic_temp['评分人数'] = need_list2[9]
            if need_list2[-1] != need_list2[9]:
                dic_temp['评语'] = need_list2[-1]

            if dic_temp != {}:
                print(dic_temp)
                moive_list.append(dic_temp)

            i += 1

            print(f'----------------------第{page}页爬取完成--------------------------------------')
    print('-----------------------爬虫结束-------------------------------')

    df = pd.DataFrame(moive_list)

    df.to_excel('douban_TOP250_moive.xlsx', index=None)
    return df


def write_info_into_db(df):
    for i in range(df.shape[0]):
        df.loc[i, '特征值'] = 'TOP' + str(i + 1).zfill(4)

    df['更新日期'] = str(datetime.datetime.now())

    # 将数据写入数据库
    insert_cols = {
        'update_date': '更新日期',
        'feature': '特征值',
        'movie_ch': '电影中文名',
        'movie_en': '电影英文名',
        'movie_url': '电影详情页链接',
        'director': '导演',
        'star': '主演',
        'start_year': '上映年份',
        'country': '国籍',
        'type': '类型',
        'rating': '评分',
        'num_ratings': '评分人数',
        'comment': '评语',
    }
    # df_copy = pd.DataFrame(df)
    db_list = list(insert_cols.keys())
    print(db_list)
    for i in range(df.shape[0]):
        dbname = 'top250movie'
        sql = "REPLACE INTO " + dbname + " ("
        for ix in range(len(db_list)):
            if ix < len(db_list) - 1:
                sql = sql + str(db_list[ix]) + ', '
            else:
                sql = sql + str(db_list[ix])
        sql = sql + ") VALUES ("
        for ix in range(len(db_list)):
            each_key = db_list[ix]
            print(df.loc[i, insert_cols[each_key]])

            if ix < len(db_list) - 1:
                sql += "'" + str(df.loc[i, insert_cols[each_key]]) + "'" + ","
            else:
                sql += "'" + str(df.loc[i, insert_cols[each_key]]) + "'" + ");"

        print('>>>>>>>>>>')
        print(sql)
        cost_execute_sql_no_return_value(sql)


# 程序入口
if __name__ == "__main__":
    # 爬取数据
    df = crawler_douban()
    print(df)
    # 将数据写入数据库
    write_info_into_db(df)

展示结果如下

4 总结

此处省略1万字,感谢各位看到最后,最后附上代码信息,数据库也在文件中,百度网盘链接如下

通过网盘分享的文件:项目工坊_豆瓣爬虫
链接: https://pan.baidu.com/s/1cY8plsXfVYl8NuhBixVrTQ?pwd=y131 提取码: y131


http://www.kler.cn/a/613852.html

相关文章:

  • Tomcat-Thales靶机攻略
  • 软件项目管理课程之第4讲:软件需求管理
  • Spring Boot 的启动流程
  • Java-servlet(九)前端会话,会话管理与Cookie和HttpSession全解析
  • 【2.项目管理】2.4 Gannt图【甘特图】
  • CLion下载安装(Windows11)
  • FPGA设计中IOB约束
  • selenium之element常见属性、方法
  • 【QT5 多线程示例】线程池
  • 网络体系架构
  • 基于ngnix配置本地代理到对应服务器
  • STM32 DMA直接存储器存取
  • 机器学习knnlearn4
  • 如何同步fork的更新
  • 利用python调接口获取物流标签,并转成PDF保存在指定的文件夹。
  • Scala课后总结(2)
  • 内核编程十一:进程的数据结构
  • springboot 实现base64格式wav转码并保存
  • 第 4 章 | Solidity安全 权限控制漏洞全解析
  • GelSight视触觉3D显微系统在透明材料检测中的应用