当前位置: 首页 > article >正文

爬虫下载B站视频简单程序(仅供学习)

请输入视频链接:https://www.bilibili.com/video/BV1owFSeREoh (示例地址)                                 程序显示结果如下:              
下载进度: 100.00%
下载完成
视频已保存到: ./video.mp4

核心功能

1. 视频信息解析

  • 自动识别B站视频格式(新版DASH/旧版FLV)
  • 提取最高画质视频流地址
  • 支持分段视频检测

2.下载

  • 多线程分块下载(1MB/块)
  • 自动重试机制(最多5次)
  • 实时下载进度显示
  • 支持代理和反爬策略

- 通过正则提取B站视频核心数据

  • 包含分辨率、码率、分段地址等关键信息   re.search(r'<script>window\.__playinfo__=(.*?)</script>', html)

注意:

  • 仅下载已获得观看权限的内容
  • 本代码仅供学习使用,请遵守B站用户协议和版权法规
import re
import json
import requests
from urllib.parse import urlparse
import subprocess
import time

class BilibiliDownloader:
    def __init__(self, url):
        self.url = url
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Referer': 'https://www.bilibili.com/'
        }
    
    def get_video_info(self):
        # 获取视频页面源码
        response = requests.get(self.url, headers=self.headers)
        html = response.text
        
        # 提取JSON格式的视频信息
        match = re.search(r'<script>window\.__playinfo__=(.*?)</script>', html)
        if match:
            return json.loads(match.group(1))
        else:
            raise Exception("无法提取视频信息")

    def download_video(self, save_path='./video.mp4'):
        # 获取视频下载链接
        video_info = self.get_video_info()
        
        # 处理不同视频格式(优先dash格式)
        try:
            if video_info['data'].get('dash'):
                # Dash格式视频流
                video_list = video_info['data']['dash']['video']
                best_quality = max(video_list, key=lambda x: x['bandwidth'])
                video_url = best_quality['baseUrl']
            else:  
                # 旧版FLV格式
                video_list = video_info['data']['durl']
                best_quality = max(video_list, key=lambda x: x['size'])
                video_url = best_quality['url']
                
                # 处理分段视频(需要合并多个flv)
                if len(video_list) > 1:
                    print("检测到分段视频,需要合并多个文件...")
                    return self._download_flv_segments(video_list, save_path)

        except KeyError as e:
            raise Exception(f"不支持的视频格式: {str(e)}")
        
        # 使用requests流式下载(带重试机制)
        max_retries = 5
        timeout = 15  # 设置超时时间(秒)
        chunk_size = 1024 * 1024  # 1MB chunks
        
        for attempt in range(max_retries):
            try:
                with requests.get(video_url, headers=self.headers, 
                                stream=True, timeout=timeout) as r:
                    r.raise_for_status()
                    total_size = int(r.headers.get('content-length', 0))
                    
                    with open(save_path, 'wb') as f:
                        downloaded = 0
                        for chunk in r.iter_content(chunk_size=chunk_size):
                            if chunk:  # 过滤保持连接的空白chunk
                                f.write(chunk)
                                downloaded += len(chunk)
                                print(f"\r下载进度: {downloaded/total_size:.2%}", end='')
                    print("\n下载完成")
                    break
                    
            except (requests.exceptions.Timeout, 
                  requests.exceptions.ConnectionError) as e:
                print(f"\n第 {attempt+1} 次尝试失败: {str(e)}")
                if attempt == max_retries - 1:
                    raise Exception("多次尝试下载失败,请检查网络连接")
                time.sleep(2 ** attempt)  # 指数退避
        
        # 合并音视频(需要ffmpeg)
        self._merge_audio_video(save_path)
        print(f"视频已保存到: {save_path}")

    def _download_flv_segments(self, segments, save_path):
        """下载并合并FLV分段视频"""
        temp_files = []
        for i, seg in enumerate(segments):
            part_url = seg['url']
            part_file = f"{save_path}.part{i}"
            temp_files.append(part_file)
            
            with requests.get(part_url, headers=self.headers, stream=True) as r:
                with open(part_file, 'wb') as f:
                    for chunk in r.iter_content(chunk_size=8192):
                        f.write(chunk)
        self._merge_files(temp_files, save_path)

  

if __name__ == "__main__":
    video_url = input("请输入视频链接:")
    downloader = BilibiliDownloader(video_url)
    downloader.download_video() 


http://www.kler.cn/a/566390.html

相关文章:

  • 【考研】复试相关上机题目
  • 【西瓜书《机器学习》四五六章内容通俗理解】
  • IPoIB源码深度解析:如何基于TCP/IP协议栈实现高性能InfiniBand通信
  • Ubuntu 下 nginx-1.24.0 源码分析 - ngx_list_t
  • python-leetcode-颜色分类
  • Spark核心算子对比:`reduceByKey`与`groupByKey`源码级解析及生产调优指南
  • ESP32-S3 42引脚 语音控制模块、设备运转展示 GOOUUU TECH 果云科技S3-N16R8 控制舵机 LED开关 直流电机
  • 【Qt QML】QML鼠标事件全面解析
  • 家政一城一店融合小程序怎么开通,需要哪些资质?
  • 软件工程复试专业课-软件生命周期
  • 敏捷原则与实践(Agile principles and practices)
  • Docker02 - 深入理解Docker
  • linux有哪些常用命令?
  • CSS—引入方式、选择器、复合选择器、文字控制属性、CSS特性
  • DeepSeek在PiscTrace上完成个性化处理需求案例——光流法将烟雾动态可视化
  • 用大白话解释日志处理Log4j 是什么 有什么用 怎么用
  • Ubuntu 下 nginx-1.24.0 源码分析 - ngx_buf_t
  • PyQT6是干啥的?
  • 实战指南:安防管理平台搭建的完整步骤解析(一)
  • 【前端】react+ts 轮播图的实现