当前位置: 首页 > article >正文

爬虫学习记录

 1.概念

通过编写程序,模拟浏览器上网,然后让其去互联网上抓取数据的过程

  • 通用爬虫:抓取的是一整张页面数据
  • 聚焦爬虫:抓取的是页面中的特定局部内容
  • 增量式爬虫:监测网站中数据更新的情况,只会抓取网站中最新更新出来的数据

robots.txt协议:

君子协议,网站后面添加robotx.txt可进行查看

https://www.baidu.com/robots.txt

1.1 http协议

服务器和客户端进行数据交互的一种形式

常用的请求头信息:

  • User-Agent: 请求载体的身份标识,如: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36
  • Connection: 请求完毕完毕之后,是断开连接还是保持连接(close和keep-alive)

常用响应头信息:

  • Content-Type: 服务器响应客户端的数据类型 如:(text/html; charset=utf-8)

1.2 https协议

安全的超文本传输协议,在客户端和服务端进行数据传输和数据交互的过程中,数据是进行加密的.

数据加密的方式

  • 对称秘钥加密: 客户端制定加密方式,加加密的密文和生成的秘钥都传输给服务端.服务端根据秘钥对于密文进行解密.(但是密文和秘钥可能会被同时拦截)
  • 非对称秘钥加密:服务端生成秘钥对,将生成的公钥传递给客户端,然后将生成的密文传递给服务端,服务端再使用私钥进行解密(客户端拿到的秘钥,不一定是从服务端传递过来的)
  • 证书秘钥加密:服务器端制定加密方式,服务端将公钥传递给证书认证机构,认证机构将公钥通过认证之后,进行数字签名.将携带数字签名的公钥封装到证书当中,将证书一并发送给客户端

2. requests模块

作用:模拟浏览器发送请求

requests模块的编码流程:

  • 指定url
  • 发起请求(get/post)
  • 获取相应数据
  • 持久化存储

2.1 简易网页采集器

2.1.1 UA 伪装

将python脚本发送的请求,伪装成为浏览器发送的请求

2.1.2 Get请求携带参数

将url的参数封装成为字典,传递给params

# _*_ coding utf-8 _*_
# george
# time: 2024/12/24下午7:56
# name: test.py
# comment:
import requests

# 参考url
# https://www.baidu.com/s?wd=%E6%88%90%E5%8A%9F
url = "https://www.baidu.com/s?"

# UA伪装:模拟浏览器
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}

word = input("enter your word:").strip()

# get请求携带的参数
param = {
    "wd":word
}

# 获取响应对象
response = requests.get(url, headers=header, params=param)

# 获取响应内容,可以通过response.text获取字符串形式的响应数据
page_text = response.content

# 持久化储存
file_name = word + ".html"
with open(file_name, "wb") as f:
    f.write(page_text)

print(f"{file_name} has been save successfully")

2.2 破解百度翻译

2.2.1 POST请求携带参数

将传递的参数封装成为字典,并且传递给data

180e7e948fa9407cb8ffb152aa6caa93.png

2.2.2 Ajax请求

单词输入后,局部页面就会刷新

Ajax(Asynchronous JavaScript and XML)是一种在Web应用程序中进行异步通信的技术,它使用JavaScript和XML(现在通常使用JSON)来实现在不刷新整个页面的情况下与服务器进行数据交换

97db308ad6cc45dfa31e7f16dcee5cc8.png

2.2.3 JSON模块的使用

  • 反序列化 loads:将json字符串转化为python对象字典
  • 序列化 dump: 将python对象字典转化为json字符串,并写入文件

4c96e3dfa22742a1a2ce45a179a8f6cd.png

# _*_ coding utf-8 _*_
# george
# time: 2024/12/24下午7:56
# name: test.py
# comment:
import requests
import json

# 参考url
url = "https://fanyi.baidu.com/sug"

# UA伪装:模拟浏览器
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36",
    "Referer":"https://fanyi.baidu.com/mtpe-individual/multimodal"
}

word = input("enter your word:").strip()

# get请求携带的参数
param = {
    "kw":word
}

# 获取响应对象
response = requests.post(url, headers=header,data=param)

# 获取响应内容
word_text = json.loads(response.text)

# 持久化储存
file_name = word + ".json"
with open(file_name, "w",encoding="utf-8") as f:
    json.dump(word_text["data"][0], f,ensure_ascii=False)

print(f"{file_name} has been save successfully")

f35d983fa23d484db43827f19e219270.png

2.3 电影 

2.4 公司url

  • 动态加载数据分析
  • 获取每家公司的url,但是发现每家公司的详情页面也是动态加载出来的

3.数据解析

  • 正则
  • bs4
  • xpath

数据解析原理:

解析局部的文本内容都会在标签之间或是标签的属性中进行存储

  • 进行指定标签的定位
  • 标签或是标签对应属性存储值的获取

3.1 正则解析

3.2 bs4解析

只可以被应用于python中

3.2.1 数据解析原理

  • 实例化BeautifulSoup对象,并且将页面源码数据加载到该对象里面
  • 通过调用BeautifulSoup对象中的相关的属性或是方法,进行标签定位和数据提取

3.2.2 环境安装

pip3 install beautifulsoup4  -i  https://mirrors.aliyun.com/pypi/simple
pip3 install lxml  -i  https://mirrors.aliyun.com/pypi/simple

3.2.3 bs4的具体使用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/26 22:07
# @Author : George
from bs4 import BeautifulSoup
import re

fp = open("./result.html", "r", encoding="utf-8")
soup = BeautifulSoup(fp, "lxml")
# soup.tagName:返回的是html中第一次出现的tag标签
print(soup.div)
# -------------------------------------------
# 等同于soup.tagName
print(soup.find("a"))
# 属性定位
print(soup.find("a", href=re.compile(".*ip138.com/$")).text)
# # 加载源码中所有的tag标签组成的列表
print(soup.find_all("a"))
# -------------------------------------------
# 可以使用选择器,id/类/标签/选择器,返回的是一个列表
print(soup.select('.center'))
# 层级选择器,
# “ ”空格就是表示多个层级
# > 表示一个层级
for i in soup.select('.center > ul > li > a'):
    print(i.text)
print(soup.select('.center > ul a')[0])
# -------------------------------------------
# 获取标签之间的文本数据 soup.a.text/string/get_text()
# --text/get_text可以获取标签里面啊所有的文本内容
# --string只可以获取该标签下的直系文本内容
print(soup.select('.center > ul a')[0].string)
# -------------------------------------------
# 获取标签里面的属性值
print(soup.select('.center > ul a')[0]["href"])

3.2.4 bs4爬取三国演义

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/26 22:43
# @Author : George
# ==================================================
# <a href="/guwen/bookv_6dacadad4420.aspx">第一回</a>
# 第一回网址
# https://www.gushiwen.cn/guwen/bookv_6dacadad4420.aspx
# ==================================================

from bs4 import BeautifulSoup
import requests

url = "https://www.gushiwen.cn/guwen/book_46653FD803893E4F7F702BCF1F7CCE17.aspx"

# UA伪装:模拟浏览器
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}
# 获取响应对象
response = requests.get(url, headers=header)
# 实例化bs对象
soup = BeautifulSoup(response.text, "lxml")
a_liat = soup.select(".bookcont >ul > span >a")
fp = open("三国.txt","w",encoding="utf-8")
# 解析章节标题和详情页面的url
for tag in a_liat:
    title = tag.text
    detail_url = "https://www.gushiwen.cn/"+tag["href"]
    # 对详情页面发起请求
    detail_page = requests.get(detail_url, headers=header)
    # 解析出详情页面中的内容
    detail_soup = BeautifulSoup(detail_page.text,"lxml")
    # 使用此种方法出现一个问题,就是文章都是在p标签里面,所以文章不会换行
    # content = detail_soup.find("div",class_="contson").text
    fp.write(title + ":")
    for line in detail_soup.select('.contson > p'):
        fp.write(line.text+"\n")
    fp.write("\n\n")
    print(f"{title}爬取成功")
fp.close()

效果:

2fed6d19354c427799d07f88da4f0cb3.png

3.3 xpath解析

最常用且是最便捷高效的一种解析方式

3.3.1 xpath解析原理

  • 实例化一个etree的对象,且将被解析的页面远吗数据加载到该对象中
  • 调用etree对象中的xpath方法结合xpath表达式实现标签的定位和内容的捕获

3.3.2 环境的安装

pip3 install lxml  -i  https://mirrors.aliyun.com/pypi/simple

3.3.3 具体使用

xpath只能够根据层级关系定位标签页面.

1d80a29970ae42d885f98586b60dd8f9.png

ba9263465f7742479051cd9574f5c8fd.png

<!DOCTYPE html>
<html lang="zh-CN">
<head>
<!--    <meta charset="UTF-8">-->
<!--    <meta name="viewport" content="width=device-width, initial-scale=1.0">-->
    <title>小型HTML页面示例</title>
</head>
<body>
    <div class="container">
        <div class="div1">
            <h2>第一个Div</h2>
            <p>这是第一个div的内容。它使用了类标签.div1进行样式定位。</p>
            <p>这是第一个div的内容。它使用了类标签.div2进行样式定位。</p>
            <p><title>这是第一个div的内容。它使用了类标签.div3进行样式定位。</title></p>
        </div>
        <div class="div2">
            <h2>第二个Div</h2>
            <p>这是第二个div的内容。它使用了类标签.div2进行样式定位。</p>
        </div>
        <div class="div3">
            <h2 class="div3_h2">第三个Div</h2>
            <p>这是第三个div的内容。它使用了类标签.div3进行样式定位。</p>
        </div>
    </div>
</body>
</html>
from lxml import etree

tree_ = etree.parse("./baidu.html")
# r = tree_.xpath("/html/head/title")  # => [<Element title at 0x10a79ee60>],返回的是定位出来的对象
# r = tree_.xpath("/html//title")  # => [<Element title at 0x10a510dc0>],定位出来的效果是一样的
# r = tree_.xpath("//title") # => [<Element title at 0x106b75dc0>],找到源码里面所有的title标签

# 属性定位
# r = tree_.xpath('//div[@class="div1"]') # => [<Element div at 0x101807e60>]

# 索引定位,索引位置从1开始
# r = tree_.xpath('//div[@class="div1"]/p[3]')

# 取文本 /text()获取的标签里面直系的文本内容, //text() 获取标签下面所有的文本内容
# text = tree_.xpath('//div[@class="div1"]/p[3]/text()')
# print(text)  # => ['这是第一个div的内容。它使用了类标签.div3进行样式定位。']
# text = tree_.xpath('//div[@class="div1"]//text()')
# print(text)

# 获取属性
# attr = tree_.xpath('//div[@class="div3"]/h2/@class')  # ['div3_h2']
# print(attr)

3.3.4 爬取ppt模板

已经成功,结果如下,但是是大学时多亏了它的免费模板,就不贴代码给它带来麻烦了

99221c29b8094f67b7b8f70744dd24fa.png

3.3.5 爬取美女图片

https://pic.netbian.com/4kmeinv/

爬取美女图片失败,开始进入网站总是有人机验证,进去什么都爬取不了,后面再试一下

db1321caf5304010a2a3d806cc43f60c.png

4.模拟登录

4.1 验证码识别 

要收费,自己写个图片文字识别

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/28 13:44
# @Author : George
from PIL import Image
import pytesseract

# 如果你没有将tesseract.exe添加到系统的PATH中,
# 你需要在这里指定tesseract可执行文件的完整路径
pytesseract.pytesseract.tesseract_cmd = r'D:\Tesseract-OCR\tesseract.exe'

# 打开一个图像文件
image_path = 'abc_1.png'  # 替换为你的图像路径
image = Image.open(image_path)

# 使用pytesseract进行OCR
text = pytesseract.image_to_string(image, lang='chi_sim')  # lang参数指定语言,例如'chi_sim'表示简体中文

# 打印识别出的文字
print(text)

4.2 模拟登录逻辑

5.cookie

5.1含义及作用

网页的Cookie是一种在Web开发中广泛使用的技术,用于在用户的计算机上存储小块的数据。这些数据通常由Web服务器发送给用户的浏览器(所以主要是服务器创建的),并在用户后续的访问中被浏览器返回给服务器。Cookie的主要功能和用途包括:

  1. 会话管理:Cookie可以用于保持用户的会话状态,例如在用户登录到网站后,服务器可以发送一个包含会话ID的Cookie到用户的浏览器。在用户后续的请求中,浏览器会自动包含这个会话ID,使得服务器能够识别并持续管理用户的会话。

  2. 个性化设置:Cookie可以用来存储用户的偏好设置,例如网站的语言、主题颜色、字体大小等。这样,当用户再次访问网站时,这些设置可以自动恢复,提高用户体验。

  3. 跟踪用户行为:通过Cookie,网站可以跟踪和分析用户的行为,例如用户访问了哪些页面、停留了多长时间、点击了哪些链接等。这些信息对于网站优化、广告定位等非常有用。

  4. 安全性:在某些情况下,Cookie还可以用于增强网站的安全性,例如通过存储加密的令牌来验证用户的身份。

Cookie具有以下几个特点:

  • 存储在客户端:Cookie数据存储在用户的浏览器上,而不是服务器上。这意味着即使用户关闭了浏览器或计算机,只要没有删除Cookie,数据仍然存在。
  • 自动发送:在用户访问与Cookie相关的网站时,浏览器会自动将相关的Cookie数据发送到服务器。
  • 有限的大小和数量:每个Cookie的大小和数量都有限制,这取决于浏览器和Web服务器的配置。
  • 过期时间:Cookie可以设置过期时间,在过期时间之前,Cookie一直有效。如果没有设置过期时间,Cookie就是一个会话Cookie,在用户关闭浏览器时自动删除。

5.2 session的含义及特点

  • 一、Session的基本概念

Session是服务器用于跟踪用户会话的一种机制。它允许服务器在多个请求之间识别同一个用户,并维护该用户的状态信息。这些状态信息可以包括用户的登录状态、购物车内容、偏好设置等。

  • 二、Session的工作原理
  1. 创建Session:当用户首次访问网站时,服务器会创建一个新的Session对象,并为其分配一个唯一的Session ID。
  2. 发送Session ID:服务器通常会将这个Session ID以Cookie的形式发送给客户端浏览器。客户端浏览器会在后续的请求中自动将这个Session ID附加在请求头中。
  3. 识别用户:服务器通过检查请求头中的Session ID来识别用户,并获取相应的Session数据。这样,服务器就可以在多个请求之间保持用户的状态信息。
  • 三、Session的作用
  1. 保持用户状态:Session允许服务器在多个请求之间跟踪用户的状态信息,如登录状态、购物车内容等。这使得用户可以在不同页面之间无缝切换,而无需重新认证或输入信息。
  2. 个性化服务:根据用户的喜好和历史行为,服务器可以为用户提供个性化的内容和服务。这有助于提高用户体验和满意度。
  3. 安全性:通过验证Session ID来确认用户的请求,服务器可以防止未授权访问和非法操作。这有助于保护用户的隐私和数据安全。
  • 四、Session与Cookie的关系

Session和Cookie是密切相关的两种技术。Cookie是服务器发送到客户端浏览器并保存在本地的一小块数据,而Session则是服务器用于跟踪用户会话的一种机制。Cookie通常用于存储Session ID,以便服务器在后续的请求中识别用户。因此,可以说Cookie是Session的一种实现方式。

  • 五、Session的管理

在网络请求中,管理Session是非常重要的。开发人员需要确保Session的安全性、有效性和可维护性。这包括:

  1. 设置合理的Session过期时间:为了避免用户长时间未操作而导致的会话过期问题,开发人员需要设置合理的Session过期时间。
  2. 保护Session ID:Session ID是用户身份的重要标识,开发人员需要采取措施来保护它免受攻击和泄露。例如,可以使用HTTPS协议来加密传输Session ID,或者使用更复杂的Session ID生成算法来提高安全性。
  3. 清理无效的Session:为了节省服务器资源和提高性能,开发人员需要定期清理无效的Session对象。这可以通过设置Session的失效时间、使用数据库存储Session数据并定期清理过期数据等方式来实现。

5.3 http和https协议的特点

无状态。即是说,即使你的第一次请求已经实现了自动登录。但是你第二次发送请求时,服务器端并不知道你的请求是基于第一次登录状态的。

cookie可以让服务器端记录客户端的相关状态

5.3 cookie值的处理

  • 手动抓包cookie值,将其封装到headers里面,但是这种方法面对cookie是动态变化的时候就很难处理了
  • session会话对象
    • 可以进行请求的发送
    • 如果请求过程中产生了cookie,则该cookie会被自动存储/携带在该session会话对象中
  • b3b252aa2fe644b3aef552a0ef159520.png

6.代理

需要绕过IP封锁、限制或进行大规模数据抓取时。代理服务器充当客户端和目标服务器之间的中介,可以隐藏你的真实IP地址,提供额外的安全性,有时还能加速请求

测试网址:

https://httpbin.org/get

现在基本上没有免费正常的代理可以被使用,我这个也是失败的。看到一个博主写建立代理ip池的python3之爬虫代理IP的使用+建立代理IP池_python爬虫代理池-CSDN博客,代码写的不错,爬取出来的ip也没什么能用的了。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/28 19:14
# @Author : George
import requests

url = "https://www.baidu.com/s?"

# 将爬虫程序伪装成为浏览器来发送请求
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
}

# 设置代理
proxies = {
    "http": "154.203.132.49:8080",
    "https":"49.73.4.158:8089"
}
param = {
    "wd":"ip"
}

response = requests.get(url,headers=headers,proxies=proxies,verify=False,params=param)
with open("proxy.html","wt",encoding="utf-8") as f:
    f.write(response.text)

公司的联网也是需要代理的

# _*_ coding utf-8 _*_
# george
# time: 2024/12/24下午7:56
# name: test.py
# comment:
import requests

user = ""
passwd = "

url = "https://www.baidu.com/"

# 模拟浏览器
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}
# 指定代理,不指定代理,无法上网
proxies = {
    "http": f"http://{user}:{passwd}@ip:port",
    "https": f"https://{user}:{passwd}@ip:port"
}

# 获取响应对象
response = requests.get(url, headers=header, proxies=proxies)

# 获取响应内容,可以通过response.text获取字符串形式的响应数据
page_text = response.content

# 持久化储存
with open("baidu.html", "wb") as f:
    f.write(page_text)

7.异步爬虫(进程池)

本来是针对视频进行爬取的,但是ajax请求时的请求地址,看不懂mrd这个怎么来的,暂时跳过,我灰太狼一定会回来的!!!!!!!!!!!!

295b11c4eb6346fba989e6cdf2874352.png

d684c278cec246cf8b9eb62f1497660d.png

7.1 视频爬取

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/29 13:16
# @Author : George
import requests
import chardet
import os
from lxml import etree
"""
"https://www.pearvideo.com/category_1"
['video_1797596', 'video_1797399', 'video_1797404', 'video_1797260']

https://www.pearvideo.com/video_1797596

ajax请求
https://www.pearvideo.com/videoStatus.jsp?contId=1797596&mrd=0.4308675768914054
https://www.pearvideo.com/videoStatus.jsp?contId=1797399&mrd=0.6241695396585363
"""


class LiVideo(object):
    def __init__(self):
        #  定义输出ppt的文件夹
        self.extract_to_dir = f"./video"
        os.makedirs("./video", exist_ok=True)
        # 添加Referer防止反爬虫
        self.header = {
            "Referer": "https://www.pearvideo.com/",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
        }
        # 编码
        self.encoding = None
        self.home_url = "https://www.pearvideo.com/category_1"

    def get_HTML(self, url_param):
        response = requests.get(url_param, headers=self.header)
        if response.status_code == 200:
            # 使用 chardet 自动检测编码
            self.encoding = chardet.detect(response.content)['encoding']
            response.encoding = self.encoding
            # 创建etree对象
            tree = etree.HTML(response.text)
            return tree

    def deal(self):
        home_tree = self.get_HTML(self.home_url)
        home_url_list = home_tree.xpath("//*[@id='listvideoListUl']/li/div/a[1]/@href")
        name_list = home_tree.xpath("//*[@id='listvideoListUl']/li/div/a[1]/div[2]/text()")
        for name,url in zip(name_list,home_url_list):
            url = "https://www.pearvideo.com/"+url
            detail_tree = self.get_HTML(url)


if __name__ == '__main__':
    vi = LiVideo()
    vi.deal()

 7.2 诗文异步爬取

只要将任务提交给线程池,线程池就会自动安排一个线程来执行这个任务.同过线程池提交的任务是异步提交.异步提交的结果就是不等待任务的执行结果,继续往下执行 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/29 14:26
# @Author : George
"""
https://www.gushiwen.cn/guwen/Default.aspx?p=1&type=%e5%b0%8f%e8%af%b4%e5%ae%b6%e7%b1%bb

第二层
https://www.gushiwen.cn/guwen/book_4e6b88d8a0bc.aspx
https://www.gushiwen.cn/guwen/book_a09880163008.aspx

第三層
https://www.gushiwen.cn/guwen/bookv_b630af160f65.aspx
"""
import os
import requests
import chardet
from lxml import etree
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
import time
from typing import Dict, List, Tuple

class NovelDownloader:
    def __init__(self):
        self.output_dir = "./novels"
        os.makedirs(self.output_dir, exist_ok=True)
        
        self.headers = {
            "Referer": "https://www.gushiwen.cn/guwen/Default.aspx?p=1",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
        }
        self.base_url = "https://www.gushiwen.cn"
        self.home_url = f"{self.base_url}/guwen/Default.aspx?p=1&type=%e5%b0%8f%e8%af%b4%e5%ae%b6%e7%b1%bb"

    def get_html_tree(self, url: str) -> etree._Element:
        """获取页面HTML并返回etree对象"""
        response = requests.get(url, headers=self.headers)
        if response.status_code != 200:
            raise Exception(f"Failed to get {url}, status code: {response.status_code}")
            
        encoding = chardet.detect(response.content)['encoding']
        response.encoding = encoding
        return etree.HTML(response.text)

    def get_chapter_details(self) -> Dict[str, Dict[str, str]]:
        """获取所有章节详情"""
        home_tree = self.get_html_tree(self.home_url)
        
        # 获取书籍链接和标题
        urls = home_tree.xpath("//*[@class='sonspic']/div[1]/p[1]/a[1]/@href")[1:3]
        titles = home_tree.xpath("//*[@class='sonspic']/div[1]/p[1]/a[1]/b/text()")[1:3]
        
        book_details = {}
        for title, url in zip(titles, urls):
            detail_tree = self.get_html_tree(f"{self.base_url}{url}")
            
            # 获取章节链接和标题
            chapter_urls = [f"{self.base_url}{url}" for url in 
                          detail_tree.xpath("//*[@class='bookcont']/ul/span/a/@href")]
            chapter_titles = detail_tree.xpath("//*[@class='bookcont']/ul/span/a/text()")
            
            book_details[title] = dict(zip(chapter_titles, chapter_urls))
            
        return book_details

    def download_novel(self, title: str, chapters: Dict[str, str]):
        """下载单本小说"""
        output_path = os.path.join(self.output_dir, f"{title}.txt")
        print(f"开始下载 {title}".center(100, "="))
        
        with open(output_path, "w", encoding="utf-8") as f:
            for chapter_title, chapter_url in chapters.items():
                response = requests.get(chapter_url, headers=self.headers)
                soup = BeautifulSoup(response.text, "lxml")
                
                f.write(f"{chapter_title}:\n")
                for paragraph in soup.select('.contson > p'):
                    f.write(f"{paragraph.text}\n")
                f.write("\n\n")
                
                print(f"{chapter_title} 下载完成".center(20, "-"))
                
        print(f"{title} 下载完成".center(100, "="))

def main():
    start_time = time.time()
    
    downloader = NovelDownloader()
    books = downloader.get_chapter_details()
    
    # 使用线程池并行下载
    with ThreadPoolExecutor(max_workers=10) as pool:
        futures = [pool.submit(downloader.download_novel, title, chapters) 
                  for title, chapters in books.items()]
        
    print(f"总耗时: {time.time() - start_time:.2f}秒")

if __name__ == '__main__':
    main()

8.异步爬虫(协程)

  协程内容不赘述:CSDN

c62484262bf6460187337af4d2ab3996.png

基于单线程和协程,实现异步爬虫。 

8.1. 基于flask搭建服务器

简单的学习了一下,感觉不是很复杂,后面等着详细学习

261a05e6ee984879838d6b8015f8b299.png

pip3 install Flask -i  https://mirrors.aliyun.com/pypi/simple
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/31 21:47
# @Author : George
"""
# @app.route('/'): 这是一个装饰器,用于告诉 Flask 哪个 URL 应该触发下面的函数。
在这个例子中,它指定了根 URL(即网站的主页)。

# return 'Hello, World!': 这行代码是 hello_world 函数的返回值。
当用户访问根 URL 时,这个字符串将被发送回用户的浏览器。

"""
from flask import Flask, request, jsonify
import time


def get_client_ip(request):
    # 如果使用了反向代理,优先从 X-Forwarded-For 头部获取 IP 地址
    x_forwarded_for = request.headers.get('X-Forwarded-For', '').split(',')
    if x_forwarded_for:
        client_ip = x_forwarded_for[0]  # 通常第一个 IP 地址是客户端的真实 IP 地址
    else:
        client_ip = request.remote_addr
    return client_ip


app = Flask(__name__)


@app.route('/')
def index():
    return 'Hello, World!'


@app.route('/bobo')
def index_bobo():
    # 获取client的user_agent和referer
    user_agent = request.headers.get('User-Agent')
    user_referer = request.headers.get('Referer')
    print(user_agent, user_referer)
    # client ip地址没有获取到
    client_ip = get_client_ip(request)
    print({'client_ip': client_ip})
    time.sleep(3)
    return 'bobo'


@app.route('/jar')
def index_jar():
    time.sleep(3)
    return 'jar'


@app.route('/test')
def index_test():
    time.sleep(3)
    return 'test'


if __name__ == '__main__':
    app.run(threaded=True)

8.2 基于aiohttp实现异步爬虫

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/31 22:08
# @Author : George
import asyncio
import requests
import time
import aiohttp
from threading import currentThread

"""
async def request(url): # 耗时: 9.054526805877686,requests是同步阻塞,必须替换为异步库提供的函数aiohttp
"""

urls = [
    "http://127.0.0.1:5000/jar",
    "http://127.0.0.1:5000/bobo",
    "http://127.0.0.1:5000/test"
]

start = time.time()


# 耗时: 9.054526805877686,requests是同步操作
# async def request(url):
#     print("开始下载", url)
#     response = requests.get(url=url)
#     print(response.text)
#     print("下载结束", url)
#     print("------------")

async def request_2(url):
    print("开始下载", url, currentThread())
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
        "Referer": "https://movie.douban.com/explore"
    }
    # 设置代理
    proxy = "http://8.220.204.215:8008"

    async with aiohttp.ClientSession() as session:
        async with session.get(url, headers=headers) as response:
            text = await response.text()
            print(text)
    print("下载结束", url)


async def main():
    tasks = []
    for url in urls:
        tasks.append(asyncio.create_task(request_2(url)))
    await asyncio.wait(tasks)


asyncio.run(main())

end = time.time()
print("耗时:", end - start)

9.selenuim使用

  • 便捷的获取网页中间动态加载的数据
  • 边界实现模拟登录

selenuim:基于浏览器自动化的一个模块

入门指南 | Selenium

b2505f3011124e5bb0606b9d5922d52f.png

基于selenium实现浏览器自动化,自动输入并播放动漫核心代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2025/1/1 16:04
# @Author : George
"""
在 Selenium 4 中,不再直接在 webself.driver.Chrome 中设置驱动程序路径,而是通过引入 Service 对象来设置。
这样可以避免弃用警告,并确保驱动程序的正确加载
"""
import os.path

from selenium import webdriver
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
import time
from lxml import etree
import requests
import chardet
import json
from log_test import logger

class VideoAutoPlay(object):
    def __init__(self):
        self.driver = {}
        self.count_num = 1
        self.movies_dict =None
        self.home_url = "https://www.agedm.org/"
        self.headers = {
            "Referer": "https://www.agedm.org/",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0"
        }

    def get_html_tree(self, url: str) -> etree._Element:
        """获取页面HTML并返回etree对象"""
        response = requests.get(url, headers=self.headers)
        if response.status_code != 200:
            raise Exception(f"Failed to get {url}, status code: {response.status_code}")

        encoding = chardet.detect(response.content)['encoding']
        response.encoding = encoding
        return etree.HTML(response.text)

    def movies_input(self, movie_name, n):
        try:
            if not self.driver:
                self.driver = self.setup_driver()

            self.driver.get(self.home_url)
            # 查找搜索框元素
            search_box = self.driver.find_element(By.ID, "query")

            # 输入搜索内容
            search_box.send_keys(movie_name)

            # 提交搜索表单
            search_box.send_keys(Keys.RETURN)

            # 等待搜索结果加载
            # WebDriverWait(self.driver, 10).until(
            #     EC.presence_of_element_located((By.CLASS_NAME, "content_left"))
            # )
            # 二级请求
            self.driver.get(f"https://www.agedm.org/search?query={movie_name}")

            # 查找在线播放btn
            # wait = WebDriverWait(self.driver, 10)  # 10秒超时
            # # titles = home_tree.xpath("//*[@class='sonspic']/div[1]/p[1]/a[1]/b/text()")
            button = self.driver.find_element(By.XPATH, "//*[@class='video_btns']/a[1]")
            # print(button)
            button.click()
            WebDriverWait(self.driver, 30).until(
                EC.presence_of_element_located((By.CLASS_NAME, "tab-content"))
            )
            # # 等待搜索结果加载
            detail_url = self.driver.current_url
            tree = self.get_html_tree(detail_url)
            url_dict = {}
            titles = tree.xpath("//*[@class='video_detail_episode']/li/a/text()")[n - 1:]
            urls = tree.xpath("//*[@class='video_detail_episode']/li/a/@href")[n - 1:]
            for title, url in zip(titles, urls):
                url_dict[title] = url
            # print(url_dict)
            return url_dict
        except Exception as e:
            logger.error(f"搜索过程发生错误: {str(e)}")
            if self.driver:
                self.driver.quit()
                self.driver = None
            return {}

    def setup_driver(self):
        # 优化视频播放性能的设置
        options = webdriver.EdgeOptions()
        options.add_argument('--disable-gpu')  # 禁用GPU加速
        options.add_argument('--no-sandbox')  # 禁用沙箱模式
        options.add_argument('--disable-dev-shm-usage')  # 禁用/dev/shm使用
        options.add_argument('--disable-software-rasterizer')  # 禁用软件光栅化
        options.add_argument('--disable-extensions')  # 禁用扩展
        options.add_argument('--disable-infobars')  # 禁用信息栏
        options.add_argument('--autoplay-policy=no-user-gesture-required')  # 允许自动播放
        options.add_experimental_option('excludeSwitches', ['enable-automation'])  # 禁用自动化提示
        options.add_experimental_option("useAutomationExtension", False)  # 禁用自动化扩展

        # 设置正确的驱动路径
        service = EdgeService(executable_path='./msedgedriver.exe')
        return webdriver.Edge(options=options, service=service)

    def video_player(self, movie_name, n):
        try:
            url_dict = self.movies_input(movie_name, n)
            if not url_dict:
                logger.error("未找到可播放的视频链接")
                return

            if not self.driver:
                self.driver = self.setup_driver()

            for title, url in url_dict.items():
                try:
                    # 打开网站
                    self.driver.get(url)
                    # 切换到视频iframe
                    self.driver.switch_to.frame("iframeForVideo")
                    logger.info(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")

                    # 等待视频元素加载
                    wait = WebDriverWait(self.driver, 20)
                    video_element = wait.until(
                        EC.presence_of_element_located((By.CLASS_NAME, "art-video"))
                    )
                    logger.info(f"找到视频元素: {video_element}")
                    logger.info(f"{movie_name}:{title}集开始播放".center(10, "="))
                    # 等待视频加载完成后执行全屏
                    page_stage = True
                    while page_stage:
                        paused_state = self.driver.execute_script("return arguments[0].paused;", video_element)
                        print("paused_state", paused_state)
                        if not paused_state:
                            break
                    # 双击使得视频全屏显示
                    ActionChains(self.driver).double_click(video_element).perform()
                    time.sleep(3)
                    # 视频单击播放
                    paused_state = self.driver.execute_script("return arguments[0].paused;", video_element)
                    if paused_state:
                        logger.info(f"{movie_name}:{title}触发双击全屏")
                        ActionChains(self.driver).click(video_element).perform()
                    # # 点击视频开始播放
                    # action = ActionChains(self.driver)
                    # action.move_to_element_with_offset(video_element, 100, 100).click().perform()
                    print(f"点击播放时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")

                    # 等待视频播放完成
                    time.sleep(60*21)
                    n = n + 1
                    self.update_movie_count(movie_name,n)
                    logger.info(f"第{title}集播放完毕".center(10, "="))

                except Exception as e:
                    logger.error(f"播放第{title}集时发生错误: {str(e)}")
                    continue

        except Exception as e:
            logger.error(f"视频播放总体发生错误: {str(e)}")
        finally:
            if self.driver:
                self.driver.quit()
                self.driver = None

    def __del__(self):
        """确保在对象销毁时关闭driver"""
        if self.driver:
            try:
                self.driver.quit()
            except:
                pass

    def count_read(self, movies, n=1):
        # 将此文件作为播放内容的缓存
        if not os.path.exists("./count.json"):
            with open("./count.json", "w", encoding="utf-8") as f:
                self.movies_dict = {movies:n}
                json.dump({movies: n}, f, ensure_ascii=False, indent=4)
            return n
        else:
            with open("./count.json", "r", encoding="utf-8") as f:
                self.movies_dict = json.load(f)
            if not self.movies_dict.get(str(movies)):  # 读取不到movies
                self.movies_dict.update({str(movies):n})
                with open("./count.json", "w", encoding="utf-8") as f:
                    json.dump(self.movies_dict, f, ensure_ascii=False, indent=4)
                return n
            else:  # 读取到了movie
                self.count_num = self.movies_dict[str(movies)]
                return self.count_num

    # update movies count
    def update_movie_count(self, movies, n):
        with open("./count.json", "w", encoding="utf-8") as f:
            self.movies_dict[str(movies)] = n
            json.dump(self.movies_dict, f, ensure_ascii=False, indent=4)


if __name__ == "__main__":
    video = VideoAutoPlay()
    movie = "火影忍者 疾风传"
    # movie = "神之塔 第二季"
    count_num = video.count_read(movie, 1)
    # video.update_movie_count(movie, 4)
    video.video_player(movie, count_num)

10.scrapy框架

破电脑太老,安装不了!!

d684c278cec246cf8b9eb62f1497660d.png


http://www.kler.cn/a/472654.html

相关文章:

  • 爬虫学习记录
  • 机器学习基础-机器学习的常用学习方法
  • 数据结构:LinkedList与链表—面试题(三)
  • 流媒体内网穿透/组网/网络映射EasyNTS上云网关启动失败如何解决?
  • QT 端口扫描附加功能实现 端口扫描5
  • 30天开发操作系统 第 12 天 -- 定时器
  • STM32-笔记38-I2C-oled实验
  • Improving Language Understanding by Generative Pre-Training GPT-1详细讲解
  • 10. C语言 函数详解
  • WPS-JS宏快速上手
  • Docker 容器自动化管理之脚本(Script for Docker Container Automation Management)
  • 【linux系统之redis6】redis的安装与初始化
  • 如何隐藏 Nginx 版本号 并自定义服务器信息,提升安全性
  • 联邦学习LoRA:推理合并权重公式:以及变体
  • 如何让用户在网页中填写PDF表格?
  • 【HTML+CSS+JS+VUE】web前端教程-1-VScode开发者工具快捷键
  • uniapp使用sm4加密
  • 创建管理表
  • Linux vi/vim 编辑器:功能强大的文本处理工具
  • 力扣刷题:数组OJ篇(上)
  • C++编程等级认证学习计划day1-1
  • [python3]Excel解析库-xlwt
  • 创建型模式-工厂模式
  • 继承(6)
  • C++ 中的 const 和 constexpr: 深入对比与最佳实践
  • HTML基础入门——简单网页页面