当前位置: 首页 > article >正文

【实用技巧】如何优雅的批量保存网页快照?

正在研究LLM,师妹跑过来问我:"师兄,能不能写个程序,帮我批量保存一下网页快照。"我心想,保存网页快照,这不简单?但我研究一番发现,似乎没那么简单。

1. 初级版本

起初我认为,保存网页快照这种事,应该已经被研究透了,于是让DeepSeek生成一个Python代码。

import os
import requests
from datetime import datetime

# 存储快照的目录
output_directory = "save_snapshots"

# 如果快照目录不存在,创建它
if not os.path.exists(output_directory):
    os.makedirs(output_directory)


def save_snapshot(url):
    # 获取当前时间格式化为字符串
    current_time = datetime.now().strftime("%Y_%m_%d %H-%M-%S")

    try:
        # 请求网页内容
        response = requests.get(url)
        response.raise_for_status()  # 如果请求失败,会抛出异常

        # 生成保存的HTML文件路径
        filename = f"snapshot_{current_time}.html"  # 使用当前时间作为文件名
        file_path = os.path.join(output_directory, filename)

        # 保存HTML文件
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(response.text)

        print(f"保存快照:{filename}")
    except requests.exceptions.RequestException as e:
        print(f"请求失败:{url} - 错误: {e}")


if __name__ == "__main__":
    # 待保存的URL
    url = "https://zstar.blog.csdn.net/"
    # 保存快照
    save_snapshot(url)
    print("处理完成!")

这个版本非常简单,直接将requests.get访问到内容保存成html,这样做显然是有问题的,因为大部分网页并不是把所有内容写在主体文件,而是根据主体文件的内容去索引渲染,合成最终的html。这样操作就会导致大量内容缺失。

2. 中级版本

如果直接访问会造成内容缺失,那么用浏览器模拟访问就可以了。于是让DeepSeek用
selenium再实现一下:

import os
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

# 存储快照的目录
output_directory = "save_snapshots"

# 如果快照目录不存在,创建它
if not os.path.exists(output_directory):
    os.makedirs(output_directory)

def save_snapshot(url):
    # 获取当前时间格式化为字符串
    current_time = datetime.now().strftime("%Y_%m_%d %H-%M-%S")

    # 设置 Selenium WebDriver(以 Chrome 为例)
    options = webdriver.ChromeOptions()
    options.add_argument("--headless")  # 无头模式,不显示浏览器窗口
    options.add_argument("--disable-gpu")  # 禁用 GPU 加速
    options.add_argument("--no-sandbox")  # 禁用沙盒模式

    # 初始化 WebDriver
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)

    try:
        # 打开网页
        driver.get(url)

        # 获取网页内容(包括动态加载的内容)
        page_source = driver.page_source

        # 生成保存的HTML文件路径
        filename = f"snapshot_{current_time}.html"  # 使用当前时间作为文件名
        file_path = os.path.join(output_directory, filename)

        # 保存HTML文件
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(page_source)

        print(f"保存快照:{filename}")
    except Exception as e:
        print(f"请求失败:{url} - 错误: {e}")
    finally:
        # 关闭浏览器
        driver.quit()


if __name__ == "__main__":
    # 待保存的URL
    url = "https://zstar.blog.csdn.net/"
    # 保存快照
    save_snapshot(url)
    print("处理完成!")

这样做已经比初级版本好了不少,但仍存在问题,很多网站即使已经加载了完整信息,很多图片/样式等信息,都是通过链接动态引入的,这样做会导致这些网站会出现样式混乱的情况。

3. 高级版本

既然中级版本有这个缺点,那么,如果在保存时,预先下载所有需要联网加载的依赖文件,嵌入html网页中,似乎就能解决这个问题。

import base64
import os
import re
import requests
import concurrent.futures
from datetime import datetime
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup


class WebPageSnapshot:
    def __init__(self):
        self.session = requests.Session()
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        }
        # 用于存储已下载的资源
        self.resource_cache = {}

    def download_resource(self, url, base_url):
        """下载并转换资源为base64,带缓存功能"""
        if not url or url.startswith('data:'):
            return url

        try:
            # 检查缓存
            cache_key = urljoin(base_url, url)
            if cache_key in self.resource_cache:
                return self.resource_cache[cache_key]

            # 下载资源
            abs_url = urljoin(base_url, url)
            response = self.session.get(abs_url, headers=self.headers, timeout=10)
            if response.status_code == 200:
                content_type = response.headers.get('content-type', '').split(';')[0]
                if not content_type:
                    # 根据文件扩展名推测内容类型
                    ext = os.path.splitext(urlparse(url).path)[1].lower()
                    content_type = {
                        '.jpg': 'image/jpeg',
                        '.jpeg': 'image/jpeg',
                        '.png': 'image/png',
                        '.gif': 'image/gif',
                        '.css': 'text/css',
                        '.js': 'application/javascript'
                    }.get(ext, 'application/octet-stream')

                data_uri = f"data:{content_type};base64,{base64.b64encode(response.content).decode('utf-8')}"
                self.resource_cache[cache_key] = data_uri
                return data_uri
        except Exception as e:
            pass
        return url

    def process_css_simple(self, css_text, base_url):
        """使用正则表达式处理CSS"""

        def replace_url(match):
            url = match.group(1).strip('\'"')
            if url.startswith('data:'):
                return f'url({url})'
            return f'url("{self.download_resource(url, base_url)}")'

        # 处理 url() 函数
        css_text = re.sub(r'url\([\'"]?(.*?)[\'"]?\)', replace_url, css_text)
        return css_text

    def process_inline_css(self, style_text, base_url):
        """处理内联CSS样式"""
        if not style_text:
            return style_text
        return self.process_css_simple(style_text, base_url)

    def download_resources_parallel(self, resources):
        """并行下载资源"""
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = []
            for url, base_url in resources:
                if url and not url.startswith('data:'):
                    futures.append(executor.submit(self.download_resource, url, base_url))
            concurrent.futures.wait(futures)

    def save_page(self, url, output_path):
        """保存网页为单文件"""
        try:
            print(f"Downloading page: {url}")
            response = self.session.get(url, headers=self.headers, timeout=30)
            response.raise_for_status()

            soup = BeautifulSoup(response.text, 'html.parser')

            # 收集所有需要下载的资源
            resources = []

            # 收集图片资源
            for img in soup.find_all('img'):
                src = img.get('src')
                if src:
                    resources.append((src, url))

            # 收集CSS资源
            for link in soup.find_all('link', rel='stylesheet'):
                href = link.get('href')
                if href:
                    resources.append((href, url))

            # 收集JavaScript资源
            for script in soup.find_all('script', src=True):
                src = script.get('src')
                if src:
                    resources.append((src, url))

            # 并行下载所有资源
            self.download_resources_parallel(resources)

            # 处理图片
            for img in soup.find_all('img'):
                src = img.get('src')
                if src:
                    new_src = self.download_resource(src, url)
                    img['src'] = new_src

            # 处理内联样式
            for tag in soup.find_all(style=True):
                style = tag.get('style')
                if style:
                    tag['style'] = self.process_inline_css(style, url)

            # 处理样式标签
            for style in soup.find_all('style'):
                if style.string:
                    style.string = self.process_css_simple(style.string, url)

            # 处理外部样式表
            for link in soup.find_all('link', rel='stylesheet'):
                href = link.get('href')
                if href:
                    css_url = urljoin(url, href)
                    try:
                        css_content = self.download_resource(href, url)
                        if css_content.startswith('data:'):
                            # 创建新的style标签
                            new_style = soup.new_tag('style')
                            # 提取base64编码的CSS内容
                            css_data = css_content.split('base64,')[1]
                            css_text = base64.b64decode(css_data).decode('utf-8')
                            new_style.string = self.process_css_simple(css_text, url)
                            link.replace_with(new_style)
                    except Exception as e:
                        print(f"Error processing stylesheet {href}: {str(e)}")

            # 处理JavaScript
            for script in soup.find_all('script', src=True):
                src = script.get('src')
                if src:
                    new_src = self.download_resource(src, url)
                    script['src'] = new_src

            # 添加元信息
            meta = soup.new_tag('meta')
            meta['name'] = 'snapshot-source'
            meta['content'] = url
            soup.head.append(meta)

            meta = soup.new_tag('meta')
            meta['name'] = 'snapshot-date'
            meta['content'] = datetime.now().isoformat()
            soup.head.append(meta)

            # 保存文件
            with open(output_path, 'w', encoding='utf-8') as f:
                f.write(str(soup.prettify()))

        except Exception as e:
            print(f"Error saving page: {str(e)}")
            raise


if __name__ == '__main__':
    # 创建实例
    snapshot = WebPageSnapshot()
    
    # 保存网页
    snapshot.save_page("https://zstar.blog.csdn.net/", "output.html")

但是这个版本仍存在以下三点问题:

  1. 对于视频类资源并没有良好适配
  2. 对图像是原始尺寸进行保存,导致快照文件体积较大
  3. 没有将JavaScript代码删减干净,导致打开文件时会出现加载的过程

4. 最终版本

仔细一分析,发现保存网页快照这细节还挺多,于是想看一下其它工具的处理方式。
找到了一个开源的Chrome插件,叫SingleFile

考虑到其是用Javascript写的,直接在python中调用不方便,于是顺着这个思路,在github上找到了一个python的SingleFile的实现方式,叫pySingleFile。

下载下来发现,它的实现思路和我上面的高级版本大差不差,仍然未能有效解决好高级版本提到的三个问题。

再进一步扩大搜索范围,发现有人实现了一个SingleFile的命令行调用版本,叫single-file-cli,地址:https://github.com/gildas-lormeau/single-file-cli

于是,想到了可以直接通过python外调cli的方式,实现single-file的功能。

主体代码如下:

import os
import logging
import argparse
import subprocess
import tempfile
import time
import glob

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


class SingleFileCLI:
    def __init__(self, url, output_path=None, singlefile_cmd="single-file"):
        """
        初始化SingleFileCLI类

        Args:
            url: 要保存的网页URL
            output_path: 输出文件路径
            singlefile_cmd: SingleFile CLI命令
        """
        self.url = url
        self.output_path = output_path or f"output_{int(time.time())}.html"
        self.singlefile_cmd = singlefile_cmd

        # 确保输出目录存在
        output_dir = os.path.dirname(os.path.abspath(self.output_path))
        if output_dir:
            os.makedirs(output_dir, exist_ok=True)

    def save_page(self):
        """使用SingleFile CLI保存页面"""
        try:
            logger.info(f"正在保存页面: {self.url}")

            # 构建命令
            cmd = [
                self.singlefile_cmd,
                self.url,
                self.output_path,
                "--browser-width", "1920",
                "--browser-height", "1080",
                "--block-audios",
                "--block-scripts",
                "--block-videos"
            ]

            # 执行命令
            logger.info(f"执行命令: {' '.join(cmd)}")

            # 使用subprocess.Popen启动进程
            process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                errors='replace'
            )

            # 定义最大等待时间和检查间隔
            max_wait_time = 60  # 最大等待60秒
            check_interval = 2  # 每2秒检查一次
            elapsed_time = 0

            # 循环检查文件是否已生成
            while elapsed_time < max_wait_time:
                # 检查进程是否已结束
                if process.poll() is not None:
                    stdout, stderr = process.communicate()
                    logger.info(f"进程已结束,返回码: {process.returncode}")
                    if stdout:
                        logger.info(f"输出: {stdout}")
                    if stderr:
                        logger.warning(f"错误: {stderr}")
                    break

                # 检查文件是否已生成
                if os.path.exists(self.output_path) and os.path.getsize(self.output_path) > 0:
                    file_size = os.path.getsize(self.output_path)
                    logger.info(f"文件已生成: {self.output_path} (大小: {file_size} 字节)")

                    # 再等待几秒确保文件写入完成
                    time.sleep(3)

                    # 终止进程
                    logger.info("文件已生成,终止进程...")
                    process.terminate()
                    try:
                        process.wait(timeout=5)
                    except subprocess.TimeoutExpired:
                        logger.warning("进程未能正常终止,强制结束")
                        process.kill()

                    return True

                # 等待检查间隔
                time.sleep(check_interval)
                elapsed_time += check_interval

            # 如果超时且进程仍在运行,强制终止
            if process.poll() is None:
                logger.warning(f"等待超时 ({max_wait_time}秒),强制终止进程")
                process.kill()
                process.wait()

            # 最后检查文件是否存在
            if os.path.exists(self.output_path) and os.path.getsize(self.output_path) > 0:
                file_size = os.path.getsize(self.output_path)
                logger.info(f"文件已保存: {self.output_path} (大小: {file_size} 字节)")
                return True
            else:
                logger.error(f"未能找到保存的文件: {self.output_path}")
                return False

        except Exception as e:
            logger.error(f"保存页面失败: {e}")
            return False


def main():
    """命令行入口"""
    parser = argparse.ArgumentParser(description="使用SingleFile CLI保存网页")
    parser.add_argument("--url", default="https://zstar.blog.csdn.net/", help="要保存的网页URL")
    parser.add_argument("--output", default=r"outputs.html", help="输出文件路径")
    parser.add_argument("--singlefile-cmd", default=r"D:\Code\cil_singfile\single-file-cli\single-file.bat",
                        help="SingleFile CLI命令")

    args = parser.parse_args()

    try:
        saver = SingleFileCLI(
            url=args.url,
            output_path=args.output,
            singlefile_cmd=args.singlefile_cmd
        )
        success = saver.save_page()

        if success:
            logger.info("页面保存成功")
        else:
            logger.error("页面保存失败")

    except Exception as e:
        logger.error(f"运行失败: {e}")


if __name__ == "__main__":
    main()

除了运行用时较长一些之外,保存效果非常不错。

最终版本需要配合denosingle-file-cli,single-file-cli的维护有些问题,最新版本的反而会出问题,因此我将代码和相关依赖都放到了自己的仓库下。

web-snapshots:https://github.com/zstar1003/web-snapshots


http://www.kler.cn/a/586664.html

相关文章:

  • 每日复盘20250314
  • 零基础上手Python数据分析 (5):Python文件操作 - 轻松读写,数据导入导出不再是难题
  • 零基础上手Python数据分析 (3):Python核心语法快速入门 (下) - 程序流程控制、函数与模块
  • 嵌入式八股,为什么单片机中不使用malloc函数
  • 基于Asp.net的物流配送管理系统
  • CockroachDB MCP -cursor适用
  • 基于NXP+FPGA轨道交通3U机箱结构逻辑控制单元(LCU)
  • 虚拟机docker连接mysql的ip地址在哪里查看?
  • C 语言实战:打造字符串加密器及实验要点解析
  • 2018年全国职业院校技能大赛高职组-计算机网络应用竞赛竞赛样题C卷
  • 使用 Redis 实现接口缓存:提升性能的完整指南
  • 第5章 构造、析构、拷贝语义学3:对象复制语意学
  • 【每日学点HarmonyOS Next知识】抽屉效果、树状组件、离屏渲染、上下文获取、Tab声明周期
  • python 操作 mongodb 输出执行命令的日志
  • 2025-03-15 学习记录--C/C++-PTA 习题3-3 出租车计价
  • 历年华中科技大学计算机考研复试上机真题
  • 推荐系统基础
  • fastapi +angular​迷宫求解可跨域
  • 【现代深度学习技术】卷积神经网络06:卷积神经网络(LeNet)
  • Protobuf 学习与实践