YOLOv11-ultralytics-8.3.67部分代码阅读笔记-downloads.py
downloads.py
ultralytics\utils\downloads.py
目录
downloads.py
1.所需的库和模块
2.def is_url(url, check=False):
3.def delete_dsstore(path, files_to_delete=(".DS_Store", "__MACOSX")):
4.def zip_directory(directory, compress=True, exclude=(".DS_Store", "__MACOSX"), progress=True):
5.def unzip_file(file, path=None, exclude=(".DS_Store", "__MACOSX"), exist_ok=False, progress=True):
6.def check_disk_space(url="https://ultralytics.com/assets/coco8.zip", path=Path.cwd(), sf=1.5, hard=True):
7.def get_google_drive_file_info(link):
8.def safe_download(url, file=None, dir=None, unzip=True, delete=False, curl=False, retry=3, min_bytes=1e0, exist_ok=False, progress=True,):
9.def get_github_assets(repo="ultralytics/assets", version="latest", retry=False):
10.def attempt_download_asset(file, repo="ultralytics/assets", release="v8.3.0", **kwargs):
11.def download(url, dir=Path.cwd(), unzip=True, delete=False, curl=False, threads=1, retry=3, exist_ok=False):
1.所需的库和模块
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
import re
import shutil
import subprocess
from itertools import repeat
from multiprocessing.pool import ThreadPool
from pathlib import Path
from urllib import parse, request
import requests
import torch
from ultralytics.utils import LOGGER, TQDM, checks, clean_url, emojis, is_online, url2file
# Define Ultralytics GitHub assets maintained at https://github.com/ultralytics/assets
# 这段代码定义了多个与GitHub仓库资产相关的变量,用于指定和管理不同版本的YOLO模型文件和其他相关资产。
# 定义GitHub仓库路径。
# 定义了一个变量 GITHUB_ASSETS_REPO ,表示GitHub仓库的路径。这个仓库用于存储YOLO模型文件和其他资产。
GITHUB_ASSETS_REPO = "ultralytics/assets"
# 定义GitHub资产文件名列表。
# 定义了一个变量 GITHUB_ASSETS_NAMES ,它是一个包含所有YOLO模型文件和其他资产文件名的列表。
# 使用列表推导式生成文件名列表,涵盖了不同版本的YOLO模型文件(如YOLOv3、YOLOv5、YOLOv8等)。
# 文件名包括不同的变体(如 -cls 、 -seg 、 -pose 等)和分辨率(如 6 )。
# 还包括一些特定的文件名,如 mobile_sam.pt 和 calibration_image_sample_data_20x128x128x3_float32.npy.zip 。
GITHUB_ASSETS_NAMES = (
[f"yolov8{k}{suffix}.pt" for k in "nsmlx" for suffix in ("", "-cls", "-seg", "-pose", "-obb", "-oiv7")]
+ [f"yolo11{k}{suffix}.pt" for k in "nsmlx" for suffix in ("", "-cls", "-seg", "-pose", "-obb")]
+ [f"yolov5{k}{resolution}u.pt" for k in "nsmlx" for resolution in ("", "6")]
+ [f"yolov3{k}u.pt" for k in ("", "-spp", "-tiny")]
+ [f"yolov8{k}-world.pt" for k in "smlx"]
+ [f"yolov8{k}-worldv2.pt" for k in "smlx"]
+ [f"yolov9{k}.pt" for k in "tsmce"]
+ [f"yolov10{k}.pt" for k in "nsmblx"]
+ [f"yolo_nas_{k}.pt" for k in "sml"]
+ [f"sam_{k}.pt" for k in "bl"]
+ [f"FastSAM-{k}.pt" for k in "sx"]
+ [f"rtdetr-{k}.pt" for k in "lx"]
+ ["mobile_sam.pt"]
+ ["calibration_image_sample_data_20x128x128x3_float32.npy.zip"]
)
# 定义GitHub资产文件名的茎(stem)。
# 定义了一个变量 GITHUB_ASSETS_STEMS ,它是一个包含所有YOLO模型文件和其他资产文件名的茎(stem)的列表。
# 使用 Path(k).stem 提取每个文件名的茎(即去除扩展名的部分)。
GITHUB_ASSETS_STEMS = [Path(k).stem for k in GITHUB_ASSETS_NAMES]
# 这段代码的主要功能是。定义GitHub仓库路径:指定存储YOLO模型文件和其他资产的GitHub仓库路径。定义GitHub资产文件名列表:生成一个包含所有YOLO模型文件和其他资产文件名的列表。定义GitHub资产文件名的茎:提取每个文件名的茎,方便后续处理。通过这些定义,可以方便地管理和引用GitHub仓库中的YOLO模型文件和其他资产,确保代码的灵活性和可维护性。
2.def is_url(url, check=False):
# 这段代码定义了一个函数 is_url ,用于检查给定的字符串是否是一个有效的 URL,并且可以选择性地检查该 URL 是否在线可访问。
# 定义了一个函数 is_url ,接收两个参数。
# 1.url :要检查的 URL,可以是字符串或任何可以转换为字符串的对象。
# 2.check :一个布尔值,默认为 False 。如果设置为 True ,函数会进一步检查 URL 是否在线可访问。
def is_url(url, check=False):
# 验证给定的字符串是否为 URL,并可选择检查 URL 是否在线存在。
"""
Validates if the given string is a URL and optionally checks if the URL exists online.
Args:
url (str): The string to be validated as a URL.
check (bool, optional): If True, performs an additional check to see if the URL exists online.
Defaults to False.
Returns:
(bool): Returns True for a valid URL. If 'check' is True, also returns True if the URL exists online.
Returns False otherwise.
Example:
```python
valid = is_url("https://www.example.com")
```
"""
# 开始一个 try 块,用于捕获可能出现的异常。
try:
# 将输入的 url 转换为字符串。这确保了即使输入是其他类型(如 Path 对象或字节序列),也能正确处理。
url = str(url)
# result = urlparse(urlstring, scheme='', allow_fragments=True)
# urlparse() 函数是 Python 标准库 urllib.parse 模块中的一个函数,用于解析 URL(统一资源定位符)并将其分解为组件。这个函数在处理网络地址时非常有用,因为它可以将复杂的 URL 分解成易于管理的部分。
# 参数 :
# urlstring : 要解析的 URL 字符串。
# scheme : (可选)如果提供,将用于覆盖 URL 中的方案部分。
# allow_fragments : (可选)一个布尔值,指示是否允许解析 URL 的片段部分(即 # 后面的部分)。默认为 True 。
# 返回值 :
# urlparse() 函数返回一个 ParseResult 对象,该对象包含以下属性 :
# scheme : URL 的方案部分(例如 http 、 https )。
# netloc : 网络位置部分(例如域名和端口)。
# path : URL 的路径部分。
# params : URL 的参数部分( ? 后面的部分)。
# query : URL 的查询部分( ? 后面的部分,不包括 # )。
# fragment : URL 的片段部分( # 后面的部分)。
# urlparse() 函数是处理 URL 的基础工具,常用于网络编程、Web 开发和任何需要解析或构造 URL 的场景。
# 使用 urllib.parse.urlparse 函数解析 URL。 urlparse 会将 URL 分解为几个部分,包括协议(scheme)、网络位置(netloc)、路径(path)等。
result = parse.urlparse(url)
# 使用 assert 语句检查解析结果中的协议( scheme )和网络位置( netloc )是否都存在。 如果 scheme 和 netloc 都存在,说明这是一个有效的 URL。 如果其中一个不存在, assert 会抛出 AssertionError 。
assert all([result.scheme, result.netloc]) # check if is url
# 如果 check 参数为 True ,则执行以下代码,进一步检查 URL 是否在线可访问。
if check:
# 使用 urllib.request.urlopen 打开 URL,并获取响应。 with 语句确保响应对象在使用后正确关闭。
with request.urlopen(url) as response:
# 检查响应的状态码是否为 200(HTTP 状态码 200 表示请求成功)。如果状态码为 200,说明 URL 在线可访问,返回 True ;否则返回 False 。
return response.getcode() == 200 # check if exists online
# 如果 check 参数为 False ,或者 check 参数为 True 且 URL 在线可访问,返回 True 。
return True
# 捕获 try 块中可能出现的任何异常。
except Exception:
# 如果捕获到异常,说明输入的 URL 不是有效的 URL,或者在检查在线状态时出现问题,返回 False 。
return False
# is_url 函数的主要功能是。检查 URL 格式:通过解析 URL,检查其是否包含协议(如 http 、 https )和网络位置(如域名或 IP 地址)。检查 URL 是否在线:如果 check 参数为 True ,进一步检查 URL 是否在线可访问(即 HTTP 状态码是否为 200)。
# 注意事项 :
# 异常处理 : except Exception 捕获所有异常,这可能会隐藏一些具体的错误信息。在实际使用中,可以根据需要捕获更具体的异常类型。
# 网络请求 :当 check=True 时,函数会发起网络请求。这可能会导致性能问题,特别是在频繁调用时。建议在需要时才启用 check 参数。
# 安全性 :在处理用户输入的 URL 时,需要注意防范潜在的安全问题,如注入攻击。
3.def delete_dsstore(path, files_to_delete=(".DS_Store", "__MACOSX")):
# 这段代码定义了一个名为 delete_dsstore 的函数,用于删除指定路径及其子目录中特定的文件(如 .DS_Store 和 __MACOSX 文件夹)。
# 定义了一个函数 delete_dsstore ,它接受两个参数。
# 1.path :表示要进行文件删除操作的目录路径。
# 2.files_to_delete :一个元组,用于指定需要删除的文件或文件夹名称,默认值为 (".DS_Store", "__MACOSX") 。这意味着如果没有指定其他文件类型,函数会默认删除 .DS_Store 文件和 __MACOSX 文件夹。
def delete_dsstore(path, files_to_delete=(".DS_Store", "__MACOSX")):
# 删除指定目录下的所有“.DS_store”文件。
# 注意:
# “.DS_store”文件由 Apple 操作系统创建,包含有关文件夹和文件的元数据。它们是隐藏的系统文件,在不同操作系统之间传输文件时可能会导致问题。
"""
Deletes all ".DS_store" files under a specified directory.
Args:
path (str, optional): The directory path where the ".DS_store" files should be deleted.
files_to_delete (tuple): The files to be deleted.
Example:
```python
from ultralytics.utils.downloads import delete_dsstore
delete_dsstore("path/to/dir")
```
Note:
".DS_store" files are created by the Apple operating system and contain metadata about folders and files. They
are hidden system files and can cause issues when transferring files between different operating systems.
"""
# 遍历 files_to_delete 元组中的每个文件或文件夹名称。例如,如果 files_to_delete 是默认值,那么会依次处理 .DS_Store 和 __MACOSX 。
for file in files_to_delete:
# 使用 Path 类(来自 pathlib 模块)来处理路径操作。
# Path(path).rglob(file) :在指定的 path 路径及其所有子目录中递归查找与 file 匹配的文件或文件夹路径。
# list() :将匹配到的路径对象转换为一个列表,存储在变量 matches 中。
matches = list(Path(path).rglob(file))
# 使用 LOGGER (是一个日志记录器对象)记录日志信息。 日志内容显示正在删除的文件类型( file )以及匹配到的文件路径列表( matches ),方便调试和跟踪。
LOGGER.info(f"Deleting {file} files: {matches}") # 删除 {file} 文件:{matches} 。
# 遍历 matches 列表中的每个路径对象 f 。
for f in matches:
# 调用 f.unlink() 方法删除当前路径对象 f 所指向的文件或文件夹。 unlink() 是 pathlib 模块中用于删除文件或文件夹的方法。
f.unlink()
# 这段代码通过递归搜索指定路径及其子目录,查找并删除指定的文件或文件夹(默认为 .DS_Store 和 __MACOSX )。它利用了 pathlib 模块的路径操作功能,并通过日志记录功能提供了操作的透明性。此函数在处理跨平台文件系统时非常有用,尤其是清理 macOS 系统在文件传输过程中生成的隐藏文件和文件夹。
4.def zip_directory(directory, compress=True, exclude=(".DS_Store", "__MACOSX"), progress=True):
# 这段代码定义了一个名为 zip_directory 的函数,用于将指定目录及其子目录中的文件打包成一个 ZIP 文件,同时支持压缩、排除特定文件以及显示进度条等功能。
# 定义了一个函数 zip_directory ,接受以下参数。
# 1.directory :要打包的目录路径。
# 2.compress :布尔值,表示是否对文件进行压缩,默认为 True 。
# 3.exclude :一个元组,用于指定需要排除的文件名,默认为 (".DS_Store", "__MACOSX") 。
# 4.progress :布尔值,表示是否显示进度条,默认为 True 。
def zip_directory(directory, compress=True, exclude=(".DS_Store", "__MACOSX"), progress=True):
# 压缩目录内容,排除包含排除列表中的字符串的文件。生成的 zip 文件以目录命名并放在旁边。
"""
Zips the contents of a directory, excluding files containing strings in the exclude list. The resulting zip file is
named after the directory and placed alongside it.
Args:
directory (str | Path): The path to the directory to be zipped.
compress (bool): Whether to compress the files while zipping. Default is True.
exclude (tuple, optional): A tuple of filename strings to be excluded. Defaults to ('.DS_Store', '__MACOSX').
progress (bool, optional): Whether to display a progress bar. Defaults to True.
Returns:
(Path): The path to the resulting zip file.
Example:
```python
from ultralytics.utils.downloads import zip_directory
file = zip_directory("path/to/dir")
```
"""
# 导入了 zipfile 模块中的常量和类。
# ZIP_DEFLATED :表示使用 DEFLATE 压缩算法。
# ZIP_STORED :表示不进行压缩,直接存储文件。
# ZipFile :用于操作 ZIP 文件的类。
from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile
# 调用了之前定义的 delete_dsstore 函数,删除指定目录及其子目录中的 .DS_Store 和 __MACOSX 文件夹(默认行为)。这一步确保在打包之前清理这些不必要的文件。
delete_dsstore(directory)
# 将传入的 directory 路径转换为 pathlib.Path 对象,方便后续的路径操作。
directory = Path(directory)
# 检查 directory 是否是一个存在的目录。
if not directory.is_dir():
# 如果不是,抛出 FileNotFoundError 异常,并提示目录不存在。
raise FileNotFoundError(f"Directory '{directory}' does not exist.") # 目录“{directory}”不存在。
# Unzip with progress bar
# 使用列表推导式 生成需要打包的文件列表 。
# directory.rglob("*") :递归查找目录及其子目录中的所有文件和文件夹。
# f.is_file() :确保只选择文件,排除文件夹。
# all(x not in f.name for x in exclude) :确保文件名中不包含 exclude 列表中的任何字符串(例如 .DS_Store 或 __MACOSX )。
files_to_zip = [f for f in directory.rglob("*") if f.is_file() and all(x not in f.name for x in exclude)]
# 使用 Path.with_suffix 方法,将目录路径的扩展名替换为 .zip ,生成 ZIP 文件的路径。
zip_file = directory.with_suffix(".zip")
# 根据 compress 参数的值选择压缩方式。
# 如果 compress=True ,使用 ZIP_DEFLATED (压缩)。
# 如果 compress=False ,使用 ZIP_STORED (不压缩)。
compression = ZIP_DEFLATED if compress else ZIP_STORED
# ZipFile(file, mode='r', compression=ZIP_STORED, allowZip64=True, compresslevel=None, *, strict_timestamps=True)
# ZipFile 类是Python标准库 zipfile 模块中的一个类,用于读取和写入ZIP文件。
# 参数解释 :
# file :可以是一个文件名(字符串),也可以是一个类文件对象(如 io.BytesIO )。如果是一个文件名, ZipFile 将打开这个文件进行读写操作;如果是一个类文件对象, ZipFile 将使用这个对象进行操作。
# mode :指定打开文件的模式,默认为 'r' ,表示只读模式。其他常用模式包括 : 'w' 写入模式,如果文件已存在,将被覆盖。 'a' 追加模式,如果文件已存在,将在文件末尾追加内容。 'r+' 读写模式,文件必须已存在。
# compression :指定压缩算法,默认为 ZIP_STORED ,表示不压缩。常用的压缩算法包括 : ZIP_STORED 不压缩,直接存储文件。 ZIP_DEFLATED 使用DEFLATE算法压缩文件,这是最常用的压缩算法。 ZIP_BZIP2 使用BZIP2算法压缩文件。 ZIP_LZMA 使用LZMA算法压缩文件。
# allowZip64 :布尔值,默认为 True ,表示是否允许使用ZIP64扩展。ZIP64扩展允许创建和读取大于4GB的ZIP文件。
# compresslevel :指定压缩级别,仅在使用 ZIP_DEFLATED 、 ZIP_BZIP2 或 ZIP_LZMA 压缩算法时有效。取值范围通常为0(无压缩)到9(最高压缩)。
# strict_timestamps :布尔值,默认为 True ,表示是否严格处理文件时间戳。如果为 True ,将确保文件时间戳在ZIP文件中准确表示。
# 常用方法 :
# write(filename, arcname=None, compress_type=None, compresslevel=None) :将文件 filename 写入ZIP文件中。 arcname 可以指定文件在ZIP文件中的名称, compress_type 可以指定压缩算法, compresslevel 可以指定压缩级别。
# writestr(zinfo_or_arcname, data, compress_type=None, compresslevel=None) :将字符串 data 写入ZIP文件中。 zinfo_or_arcname 可以是一个 ZipInfo 对象或文件名, compress_type 和 compresslevel 的含义与 write 方法相同。
# extract(member, path=None, pwd=None) :从ZIP文件中提取一个成员文件到指定路径 path 。 member 可以是一个文件名或 ZipInfo 对象, pwd 可以指定解压密码。
# extractall(path=None, members=None, pwd=None) :从ZIP文件中提取所有成员文件到指定路径 path 。 members 可以是一个文件名列表, pwd 可以指定解压密码。
# close() :关闭ZIP文件,释放资源。
# ZipFile 类提供了丰富的功能来处理ZIP文件,包括创建、写入、读取和提取文件。通过合理使用其参数和方法,可以方便地进行文件的压缩和解压操作。
# 使用 ZipFile 类创建一个 ZIP 文件,模式为 "w" (写入模式),并指定压缩方式。 使用 with 语句确保 ZIP 文件在操作完成后正确关闭。
with ZipFile(zip_file, "w", compression) as f:
# 遍历 files_to_zip 列表中的每个文件。 使用 TQDM (是一个进度条库)显示进度条。 desc 进度条的描述信息。 unit 进度条的单位(这里是文件)。 disable 根据 progress 参数的值决定是否启用进度条。
for file in TQDM(files_to_zip, desc=f"Zipping {directory} to {zip_file}...", unit="file", disable=not progress):
# 将当前文件 file 写入 ZIP 文件中。
# file.relative_to(directory) :将文件路径转换为相对于目录的相对路径,避免在 ZIP 文件中包含绝对路径。
f.write(file, file.relative_to(directory))
# 返回生成的 ZIP 文件的路径。
return zip_file # return path to zip file
# 这段代码实现了一个功能强大的目录打包功能,支持以下特性。文件清理:在打包前自动删除指定的文件(如 .DS_Store 和 __MACOSX )。压缩选项:可以选择是否对文件进行压缩。文件排除:支持排除指定的文件名。进度条显示:在打包过程中显示进度条,增强用户体验。路径处理:使用 pathlib 模块处理路径,代码更加简洁且跨平台兼容。该函数适用于需要将目录打包为 ZIP 文件的场景,尤其适合处理包含大量文件的目录,同时提供灵活的配置选项。
5.def unzip_file(file, path=None, exclude=(".DS_Store", "__MACOSX"), exist_ok=False, progress=True):
# 这段代码定义了一个名为 unzip_file 的函数,用于解压ZIP文件到指定目录。它提供了多种功能,包括排除特定文件、处理路径安全问题、检查目标目录是否已存在等。
# 定义了 unzip_file 函数,它接受以下参数 :
# 1.file :ZIP文件的路径。
# 2.path :解压目标目录,默认为ZIP文件的父目录。
# 3.exclude :一个元组,包含需要排除的文件名或目录名,默认为 (".DS_Store", "__MACOSX") 。
# 4.exist_ok :一个布尔值,指示如果目标目录已存在且不为空时是否跳过解压,默认为 False 。
# 5.progress :一个布尔值,指示是否显示解压进度条,默认为 True 。
def unzip_file(file, path=None, exclude=(".DS_Store", "__MACOSX"), exist_ok=False, progress=True):
# 将 *.zip 文件解压到指定路径,排除包含排除列表中的字符串的文件。
# 如果 zip 文件不包含单个顶级目录,则该函数将创建一个与 zip 文件同名(不带扩展名)的新目录来提取其内容。
# 如果未提供路径,则该函数将使用 zip 文件的父目录作为默认路径。
# 引发:
# BadZipFile:如果提供的文件不存在或不是有效的 zip 文件。
"""
Unzips a *.zip file to the specified path, excluding files containing strings in the exclude list.
If the zipfile does not contain a single top-level directory, the function will create a new
directory with the same name as the zipfile (without the extension) to extract its contents.
If a path is not provided, the function will use the parent directory of the zipfile as the default path.
Args:
file (str | Path): The path to the zipfile to be extracted.
path (str, optional): The path to extract the zipfile to. Defaults to None.
exclude (tuple, optional): A tuple of filename strings to be excluded. Defaults to ('.DS_Store', '__MACOSX').
exist_ok (bool, optional): Whether to overwrite existing contents if they exist. Defaults to False.
progress (bool, optional): Whether to display a progress bar. Defaults to True.
Raises:
BadZipFile: If the provided file does not exist or is not a valid zipfile.
Returns:
(Path): The path to the directory where the zipfile was extracted.
Example:
```python
from ultralytics.utils.downloads import unzip_file
dir = unzip_file("path/to/file.zip")
```
"""
# 这段代码是 unzip_file 函数的一部分,用于检查ZIP文件是否存在,并设置默认的解压路径。
# 导入必要的模块。
# 从 zipfile 模块中导入以下类。
# BadZipFile :表示ZIP文件损坏或无效的异常。
# ZipFile :用于操作ZIP文件的类。
# is_zipfile :一个函数,用于检查文件是否是一个有效的ZIP文件。
from zipfile import BadZipFile, ZipFile, is_zipfile
# 检查文件是否存在且是有效的ZIP文件。
# 使用 Path(file).exists() 检查文件是否存在。 使用 is_zipfile(file) 检查文件是否是一个有效的ZIP文件。
if not (Path(file).exists() and is_zipfile(file)):
# 如果文件不存在或不是一个有效的ZIP文件,抛出 BadZipFile 异常,并提供错误信息。
raise BadZipFile(f"File '{file}' does not exist or is a bad zip file.") # 文件“{file}”不存在或是一个坏的 zip 文件。
# 设置默认解压路径。
# 如果未指定解压路径( path 为 None ),则将默认解压路径设置为ZIP文件的父目录。
if path is None:
# 使用 Path(file).parent 获取ZIP文件的父目录路径。
path = Path(file).parent # default path
# 这段代码的主要功能是。检查文件是否存在且是有效的ZIP文件:确保文件存在且是一个有效的ZIP文件。如果文件不存在或不是一个有效的ZIP文件,抛出 BadZipFile 异常。设置默认解压路径:如果未指定解压路径,则默认为ZIP文件的父目录。通过这些检查,可以确保在解压文件之前,文件是有效的,并且解压路径是合理的。这有助于避免因文件损坏或路径错误导致的问题。
# 这段代码是 unzip_file 函数的一部分,用于解压ZIP文件的内容,并根据ZIP文件的结构决定解压的目标路径。
# Unzip the file contents
# 使用 ZipFile 类打开指定的ZIP文件,并将其存储在变量 zipObj 中。 with 语句确保文件在操作完成后正确关闭。
with ZipFile(file) as zipObj:
# 使用列表推导式从ZIP文件中获取所有文件名( zipObj.namelist() )。 过滤掉包含在 exclude 列表中的文件或目录名。 exclude 是一个包含需要排除的文件名或目录名的元组,默认为 (".DS_Store", "__MACOSX") 。
files = [f for f in zipObj.namelist() if all(x not in f for x in exclude)]
# 使用集合推导式提取所有文件的顶级目录名称。 Path(f).parts[0] 获取每个文件路径的第一部分,即顶级目录名。
top_level_dirs = {Path(f).parts[0] for f in files}
# Decide to unzip directly or unzip into a directory
# 判断ZIP文件中是否只有一个顶级目录。 如果是,则将 unzip_as_dir 设置为 True 。 如果ZIP文件中有多个顶级文件或目录,则将 unzip_as_dir 设置为 False 。
unzip_as_dir = len(top_level_dirs) == 1 # (len(files) > 1 and not files[0].endswith("/"))
# 如果ZIP文件中只有一个顶级目录( unzip_as_dir 为 True )。
if unzip_as_dir:
# Zip has 1 top-level directory
# 将 解压路径 设置为指定的 path 。
extract_path = path # i.e. ../datasets
# 将 目标路径 设置为 path 与顶级目录名的组合。
path = Path(path) / list(top_level_dirs)[0] # i.e. extract coco8/ dir to ../datasets/
# 如果ZIP文件中有多个顶级文件或目录( unzip_as_dir 为 False )。
else:
# Zip has multiple files at top level
# 将 解压路径 设置为 path 与ZIP文件名(去除扩展名)的组合。
path = extract_path = Path(path) / Path(file).stem # i.e. extract multiple files to ../datasets/coco8/
# 这段代码的主要功能是。过滤文件列表:排除指定的文件或目录。提取顶级目录:提取ZIP文件中的顶级目录名称。决定解压方式:根据ZIP文件的结构决定是直接解压到指定路径,还是创建一个以ZIP文件名命名的目录。设置解压路径:根据决定的解压方式设置最终的解压路径。通过这些逻辑,可以灵活地处理不同结构的ZIP文件,确保解压后的文件组织合理。
# 这段代码是 unzip_file 函数的一部分,用于检查目标目录是否已存在且包含文件,并根据需要解压ZIP文件。
# Check if destination directory already exists and contains files
# 检查目标目录是否已存在且包含文件。
# 检查目标目录 path 是否存在且包含文件。
if path.exists() and any(path.iterdir()) and not exist_ok:
# If it exists and is not empty, return the path without unzipping
# 如果目标目录已存在且不为空,并且 exist_ok 参数为 False ,则记录一条警告信息。
LOGGER.warning(f"WARNING ⚠️ Skipping {file} unzip as destination directory {path} is not empty.") # 警告 ⚠️ 跳过 {file} 解压缩,因为目标目录 {path} 不为空。
# 返回目标路径,跳过解压操作。
return path
# 解压文件。
# 使用 TQDM (一个进度条库)显示解压进度条。
# 遍历ZIP文件中的所有文件 files ,并为每个文件提供描述信息和进度单位。
# 如果 progress 参数为 False ,则禁用进度条。
for f in TQDM(files, desc=f"Unzipping {file} to {Path(path).resolve()}...", unit="file", disable=not progress):
# Ensure the file is within the extract_path to avoid path traversal security vulnerability
# 检查文件路径是否包含 .. ,这可能表示路径遍历攻击(path traversal)。
if ".." in Path(f).parts:
# 如果文件路径不安全,记录一条警告信息。
LOGGER.warning(f"Potentially insecure file path: {f}, skipping extraction.") # 可能不安全的文件路径:{f},跳过提取。
# 并跳过该文件的解压。
continue
# 使用 zipObj.extract 方法将文件 f 解压到目标路径 extract_path 。
zipObj.extract(f, extract_path)
# 返回解压后的目录路径。
return path # return unzip dir
# 这段代码的主要功能是。检查目标目录是否已存在且包含文件:如果目标目录已存在且不为空,并且 exist_ok 为 False ,则跳过解压操作。解压文件:使用 TQDM 显示解压进度条(如果启用)。检查文件路径是否安全,避免路径遍历攻击。解压文件到目标路径。返回解压目录:返回解压后的目录路径。通过这些逻辑,可以确保在解压文件时处理常见的问题,如目标目录已存在和路径安全问题,从而提高代码的健壮性和安全性。
# 这段代码的主要功能是。检查ZIP文件是否存在:确保文件存在且是一个有效的ZIP文件。设置默认解压路径:如果未指定解压路径,则默认为ZIP文件的父目录。解压文件:排除指定的文件或目录。根据ZIP文件的结构决定解压方式。检查目标目录是否已存在:如果目标目录已存在且不为空,并且 exist_ok 为 False ,则跳过解压。解压文件:确保文件路径安全,避免路径遍历漏洞。显示解压进度条(如果启用)。返回解压目录:返回解压后的目录路径。通过这个函数,可以方便地解压ZIP文件,同时处理多种常见问题,如路径安全和目标目录存在性检查。
6.def check_disk_space(url="https://ultralytics.com/assets/coco8.zip", path=Path.cwd(), sf=1.5, hard=True):
# 这段代码定义了一个名为 check_disk_space 的函数,用于检查是否有足够的磁盘空间来下载指定URL的文件。
# current_working_directory = Path.cwd()
# 在Python中, cwd() 函数是 pathlib 模块中的一个方法,用于获取当前工作目录。这个方法是 Path 类的一个实例方法, Path 类是 pathlib 模块中用于处理文件系统路径的类。
# 功能描述 :
# Path.cwd() 方法返回一个 Path 对象,该对象代表当前工作目录的路径。
# 返回值 :
# Path.cwd() 方法返回的是 Path 对象,这个对象提供了许多方法来操作路径,例如 .resolve() 可以获取路径的绝对路径, .as_posix() 可以将路径转换为跨平台的字符串形式等。
# 异常处理 :
# Path.cwd() 方法通常不会抛出异常,因为它只是返回当前工作目录的路径。但是,如果系统出现问题,导致无法确定当前工作目录,可能会抛出异常,这种情况下应该进行异常处理。
# 定义了 check_disk_space 函数,它接受以下参数 :
# 1.url :文件的URL。
# 2.path :检查磁盘空间的路径,默认为当前工作目录( Path.cwd() )。
# 3.sf :安全因子,默认为1.5,表示需要的空间是文件大小的1.5倍。
# 4.hard :是否严格检查磁盘空间,默认为 True 。
def check_disk_space(url="https://ultralytics.com/assets/coco8.zip", path=Path.cwd(), sf=1.5, hard=True):
# 检查是否有足够的磁盘空间来下载和存储文件。
"""
Check if there is sufficient disk space to download and store a file.
Args:
url (str, optional): The URL to the file. Defaults to 'https://ultralytics.com/assets/coco8.zip'.
path (str | Path, optional): The path or drive to check the available free space on.
sf (float, optional): Safety factor, the multiplier for the required free space. Defaults to 1.5.
hard (bool, optional): Whether to throw an error or not on insufficient disk space. Defaults to True.
Returns:
(bool): True if there is sufficient disk space, False otherwise.
"""
# 检查URL的有效性。
try:
# 使用 requests.head 发送一个HEAD请求到指定的URL,获取响应。
r = requests.head(url) # response
# 检查响应状态码是否小于400(表示请求成功)。
assert r.status_code < 400, f"URL error for {url}: {r.status_code} {r.reason}" # check response {url} 的 URL 错误:{r.status_code} {r.reason}。
# 如果请求失败(例如网络问题或URL无效)。
except Exception:
# 捕获异常并返回 True ,表示默认有足够的磁盘空间。
return True # requests issue, default to True
# Check file size
# 检查文件大小。
# 定义1 GiB的字节数( 1 << 30 )。
gib = 1 << 30 # bytes per GiB
# 从响应头中获取 Content-Length ,表示文件的大小(以字节为单位),并将其转换为GiB。
data = int(r.headers.get("Content-Length", 0)) / gib # file size (GB)
# shutil.disk_usage(path)
# shutil.disk_usage(path) 是 Python 标准库 shutil 模块中的一个函数,用于获取指定路径的磁盘使用情况统计信息。
# 参数 :
# path :一个表示文件系统路径的字符串或路径对象。在 Windows 上,这个路径必须代表一个目录;在 Unix 系统上,它可以是文件或目录。
# 返回值 :
# 该函数返回一个命名元组(namedtuple),包含以下属性 :
# total :表示文件系统的总空间量,单位为字节。
# used :表示已使用的磁盘空间量,单位为字节。
# free :表示可用的磁盘空间量,单位为字节。
# 注意事项 :
# 在 Unix 文件系统中, path 必须指向一个已挂载文件系统分区中的路径。在这些平台上,CPython 不会尝试从未挂载的文件系统中获取磁盘使用信息。
# 从 Python 3.8 开始,在 Windows 上, path 可以是一个文件或目录。 shutil.disk_usage() 提供的信息可以帮助开发者监控磁盘空间使用情况,或者在需要时优化磁盘使用。
# 使用 shutil.disk_usage 获取指定路径的磁盘使用情况(总空间、已用空间和剩余空间),并将其转换为GiB。
total, used, free = (x / gib for x in shutil.disk_usage(path)) # bytes
# 计算所需的空间(文件大小乘以安全因子 sf )。如果所需空间小于剩余空间。
if data * sf < free:
# 则返回 True ,表示有足够的磁盘空间。
return True # sufficient space
# Insufficient space
# 构造一条警告信息,说明当前的剩余空间不足,并提示用户需要释放的额外空间。
text = (
f"WARNING ⚠️ Insufficient free disk space {free:.1f} GB < {data * sf:.3f} GB required, " # 警告 ⚠️ 可用磁盘空间不足 {free:.1f} GB < {data * sf:.3f} GB required,
f"Please free {data * sf - free:.1f} GB additional disk space and try again." # 请释放 {data * sf - free:.1f} GB 额外磁盘空间并重试。
)
# 如果 hard=True 。
if hard:
# 抛出 MemoryError 异常,提示用户磁盘空间不足。
raise MemoryError(text)
# 如果 hard=False ,记录一条警告信息。
LOGGER.warning(text)
# 返回 False ,表示磁盘空间不足。
return False
# 这段代码的主要功能是。检查URL的有效性:发送HEAD请求,确保URL有效且可访问。检查文件大小:从响应头中获取文件的大小。检查磁盘空间:获取指定路径的磁盘使用情况。判断是否有足够的空间:如果所需空间小于剩余空间,返回 True 。处理不足的空间:如果空间不足,根据 hard 参数的值,抛出异常或记录警告信息。通过这个函数,可以确保在下载文件之前有足够的磁盘空间,避免因空间不足导致的下载失败。
7.def get_google_drive_file_info(link):
# 这段代码定义了一个名为 get_google_drive_file_info 的函数,用于从Google Drive链接中提取文件信息,包括文件ID和文件名。它还处理了Google Drive的下载警告和配额限制。
# 定义了 get_google_drive_file_info 函数,它接受一个参数。
# 1.link :Google Drive文件的共享链接。
def get_google_drive_file_info(link):
# 检索可共享 Google Drive 文件链接的直接下载链接和文件名。
"""
Retrieves the direct download link and filename for a shareable Google Drive file link.
Args:
link (str): The shareable link of the Google Drive file.
Returns:
(str): Direct download URL for the Google Drive file.
(str): Original filename of the Google Drive file. If filename extraction fails, returns None.
Example:
```python
from ultralytics.utils.downloads import get_google_drive_file_info
link = "https://drive.google.com/file/d/1cqT-cJgANNrhIHCrEufUYhQ4RqiWG_lJ/view?usp=drive_link"
url, filename = get_google_drive_file_info(link)
```
"""
# 从链接中提取文件ID。假设链接格式为 https://drive.google.com/file/d/<file_id>/view ,通过分割字符串提取 <file_id> 部分。
file_id = link.split("/d/")[1].split("/view")[0]
# 构造Google Drive的下载URL。
drive_url = f"https://drive.google.com/uc?export=download&id={file_id}"
# 初始化文件名为 None ,稍后从响应头中提取。
filename = None
# Start session
# 使用 requests.Session 启动一个会话。
with requests.Session() as session:
# 发送一个GET请求到 drive_url ,并启用流式传输( stream=True )。
response = session.get(drive_url, stream=True)
# 检查响应内容中是否包含“quota exceeded”(配额超出)。
if "quota exceeded" in str(response.content.lower()):
# 如果配额超出,抛出 ConnectionError 异常,并提示用户手动下载文件。
raise ConnectionError(
emojis(
f"❌ Google Drive file download quota exceeded. " # ❌ Google Drive 文件下载配额已超出。
f"Please try again later or download this file manually at {link}." # 请稍后重试或从 {link} 手动下载此文件。
)
)
# 遍历响应中的Cookie,查找以 download_warning 开头的键。
for k, v in response.cookies.items():
# 如果找到。
if k.startswith("download_warning"):
# 将 confirm 参数添加到下载URL中,以绕过Google Drive的下载警告。
drive_url += f"&confirm={v}" # v is token
# 从响应头中提取 Content-Disposition 字段。
if cd := response.headers.get("content-disposition"):
# 使用正则表达式 re.findall 提取文件名。
filename = re.findall('filename="(.+)"', cd)[0]
# 返回 更新后的下载URL 和 文件名 。
return drive_url, filename
# 这段代码的主要功能是。提取文件ID:从Google Drive共享链接中提取文件ID。构造下载URL:构造用于下载文件的URL。检查配额限制:检查是否达到Google Drive的下载配额,如果达到,提示用户手动下载。处理下载警告:如果存在下载警告,通过添加 confirm 参数绕过警告。提取文件名:从响应头中提取文件名。返回文件信息:返回更新后的下载URL和文件名。通过这个函数,可以方便地从Google Drive链接中提取文件信息,并处理常见的下载问题,如配额限制和下载警告。
8.def safe_download(url, file=None, dir=None, unzip=True, delete=False, curl=False, retry=3, min_bytes=1e0, exist_ok=False, progress=True,):
# 这段代码定义了一个名为 safe_download 的函数,用于安全地下载文件,并提供多种功能,如解压、删除下载文件、重试机制等。
# 定义了 safe_download 函数,它接受以下参数 :
# 1.url :文件的URL。
# 2.file :目标文件名,默认为 None 。
# 3.dir :目标目录,默认为 None 。
# 4.unzip :是否解压下载的文件,默认为 True 。
# 5.delete :下载后是否删除原始文件,默认为 False 。
# 6.curl :是否使用 curl 进行下载,默认为 False 。
# 7.retry :下载失败时的重试次数,默认为3。
# 8.min_bytes :文件的最小字节数,用于检查文件是否下载完整,默认为 1e0 (1字节)。
# 9.exist_ok :如果目标文件已存在且不为空,是否跳过下载,默认为 False 。
# 10.progress :是否显示下载进度条,默认为 True 。
def safe_download(
url,
file=None,
dir=None,
unzip=True,
delete=False,
curl=False,
retry=3,
min_bytes=1e0,
exist_ok=False,
progress=True,
):
# 从 URL 下载文件,带有重试、解压和删除已下载文件的选项。
"""
Downloads files from a URL, with options for retrying, unzipping, and deleting the downloaded file.
Args:
url (str): The URL of the file to be downloaded.
file (str, optional): The filename of the downloaded file.
If not provided, the file will be saved with the same name as the URL.
dir (str, optional): The directory to save the downloaded file.
If not provided, the file will be saved in the current working directory.
unzip (bool, optional): Whether to unzip the downloaded file. Default: True.
delete (bool, optional): Whether to delete the downloaded file after unzipping. Default: False.
curl (bool, optional): Whether to use curl command line tool for downloading. Default: False.
retry (int, optional): The number of times to retry the download in case of failure. Default: 3.
min_bytes (float, optional): The minimum number of bytes that the downloaded file should have, to be considered
a successful download. Default: 1E0.
exist_ok (bool, optional): Whether to overwrite existing contents during unzipping. Defaults to False.
progress (bool, optional): Whether to display a progress bar during the download. Default: True.
Example:
```python
from ultralytics.utils.downloads import safe_download
link = "https://ultralytics.com/assets/bus.jpg"
path = safe_download(link)
```
"""
# 检查是否为Google Drive链接。
# 检查URL是否以`https://drive.google.com/`开头。
gdrive = url.startswith("https://drive.google.com/") # check if the URL is a Google Drive link
# 如果是,则调用`get_google_drive_file_info`函数获取文件信息,包括更新后的URL和文件名。
if gdrive:
# def get_google_drive_file_info(link): -> 用于从Google Drive链接中提取文件信息,包括文件ID和文件名。它还处理了Google Drive的下载警告和配额限制。返回 更新后的下载URL 和 文件名 。 -> return drive_url, filename
url, file = get_google_drive_file_info(url)
# 如果未指定目标目录 dir ,则默认为当前目录( "." )。 如果未指定文件名 file ,则调用 url2file 函数从URL中提取文件名。 构造目标文件路径 f 。
# def url2file(url): -> 用于将URL转换为文件名。使用 clean_url(url) 清理URL,去除查询参数(如 ?auth )。 将清理后的URL字符串转换为 Path 对象。 使用 Path 对象的 .name 属性提取文件名部分。 -> return Path(clean_url(url)).name
f = Path(dir or ".") / (file or url2file(url)) # URL converted to filename
# 检查文件是否存在。如果URL中不包含 :// 且路径指向的文件存在。
if "://" not in str(url) and Path(url).is_file(): # URL exists ('://' check required in Windows Python<3.10)
# 则直接使用该路径作为目标文件路径。
f = Path(url) # filename
# 这段代码是 safe_download 函数的一部分,用于处理文件下载逻辑。如果目标文件不存在,它会准备下载文件,并确保目标目录存在且有足够的磁盘空间。
# 检查目标文件 f 是否存在。如果文件不存在,则继续执行下载逻辑。
elif not f.is_file(): # URL and file do not exist
# 如果 gdrive 为 True (即URL是Google Drive链接),直接使用 url 。 否则,调用 clean_url 函数清理URL,去除查询参数。 替换 https://ultralytics.com/assets/ 为 https://github.com/ultralytics/assets/releases/download/v0.0.0/ ,确保URL格式正确。
uri = (url if gdrive else clean_url(url)).replace( # cleaned and aliased url
"https://github.com/ultralytics/assets/releases/download/v0.0.0/",
"https://ultralytics.com/assets/", # assets alias
)
# 构造下载描述信息,说明正在从 uri 下载文件到目标路径 f 。
desc = f"Downloading {uri} to '{f}'" # 正在将 {uri} 下载至 '{f}'。
# 使用 LOGGER.info 记录下载信息。
LOGGER.info(f"{desc}...")
# 使用 mkdir 方法确保目标文件的父目录存在。如果父目录不存在,则创建所有必要的父目录。
# parents=True :创建所有必要的父目录。
# exist_ok=True :如果目录已存在,不会抛出异常。
f.parent.mkdir(parents=True, exist_ok=True) # make directory if missing
# 调用 check_disk_space 函数,检查目标路径 f.parent 是否有足够的磁盘空间来下载文件。 如果磁盘空间不足, check_disk_space 函数会抛出异常或记录警告信息。
# def check_disk_space(url="https://ultralytics.com/assets/coco8.zip", path=Path.cwd(), sf=1.5, hard=True):
# -> 用于检查是否有足够的磁盘空间来下载指定URL的文件。 捕获异常并返回 True ,表示默认有足够的磁盘空间。返回 True ,表示有足够的磁盘空间。返回 False ,表示磁盘空间不足。
# -> return True / return True / return False
check_disk_space(url, path=f.parent)
# 这段代码的主要功能是。检查文件是否存在:如果目标文件不存在,则继续执行下载逻辑。清理和替换URL:清理URL,去除查询参数,并确保URL格式正确。打印下载描述信息:记录下载信息,说明正在从哪个URL下载文件到哪个目标路径。确保目标目录存在:• 确保目标文件的父目录存在,如果不存在则创建。检查磁盘空间:检查目标路径是否有足够的磁盘空间来下载文件。通过这些步骤,可以确保在下载文件之前,目标路径存在且有足够的磁盘空间,从而避免因路径不存在或磁盘空间不足导致的下载失败。
# 这段代码是 safe_download 函数的一部分,用于实现文件下载的重试机制,并根据配置选择不同的下载方法。
# 使用 for 循环实现重试机制,最多尝试 retry + 1 次。
for i in range(retry + 1):
# 使用 try 块捕获下载过程中可能出现的异常。
try:
# 如果 curl 为 True 或重试次数大于0,则使用 curl 进行下载。
if curl or i > 0: # curl download with retry, continue
# 如果 progress 为 False ,则设置 curl 的静默模式( -sS )。
s = "sS" * (not progress) # silent
# 使用 subprocess.run 调用 curl 命令,参数包括 。
# "-#" :显示进度条(如果 progress 为 True )。
# f"-{s}L" :跟随重定向( -L )并设置静默模式( -sS )。
# url :下载的URL。
# "-o", f :指定输出文件路径。
# "--retry", "3" :设置重试次数为3。
# "-C", "-" :从上次失败的地方继续下载。
r = subprocess.run(["curl", "-#", f"-{s}L", url, "-o", f, "--retry", "3", "-C", "-"]).returncode
# 检查 curl 的返回码 r ,确保下载成功(返回码为0)
assert r == 0, f"Curl return value {r}" # Curl 返回值 {r} 。
# 如果 curl 为 False ,则选择使用 torch 或 urllib 进行下载。
else: # urllib download
method = "torch"
# 如果 method 为 "torch" ,则使用 torch.hub.download_url_to_file 下载文件。
if method == "torch":
torch.hub.download_url_to_file(url, f, progress=progress)
# 否则,使用 urllib.request.urlopen 下载文件,并使用 TQDM 显示进度条。
else:
# urllib.request.urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, *, cafile=None, capath=None, cadefault=False, context=None)
# urllib.request.urlopen() 是 Python 标准库 urllib 模块中的一个函数,它用于打开一个 URL 并返回一个类似文件对象的东西,可以用来读取从 URL 获取的数据。这个函数是 urllib.request 模块的一部分,该模块提供了用于处理 URL 的函数和类。
# 参数 :
# url : 要打开的 URL 字符串。
# data : 可选参数,用于发送 POST 请求的数据。如果是字节字符串,将作为请求体发送。如果是字典或序列的元组,将被编码为表单数据并发送。
# timeout : 可选参数,指定等待服务器响应的超时时间(以秒为单位)。默认值为 socket._GLOBAL_DEFAULT_TIMEOUT 。
# cafile : 可选参数,指定包含受信任的 CA 证书的文件路径,用于验证 SSL 证书。
# capath : 可选参数,指定包含多个 CA 证书的目录路径。
# cadefault : 布尔值,如果为 True ,则使用默认的 CA 证书。
# context : 可选参数,用于指定 SSL 上下文。
# 返回值 :
# 返回一个 urllib.response.addinfourl 对象,该对象继承自 io.IOBase ,提供了一个文件类的接口来读取服务器的响应。这个对象包含了响应的状态码、响应头等信息。
# 使用 urllib.request.urlopen 打开指定的URL,获取响应对象 response 。 使用 TQDM (一个进度条库)显示下载进度条。
with request.urlopen(url) as response, TQDM(
# response.getheader(name)
# response.getheader() 是 Python 标准库 http.client 模块中的一个方法,用于从 HTTP 响应中获取指定的头信息。
# 参数 :
# name :要获取的响应头的名称(如 "Content-Length" 、 "Content-Type" 等)。
# "Content-Length" :响应头表示响应体的长度,以字节为单位。它告诉客户端响应体的大小,以便客户端可以预先分配足够的缓冲区来接收数据。
# "Content-Type" :响应头表示响应体的媒体类型(MIME类型)。它告诉客户端响应体的内容格式,以便客户端可以正确解析和显示响应内容。
# 返回值 :
# 返回指定响应头的值。如果响应中不包含该头,则返回 None 。
# 功能 :
# 从 HTTP 响应中获取指定的头信息。如果响应中包含多个相同名称的头,则返回第一个头的值。
# 从响应头中获取 Content-Length 字段,表示文件的总大小(以字节为单位)。 如果 Content-Length 字段不存在,则默认值为0。
total=int(response.getheader("Content-Length", 0)),
# 进度条的描述信息,说明正在下载的文件。
desc=desc,
# 是否禁用进度条。如果 progress 为 False ,则禁用进度条。
disable=not progress,
# 进度条的单位,设置为 "B" 表示字节。
unit="B",
# 是否自动选择合适的单位(如KB、MB、GB)。
unit_scale=True,
# 单位转换的基数,设置为1024(表示1KB = 1024字节)。
unit_divisor=1024,
) as pbar:
# 使用 open 函数以二进制写模式打开目标文件 f 。
with open(f, "wb") as f_opened:
# 打开HTTP连接并获取响应 :使用 urllib.request.urlopen 或 requests.get 打开指定的URL,获取HTTP响应对象。
# 获取文件的总大小 :从响应头中获取 Content-Length 字段,表示文件的总大小(以字节为单位)。
# 逐块读取数据 :使用 for data in response: 逐块读取响应体中的数据。 每一块数据是一个字节字符串( bytes )。
# 写入文件并更新进度条 :将每一块数据写入目标文件。 使用 tqdm 更新进度条,显示下载进度。
# 遍历响应对象 response 中的数据块 data 。
# 这行代码通常用于从HTTP响应对象中逐块读取数据。这种用法在处理HTTP响应时非常常见,尤其是在下载文件或处理大量数据时。
# response 是一个HTTP响应对象,通常是通过 urllib.request.urlopen 或其他HTTP客户端库(如 requests )获取的。
# for data in response: 会逐块读取响应体中的数据。每一块数据通常是一个字节字符串( bytes )。
# 这种逐块读取的方式可以有效地处理大文件,避免一次性将整个文件内容加载到内存中,从而节省内存。
# for data in response: 这行代码的主要功能是。逐块读取HTTP响应数据:逐块读取响应体中的数据,每一块数据是一个字节字符串。处理大文件:逐块读取的方式可以有效处理大文件,避免一次性将整个文件内容加载到内存中,从而节省内存。写入文件并更新进度条:将每一块数据写入目标文件,并使用进度条库(如 tqdm )显示下载进度。通过这种方式,可以高效地下载文件,并提供用户友好的进度反馈。
for data in response:
# 使用 f_opened.write(data) 将数据块写入目标文件。
f_opened.write(data)
# 使用 pbar.update(len(data)) 更新进度条,表示已写入的数据量。
pbar.update(len(data))
# 这段代码的主要功能是。重试机制:最多尝试 retry + 1 次下载。选择下载方法:根据配置选择使用 curl 、 torch 或 urllib 进行下载。显示下载进度:如果 progress 为 True ,显示下载进度条。检查文件是否下载成功:检查文件是否存在且大小大于 min_bytes ,确保文件下载完整。如果文件下载不完整,则删除部分下载的文件。通过这些逻辑,可以确保文件下载过程的健壮性和可靠性,同时提供灵活的配置选项。
# 这段代码是 safe_download 函数的一部分,用于检查文件是否成功下载。如果文件存在且大小符合要求,则认为下载成功;否则,删除部分下载的文件。
# 使用 Path.exists() 方法检查目标文件 f 是否存在。
if f.exists():
# 使用 Path.stat().st_size 获取文件的大小(以字节为单位)。 检查文件大小是否大于 min_bytes (最小字节数,默认为1字节)。 如果文件大小大于 min_bytes ,则认为文件下载成功,退出循环。
if f.stat().st_size > min_bytes:
break # success
# 如果文件大小不符合要求,使用 Path.unlink() 方法删除部分下载的文件,以避免保留不完整的文件。
f.unlink() # remove partial downloads
# 这段代码的主要功能是。检查文件是否存在:确保目标文件 f 是否存在。检查文件大小是否符合要求:检查文件大小是否大于 min_bytes ,确保文件下载完整。删除部分下载的文件:如果文件大小不符合要求,删除部分下载的文件,避免保留不完整的文件。通过这些步骤,可以确保下载的文件是完整的,从而提高代码的健壮性和可靠性。
# 这段代码是 safe_download 函数的一部分,用于处理下载过程中可能出现的异常。它根据异常类型和重试次数决定如何处理下载失败的情况。
# 使用 try-except 块捕获下载过程中可能出现的异常,并将其存储在变量 e 中。
except Exception as e:
# 如果是第一次尝试( i == 0 )且当前环境不在线( is_online() 返回 False )。
if i == 0 and not is_online():
# 则抛出 ConnectionError 异常,提示用户环境不在线。使用 emojis 函数添加表情符号,使错误信息更直观。使用 from e 将原始异常 e 作为上下文信息传递,方便调试。
raise ConnectionError(emojis(f"❌ Download failure for {uri}. Environment is not online.")) from e # ❌ {uri} 下载失败。环境不在线。
# 如果重试次数已达到或超过最大重试次数( i >= retry )。
elif i >= retry:
# 则抛出 ConnectionError 异常,提示用户重试次数已达到上限。 同样使用 emojis 函数添加表情符号,并将原始异常 e 作为上下文信息传递。
raise ConnectionError(emojis(f"❌ Download failure for {uri}. Retry limit reached.")) from e # ❌ {uri} 下载失败。已达到重试限制。
# 如果上述条件都不满足,则记录一条警告信息,说明当前下载失败,并提示将进行下一次重试。使用 LOGGER.warning 记录警告信息,并使用 emojis 函数添加表情符号,使警告信息更直观。
LOGGER.warning(f"⚠️ Download failure, retrying {i + 1}/{retry} {uri}...") # ⚠️ 下载失败,重试 {i + 1}/{retry} {uri}...
# 这段代码的主要功能是。捕获下载过程中的异常:捕获所有可能的异常,并根据异常类型和重试次数决定如何处理。检查网络状态:如果第一次尝试失败且当前环境不在线,抛出 ConnectionError 异常,提示用户环境不在线。检查重试次数:如果重试次数已达到或超过最大重试次数,抛出 ConnectionError 异常,提示用户重试次数已达到上限。记录警告并重试:如果上述条件都不满足,记录一条警告信息,说明当前下载失败,并提示将进行下一次重试。通过这些逻辑,可以确保在下载过程中遇到问题时,能够提供清晰的错误信息,并根据情况决定是否重试,从而提高代码的健壮性和用户体验。
# 这段代码是 safe_download 函数的一部分,用于在文件下载完成后,根据需要解压文件,并处理解压后的文件。
# 检查是否需要解压文件( unzip 为 True )。
# 检查文件是否存在( f.exists() )。
# 检查文件的后缀是否为ZIP、TAR或GZ文件( f.suffix in {"", ".zip", ".tar", ".gz"} )。
if unzip and f.exists() and f.suffix in {"", ".zip", ".tar", ".gz"}:
# 导入 zipfile 模块中的 is_zipfile 函数,用于检查文件是否为ZIP文件。
from zipfile import is_zipfile
# 确定解压目录 unzip_dir 。如果提供了 dir 参数,则解压到指定目录。 如果未提供 dir 参数,则解压到文件的父目录。
unzip_dir = (dir or f.parent).resolve() # unzip to dir if provided else unzip in place
# 使用 is_zipfile 检查文件是否为ZIP文件。
if is_zipfile(f):
# 如果是ZIP文件,调用 unzip_file 函数解压文件,并返回解压后的目录路径。
# def unzip_file(file, path=None, exclude=(".DS_Store", "__MACOSX"), exist_ok=False, progress=True):
# -> 用于解压ZIP文件到指定目录。它提供了多种功能,包括排除特定文件、处理路径安全问题、检查目标目录是否已存在等。返回目标路径,跳过解压操作。返回解压后的目录路径。
# -> return path / return path # return unzip dir
unzip_dir = unzip_file(file=f, path=unzip_dir, exist_ok=exist_ok, progress=progress) # unzip
# 如果文件是TAR或GZ文件,使用 subprocess.run 调用 tar 命令解压文件。
elif f.suffix in {".tar", ".gz"}:
LOGGER.info(f"Unzipping {f} to {unzip_dir}...") # 正在将 {f} 解压缩至 {unzip_dir}...
# 这行代码使用 subprocess.run 调用了系统的 tar 命令来解压TAR或GZ文件。
# subprocess.run :这是Python标准库中的一个函数,用于在子进程中运行命令。
# ["tar", ...] :这是要运行的命令及其参数的列表。 "tar" 是命令名称,后面的元素是命令的参数。
# "xf" :这是 tar 命令的选项,表示解压文件。 x 表示解压, f 表示指定文件。
# "xfz" :这是 tar 命令的选项,表示解压GZ文件。 x 表示解压, f 表示指定文件, z 表示文件是gzip压缩的。
# f :这是要解压的文件路径。
# "--directory" :这是 tar 命令的选项,指定解压的目标目录。
# unzip_dir :这是解压的目标目录路径。
# check=True :这个参数指示 subprocess.run 在命令执行失败时抛出异常。如果命令执行成功,返回值的 returncode 属性为0;如果命令执行失败, returncode 属性为非0值。
subprocess.run(["tar", "xf" if f.suffix == ".tar" else "xfz", f, "--directory", unzip_dir], check=True)
# 如果 delete 为 True ,则删除原始文件(ZIP、TAR或GZ文件)。
if delete:
f.unlink() # remove zip
# 返回解压后的目录路径,方便后续操作。
return unzip_dir
# 这段代码的主要功能是。检查是否需要解压文件:确保文件存在且是ZIP、TAR或GZ文件。确定解压目录:如果提供了目标目录,则解压到指定目录;否则,解压到文件的父目录。解压ZIP文件:使用 unzip_file 函数解压ZIP文件。解压TAR或GZ文件:使用 tar 命令解压TAR或GZ文件。删除原始文件:如果 delete 为 True ,删除解压后的原始文件。返回解压后的目录路径:返回解压后的目录路径,方便后续操作。通过这些步骤,可以确保下载的文件在需要时被正确解压,并且可以选择性地删除原始文件,从而提高代码的灵活性和用户体验。
# 这段代码的主要功能是。检查是否为Google Drive链接:如果是,调用 get_google_drive_file_info 获取文件信息。确定目标文件路径:根据提供的目录和文件名或从URL中提取文件名。检查文件是否存在:如果文件已存在,跳过下载。下载文件:使用 curl 或 torch.hub.download_url_to_file 或 urllib.request.urlopen 下载文件。提供重试机制和进度条。检查文件是否下载成功:检查文件大小是否大于 min_bytes ,确保文件下载完整。解压文件:如果文件是ZIP、TAR或GZ文件,解压到指定目录。如果 delete 为 True ,删除解压后的原始文件。返回解压后的目录路径:返回解压后的目录路径,方便后续操作。通过这个函数,可以安全地下载文件,并处理多种常见问题,如文件存在性检查、解压和删除原始文件等。
# def safe_download(url, file=None, dir=None, unzip=True, delete=False, curl=False, retry=3, min_bytes=1e0, exist_ok=False, progress=True,):
# -> 用于安全地下载文件,并提供多种功能,如解压、删除下载文件、重试机制等。返回解压后的目录路径,方便后续操作。
# -> return unzip_dir
9.def get_github_assets(repo="ultralytics/assets", version="latest", retry=False):
# 这段代码定义了一个名为 get_github_assets 的函数,用于从GitHub仓库中获取指定版本的资产(如模型文件)。
# 定义了 get_github_assets 函数,它接受以下参数 :
# 1.repo :GitHub仓库的名称,默认为 "ultralytics/assets" 。
# 2.version :要获取资产的版本,默认为 "latest" 。
# 3.retry :是否在失败时重试,默认为 False 。
def get_github_assets(repo="ultralytics/assets", version="latest", retry=False):
# 从 GitHub 存储库中检索指定版本的标签和资产。如果未指定版本,该函数将获取最新版本的资产。
"""
Retrieve the specified version's tag and assets from a GitHub repository. If the version is not specified, the
function fetches the latest release assets.
Args:
repo (str, optional): The GitHub repository in the format 'owner/repo'. Defaults to 'ultralytics/assets'.
version (str, optional): The release version to fetch assets from. Defaults to 'latest'.
retry (bool, optional): Flag to retry the request in case of a failure. Defaults to False.
Returns:
(tuple): A tuple containing the release tag and a list of asset names.
Example:
```python
tag, assets = get_github_assets(repo="ultralytics/assets", version="latest")
```
"""
# 如果 version 不是 "latest" ,则将其格式化为 "tags/<version>" ,例如 "tags/v6.2" 。
if version != "latest":
version = f"tags/{version}" # i.e. tags/v6.2
# 构造GitHub API的URL。
url = f"https://api.github.com/repos/{repo}/releases/{version}"
# 使用 requests.get 发送GET请求到GitHub API,获取指定版本的资产信息。
r = requests.get(url) # github api
# 检查响应状态码是否为200(成功)。
# 如果状态码不是200且不是因为GitHub的速率限制( rate limit exceeded ),并且 retry 为 True 。
if r.status_code != 200 and r.reason != "rate limit exceeded" and retry: # failed and not 403 rate limit exceeded
# 则再次发送GET请求。
r = requests.get(url) # try again
# 如果响应状态码不是200,记录一条警告信息。
if r.status_code != 200:
LOGGER.warning(f"⚠️ GitHub assets check failure for {url}: {r.status_code} {r.reason}") # ⚠️ GitHub 资产检查 {url} 失败:{r.status_code} {r.reason}。
# 返回 空的版本号 和 资产列表 。
return "", []
# 将响应内容解析为JSON格式。
data = r.json()
# 从JSON数据中提取 tag_name (版本号)和 assets (资产列表)。 使用列表推导式提取 每个资产的名称 。
return data["tag_name"], [x["name"] for x in data["assets"]] # tag, assets i.e. ['yolo11n.pt', 'yolov8s.pt', ...]
# 这段代码的主要功能是。处理版本号:如果版本号不是 "latest" ,则格式化为 "tags/<version>" 。构造GitHub API URL:构造用于获取资产信息的GitHub API URL。发送GET请求:使用 requests.get 发送GET请求,获取指定版本的资产信息。处理请求失败:如果请求失败且不是因为速率限制,并且启用了重试机制,则再次发送请求。检查响应状态:如果响应状态码不是200,记录警告信息并返回空结果。解析响应数据:解析响应内容,提取版本号和资产列表。通过这个函数,可以方便地从GitHub仓库中获取指定版本的资产信息,适用于需要动态获取模型文件或其他资源的场景。
10.def attempt_download_asset(file, repo="ultralytics/assets", release="v8.3.0", **kwargs):
# 这段代码定义了一个名为 attempt_download_asset 的函数,用于尝试下载指定的资产文件(如模型文件)。它会检查文件是否已存在,如果不存在,则尝试从指定的URL或GitHub仓库下载。
# 定义了 attempt_download_asset 函数,它接受以下参数 :
# 1.file :要下载的文件名或URL。
# 2.repo :GitHub仓库的名称,默认为 "ultralytics/assets" 。
# 3.release :要下载的版本,默认为 "v8.3.0" 。
# 4.**kwargs :其他关键字参数,传递给 safe_download 函数。
def attempt_download_asset(file, repo="ultralytics/assets", release="v8.3.0", **kwargs):
# 如果在本地找不到文件,则尝试从 GitHub 发布资产中下载文件。该函数首先在本地检查文件,然后尝试从指定的 GitHub 存储库发布中下载。
"""
Attempt to download a file from GitHub release assets if it is not found locally. The function checks for the file
locally first, then tries to download it from the specified GitHub repository release.
Args:
file (str | Path): The filename or file path to be downloaded.
repo (str, optional): The GitHub repository in the format 'owner/repo'. Defaults to 'ultralytics/assets'.
release (str, optional): The specific release version to be downloaded. Defaults to 'v8.3.0'.
**kwargs (any): Additional keyword arguments for the download process.
Returns:
(str): The path to the downloaded file.
Example:
```python
file_path = attempt_download_asset("yolo11n.pt", repo="ultralytics/assets", release="latest")
```
"""
# 从 ultralytics.utils 模块中导入 SETTINGS ,这是一个包含全局设置的字典。
from ultralytics.utils import SETTINGS # scoped for circular import
# YOLOv3/5u updates
# 将 file 转换为字符串。
file = str(file)
# 调用 check_yolov5u_filename 函数,将旧版YOLOv5文件名替换为更新后的YOLOv5u文件名。
# def check_yolov5u_filename(file: str, verbose: bool = True): -> 用于将旧版YOLOv5文件名替换为更新后的YOLOv5u文件名。它还提供了一个提示信息,建议用户使用新的文件名,因为YOLOv5u模型在性能上有所改进。返回更新后的文件名。 -> return file
file = checks.check_yolov5u_filename(file)
# 去除文件名首尾的空白字符和单引号。
file = Path(file.strip().replace("'", ""))
# 检查文件是否已存在。如果存在,返回文件路径。
if file.exists():
return str(file)
# 如果文件不存在,检查 SETTINGS["weights_dir"] 目录下是否存在该文件。如果存在,返回文件路径。
elif (SETTINGS["weights_dir"] / file).exists():
return str(SETTINGS["weights_dir"] / file)
# 这段代码是 attempt_download_asset 函数的一部分,用于处理指定URL的情况。如果 file 是一个URL,它会尝试下载该URL指向的文件。
# 处理URL指定的情况。
else:
# URL specified
# parse.unquote(s, encoding='utf-8', errors='replace')
# parse.unquote() 是 Python 标准库 urllib.parse 模块中的一个函数,用于对 URL 编码的字符串进行解码,将百分号编码(%XX)转换回普通字符。
# 参数 :
# s :要解码的 URL 编码字符串。
# encoding :(可选)用于解码的字符编码,默认为 'utf-8' 。
# errors :(可选)指定如何处理解码错误,默认为 'replace' ,意味着将无法解码的字符替换为一个替代字符(通常是 ? )。
# 返回值 :
# 返回解码后的字符串。
# 函数逻辑 :
# 解码百分号编码 :将字符串中的 %XX 序列转换为对应的字符。
# 字符编码转换 :将原始的百分比编码字符串(通常为 ASCII)转换为指定的编码。
# 在例子中, unquote 函数将 URL 编码的字符串 "Hello%2C%20World%21" 解码为普通字符串 "Hello, World!" 。
# 注意事项 :
# 当处理来自用户的 URL 编码数据时,使用 unquote 函数可以确保正确地解释这些数据。
# 如果 URL 包含非 ASCII 字符,确保指定正确的 encoding 参数,否则解码可能会失败或产生意外结果。
# 如果遇到无法解码的百分号序列, errors 参数决定了如何处理这些错误。常见的选项包括 'strict' (抛出 UnicodeDecodeError )、 'replace' (用替代字符替换无法解码的字符)和 'ignore' (忽略无法解码的字符)。
# 将 file 转换为字符串,并使用 parse.unquote 解码URL中的特殊字符(如将 %2F 解码为 / )。 使用 Path 对象的 .name 属性提取文件名。
name = Path(parse.unquote(str(file))).name # decode '%2F' to '/' etc.
# 构造下载URL的基本路径。
download_url = f"https://github.com/{repo}/releases/download"
# 检查 file 是否以 http:/ 或 https:/ 开头,表示它是一个HTTP URL。
if str(file).startswith(("http:/", "https:/")): # download
# 修正URL格式,将 (":/") 替换为 ("://") 。这是因为 Pathlib 可能会错误地将 :// 转换为 :/ 。
url = str(file).replace(":/", "://") # Pathlib turns :// -> :/
# 调用 url2file 函数,从URL中提取文件名。这个函数通常会处理URL中的认证信息(如 ?auth... )。
# def url2file(url): -> 用于将URL转换为文件名。使用 clean_url(url) 清理URL,去除查询参数(如 ?auth )。 将清理后的URL字符串转换为 Path 对象。 使用 Path 对象的 .name 属性提取文件名部分。 -> return Path(clean_url(url)).name
file = url2file(name) # parse authentication https://url.com/file.txt?auth...
# 使用 Path.is_file() 检查文件是否已存在。
if Path(file).is_file():
# 如果文件已存在,记录一条信息,说明文件已本地存在。
LOGGER.info(f"Found {clean_url(url)} locally at {file}") # file already exists 在本地的 {file} 处找到了 {clean_url(url)}。
# 如果文件不存在。
else:
# 调用 safe_download 函数下载文件。
# min_bytes=1e5 :设置文件的最小字节数为100,000字节,确保下载的文件大小符合要求。
# **kwargs :将其他关键字参数传递给 safe_download 函数。
# def safe_download(url, file=None, dir=None, unzip=True, delete=False, curl=False, retry=3, min_bytes=1e0, exist_ok=False, progress=True,):
# -> 用于安全地下载文件,并提供多种功能,如解压、删除下载文件、重试机制等。返回解压后的目录路径,方便后续操作。
# -> return unzip_dir
safe_download(url=url, file=file, min_bytes=1e5, **kwargs)
# 这段代码的主要功能是。处理指定URL的情况:如果 file 是一个URL,解析URL并提取文件名。修正URL格式:修正URL格式,确保其正确性。解析文件名:从URL中提取文件名,处理认证信息。检查文件是否已存在:如果文件已存在,记录一条信息,说明文件已本地存在。下载文件:如果文件不存在,调用 safe_download 函数下载文件。通过这些步骤,可以确保在需要时下载指定的文件,同时避免重复下载已存在的文件,提高代码的效率和用户体验。
# 从GitHub仓库下载。
# 如果 repo 是 GITHUB_ASSETS_REPO 且文件名在 GITHUB_ASSETS_NAMES 中,直接从指定的版本下载文件。
elif repo == GITHUB_ASSETS_REPO and name in GITHUB_ASSETS_NAMES:
safe_download(url=f"{download_url}/{release}/{name}", file=file, min_bytes=1e5, **kwargs)
# 这段代码是 attempt_download_asset 函数的一部分,用于从GitHub仓库中下载指定的资产文件。如果指定的版本中没有找到文件,则尝试从最新版本中下载。
else:
# 调用 get_github_assets 函数,尝试从指定的GitHub仓库 repo 和版本 release 中获取资产列表。
# tag :版本标签(如 v8.3.0 )。
# assets :资产文件列表(如 ['yolo11n.pt', 'yolov8s.pt', ...] )。
# def get_github_assets(repo="ultralytics/assets", version="latest", retry=False):
# -> 用于从GitHub仓库中获取指定版本的资产(如模型文件)。返回 空的版本号 和 资产列表 。从JSON数据中提取 tag_name (版本号)和 assets (资产列表)。 使用列表推导式提取 每个资产的名称 。
# -> return "", [] / return data["tag_name"], [x["name"] for x in data["assets"]] # tag, assets i.e. ['yolo11n.pt', 'yolov8s.pt', ...]
tag, assets = get_github_assets(repo, release)
# 如果指定版本中没有资产,尝试从最新版本获取。
# 如果指定版本中没有找到资产( assets 为空)。
if not assets:
# 则调用 get_github_assets 函数,尝试从最新版本中获取资产列表。
tag, assets = get_github_assets(repo) # latest release
# 检查文件名 name 是否在资产列表 assets 中。
if name in assets:
# 如果文件名在资产列表中,调用 safe_download 函数下载文件。
# url :构造下载URL,格式为 <download_url>/<tag>/<name> 。
# file :目标文件路径。
# min_bytes=1e5 :设置文件的最小字节数为100,000字节,确保下载的文件大小符合要求。
# **kwargs :将其他关键字参数传递给 safe_download 函数。
safe_download(url=f"{download_url}/{tag}/{name}", file=file, min_bytes=1e5, **kwargs)
# 这段代码的主要功能是。从指定版本获取资产:调用 get_github_assets 函数,尝试从指定的版本中获取资产列表。如果指定版本中没有资产,尝试从最新版本获取:如果指定版本中没有找到资产,尝试从最新版本中获取资产列表。检查文件是否在资产列表中:检查文件名是否在资产列表中。下载文件:如果文件名在资产列表中,调用 safe_download 函数下载文件。通过这些步骤,可以确保在需要时从GitHub仓库中下载指定的文件,同时处理指定版本中没有找到文件的情况,提高代码的健壮性和灵活性。
# 返回文件的路径。
return str(file)
# 这段代码的主要功能是。检查文件是否存在:如果文件已存在,直接返回文件路径。处理URL指定的情况:如果 file 是一个URL,解析URL并下载文件。从GitHub仓库下载:如果 file 是一个文件名,从指定的GitHub仓库和版本下载文件。返回文件路径:返回文件的路径。通过这个函数,可以方便地尝试下载指定的资产文件,同时处理文件已存在的情况,确保代码的健壮性和灵活性。
# def attempt_download_asset(file, repo="ultralytics/assets", release="v8.3.0", **kwargs): -> 用于尝试下载指定的资产文件(如模型文件)。它会检查文件是否已存在,如果不存在,则尝试从指定的URL或GitHub仓库下载。返回文件的路径。 -> return str(file)
11.def download(url, dir=Path.cwd(), unzip=True, delete=False, curl=False, threads=1, retry=3, exist_ok=False):
# 这段代码定义了一个名为 download 的函数,用于从指定的 URL 下载文件,并支持多种功能,包括多线程下载、自动解压、删除源文件、使用 curl 下载等。
# 定义了一个函数 download ,接受以下参数 :
# 1.url :文件的下载链接,可以是一个字符串(单个 URL),也可以是一个列表(多个 URL)。
# 2.dir :下载文件的目标目录,默认为当前工作目录( Path.cwd() )。
# 3.unzip :布尔值,表示是否自动解压下载的 ZIP 文件,默认为 True 。
# 4.delete :布尔值,表示是否删除下载后的源文件,默认为 False 。
# 5.curl :布尔值,表示是否使用 curl 命令进行下载,默认为 False 。
# 6.threads :整数,表示下载时使用的线程数,默认为 1 。
# 7.retry :整数,表示下载失败时的重试次数,默认为 3 。
# 8.exist_ok :布尔值,表示在创建目标目录时,如果目录已存在是否抛出异常,默认为 False 。
def download(url, dir=Path.cwd(), unzip=True, delete=False, curl=False, threads=1, retry=3, exist_ok=False):
# 从指定的 URL 下载文件到给定目录。如果指定多个线程,则支持并发下载。
"""
Downloads files from specified URLs to a given directory. Supports concurrent downloads if multiple threads are
specified.
Args:
url (str | list): The URL or list of URLs of the files to be downloaded.
dir (Path, optional): The directory where the files will be saved. Defaults to the current working directory.
unzip (bool, optional): Flag to unzip the files after downloading. Defaults to True.
delete (bool, optional): Flag to delete the zip files after extraction. Defaults to False.
curl (bool, optional): Flag to use curl for downloading. Defaults to False.
threads (int, optional): Number of threads to use for concurrent downloads. Defaults to 1.
retry (int, optional): Number of retries in case of download failure. Defaults to 3.
exist_ok (bool, optional): Whether to overwrite existing contents during unzipping. Defaults to False.
Example:
```python
download("https://ultralytics.com/assets/example.zip", dir="path/to/dir", unzip=True)
```
"""
# 将 dir 参数转换为 pathlib.Path 对象,方便后续路径操作。
dir = Path(dir)
# 使用 Path.mkdir 方法创建目标目录。
# parents=True :如果目标目录的父目录不存在,会递归创建。
# exist_ok=True :如果目录已存在,不会抛出异常。
dir.mkdir(parents=True, exist_ok=True) # make directory
# 判断是否使用多线程下载。如果 threads 参数大于 1 ,则使用多线程。
if threads > 1:
# pool = ThreadPool(processes=5)
# 在Python中, multiprocessing.pool.ThreadPool 是 multiprocessing 模块中的 pool 子模块提供的类之一,它用于创建一个线程池,以便并行执行多个线程任务。这个类是 multiprocessing 库的一部分,该库提供了一种方式来并行化程序,利用多核处理器的能力。
# 参数 :
# processes :参数指定线程池中的线程数量。
# 返回 :
# pool :创建一个线程池实例。
# 方法 ThreadPool 提供了以下方法 :
# apply_async(func, args=(), kwds={}) :异步地将一个函数 func 应用到 args 和 kwds 参数上,并返回一个 AsyncResult 对象。
# map(func, iterable, chunksize=1) :将一个函数 func 映射到一个迭代器 iterable 的所有元素上,并返回一个迭代器。
# close() :关闭线程池,不再接受新的任务。
# join() :等待线程池中的所有任务完成。
# terminate() :立即终止线程池中的所有任务。
# ThreadPool 是 multiprocessing 库中用于并行执行 I/O 密集型任务的工具,它允许程序利用多核处理器的能力来提高性能。需要注意的是, multiprocessing 库中的进程是系统级的进程,与线程相比,它们有更大的开销,但也能提供更好的并行性能。
# 使用 concurrent.futures.ThreadPool 创建一个线程池,线程数由 threads 参数指定。
with ThreadPool(threads) as pool:
# 使用 pool.map 方法将下载任务分配给线程池中的线程。
pool.map(
# 定义了一个匿名函数,调用 safe_download 函数进行实际的下载操作。
# def safe_download(url, file=None, dir=None, unzip=True, delete=False, curl=False, retry=3, min_bytes=1e0, exist_ok=False, progress=True,):
# -> 用于安全地下载文件,并提供多种功能,如解压、删除下载文件、重试机制等。返回解压后的目录路径,方便后续操作。
# -> return unzip_dir
lambda x: safe_download(
url=x[0],
dir=x[1],
unzip=unzip,
delete=delete,
curl=curl,
retry=retry,
exist_ok=exist_ok,
# 如果线程数为 1 ,则显示进度条。
progress=threads <= 1,
),
# 将 url 和重复的目标目录路径 dir 组合成一个迭代器,为每个线程提供下载任务。
zip(url, repeat(dir)),
)
# 关闭线程池,不再接受新的任务。
pool.close()
# 等待所有线程完成任务。
pool.join()
# 如果 threads 参数小于或等于 1 ,则不使用多线程,逐个下载文件。
else:
# 如果 url 是一个字符串或 Path 对象,则将其转换为一个列表(包含单个 URL)。如果 url 已经是一个列表,则直接遍历。
for u in [url] if isinstance(url, (str, Path)) else url:
# 调用 safe_download 函数逐个下载文件。
safe_download(url=u, dir=dir, unzip=unzip, delete=delete, curl=curl, retry=retry, exist_ok=exist_ok)
# 这段代码实现了一个功能丰富的文件下载工具,支持以下特性。多线程下载:通过 ThreadPool 实现多线程下载,提高下载效率。自动解压:下载完成后自动解压 ZIP 文件。删除源文件:可以选择是否删除下载后的源文件。使用 curl 下载:支持使用 curl 命令进行下载,适用于某些特殊场景。重试机制:支持下载失败时的重试机制。进度条显示:在单线程下载时显示进度条。目录创建:自动创建目标目录,确保下载路径有效。该函数适用于需要从网络下载文件的场景,尤其是需要处理多个文件或需要高效下载的场景。通过灵活的参数配置,可以满足不同的下载需求。
# curl 是一个非常强大的命令行工具,用于在不同类型的网络协议之间传输数据。它支持多种协议,包括 HTTP、HTTPS、FTP、FTPS、SMTP、IMAP 等。 curl 的名字来源于“Client URL”,即客户端 URL 工具。
# 主要用途 :
# 文件下载 :从服务器下载文件。
# 文件上传 :将文件上传到服务器。
# API 调用 :发送 HTTP 请求,与 RESTful API 或其他 Web 服务进行交互。
# 测试网络连接 :检查服务器的响应情况。
# 模拟浏览器行为 :发送带有特定头信息或数据的请求。
# 基本语法 :
# curl [选项] [URL]
# 常用选项 :
# -o <file> :将输出保存到指定文件。 curl -o example.txt http://example.com/file.txt
# -O :将输出保存到与远程文件同名的本地文件。 curl -O http://example.com/file.txt
# -L :跟随 HTTP 重定向。 curl -L http://example.com/redirect
# -X <method> :指定 HTTP 请求方法(如 GET 、 POST 、 PUT 、 DELETE )。 curl -X POST http://example.com/api -d "key=value"
# -d <data> :发送 HTTP POST 请求的数据。 curl -X POST http://example.com/api -d "key=value"
# -H <header> :添加自定义 HTTP 头。 curl -H "Content-Type: application/json" -d '{"key":"value"}' http://example.com/api
# -u <user:password> :用于 HTTP 基本认证。 curl -u user:password http://example.com/protected
# -v :显示详细的请求和响应信息,用于调试。 curl -v http://example.com
# 优点 :
# 跨平台 : curl 可以在 Linux、macOS 和 Windows 上使用。
# 功能强大 :支持多种网络协议和复杂的请求。
# 轻量级 :无需安装庞大的图形界面工具。
# 易于集成 :可以在脚本中使用,方便自动化任务。
# 总结 : curl 是一个非常实用的网络工具,广泛用于开发、测试和自动化任务中。它简单易用,功能强大,是每个开发者和系统管理员的必备工具之一。