PyQt下载M3U8文件
下载模型
from pathlib import Path
from typing import Any, Union
from pydantic import Field, BaseModel
class DownloadUrlModel(BaseModel):
title: str = Field(..., description="文件名")
save_path: Union[Path, str] = Field(..., description="文件路径")
url: str = Field(..., description="m3u8文件路径")
isM3u8: bool = Field(True, description="是否为m3u8文件")
M3U8解密
pip install pycryptodome
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
class DecodeByte:
# 解密
@staticmethod
def do_decode(key, iv, data, method="AES-128") -> bytes:
if isinstance(key, str):
key = key.encode('utf-8')
if isinstance(iv, str):
iv = iv.encode('utf-8')
if "AES-128" == method:
aes = AES.new(key, AES.MODE_CBC, iv)
if data and (len(data) % 16) != 0:
data = pad(data, 16)
return aes.decrypt(data)
else:
return None
下载线程
FFMPEG_EXE_PATH
为ffmpeg.exe
的绝对路径,若没有在官网上下载
ffmpeg
下载M3U8没有进度显示,并且它是同步下载,会比较慢,使用httpx异步比较快
# coding = utf-8
import asyncio
import os
import re
import shutil
from pathlib import Path
from typing import Union
import httpx
from PySide6.QtCore import QThread, Signal
from ..models import DownloadUrlModel
from ..utils import HTTPRequest, DecodeByte
from ..config import FFMPEG_EXE_PATH
class DownloadM3U8Thread(QThread):
loggerSignal = Signal(str)
progressSignal = Signal(int)
stopSignal = Signal(bool)
def __init__(self, parent=None):
super().__init__(parent)
self.__is_stop = False
self.__model: DownloadUrlModel = None
self.finished_file: Path = None
self.failed_file: Path = None
self.temp_path: Path = None
self.num = 0
self.list_length = 0
self.retry_max = 7
# 解密信息
self.cry = {
"key": "",
"iv": "",
"method": "",
}
self.started.connect(self.startedSlot)
def sendRequest(self, url: str, *, method='GET', **kwargs):
response = httpx.request(method, url, verify=False, headers=HTTPRequest.headers, timeout=HTTPRequest.timeout,
**kwargs)
response.raise_for_status()
return response
async def aiohttp_send(self, client: httpx.AsyncClient, url: str, index: int, *, retry_count=0):
ts_name = f'{str(index).zfill(6)}.ts'
try:
response = await client.get(url)
self.save_data_file(response.content, ts_name)
progress = round(self.num / self.list_length * 100, 3)
self.loggerSignal.emit(f'下载进度: {progress} %')
self.progressSignal.emit(int(progress))
except httpx.ConnectError as e:
await asyncio.sleep(4)
if retry_count < self.retry_max:
retry_count += 1
await self.aiohttp_send(client, url, index, retry_count=retry_count)
else:
self.save_failed_file(f'{url}_{ts_name}')
async def aiohttp_download(self, urls):
async with httpx.AsyncClient(headers=HTTPRequest.headers, timeout=HTTPRequest.timeout) as client:
tasks = []
self.list_length = len(urls)
for index, url in enumerate(urls, 1):
if self.__is_stop:
self.wait()
self.quit()
break
ts_name = f'{str(index).zfill(6)}.ts'
if ts_name in self.open_finished_file():
continue
task = asyncio.ensure_future(self.aiohttp_send(client, url, index))
tasks.append(task)
await asyncio.gather(*tasks)
def get_full_ts_url(self, url: str, ts_name: str) -> str:
"""
获取完整的ts文件url
:param url: 原始url
:param ts_name: ts文件名
:return: str
"""
if ts_name.startswith('http'):
return ts_name
tl = ts_name.split('/')
new_url = []
# 循环url,去掉ts name中重复的部分
for s in url.split('/')[:-1]:
if s in tl:
tl.remove(s)
new_url.append(s)
# 拼接ts name
new_url.extend(tl)
result = '/'.join(new_url)
return result
def setCryInfo(self, text, url):
# 获取加密参数
x_key = re.findall('#EXT-X-KEY:(.*?)\n', text)
cry_obj = dict()
if len(x_key) > 0:
# 提取
for item in x_key[0].split(','):
key = item.split('=')[0]
value = item.replace(key, '')[1:].replace('"', '')
cry_obj[key] = value
# format
if cry_obj.get('URI') and not cry_obj['URI'].startswith('http'):
cry_obj['URI'] = self.get_full_ts_url(url, cry_obj['URI'])
elif not cry_obj.get('URI'):
cry_obj['URI'] = ''
# 获取key
res = self.sendRequest(cry_obj['URI'])
self.cry['key'] = res.content
# 加密方式
self.cry['method'] = cry_obj.get('METHOD')
# iv值
if cry_obj.get('IV'):
self.cry['iv'] = cry_obj['IV'][2:18]
else:
pass
def save_data_file(self, data: bytes, ts_name: str):
# 如果有加密,需要data解密后再存储
if self.cry.get('key'):
# 如果源文件有iv就读取,如果没有就用文件名
iv = self.cry["iv"] if self.cry.get("iv") else ts_name.split('.')[0].zfill(16)
data = DecodeByte.do_decode(self.cry["key"], iv, data, self.cry["method"])
if not data:
raise Exception('解密失败')
# 保存
with open(self.temp_path / ts_name, 'wb') as f:
f.write(data)
self.save_finished_file(ts_name)
self.num += 1
def open_failed_file(self) -> list:
return self.failed_file.read_text(encoding='utf-8').split('\n')
def save_failed_file(self, ts_name: str):
with self.failed_file.open('a', encoding='utf-8') as f:
f.write(ts_name + '\n')
def open_finished_file(self) -> list:
return self.finished_file.read_text(encoding='utf-8').split('\n')
def save_finished_file(self, ts_name: str):
with self.finished_file.open('a', encoding='utf-8') as f:
f.write(ts_name + '\n')
def save_ffmpeg_file(self, file_list: list):
# 保存ffmpeg合并文件
ffmpeg_file = self.temp_path / 'ffmpeg.txt'
ffmpeg_file.touch(exist_ok=True)
with ffmpeg_file.open('a+', encoding='utf-8') as f:
for file in file_list:
f.write(f"file '{file}'\n")
return ffmpeg_file
def combine_ts(self, source_path: Path, dest_file: Union[Path, str]):
# 获取所有缓存文件
file_list = [str(file.name) for file in source_path.glob('**/*.ts')]
if not file_list:
return
# 名称排序
file_list.sort(key=lambda s: s.split('.')[0])
ffmpeg_txt = self.save_ffmpeg_file(file_list)
# 文件总数
length = len(file_list)
# 开始合并文件
cmd = f'{FFMPEG_EXE_PATH} -f concat -safe 0 -i {ffmpeg_txt} -c copy {dest_file}'
os.system(cmd)
# with open(dest_file, 'ab') as f:
# # 循环文件列表
# for i, file in enumerate(file_list, 1):
# # 读取每个文件
# with open(os.path.join(source_path, file), 'rb') as rf:
# # 把每个文件的内容 追加到同一个文件
# data = rf.read()
# f.write(data)
# # 打印进度
# self.loggerSignal.emit('合并中: {:3.2f}%'.format(i / length * 100))
# # 移除缓存文件夹
self.sleep(2)
try:
shutil.rmtree(source_path.parent)
except Exception as e:
self.loggerSignal.emit(f'删除文件夹错误:{e}')
def run(self):
url = self.__model.url
save_path = Path(self.__model.save_path)
video_path = save_path / self.__model.title
self.temp_path = video_path / 'temp'
self.finished_file = self.temp_path / 'finished.txt'
self.failed_file = self.temp_path / 'failed.txt'
self.temp_path.mkdir(parents=True, exist_ok=True)
self.finished_file.touch(exist_ok=True)
self.failed_file.touch(exist_ok=True)
try:
self.loggerSignal.emit(f"开始加载m3u8文件")
m3u8_text = self.sendRequest(url).text
self.setCryInfo(m3u8_text, url)
new_urls = []
self.loggerSignal.emit("解析m3u8文件")
for line in m3u8_text.split('\n'):
if not line.startswith('#'):
new_urls.append(self.get_full_ts_url(url, line))
self.loggerSignal.emit('开始下载视频...')
asyncio.run(self.aiohttp_download(new_urls))
self.loggerSignal.emit('开始合并文件...')
self.combine_ts(self.temp_path, str(save_path / f'{self.__model.title}.mp4'))
self.loggerSignal.emit(f"视频下载完成")
except Exception as e:
self.loggerSignal.emit(f"视频下载失败")
return
def stop(self):
self.__is_stop = True
self.stopSignal.emit(True)
self.loggerSignal.emit('下载已停止')
def startedSlot(self):
self.__is_stop = False
self.stopSignal.emit(False)
def setDownloadTask(self, model: DownloadUrlModel):
self.__model = model
combine_ts
使用
open
方法写的合并函数,会出现格式错误的问题,ffmpeg
没有此问题
def combine_ts(self, source_path: Path, dest_file: Union[Path, str]):
# 获取所有缓存文件
pass