爬虫下载B站视频简单程序(仅供学习)
请输入视频链接:https://www.bilibili.com/video/BV1owFSeREoh (示例地址) 程序显示结果如下:
下载进度: 100.00%
下载完成
视频已保存到: ./video.mp4
核心功能
1. 视频信息解析
- 自动识别B站视频格式(新版DASH/旧版FLV)
- 提取最高画质视频流地址
- 支持分段视频检测
2.下载
- 多线程分块下载(1MB/块)
- 自动重试机制(最多5次)
- 实时下载进度显示
- 支持代理和反爬策略
- 通过正则提取B站视频核心数据
- 包含分辨率、码率、分段地址等关键信息 re.search(r'<script>window\.__playinfo__=(.*?)</script>', html)
注意:
- 仅下载已获得观看权限的内容
- 本代码仅供学习使用,请遵守B站用户协议和版权法规
import re
import json
import requests
from urllib.parse import urlparse
import subprocess
import time
class BilibiliDownloader:
def __init__(self, url):
self.url = url
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Referer': 'https://www.bilibili.com/'
}
def get_video_info(self):
# 获取视频页面源码
response = requests.get(self.url, headers=self.headers)
html = response.text
# 提取JSON格式的视频信息
match = re.search(r'<script>window\.__playinfo__=(.*?)</script>', html)
if match:
return json.loads(match.group(1))
else:
raise Exception("无法提取视频信息")
def download_video(self, save_path='./video.mp4'):
# 获取视频下载链接
video_info = self.get_video_info()
# 处理不同视频格式(优先dash格式)
try:
if video_info['data'].get('dash'):
# Dash格式视频流
video_list = video_info['data']['dash']['video']
best_quality = max(video_list, key=lambda x: x['bandwidth'])
video_url = best_quality['baseUrl']
else:
# 旧版FLV格式
video_list = video_info['data']['durl']
best_quality = max(video_list, key=lambda x: x['size'])
video_url = best_quality['url']
# 处理分段视频(需要合并多个flv)
if len(video_list) > 1:
print("检测到分段视频,需要合并多个文件...")
return self._download_flv_segments(video_list, save_path)
except KeyError as e:
raise Exception(f"不支持的视频格式: {str(e)}")
# 使用requests流式下载(带重试机制)
max_retries = 5
timeout = 15 # 设置超时时间(秒)
chunk_size = 1024 * 1024 # 1MB chunks
for attempt in range(max_retries):
try:
with requests.get(video_url, headers=self.headers,
stream=True, timeout=timeout) as r:
r.raise_for_status()
total_size = int(r.headers.get('content-length', 0))
with open(save_path, 'wb') as f:
downloaded = 0
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk: # 过滤保持连接的空白chunk
f.write(chunk)
downloaded += len(chunk)
print(f"\r下载进度: {downloaded/total_size:.2%}", end='')
print("\n下载完成")
break
except (requests.exceptions.Timeout,
requests.exceptions.ConnectionError) as e:
print(f"\n第 {attempt+1} 次尝试失败: {str(e)}")
if attempt == max_retries - 1:
raise Exception("多次尝试下载失败,请检查网络连接")
time.sleep(2 ** attempt) # 指数退避
# 合并音视频(需要ffmpeg)
self._merge_audio_video(save_path)
print(f"视频已保存到: {save_path}")
def _download_flv_segments(self, segments, save_path):
"""下载并合并FLV分段视频"""
temp_files = []
for i, seg in enumerate(segments):
part_url = seg['url']
part_file = f"{save_path}.part{i}"
temp_files.append(part_file)
with requests.get(part_url, headers=self.headers, stream=True) as r:
with open(part_file, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
self._merge_files(temp_files, save_path)
if __name__ == "__main__":
video_url = input("请输入视频链接:")
downloader = BilibiliDownloader(video_url)
downloader.download_video()