当前位置: 首页 > article >正文

PyQt下载M3U8文件

下载模型

from pathlib import Path
from typing import Any, Union
from pydantic import Field, BaseModel

class DownloadUrlModel(BaseModel):
    title: str = Field(..., description="文件名")
    save_path: Union[Path, str] = Field(..., description="文件路径")
    url: str = Field(..., description="m3u8文件路径")
    isM3u8: bool = Field(True, description="是否为m3u8文件")

M3U8解密

pip install pycryptodome

from Crypto.Cipher import AES
from Crypto.Util.Padding import pad


class DecodeByte:
    # 解密
    @staticmethod
    def do_decode(key, iv, data, method="AES-128") -> bytes:
        if isinstance(key, str):
            key = key.encode('utf-8')
        if isinstance(iv, str):
            iv = iv.encode('utf-8')
        if "AES-128" == method:
            aes = AES.new(key, AES.MODE_CBC, iv)
            if data and (len(data) % 16) != 0:
                data = pad(data, 16)
            return aes.decrypt(data)
        else:
            return None

下载线程

FFMPEG_EXE_PATHffmpeg.exe的绝对路径,若没有在官网上下载
ffmpeg下载M3U8没有进度显示,并且它是同步下载,会比较慢,使用httpx异步比较快

# coding = utf-8
import asyncio
import os
import re
import shutil
from pathlib import Path
from typing import Union

import httpx
from PySide6.QtCore import QThread, Signal

from ..models import DownloadUrlModel
from ..utils import HTTPRequest, DecodeByte
from ..config import FFMPEG_EXE_PATH


class DownloadM3U8Thread(QThread):
    loggerSignal = Signal(str)
    progressSignal = Signal(int)
    stopSignal = Signal(bool)

    def __init__(self, parent=None):
        super().__init__(parent)
        self.__is_stop = False
        self.__model: DownloadUrlModel = None
        self.finished_file: Path = None
        self.failed_file: Path = None
        self.temp_path: Path = None

        self.num = 0
        self.list_length = 0
        self.retry_max = 7
        # 解密信息
        self.cry = {
            "key": "",
            "iv": "",
            "method": "",
        }

        self.started.connect(self.startedSlot)

    def sendRequest(self, url: str, *, method='GET', **kwargs):
        response = httpx.request(method, url, verify=False, headers=HTTPRequest.headers, timeout=HTTPRequest.timeout,
                                 **kwargs)
        response.raise_for_status()
        return response

    async def aiohttp_send(self, client: httpx.AsyncClient, url: str, index: int, *, retry_count=0):
        ts_name = f'{str(index).zfill(6)}.ts'
        try:
            response = await client.get(url)
            self.save_data_file(response.content, ts_name)
            progress = round(self.num / self.list_length * 100, 3)
            self.loggerSignal.emit(f'下载进度: {progress} %')
            self.progressSignal.emit(int(progress))
        except httpx.ConnectError as e:
            await asyncio.sleep(4)
            if retry_count < self.retry_max:
                retry_count += 1
                await self.aiohttp_send(client, url, index, retry_count=retry_count)
            else:
                self.save_failed_file(f'{url}_{ts_name}')

    async def aiohttp_download(self, urls):
        async with httpx.AsyncClient(headers=HTTPRequest.headers, timeout=HTTPRequest.timeout) as client:
            tasks = []
            self.list_length = len(urls)
            for index, url in enumerate(urls, 1):
                if self.__is_stop:
                    self.wait()
                    self.quit()
                    break
                ts_name = f'{str(index).zfill(6)}.ts'
                if ts_name in self.open_finished_file():
                    continue
                task = asyncio.ensure_future(self.aiohttp_send(client, url, index))
                tasks.append(task)
            await asyncio.gather(*tasks)

    def get_full_ts_url(self, url: str, ts_name: str) -> str:
        """
        获取完整的ts文件url
        :param url: 原始url
        :param ts_name: ts文件名
        :return: str
        """
        if ts_name.startswith('http'):
            return ts_name
        tl = ts_name.split('/')
        new_url = []
        # 循环url,去掉ts name中重复的部分
        for s in url.split('/')[:-1]:
            if s in tl:
                tl.remove(s)
            new_url.append(s)
        # 拼接ts name
        new_url.extend(tl)
        result = '/'.join(new_url)
        return result

    def setCryInfo(self, text, url):
        # 获取加密参数
        x_key = re.findall('#EXT-X-KEY:(.*?)\n', text)
        cry_obj = dict()
        if len(x_key) > 0:
            # 提取
            for item in x_key[0].split(','):
                key = item.split('=')[0]
                value = item.replace(key, '')[1:].replace('"', '')
                cry_obj[key] = value
            # format
            if cry_obj.get('URI') and not cry_obj['URI'].startswith('http'):
                cry_obj['URI'] = self.get_full_ts_url(url, cry_obj['URI'])
            elif not cry_obj.get('URI'):
                cry_obj['URI'] = ''
            # 获取key
            res = self.sendRequest(cry_obj['URI'])
            self.cry['key'] = res.content
            # 加密方式
            self.cry['method'] = cry_obj.get('METHOD')
            # iv值
            if cry_obj.get('IV'):
                self.cry['iv'] = cry_obj['IV'][2:18]
        else:
            pass

    def save_data_file(self, data: bytes, ts_name: str):
        # 如果有加密,需要data解密后再存储
        if self.cry.get('key'):
            # 如果源文件有iv就读取,如果没有就用文件名
            iv = self.cry["iv"] if self.cry.get("iv") else ts_name.split('.')[0].zfill(16)
            data = DecodeByte.do_decode(self.cry["key"], iv, data, self.cry["method"])
            if not data:
                raise Exception('解密失败')
        # 保存
        with open(self.temp_path / ts_name, 'wb') as f:
            f.write(data)
            self.save_finished_file(ts_name)
            self.num += 1

    def open_failed_file(self) -> list:
        return self.failed_file.read_text(encoding='utf-8').split('\n')

    def save_failed_file(self, ts_name: str):
        with self.failed_file.open('a', encoding='utf-8') as f:
            f.write(ts_name + '\n')

    def open_finished_file(self) -> list:
        return self.finished_file.read_text(encoding='utf-8').split('\n')

    def save_finished_file(self, ts_name: str):
        with self.finished_file.open('a', encoding='utf-8') as f:
            f.write(ts_name + '\n')

    def save_ffmpeg_file(self, file_list: list):
        # 保存ffmpeg合并文件
        ffmpeg_file = self.temp_path / 'ffmpeg.txt'
        ffmpeg_file.touch(exist_ok=True)
        with ffmpeg_file.open('a+', encoding='utf-8') as f:
            for file in file_list:
                f.write(f"file '{file}'\n")
        return ffmpeg_file

    def combine_ts(self, source_path: Path, dest_file: Union[Path, str]):
        # 获取所有缓存文件
        file_list = [str(file.name) for file in source_path.glob('**/*.ts')]
        if not file_list:
            return
        # 名称排序
        file_list.sort(key=lambda s: s.split('.')[0])
        ffmpeg_txt = self.save_ffmpeg_file(file_list)
        # 文件总数
        length = len(file_list)
        # 开始合并文件
        cmd = f'{FFMPEG_EXE_PATH} -f concat -safe 0 -i  {ffmpeg_txt}  -c  copy {dest_file}'
        os.system(cmd)
        # with open(dest_file, 'ab') as f:
        #     # 循环文件列表
        #     for i, file in enumerate(file_list, 1):
        #         # 读取每个文件
        #         with open(os.path.join(source_path, file), 'rb') as rf:
        #             # 把每个文件的内容 追加到同一个文件
        #             data = rf.read()
        #             f.write(data)
        #         # 打印进度
        #         self.loggerSignal.emit('合并中: {:3.2f}%'.format(i / length * 100))
        # # 移除缓存文件夹
        self.sleep(2)
        try:
            shutil.rmtree(source_path.parent)
        except Exception as e:
            self.loggerSignal.emit(f'删除文件夹错误:{e}')

    def run(self):
        url = self.__model.url
        save_path = Path(self.__model.save_path)
        video_path = save_path / self.__model.title
        self.temp_path = video_path / 'temp'
        self.finished_file = self.temp_path / 'finished.txt'
        self.failed_file = self.temp_path / 'failed.txt'

        self.temp_path.mkdir(parents=True, exist_ok=True)
        self.finished_file.touch(exist_ok=True)
        self.failed_file.touch(exist_ok=True)

        try:
            self.loggerSignal.emit(f"开始加载m3u8文件")
            m3u8_text = self.sendRequest(url).text
            self.setCryInfo(m3u8_text, url)
            new_urls = []
            self.loggerSignal.emit("解析m3u8文件")
            for line in m3u8_text.split('\n'):
                if not line.startswith('#'):
                    new_urls.append(self.get_full_ts_url(url, line))
            self.loggerSignal.emit('开始下载视频...')
            asyncio.run(self.aiohttp_download(new_urls))
            self.loggerSignal.emit('开始合并文件...')
            self.combine_ts(self.temp_path, str(save_path / f'{self.__model.title}.mp4'))
            self.loggerSignal.emit(f"视频下载完成")
        except Exception as e:
            self.loggerSignal.emit(f"视频下载失败")
            return

    def stop(self):
        self.__is_stop = True
        self.stopSignal.emit(True)
        self.loggerSignal.emit('下载已停止')

    def startedSlot(self):
        self.__is_stop = False
        self.stopSignal.emit(False)

    def setDownloadTask(self, model: DownloadUrlModel):
        self.__model = model

combine_ts

使用open方法写的合并函数,会出现格式错误的问题,ffmpeg没有此问题

    def combine_ts(self, source_path: Path, dest_file: Union[Path, str]):
        # 获取所有缓存文件
        pass

http://www.kler.cn/a/465695.html

相关文章:

  • 【JAVA】用于控制流程的关键字 break、continue、return 使用场景,注意事项和实例
  • 【Rust自学】10.6. 生命周期 Pt.2:生命周期的语法与例子
  • Jdk动态代理源码缓存优化比较(JDK17比JDK8)
  • k8s基础(3)—Kubernetes-Deployment
  • 【信息系统项目管理师】高分论文:论信息系统项目的风险管理(数字化联合审查管理系统)
  • 【MongoDB详解】
  • TK730 不允许更改资源库或跨客户端定制
  • JavaEE 初阶:线程(2)
  • RabbitMQ的常见面试题及其答案的总结
  • 美团商家端 字符验证码 分析
  • 使用npm 插件[mmdc]将.mmd时序图转换为图片
  • VuePress2配置unocss的闭坑指南
  • 适配器模式(类适配器,对象适配器)
  • 高频java面试题
  • 用语言模型 GLM-Zero-Preview 来驱动战场推演
  • 数据挖掘——支持向量机分类器
  • Centos源码安装MariaDB 基于GTID主从部署(一遍过)
  • Redis面试相关
  • vue2框架配置路由设计打印单
  • 【Axios使用手册】如何使用axios向后端发送请求并进行数据交互
  • 利用PHP爬虫获取1688按关键字搜索商品:技术解析与实践指南
  • 【C语言程序设计——循环程序设计】枚举法换硬币(头歌实践教学平台习题)【合集】
  • 【HTTP和gRPC的区别】协议类型/传输效率/性能/语义/跨语言支持/安全性/使用场景/易用性对比
  • Kafka详解 ③ | Kafka集群操作与API操作
  • 常用的聚合函数
  • TCPDump参数详解及示例