从 GitHub 批量下载项目各版本的方法
一、脚本功能概述
这个 Python 脚本的主要功能是从 GitHub 上下载指定项目的各个发布版本的压缩包(.zip
和 .tar.gz
格式)。用户需要提供两个参数:一个是包含项目信息的 CSV 文件,另一个是用于保存下载版本信息的 CSV 文件。脚本会遍历项目列表,访问每个项目的 tags
页面,下载所有可用的版本压缩包,并记录相关信息到指定的 CSV 文件中。
二、脚本使用说明
在运行脚本前,请确保你已经安装了 requests
和 beautifulsoup4
库。如果未安装,可以使用以下命令进行安装:
bash
pip install requests beautifulsoup4
运行脚本时,在命令行中输入以下格式的命令:
bash
python script.py project_list.csv save_info.csv
其中,script.py
是脚本文件名,project_list.csv
是包含项目信息的 CSV 文件,save_info.csv
是用于保存下载版本信息的 CSV 文件。
下面是完整的脚本。
import requests
from bs4 import BeautifulSoup
import os
import csv
import sys
import time
import random
from urllib.error import HTTPError
import signal
# 设置GitHub API的个人访问令牌
# 从这里获取:https://github.com/settings/tokens
access_token = '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'
# 分页参数
page = 1
num = 0
savelastfilename =""
lastfilename = ""
url_with_page = ""
fieldnames = ['项目名称', 'tags', '版本号', '压缩包名','是否有发布']
file = None
writer = None
proxies = {
"https1": "https://182.204.177.61:4331",
"https2": "https://140.255.150.253:4361",
"https3": "https://113.231.18.51:4334",
"https4": "https://116.7.192.240:43581",
"https5": "https://121.61.160.170:43311",
"https6": "https://124.231.69.245:43311",
"https7": "https://183.128.97.139:43251",
"https8": "https://124.94.188.113:43341",
"https9": "https://1.82.107.78:4389",
"https10": "https://1.82.107.49:4379",
}
headers = {
'User-Agent': 'Mozilla/5.0',
'Authorization': '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',
'Content-Type': 'text/html',
'Accept': 'application/json'
}
# 定义信号处理函数
def signal_handler(sig, frame):
print("\n收到了中断信号,程序退出!!!")
sys.exit(0)
# 注册信号处理函数
signal.signal(signal.SIGINT, signal_handler)
def get_html_url_with_tags(file_name):
html_urls = []
Name = ''
with open(file_name, 'r', newline='') as file:
reader = csv.DictReader(file)
for row in reader:
Name = row.get('Name')
html_url = row.get('HTML URL')
if html_url:
new_html_url = html_url + "/tags"
html_urls.append(new_html_url)
#print("Name="+ Name +" url=" + new_html_url)
return Name,html_urls
def download_file(url, save_path, timeout=20, max_retries=3):
retries = 0
while retries < max_retries:
try:
# 发送GET请求获取文件内容,设置超时时间
#@retry(tries=3, delay=2) # 重试3次,每次间隔2秒
#response = requests.get(url, proxies=proxies, headers=headers, timeout=15)
response = requests.get(url, proxies=proxies, timeout=15)
# 检查响应状态码
if response.status_code == 200:
# 写入文件
print(f"正在下载文件{save_path}...",end='')
with open(save_path, 'wb') as f:
f.write(response.content)
print(f"...文件下载完成")
return True
else:
print(f"下载失败:状态码 {response.status_code}")
#print("下载失败:超过最大重试次数")
check_connection_error(response)
except requests.exceptions.Timeout:
print(f"请求超时,正在尝试重试...", end="")
except requests.exceptions.RequestException as e:
print(f"请求异常:{e}", end="")
retries += 1
print(f"重试次数:{retries}")
return False
def check_connection_error(response):
"""检查是否由于连接问题而无法访问 GitHub"""
if response.status_code == 200:
print("返回200!")
return True
if response.status_code == 403:
print("已达到 GitHub API 请求限制!")
return False
elif response.status_code == 400:
print("服务器无法理解请求!")
return False
elif response.status_code == 502:
print("远程服务器关闭了连接。")
return False
elif response.status_code == 404:
print("未找到请求的资源。")
return False
elif response.status_code == 407:
print("代理服务器需要身份验证!")
return False
elif response.status_code == 406:
print("客户端请求问题,可能是代理失效,建议更换代理IP列表")
return False
elif response.status_code == 429:
print("请求过多,请稍后重试。")
return False
elif response.status_code == 504:
print("网关超时,等等并重试或更换其它代理!")
return False
elif response.status_code >= 500:
print("远程服务器内部错误。")
return False
else:
try:
print("远程服务器返回未知错误,正在尝试获取更多详细信息...")
response.raise_for_status() # 如果请求失败,这将抛出一个 HTTPError 异常
print("获取详细信息失败,当前状态码为:", response.status_code)
return False # 如果无法获取详细信息,则返回 False,表示请求失败
except HTTPError as e:
print("发生了 HTTPError 异常:", e)
return False # 如果抛出 HTTPError 异常,则返回 False,表示请求失败
except ConnectionError as ce:
print("连接被远程服务器关闭,没有返回任何响应。")
return False # 如果捕获到 ConnectionError 异常,则返回 False,表示请求失败
except requests.exceptions.Timeout:
print("连接超时:可能是由于网络问题导致的连接失败")
return False
except requests.exceptions.ConnectionError:
print("连接错误:无法连接到服务器")
return False
except requests.exceptions.RequestException as e:
print("请求异常,最可能是代理问题:", e)
return False
def wait_and_retry(wait_time=30):
"""等待一段时间后重试请求"""
print(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
def get_github_rate_limit(headers):
url = "https://api.github.com/rate_limit"
headers = {
'User-Agent': 'Mozilla/5.0',
'Authorization': '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',
'Content-Type': 'text/html',
'Accept': 'application/json'
}
#response = requests.get(url, proxies=proxies, headers=headers)
response = requests.get(url, proxies=proxies)
data = response.json()
limit = data["rate"]["limit"]
remaining = data["rate"]["remaining"]
reset_time = data["rate"]["reset"]
print(f"限速检查完成...")
return limit, remaining, reset_time
def update_ifcheck_value(file_name, Name):
"""打开 CSV 文件,将指定 Name 对应的行的 ifcheck 字段值修改为 1"""
rows = []
with open(file_name, 'r', newline='') as file:
reader = csv.DictReader(file)
fieldnames = reader.fieldnames # 获取表头字段名
for row in reader:
if row.get('Name') == Name:
row['ifcheck'] = '1' # 将 ifcheck 字段值修改为 1
rows.append(row)
# 写回 CSV 文件
with open(file_name, 'w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
def renamefile(name,filename):
current_path = os.getcwd()
old_name = filename
# 设置新的文件名
new_name = name + "_" + filename
# 构建新文件的完整路径
new_path = os.path.join(current_path, new_name)
# 构建旧文件的完整路径
old_path = os.path.join(current_path, old_name)
# 重命名文件
if os.path.isfile(new_path):
return False
else:
os.rename(old_path, new_path)
return True
def analyze_download_links(Name, url):
global lastfilename,num,headers
global writer,file
while True:
#判断是否为多页
if num == 20:
url_with_page = url + "?after=" + lastfilename
num = 0
else:
url_with_page = url
print("访问的url=" + url_with_page)
#判断是否被限速,被限速的话,等待10~20秒
limit, remaining, reset_time=get_github_rate_limit(headers)
if remaining == 0:
count = 1
time.sleep(random.randint(10, 20))
print("剩余请求次数为 0 了,现在更新header" )
headers = {
'User-Agent': 'User-Agent:Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0',
'Authorization': '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',
'Content-Type': 'text/html',
'Accept': 'application/json'
}
#通过一个代理列表中中的一个代理获取当前url项目的tags页面
#response = requests.get(url_with_page,proxies=proxies, headers=headers)
#直接获取当前url项目的tags页面
response = requests.get(url_with_page,proxies=proxies)
#进行容错判断,如果链接错误,则等一会重新链接
print("进行服务器返回检查中..." )
if False == check_connection_error(response):
wait_and_retry()
print("服务器返回错误,请稍等一会..." )
#time.sleep(random.randint(10, 20))
headers = {
'User-Agent': 'Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11',
'Authorization': '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',
'Content-Type': 'text/html',
'Accept': 'application/json'
}
continue
#如果没有响应,则10~20秒,更换header重新链接
if response is None:
# Exit loop if HTTP Error 422
print("GitHub 未响应")
time.sleep(random.randint(10, 20))
headers = {
'User-Agent': 'Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11',
'Authorization': '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',
'Content-Type': 'text/html',
'Accept': 'application/json'
}
continue
#如果返回200,说明请求正确,处理返回数据
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
#如果项目tags中没有版本,则返回
if "There aren’t any releases here" in response.text:
with open(save_file_name, 'a', newline='') as file:
writer = csv.writer(file)
writer.writerow([Name, url, '', '','0'])
print("当前项目tags下无发布版本")
break
#H抓取tags页面中所有的下载链接
download_links = soup.find_all('a', href=lambda href: href and (href.endswith('.zip') or href.endswith('.tar.gz')))
#print("所有链接:" + download_links)
if len(download_links) == 0:
print("当前项目tags下无发布版本")
break
#分析每一个链接
for link in download_links:
#print("Found download link:", link['href'])
file_name = os.path.basename(link['href']) #file_name是压缩包名字
if os.path.isfile(file_name):
print("该项目版本已经下载,略过!")
continue
if download_file("https://github.com/" + link['href'], file_name): #拼接为完整下载链接后下载文件
#print(f"确认文件已下载完成")
renamefile(Name,file_name)
lastfilename, _ = os.path.splitext(file_name) #lastfilename 是去掉.zip或.gz后的文件名
if lastfilename.endswith('.tar'): #如果后面还有tar后缀
lastfilename, _ = os.path.splitext(lastfilename) #lastfilename 文件名称,实际上版本号
#项目名称,tags url、版本号和压缩包文件名,是否找到发布包写入文件。1表示有发布包
with open(save_file_name, 'a', newline='') as file:
writer = csv.writer(file)
writer.writerow([Name, url, lastfilename, file_name,'1'])
num = num + 1 #当前下载文件数加1
else:
headers = {
'User-Agent': 'Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11',
'Authorization': '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',
'Content-Type': 'text/html',
'Accept': 'application/json'
}
print(f"更换header重新下载...")
download_file("https://github.com/" + link['href'], file_name) #拼接为完整下载链接后下载文件
if num == 20:
continue
#如果有多页,每页返回10个版本,一共20个压缩包,如果不到20表示版本页面不到一页,
if num < 20:
break
#user_input = input("请输入任意内容,按 Enter 键结束程序:")
#if user_input:
# print("用户输入了ctrl+C:", user_input)
#
#break
else:
print(f'页面返回不是200时,返回状态码是: {response.status_code}')
continue
# 测试程序
if __name__ == "__main__":
if len(sys.argv) != 3:
print("请提供两个参数:第一个参数是项目列表;第二个需要创新的文件用来保存该项目版本信息")
sys.exit(1)
file_name = sys.argv[1]
if not file_name.endswith('.csv'):
print("Please provide a CSV file.")
sys.exit(1)
if os.path.exists(file_name):
print("打开文件,开始分析...")
else:
print("输入文件不存在,请确认")
sys.exit(1)
save_file_name = sys.argv[2]
if not save_file_name.endswith('.csv'):
print("Please provide a CSV save file.")
sys.exit(1)
#如果保存文件不存在,则创建文件,添加表头
if os.path.exists(save_file_name):
print("保存文件已经存在,会在文件后面追加数据")
else:
with open(save_file_name, 'a', newline='') as file:
writer = csv.writer(file)
writer.writerow(fieldnames)
#Name,html_urls = get_html_url_with_tags(file_name)
#for url in html_urls:
Name = ''
with open(file_name, 'r', newline='') as file:
reader = csv.DictReader(file)
for row in reader:
ifcheck = row.get('ifcheck')
Name = row.get('Name')
html_url = row.get('HTML URL')
if ifcheck == '0' and html_url:
new_html_url = html_url + "/tags"
analyze_download_links(Name,new_html_url)
update_ifcheck_value(file_name, Name)
#user_input = input("您的输入中断了下载,按Ctrl+C键结束程序,其它键继续下载")
#if user_input:
# print("用户输入了:", user_input)
# break
print("检测完成!")
————————————————————————————————