【Python 图片下载器】一款专门为爬虫制作的图片下载器,多线程下载,速度快,支持续传/图片缩放/图片压缩/图片转换
文章日期:2024.12.23
使用工具:Python
本章知识:制作一款图片下载器_DOS窗口(爬虫专用)
文章难度:低等(没难度)
文章全程已做去敏处理!!! 【需要做的可联系我】
AES解密处理(直接解密即可)(crypto-js.js 标准算法):在线AES加解密工具
注意:网络的带宽会对下载速度有所影响,包括图片链接的服务器所在地区,如果图片链接的服务器所在地区是海外的,那么下载图片的时候尽量使用代理流量包下载会更快。如有其他问题可以留言,勿喷!!!
23年7月左右测速最高达到了800M/s,实际速度请按照你们自己测出的来
讲的不是很详细,有问题请留言,欢迎大家指导
程序的界面是DOS窗口,并非GUI窗口。下载器不依赖于任何数据库,方便使用和转移
本章内的源代码是由23年4月份制作,经过多次修改,现已稳定使用,如有不同场景使用不了,可以告知作者,作者将会根据实际情况进行整改。
此项目不是针对于单个固定的网站下载图片,所以我采用的是读取本地文件和实时保存本地图片路径数据的方法进行读取和存储(excel格式),此方法比较灵活,不依赖数据库,也不依赖GUI窗口。如有介意可以自行修改源代码添加自定义功能
源代码大部分是在23年制作和修改,代码的命名可能不是很友好,但也进行了部分优化,凑合着看吧。
⬇️⬇️⬇️为了照顾部分使用人群,我将程序打包成了exe文件,供大家使用。源代码在文章最底部
名称 | 下载地址(百度盘) |
极速图片下载器 EXE(支持win10/win11) | 下载 (提取码 bfri ) 22M |
演示 / 测试文件(部分图片链接可能会失效导致报错) | 下载 (提取码 k3tr ) 3M |
给大家看一下程序运行时的图片,gif动图太大放不了,凑合看吧
出现错误连接的是不会被保存的,一整条都不会被保存
下面讲解整个运转流程步骤,文章结尾附上源码
【简单的流程:设置配置 -> 读取文件 -> 线程分配 -> 数据存储】
1、【配置设置】线程量、下载速度、超时时间、图片的处理、路径的存放、请求的参数
2、【读取文件】读取文件并将文件内的数据转为程序可识别的统一模板
3、【线程分配】按照设置好的线程量进行并发下载
4、【数据存储】下载完成的图片路径和信息会存储到队列,然后又独立的线程把队列里的数据进行保存到本地
1、【配置设置】
[headers][UA]:自定义可以直接输入你自己的UA。如果用固定默认的UA就输入1。如果想要用默认且不固定的就输入2。有效应对大部分网站的限制
[headers][Cookie]:自定义Cookie, 直接粘贴你要的cookie即可。这个功能基本不用,除非是特殊网站
[proxies][ip]:自定义海外流量包,专门用于下载海外图片,只要输入流量包的ip和端口即可,采用Socks4协议,可以通过源代码自行修改
[根目录][目录设置]:这个是设置所有图片的根目录,比如我设置了img,那么他会自动在img里创建多个字文件夹,每个文件夹对应每条不同的数据图片。
[每张图片下载的间隔时间][速度限制]:设置图片的下载间隔时间,一般情况设置0.001,如果网站卡,或网络卡,就不要设置太低,设置0.09也可以
[下载超时时间][请求超时设置]:这个是每个图片请求的总时间,超过这个时间就会直接返回错误,当然,软件内默认会重试3次。还是适当调整即可
[启用多少线程][并发每行数据]:根据自身情况设置并发线程量,如果不着急下载,就不要设置太高,因为对你自身的网卡占用也高,如果把人家网站搞崩了,本人是不负责的
[图片功能][是否启用]:内置了一些图片处理功能,可以边下载边处理图片
[图片压缩][压缩质量]:压缩图片专用,一般情况不设置,或设置90-100即可
[图片放大缩小][像素大小]:等比例缩放图片,程序会根据你设置的大小,把图片等比例缩放到你设置的大小范围,也是根据情况设置,不要超过4000就行,因为会模糊
[读取配置]:这个是链接的分割方法,如果你的链接是用列表保存的,则输入1,如果是用一写符号作为分割,那你就要输入对应的分割符,让程序可以读取识别出每一条链接
2、【读取文件】
我们要创建一个固定的文件,名称也是固定的【000.txt】,然后把表格里要下的链接粘贴到这个文件内,然后再去执行程序下载图片。
【000.txt】只能读取的两列,第一列为【唯一值】第二列为链接,这个链接可以是列表形式,也可以是连在一起的,但要有分隔符
下面这个图片是按照分隔符保存的
3、【线程分配】
程序就是进程,他会在程序刚开始的时候先创建一个线程用于监测数据保存队列(下面图片内没有展示),只有所有图片都下载完成后,他才会停止。
下载图片的线程分配逻辑如下图,程序会根据你设置的线程量进行创建父线程,每个父线程是对应一个条数据,父线程会创建多个子线程,子线程数量是和当前数据的连接数量匹配,然后由子线程下载图片,父线程监测子线程,并完成最后的数据整理,存放到数据保存队列内,由队列线程进行保存,这是为了防止资源争夺。如果保存数据的时候,采用的是数据库,则无需使用队列线程
4、【数据存储】
这是一条独立的线程执行的函数,当程序下载图片开始的时候,此线程就会启动,并一直处于监测状态,监测队列有没有数据,有则保存无则等待。如果不想要这个累赘的东西,可以采用数据库保存的方法执行,就可以去除这个烦人的队列。
下面给大家看一下源码,并略微讲一下每个模块的作用
1、给大家先看一下类,程序内只有这三个类,也是为了方法修改和整理,有些代码还是比较分散的,大家可以自行整理,我都用习惯了。
2、先来看一下 【配置】 的类
这是一些默认值,我所规定的默认值比较严格,就是太快要求有点高,大家还是要根据自己情况进行设置
这是dos窗口所需要的配置输入功能,这没什么可说的,比较简单。代码丑不要介意啊
分别是获取局域网的ip,方便大家使用(支持获取无线网 / 支持获取有线网)。还有函数一个是用于输出控制台不同颜色字体的功能
第一个是检测文件是否存在,另一个是用于图片的处理,比如图片压缩/等比例缩放/png背景填充/格式转换。注意:想使用图片处理的功能,那么线程就要启动少一点,否则线程过多运算不过来,CPU会瞬间沾满,会卡顿的,为了避免这种请求,使用图片处理功能,就不要开太多线程
3、来看一下 【下载模块】 的类
这个是下载图片的核心部位,是采用 requests 模块下载,我只想将他包装了一下,也没用什么可稀奇的
4、来看一下 【线程资源分配】 的类
直接看图吧,图更容易理解,结合源代码去看去理解
【附上源码】 切记,只能读取【000.txt】文件。不喜勿喷
"""
File Name: 超级下载器_DOS版本
Author: 小木_.
Date Created: 2022-11-2
Last Modified: 2024-12-23
Version: 2.0
Python version: 3.12.7
Description: 一款图片超级下载装置,拉满你的带宽下载图片,可以边下载边处理图片,支持jpg/png格式图的下载与处理。支持压缩/有损放大缩小/图片检测功能。支持多级路径保存图片
"""
import json
import subprocess
import filetype
import time
import random
import os
import re
import copy
import requests
from concurrent.futures import ThreadPoolExecutor
import colorama
from typing import Union, Dict, Literal, List
from io import BytesIO
colorama.init(autoreset=True)
from PIL import Image
'''
安装模块
filetype==1.2.0
requests==2.32.3
colorama==0.4.6
Pillow==11.0.0
'''
'''
模板:
——-——-——-——-——-——-——-——
| | |
| 唯一值 | 图片链接 |
| | |
——-——-——-——-——-——-——-——
注意:如果遇到图片有问题的,比如图片的后缀不是jpg或png的,可以尝试修改链接的后缀。或者在下载图片的时候不要添加UA头,已确保图片的内容是正常的jpg/png
'''
'''[配置信息] - 只实列化一次'''
class config:
def __init__(self,
ua_str: str = '',
cookie: str = '',
proxies_ip: str = '',
root_name: str = 'root_directory',
rewrite_sku: bool = False,
time_sleep: float = 0.09,
timeout: int = 10,
thread_num: int = 1,
img_function: bool = False,
compress_img: Union[int, None] = None,
conversion_img: bool = False,
enlarge_img: Union[int, None] = None,
img_segmentation: str = '|'):
'''[头部设置]'''
self.headers_ua = ua_str
self.headers_cookie = cookie
'''[代理设置]:127.0.0.1:7890'''
self.proxies_ip = proxies_ip
'''[根目录名称/文件夹]'''
self.root_name = root_name
'''[是否启用重写SKU唯一值]'''
self.rewrite_sku = rewrite_sku
'''[每张图片下载的间隔时间]:0.001-0.09-值越小越稳定越慢'''
self.time_sleep = time_sleep
'''[下载图片时超时时间]'''
self.timeout = timeout
'''[线程量/同时下载多少个商品]'''
self.thread_num = thread_num
'''[图片功能是否启用]'''
self.img_function = img_function
'''[图片压缩]'''
self.compress_img = compress_img
'''[图片转换]'''
self.conversion_img = conversion_img
'''[图片放大]'''
self.enlarge_img = enlarge_img
'''[读取文件]'''
self.FILE_data = '000.txt'
'''[完成数据存储]'''
self.FILE_data_done = '[下载]完成-完成-完成.txt'
# '''[查重数据存储]'''
# self.FILE_data_check = '[下载]查重-查重-查重.txt'
'''[图片错误数据存储]'''
self.FILE_data_error = '[下载]图片错误-图片错误-图片错误.txt'
# '''[全部错误数据存储]'''
# self.FILE_data_all_error = '[下载]全部错误-全部错误-全部错误.txt'
'''[读取配置]'''
self.img_segmentation = img_segmentation
'''[整个程序核心开关]'''
self.run_switch: bool = True
'''[启动] 如果不用DOS窗口,可以直接移除此手动选项'''
self.run()
'''配置自动设置'''
def run(self):
'''run仅用于DOS输入选择,如果该配置模块预装到程序软件内,则无需run模块执行'''
print(self.colors('Red', r'''
___
|_ _|_ __ ___ __ _ __ _ ___
| || '_ ` _ \ / _` |/ _` |/ _ \
| || | | | | | (_| | (_| | __/
|___|_| |_| |_|\__,_|\__, |\___| _
| _ \ _____ __|___/| | ___ __ _ __| |
| | | |/ _ \ \ /\ / / '_ \| |/ _ \ / _` |/ _` |
| |_| | (_) \ V V /| | | | | (_) | (_| | (_| |
|____/ \___/ \_/\_/ |_| |_|_|\___/ \__,_|\__,_| 图片下载器...'''))
# 检测 000.txt文件是否存在,不存在则执行停止
if not self.check_file(self.FILE_data):
self.run_switch = False
print(self.colors('Red',
'\n\t[配置设置]:要读取的文件不存在,请检查文件是否和当前程序在同一路径!\n\t[error]:缺少000.txt文件!!!读取不到'))
input('\n按回车退出程序....')
quit()
print(self.colors('Red',
'\n\n\t[配置设置快捷操作]:请按照要求进行配置\n\t1、不需要设置的内容请直接按回车,程序会自动使用默认值,没有默认值的则不会启动\n\t2、凡是布尔类型的选项直接回车默认都是False'))
__ua = input(
'\n\n\t[headers][UA]设置User-Agent / 直接输入UA / 1为内置固定UA / 2为防指纹UA(默认为空)\n\t请输入:')
if __ua == '1': self.headers_ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Safari/537.36 Edg/91.0.864.48'
if __ua and __ua != '1': self.headers_ua = __ua
__cookie = input('\n\t[headers][Cookie]设置Cookie(默认为空)\n\t请输入:')
if __cookie: self.headers_cookie = __cookie
__ip = input(
f'\n\t[proxies][ip]设置代理IP 格式为:ip:port | 当前局域网ip为 {self.get_local_ip()}:8080(默认为空)\n\t请输入:')
if __ip: self.proxies_ip = __ip
__root_name = input(
f'\n\t[根目录][目录设置]设置图片根文件夹名称(自动创建/纯英文+数字) 可设置多级 img/nike(默认为 root_directory文件夹)\n\t请输入:')
if __root_name: self.root_name = __root_name
# __rewrite_sku = input(f'\n\t[图片文件夹][SKU设置]【研发中..】设置唯一值是否要重置刷 - 新请输入选项 / 1-False / 2-True:')
# if str(__rewrite_sku) == '2':self.rewrite_sku = True
__time_sleep = input(
f'\n\t[每张图片下载的间隔时间][速度限制]请输入小数-值大则慢则稳定-值小则快则不稳定 / 0.001~0.09(默认为0.09)\n\t请输入:')
if __time_sleep: self.time_sleep = float(__time_sleep)
__timeout = input(
f'\n\t[下载超时时间][请求超时设置]请输入整数-值小则快则不稳定-值大则慢则稳定 / 1~50(默认为10)\n\t请输入:')
if __timeout: self.timeout = int(__timeout)
__thread_num = input(
f'\n\t[启用多少线程][并发每行数据]请输入整数-值小则线程少则慢则稳定-值大则线程越多则快则不稳定 / 1~100(默认为1)\n\t请输入:')
if __thread_num: self.thread_num = int(__thread_num)
__img_function = input(f'\n\t[图片功能][是否启用]请输入数字选项 / 1-False / 2-True(默认为1)\n\t请输入:')
if __img_function == '2': self.img_function = True
if self.img_function:
__compress_img = input(f'\n\t[图片压缩][压缩质量]请输入整数-值越小越模糊 / 1~100(默认为空)\n\t请输入:')
if __compress_img: self.compress_img = int(__compress_img)
# __conversion_img = input(f'\n\t[图片转换][是否启用]【默认为启动无法关闭】转换为jpg 透明底会被填充为白色 请输入选项 / 1-False / 2-True:')
# if str(__conversion_img) == '2': self.conversion_img = True
__enlarge_img = input(
f'\n\t[图片放大缩小][像素大小]请设置图片最大宽高的限制(程序会自动等比例缩放) / 500~4000(默认为空)\n\t请输入:')
if __enlarge_img: self.enlarge_img = int(__enlarge_img)
if not self.compress_img and not self.enlarge_img:
self.img_function = False
print(
self.colors('Yellow', '\t[图片功能][是否启用]图片压缩未设置/图片放大缩小未设置,已自动关闭图片功能'))
__img_segmentation = input(
f'\n\t[读取配置]请输入图片的分割方式 / 通常是【|#$@】分割,如果是列表请输入1(默认为|符号)\n\t请输入:')
if __img_segmentation == '1': self.img_segmentation = '列表'
# 头部
if self.headers_ua or self.headers_cookie:
self.headers = {}
(lambda x: self.headers.update({'User-Agent': x}) if bool(x) else None)(self.headers_ua)
(lambda x: self.headers.update({'Cookie': x}) if bool(x) else None)(self.headers_cookie)
else:
self.headers = None
# 代理
if self.proxies_ip:
self.proxies = {'https': f'Socks4://{self.proxies_ip}', 'http': f'Socks4://{self.proxies_ip}'}
else:
self.proxies = None
# 获取本地局域网ip地址
def get_local_ip(self) -> str:
try:
# 使用Windows命令 ipconfig 获取局域网IP
result = subprocess.run(['ipconfig'], capture_output=True, text=True)
# 处理一下dos输出内容,方便提取 为了防止出现问题,结尾补上两个空的
output = result.stdout.replace('\t', '').replace(' ', '').split('\n\n') + [''] * 2
for i in range(len(output) - 1):
network1 = output[i].replace('\n', '')
network2 = output[i + 1].replace('\n', '')
if '以太网适配器以太网' in network1 and '媒体状态............:媒体已断开连接' not in network2:
_ip = re.findall('IPv4地址.*?:(.*?)子', network2)
ip_address = _ip and _ip[0] or '获取失败'
return ip_address
if '无线局域网适配器WLAN' in network1 and '媒体状态............:媒体已断开连接' not in network2:
_ip = re.findall('IPv4地址.*?:(.*?)子', network2)
ip_address = _ip and _ip[0] or '获取失败'
return ip_address
return '未联网'
except:
return '报错'
# 控制台颜色
def colors(self,
value: Literal["Red", "Green", "Yellow", "Blue", "Cyan", "White", "darkWhite", "darkGreen", "darkRed"],
text: str) -> str:
return {
'Red': "\033[91m{}\033[00m".format(text),
'Green': "\033[92m{}\033[00m".format(text),
'Yellow': "\033[93m{}\033[00m".format(text),
'Blue': "\033[94m{}\033[00m".format(text),
'Cyan': "\033[96m{}\033[00m".format(text),
'White': "\033[97m{}\033[00m".format(text),
'darkWhite': "\033[2;97m{}\033[00m".format(text),
'darkGreen': "\033[32;11m{}\033[00m".format(text),
'darkRed': "\033[31;12m{}\033[00m".format(text),
}[value]
# 检测文件是否存在
def check_file(self, FILE_data: str) -> bool:
if not os.path.exists(FILE_data):
return False
else:
return True
# 图片转换压缩调整
def image_convert(self, img_file: str, background_color: str = '#ffffff', format: Literal["JPEG", "PNG"] = 'JPEG',
max_size: Union[int, None] = None, quality_size: Union[int, None] = None):
'''
:param img_file: 图片路径名称
:param background_color: 背景填充颜色 rgb(255,255,255) 背景填充颜色 hxe #ffffff 默认填充 #ffffff
:param format: 图片保存格式 PNG JPEG 默认JPEG
:param max_size: 图片大小等比例调整,会将图片等比例缩放到当前设置的大小范围 默认无
:param quality_size: 图片压缩 100-0 None为不压缩,0为极致压缩 50比较好 默认无
该模块的作用: 注意,该模块会对图片里的属性内容进行清除,清除后图片的占用大小会自然降低,也有可能会变高一点,对质量的影响可以忽略不计。如果不想清除原有图片的内容信息,可以不使用该模块或将该模块进行二次开发
1、支持压缩图片质量
2、支持将png透明背景进行填充
3、支持将图片转换为jpg/png
4、支持等比例缩放图片大小
'''
dict_data = {'msg': '', 'code': ''}
# 打开图片并转换为RGBA模式
image = Image.open(img_file).convert('RGBA')
if background_color: # (255,255,255) #ffffff
# 创建白色背景图片
background = Image.new('RGB', image.size, background_color)
dict_data['REG'] = str(background_color)
else:
return {'code': 'False', 'msg': '[background_color]背景颜色必须设置', 'file': img_file}
# 将源图片粘贴到背景图片上
background.paste(image, (0, 0), image)
image = background
# 确定调整后的尺寸
width, height = image.size
dict_data['size'] = f'{width}x{height}'
# 图片大小设置
if max_size:
if width > height:
new_width = max_size
new_height = int((height / width) * max_size)
else:
new_width = int((width / height) * max_size)
new_height = max_size
# 调整图像大小
image = image.resize((new_width, new_height))
dict_data['size'] = f'{new_width}x{new_height}'
dict_data['max_size'] = max_size
# 确定输出文件名和路径
if format == 'PNG':
output_filename = os.path.splitext(os.path.basename(img_file))[0] + '.png'
output_filename = os.path.split(img_file)[0] + '/' + output_filename
dict_data['format'] = 'PNG'
else:
if format == 'JPEG':
output_filename = os.path.splitext(os.path.basename(img_file))[0] + '.jpg'
output_filename = os.path.split(img_file)[0] + '/' + output_filename
dict_data['format'] = 'JPEG'
else:
return {'code': 'False', 'msg': '设置图片格式错误', 'file': img_file}
dict_data['file'] = img_file
# 保存转换后的图像
if quality_size:
image.save(output_filename, format=format, quality=quality_size)
dict_data['file_save'] = output_filename
dict_data['quality_size'] = quality_size
dict_data['code'] = 'True'
dict_data['msg'] = '数据保存完成 - 已成功转换'
return dict_data
else:
image.save(output_filename, format=format)
dict_data['file_save'] = output_filename
dict_data['code'] = 'True'
dict_data['msg'] = '数据保存完成 - 已成功转换'
return dict_data
'''[新版图片下载器]'''
class Image_Downloader:
# 获取图片像素大小
def get_image_size(self, image_path: Union[str, None] = None, image_content: Union[bytes, None] = None) -> Union[
list, str]:
if not image_path and not image_content:
return '图片像素大小获取失败,未接收到文件或数据'
try:
if image_content:
image_path = BytesIO(image_content)
with Image.open(image_path) as img:
width, height = img.size
return [width, height]
except Exception as e:
return f'图片有问题,未读取出像素大小 - {e}'
# 正常下载图片模块 单
def XZTP(self, URL: str, FILE_data1: str = 'img', FILE_data2: str = '1.jpg', headers: Union[Dict, None] = None,
proxies: Union[Dict, None] = None, timeout: Union[int, None] = 1, verify: bool = True) -> Dict[
str, Union[str, int, list, float, requests.Response]]:
'''
:param URL: 图片链接
:param FILE_data1: 图片路径 不含文件名称 D:/图片
:param FILE_data2: 要保存的图片名称+后缀 11.jpg
:param headers: 伪装头
:param proxies: 代理 请用字典格式 默认None 未开启
:param timeout: 超时时间 默认1秒内
:param verify: 是否验证SSL证书 【False=禁用 SSL 证书验证。 True=开启】默认开启 提示:禁用可以有效提高速度
:return: 返回数据 字典格式 code为固定值
'''
try:
# 如果文件夹不存在,则直接创建,exist_ok忽略已存在的文件夹
try:
os.makedirs(FILE_data1, exist_ok=True)
except Exception as e:
return {'msg': f'地址路径错误 - {e}', 'code': -1, 'URL': URL}
# 进行检测
if not os.path.exists(FILE_data1) and (FILE_data1 != ''):
return {'msg': '地址路径错误', 'code': -1, 'URL': URL}
try: # 开始请求
Conin = requests.get(URL, timeout=timeout, headers=headers, proxies=proxies, verify=verify)
except Exception as e:
return {'msg': f'[Get]请求失败,请检查 网络/代理/URL链接 - {e}', 'code': -1, 'Response': '...',
'URL': URL}
# 字节
data_length = int(Conin.headers.get("Content-Length", len(Conin.content)))
# 计算 MB/KB
ContentLength = ['MB', round(data_length / 1024 / 1024, 3)] if data_length > 2048 else ['KB', round(
data_length / 1024, 3)]
if Conin.status_code != 200:
return {'msg': '[Get]请求成功,状态码非200', 'code': 0, 'Response': Conin, 'URL': URL,
'TimeElapsed': (Conin.elapsed.total_seconds() if type(
Conin.elapsed) != float else Conin.elapsed),
f"Content-Length({ContentLength[0]})": ContentLength[1]}
# 请求成功 wb+ 强制顶替
with open(FILE_data1 + '/' + FILE_data2, 'wb+') as file:
file.write(Conin.content)
except Exception as e:
return {'msg': f'[Get]错误语句 - {e}', 'code': -1, 'Response': '...', 'URL': URL}
# 检查图片是否存在
if os.path.exists(FILE_data1 + '/' + FILE_data2):
# 检查文件是否是正常图片
if filetype.guess(FILE_data1 + '/' + FILE_data2):
return {'msg': '[Get]保存成功', 'code': 1, 'Response': Conin,
# 'X-Y': self.get_image_size(image_path=str(FILE_data1 + '/' + FILE_data2)),
'X-Y': self.get_image_size(image_content=Conin.content),
'file': str(FILE_data1 + '/' + FILE_data2), 'URL': URL,
f"Content-Length({ContentLength[0]})": ContentLength[1]}
return {'msg': '[Get]保存成功 - 非正常图片', 'code': -1, 'Response': Conin,
'file': str(FILE_data1 + '/' + FILE_data2), 'URL': URL,
f"Content-Length({ContentLength[0]})": ContentLength[1]}
return {'msg': '[Get]保存失败 - 图片不存在', 'code': -1, 'Response': Conin,
'file': str(FILE_data1 + '/' + FILE_data2), 'URL': URL,
f"Content-Length({ContentLength[0]})": ContentLength[1]}
# 正常下载图片模块 多
def XZTP_D(self, URL: str, FILE_data1: str = 'img', FILE_data2: str = '1.jpg', headers: Union[Dict, None] = None,
proxies: Union[Dict, None] = None, timeout: Union[int, None] = 1, verify: bool = True,
time_s: Union[int, float] = 1,
SetRequestsFrequency: Literal["A", "B"] = 'A') -> Dict[
str, Union[str, int, list, float, requests.Response]]:
'''
:param URL: ...
:param FILE_data1: 图片路径 不含文件名称 D:/图片
:param FILE_data2: 要保存的图片名称+后缀 11.jpg
:param headers: ...
:param proxies: ...
:param timeout: 超时时间
:param time_s: 重试的间隔时间
:param SetRequestsFrequency: A:请求一次 B:请求三次
:return:
'''
# 计算重试次数
Frequency_limit: int = 1
while True:
# 请求1次 无论是好是坏都返回
if SetRequestsFrequency == 'A':
Conin = self.XZTP(URL, FILE_data1=FILE_data1, FILE_data2=FILE_data2, headers=headers, proxies=proxies,
timeout=timeout, verify=verify)
Conin['msg'] = f'[{Frequency_limit}]' + Conin['msg']
return Conin
# 请求3次 三次过后无论是好是坏都返回,如果三次内有200则直接返回
if SetRequestsFrequency == 'B':
Conin = self.XZTP(URL, FILE_data1=FILE_data1, FILE_data2=FILE_data2, headers=headers, proxies=proxies,
timeout=timeout, verify=verify)
if Conin['code'] == 1:
Conin['msg'] = f'[{Frequency_limit}]' + Conin['msg']
return Conin
if Frequency_limit == 3:
Conin['msg'] = f'[{Frequency_limit}]' + Conin['msg']
return Conin
Frequency_limit += 1
# 重试的间隔时间
time.sleep(time_s)
'''线程调用 主程序'''
class start_up:
def __init__(self):
'''[当前线程数量]'''
self.thread_num: int = 0
'''[数据存储]'''
self.data_root: List[Dict] = []
'''[总数据行数-固定]'''
self.data_rows_num: int = 0
'''[总图片数量-计算]'''
self.data_img_num: int = 0
'''[成功下载数据行数/成功下载图片总数/失败下载数据行数/失败下载图片总数]'''
self.data_num_assemble: List[int] = [0, 0, 0, 0]
'''[采集的数据行数-实时刷新]'''
self.data_gather_num: int = 0
'''[采集的开始时间 秒单位(时:分:秒:毫 = --:--:--:---)]'''
self.time_consuming: int = int(time.time() * 1000)
'''[所有的采集图片总大小计算 未开发]'''
# self.img_occupancy: int = 0
# 检测是否可以执行程序 - 配置有没有完成
if configs.run_switch:
self.run()
def Identify_content(self, data: str) -> list:
'''内容识别 模板内的链接转为列表返回'''
if configs.img_segmentation == '列表':
try:
# 将列表格式化成字典格式
# data: list = json.loads(data)
data: list = json.loads(json.dumps(eval(data)))
# 检测是否是列表
if type(data) != list:
print(configs.colors('Red', f'你的图片并非列表格式,程序已退出'))
return []
return data
except Exception as e:
# 关闭核心全部线程
configs.run_switch = False
print(configs.colors('Red', f'抱歉,你的图片并非列表格式,或者json格式有误,程序已退出 - {e}'))
return []
return data.split(configs.img_segmentation)
def run(self):
# 创建 ThreadPoolExecutor 一个线程,用于队列存储数据
# 创建后直接运行线程,该线程会伴随着程序一直运行,直到程序停止
ThreadPoolExecutor(max_workers=1).submit(self.thread_coroutine_transfer)
# 读取000文件数据
with open(configs.FILE_data, 'r', encoding='utf-8') as f:
data_read = f.readlines()
# 总数据量
self.data_rows_num = len(data_read)
# 循环每一行数据
for data in data_read:
# 去掉开头结尾的空格和回车字符,并转为列表方便提取数据
data_list = data.strip().split('\t')
'''[唯一值/SKU]'''
SKU = data_list[0]
'''[url链接/列表形式] 无论图片链接是怎么存储的,都要转换为列表才能进行分配线程进行采集'''
url_list = self.Identify_content(data_list[1])
# 用while循环是为了方便监控线程
while True:
# 监测线程的数量,有缺位立马补上
if self.thread_num != configs.thread_num:
# 线程量+1
self.thread_num += 1
# 创建 ThreadPoolExecutor,创建一个父线程,然后在父线程里创建多个子线程
# 子线程下载图片,父线程用于监控子线程的执行,子线程都支持完成后,父线程将把数据进行整理然后传递给存储队列
# 创建后直接运行线程即可,while循环将会一直监测线程数量的创建,不用担心
ThreadPoolExecutor(max_workers=1).submit(self.download_main, SKU, url_list)
# 略微给一个休息时间,不给也可以,自己看着办
time.sleep(0.01)
break
# 等待某一个线程结束,然后开启新线程
time.sleep(0.4)
# 等待线程全部结束
while self.thread_num != 0:
time.sleep(0.3)
# 运行完成了,可以关闭核心进程了,提醒其他线程,我们已经完事了,都收工吧
configs.run_switch = False
# 父-下载模块-用于线程监控 链接处理 线程资源分配
def download_main(self, SKU: str, url_list: list):
"""
:param SKU: 就是一个唯一值,这个sku是用于对图片的命名和文件夹的创建
:param url_list: 图片列表
:return:
"""
'''[线程量]'''
thread_count = len(url_list)
# 数据传输,每次调用都要实列化一个新的容器壳子,防止数据修改或数据混乱
resource = {
"z_img": [""] * thread_count,
"b_img": [""] * thread_count
}
# 实时采集数量+1
self.data_gather_num += 1
# 临时获取数量
num = self.data_gather_num
# 创建 ThreadPoolExecutor,最大线程数为10
with ThreadPoolExecutor(max_workers=thread_count) as executor:
# 循环开启线程,一个一个启动
futures = []
'''[图片的命名/图片的存储位置]'''
fixed = 0
# 将列表内的链接循环出来 单个执行
for url in url_list:
# 总图片进行叠加计算
self.data_img_num += 1
# 将链接进行简单处理 去掉头部尾部的空格和换行符 防止出现请求错误
url_handle = url.strip()
# 每张图片下载的间隔时间 每次循环都会+1 所以我们在刚开始的时候可以设置1~100(越小越慢) 用来控制此处的间隔速度 防止高并发
time.sleep(configs.time_sleep)
# 创建图片存放的文件夹
os.makedirs(f'{configs.root_name}/{SKU}', exist_ok=True)
FILE_data1 = f'{configs.root_name}/{SKU}'
FILE_data2 = f'{SKU}_{fixed}.jpg'
# 启动线程
futures += [executor.submit(self.download, url_handle, FILE_data1, FILE_data2, fixed, resource, num)]
fixed += 1
# 等待所有任务完成 进行阻塞等待
for future in futures:
future.result()
resource['SKU'] = SKU
resource['code'] = 1
# 检测下载结果有没有空的,出现空的图片,说明下载失败
if '' in resource['b_img']:
resource['code'] = -1
self.data_root += [resource]
# 【外部线程】告诉线程我执行完成了 可以开启下一个线程了
self.thread_num -= 1
# 子下载模块-用于下载图片 返回结果
def download(self, url: str, FILE_data1: str, FILE_data2: str, fixed: int, resource: dict, num: int):
"""
:param url: 图片链接
:param FILE_data1: 图片路径 不含文件名称 D:/图片
:param FILE_data2: 要保存的图片名称+后缀 11.jpg
:param fixed: fixed 关联着图片的位置(链接对应本地图片) 和 图片的命名(防重复)
:param resource: resource 图片的存储遍历,此线程会把指定的图片按照 fixed 的位置存储到字典里,返回给父线程
:param num: 当前采集的数据编号(排序号)
:return:
"""
# 如果是给空的,则直接跳过
if url == "":
resource['z_img'][fixed] = ""
resource['b_img'][fixed] = ""
return
print(configs.colors('Cyan', f'开始下载[{num}/{self.data_rows_num}]:{url}'))
file_img_filetype = None
if os.path.exists(FILE_data1 + '/' + FILE_data2):
file_img_filetype = filetype.guess(FILE_data1 + '/' + FILE_data2)
print(configs.colors('White',
str(file_img_filetype) + ' -- ' + str(file_img_filetype.MIME) + ' -- ' + str(
file_img_filetype.EXTENSION)))
# 如果图片不存在或图片有问题,则重新下载
if not file_img_filetype:
# 一定要在线程内实列化,防止发生数据交换,每次使用都要重新实列化
Image_Downloaders = Image_Downloader()
headers = copy.deepcopy(configs.headers)
# UA防指纹
if configs.headers and configs.headers.get('User-Agent') == '2':
headers[
"User-Agent"] = f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/{random.randint(512, 538)}.{random.randint(9, 37)} (KHTML, like Gecko) Chrome/{random.randint(100, 121)}.0.0.0 Safari/{random.randint(512, 538)}.{random.randint(9, 37)}"
headers[
"Sec-Ch-Ua"] = f"\"Not A(Brand\";v=\"{random.randint(70, 99)}\", \"Brave\";v=\"{random.randint(70, 120)}\", \"Chromium\";v=\"{random.randint(70, 120)}\""
headers[
f"F{random.randint(10000000, 99999999)}"] = f"D{random.randint(100, 999)}{random.randint(100, 999)}{random.randint(100, 999)}"
data_dict = Image_Downloaders.XZTP_D(URL=url, FILE_data1=FILE_data1, FILE_data2=FILE_data2,
headers=headers,
proxies=configs.proxies, timeout=configs.timeout, time_s=1,
SetRequestsFrequency='B')
if data_dict['code'] == 1:
# 成功下载图片总数
self.data_num_assemble[1] += 1
print(configs.colors('Green', str(data_dict)))
resource['z_img'][fixed] = url
resource['b_img'][fixed] = data_dict['file']
# 如果启动了图片功能则执行
if configs.img_function:
time.sleep(0.001)
__data = configs.image_convert(data_dict['file'], format='JPEG', max_size=(
lambda x: configs.enlarge_img if configs.enlarge_img else None)(configs.enlarge_img),
quality_size=(lambda
x: configs.compress_img if configs.compress_img else None)(
configs.compress_img))
print(configs.colors('White', str(__data)))
else:
# 失败下载图片总数
self.data_num_assemble[3] += 1
print(configs.colors('Red', str(data_dict)))
else:
# 成功下载图片总数
self.data_num_assemble[1] += 1
print(configs.colors('Green', str({'msg': '[Get]保存已存在 - 跳过下载 - 直接保存', 'code': 'True',
'file': str(FILE_data1 + '/' + FILE_data2)})))
resource['z_img'][fixed] = url
resource['b_img'][fixed] = FILE_data1 + '/' + FILE_data2
# 存储队列
def thread_coroutine_transfer(self):
# Temporary 的作用就是方便while检测,可延迟两次,防止数据有遗漏
Temporary = ['', '']
# 这个循环主要是检测线程有没有全部关闭,如果全部关闭了,则要经过两次的 Temporary 删除才能关闭整个程序的运行,都是为了防止数据遗漏未保存
while configs.run_switch or Temporary:
# 当线程都说已经结束了,我们需要等待几秒,再运行2次,防止数据有遗漏
if not configs.run_switch:
# 删除
Temporary.pop(0)
time.sleep(1)
else:
time.sleep(2)
for _ in range(len(self.data_root)):
data = self.data_root.pop(0)
z_img = (
configs.img_segmentation.join(data['z_img']) if configs.img_segmentation != '列表' else json.dumps(
data['z_img']))
b_img = (
configs.img_segmentation.join(data['b_img']) if configs.img_segmentation != '列表' else json.dumps(
data['b_img']))
if data['code'] == -1:
'''[图片错误数据存储]'''
# 失败下载数据行数
self.data_num_assemble[2] += 1
with open(configs.FILE_data_error, 'a', encoding='utf-8') as f:
f.write(f'{data["SKU"]}\t' + z_img + '\t' + b_img + '\n')
elif data['code'] == 1:
'''[完好数据存储]'''
# 成功下载数据行数
self.data_num_assemble[0] += 1
with open(configs.FILE_data_done, 'a', encoding='utf-8') as f:
f.write(f'{data["SKU"]}\t' + z_img + '\t' + b_img + '\n')
if __name__ == "__main__":
# 公用配置信息,实列化
configs = config()
# 启动
start = start_up()
# 延迟个一秒
time.sleep(1)
# 计算出时间戳相差值
time_diff = int(time.time() * 1000) - start.time_consuming
# 计算出所消耗的时间
# 计算小时、分钟、秒、毫秒
hours = time_diff // (1000 * 3600) # 计算小时
minutes = (time_diff % (1000 * 3600)) // (1000 * 60) # 计算分钟
seconds = (time_diff % (1000 * 60)) // 1000 # 计算秒
milliseconds = time_diff % 1000 # 计算毫秒
print(f'\n\t总用时(时:分:秒:毫):{hours:02}:{minutes:02}:{seconds:02}:{milliseconds:03}')
print(f'\t数据行数为:{start.data_rows_num} 总图片数量为:{start.data_img_num}')
print(f'\t成功下载数据行数为:{start.data_num_assemble[0]} 成功下载图片总数为:{start.data_num_assemble[1]}')
print(f'\t失败下载数据行数为:{start.data_num_assemble[2]} 失败下载图片总数为:{start.data_num_assemble[3]}')
print('\n\t数据已经爬取完成!!!')
input('\t请回车进行关闭页面!!!')
input('\t请再次回车进行关闭页面!!!\n\n')