当前位置: 首页 > article >正文

从 GitHub 批量下载项目各版本的方法

一、脚本功能概述

这个 Python 脚本的主要功能是从 GitHub 上下载指定项目的各个发布版本的压缩包(.zip 和 .tar.gz 格式)。用户需要提供两个参数:一个是包含项目信息的 CSV 文件,另一个是用于保存下载版本信息的 CSV 文件。脚本会遍历项目列表,访问每个项目的 tags 页面,下载所有可用的版本压缩包,并记录相关信息到指定的 CSV 文件中。

二、脚本使用说明

在运行脚本前,请确保你已经安装了 requests 和 beautifulsoup4 库。如果未安装,可以使用以下命令进行安装:

bash

pip install requests beautifulsoup4

运行脚本时,在命令行中输入以下格式的命令:

bash

python script.py project_list.csv save_info.csv

其中,script.py 是脚本文件名,project_list.csv 是包含项目信息的 CSV 文件,save_info.csv 是用于保存下载版本信息的 CSV 文件。

下面是完整的脚本。

import requests
from bs4 import BeautifulSoup
import os
import csv
import sys
import time
import random
from urllib.error import HTTPError
import signal


# 设置GitHub API的个人访问令牌
# 从这里获取:https://github.com/settings/tokens
access_token = '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'

# 分页参数
page = 1
num = 0
savelastfilename =""
lastfilename = ""
url_with_page = ""
fieldnames = ['项目名称', 'tags', '版本号', '压缩包名','是否有发布']
file = None
writer = None

proxies = {
    "https1": "https://182.204.177.61:4331",
    "https2": "https://140.255.150.253:4361",
    "https3": "https://113.231.18.51:4334",
    "https4": "https://116.7.192.240:43581",
    "https5": "https://121.61.160.170:43311",
    "https6": "https://124.231.69.245:43311",
    "https7": "https://183.128.97.139:43251",
    "https8": "https://124.94.188.113:43341",
    "https9": "https://1.82.107.78:4389",
    "https10": "https://1.82.107.49:4379",
}

headers = {
    'User-Agent': 'Mozilla/5.0',
    'Authorization': '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',
    'Content-Type': 'text/html',
    'Accept': 'application/json'
}


# 定义信号处理函数
def signal_handler(sig, frame):
    print("\n收到了中断信号,程序退出!!!")
    sys.exit(0)

# 注册信号处理函数
signal.signal(signal.SIGINT, signal_handler)

def get_html_url_with_tags(file_name):
    html_urls = []
    Name = ''
    with open(file_name, 'r', newline='') as file:
        reader = csv.DictReader(file)
        for row in reader:
            Name = row.get('Name')
            html_url = row.get('HTML URL')
            if html_url:
                new_html_url = html_url + "/tags"
                html_urls.append(new_html_url)
                #print("Name="+ Name +"  url=" + new_html_url)
    return Name,html_urls

def download_file(url, save_path, timeout=20, max_retries=3):
    retries = 0
    while retries < max_retries:
        try:
            # 发送GET请求获取文件内容,设置超时时间
            #@retry(tries=3, delay=2)  # 重试3次,每次间隔2秒
            #response = requests.get(url, proxies=proxies, headers=headers, timeout=15)
            response = requests.get(url, proxies=proxies, timeout=15)
            # 检查响应状态码
            if response.status_code == 200:
                # 写入文件
                print(f"正在下载文件{save_path}...",end='')
                with open(save_path, 'wb') as f:
                    f.write(response.content)
                print(f"...文件下载完成")
                return True
            else:
                print(f"下载失败:状态码 {response.status_code}")
                #print("下载失败:超过最大重试次数")
                check_connection_error(response)
        except requests.exceptions.Timeout:
            print(f"请求超时,正在尝试重试...", end="")
        except requests.exceptions.RequestException as e:
            print(f"请求异常:{e}", end="")

        retries += 1
        print(f"重试次数:{retries}")
    
    return False

def check_connection_error(response):
    """检查是否由于连接问题而无法访问 GitHub"""
    if response.status_code == 200:
        print("返回200!")
        return True
    if response.status_code == 403:
        print("已达到 GitHub API 请求限制!")
        return False
    elif response.status_code == 400:
        print("服务器无法理解请求!")
        return False
    elif response.status_code == 502:
        print("远程服务器关闭了连接。")
        return False
    elif response.status_code == 404:
        print("未找到请求的资源。")
        return False
    elif response.status_code == 407:
        print("代理服务器需要身份验证!")
        return False
    elif response.status_code == 406:
        print("客户端请求问题,可能是代理失效,建议更换代理IP列表")
        return False
    elif response.status_code == 429:
        print("请求过多,请稍后重试。")
        return False
    elif response.status_code == 504:
        print("网关超时,等等并重试或更换其它代理!")
        return False
    elif response.status_code >= 500:
        print("远程服务器内部错误。")
        return False
    else:
        try:
            print("远程服务器返回未知错误,正在尝试获取更多详细信息...")
            response.raise_for_status()  # 如果请求失败,这将抛出一个 HTTPError 异常
            print("获取详细信息失败,当前状态码为:", response.status_code)
            return False  # 如果无法获取详细信息,则返回 False,表示请求失败
        except HTTPError as e:
            print("发生了 HTTPError 异常:", e)
            return False  # 如果抛出 HTTPError 异常,则返回 False,表示请求失败
        except ConnectionError as ce:
            print("连接被远程服务器关闭,没有返回任何响应。")
            return False  # 如果捕获到 ConnectionError 异常,则返回 False,表示请求失败
        except requests.exceptions.Timeout:
            print("连接超时:可能是由于网络问题导致的连接失败")
            return False
        except requests.exceptions.ConnectionError:
            print("连接错误:无法连接到服务器")
            return False
        except requests.exceptions.RequestException as e:
            print("请求异常,最可能是代理问题:", e)
            return False

def wait_and_retry(wait_time=30):
    """等待一段时间后重试请求"""
    print(f"等待 {wait_time} 秒后重试...")
    time.sleep(wait_time)

def get_github_rate_limit(headers):
    url = "https://api.github.com/rate_limit"
    headers = {
        'User-Agent': 'Mozilla/5.0',
        'Authorization': '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',
        'Content-Type': 'text/html',
        'Accept': 'application/json'
    }
    #response = requests.get(url, proxies=proxies, headers=headers)
    response = requests.get(url, proxies=proxies)
    data = response.json()
    limit = data["rate"]["limit"]
    remaining = data["rate"]["remaining"]
    reset_time = data["rate"]["reset"]
    print(f"限速检查完成...")
    return limit, remaining, reset_time 

def update_ifcheck_value(file_name, Name):
    """打开 CSV 文件,将指定 Name 对应的行的 ifcheck 字段值修改为 1"""
    rows = []
    with open(file_name, 'r', newline='') as file:
        reader = csv.DictReader(file)
        fieldnames = reader.fieldnames  # 获取表头字段名
        for row in reader:
            if row.get('Name') == Name:
                row['ifcheck'] = '1'  # 将 ifcheck 字段值修改为 1
            rows.append(row)
    # 写回 CSV 文件
    with open(file_name, 'w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows)

def renamefile(name,filename):
    current_path = os.getcwd()
    old_name = filename
    # 设置新的文件名
    new_name = name + "_" + filename
    # 构建新文件的完整路径
    new_path = os.path.join(current_path, new_name)
    # 构建旧文件的完整路径
    old_path = os.path.join(current_path, old_name)
    # 重命名文件
    if os.path.isfile(new_path):
        return False
    else:
        os.rename(old_path, new_path)
        return True

def analyze_download_links(Name, url):
    global lastfilename,num,headers
    global writer,file
    while True:
        
        #判断是否为多页
        if num == 20:
            url_with_page = url + "?after=" + lastfilename
            num = 0
        else:
            url_with_page = url
        print("访问的url=" + url_with_page)
        
        #判断是否被限速,被限速的话,等待10~20秒
        limit, remaining, reset_time=get_github_rate_limit(headers)
        if remaining == 0:
            count = 1
            time.sleep(random.randint(10, 20))
            print("剩余请求次数为 0 了,现在更新header" )
            headers = {
                'User-Agent': 'User-Agent:Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0',
                'Authorization': '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',
                'Content-Type': 'text/html',
                'Accept': 'application/json'
            }
        
        #通过一个代理列表中中的一个代理获取当前url项目的tags页面
        #response = requests.get(url_with_page,proxies=proxies, headers=headers)
        #直接获取当前url项目的tags页面
        response = requests.get(url_with_page,proxies=proxies)
        #进行容错判断,如果链接错误,则等一会重新链接
        print("进行服务器返回检查中..." )
        if False == check_connection_error(response):
            wait_and_retry()
            print("服务器返回错误,请稍等一会..." )
            #time.sleep(random.randint(10, 20))
            headers = {
                'User-Agent': 'Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11',
                'Authorization': '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',
                'Content-Type': 'text/html',
                'Accept': 'application/json'
            }
            continue
        #如果没有响应,则10~20秒,更换header重新链接
        if response is None:
            # Exit loop if HTTP Error 422
            print("GitHub 未响应")
            time.sleep(random.randint(10, 20))
            headers = {
                'User-Agent': 'Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11',
                'Authorization': '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',
                'Content-Type': 'text/html',
                'Accept': 'application/json'
            }
            continue
        #如果返回200,说明请求正确,处理返回数据
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')
            #如果项目tags中没有版本,则返回     
            if "There aren’t any releases here" in response.text:
                with open(save_file_name, 'a', newline='') as file:
                    writer = csv.writer(file)
                    writer.writerow([Name, url, '', '','0'])
                print("当前项目tags下无发布版本")
                break
            #H抓取tags页面中所有的下载链接
            download_links = soup.find_all('a', href=lambda href: href and (href.endswith('.zip') or href.endswith('.tar.gz')))
            #print("所有链接:" + download_links)
            if len(download_links) == 0:
                print("当前项目tags下无发布版本")
                break
            #分析每一个链接
            for link in download_links:
                #print("Found download link:", link['href'])
                file_name = os.path.basename(link['href'])  #file_name是压缩包名字

                if os.path.isfile(file_name):
                    print("该项目版本已经下载,略过!")
                    continue
               
                if download_file("https://github.com/" + link['href'], file_name):  #拼接为完整下载链接后下载文件
                    #print(f"确认文件已下载完成")
                    renamefile(Name,file_name)
                    
                    lastfilename, _ = os.path.splitext(file_name)  #lastfilename 是去掉.zip或.gz后的文件名
                    if lastfilename.endswith('.tar'):  #如果后面还有tar后缀
                        lastfilename, _ = os.path.splitext(lastfilename)  #lastfilename 文件名称,实际上版本号
                    #项目名称,tags url、版本号和压缩包文件名,是否找到发布包写入文件。1表示有发布包
                    with open(save_file_name, 'a', newline='') as file:
                        writer = csv.writer(file)
                        writer.writerow([Name, url, lastfilename, file_name,'1'])
                    num = num + 1  #当前下载文件数加1
                else:
                    headers = {
                        'User-Agent': 'Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11',
                        'Authorization': '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',
                        'Content-Type': 'text/html',
                        'Accept': 'application/json'
                    }
                    print(f"更换header重新下载...")
                    download_file("https://github.com/" + link['href'], file_name)  #拼接为完整下载链接后下载文件
                if  num == 20:
                    continue
                #如果有多页,每页返回10个版本,一共20个压缩包,如果不到20表示版本页面不到一页,    
            if num < 20:
                break
        #user_input = input("请输入任意内容,按 Enter 键结束程序:")
        #if user_input:
        #    print("用户输入了ctrl+C:", user_input)
        #
        #break
        else:
            print(f'页面返回不是200时,返回状态码是: {response.status_code}')
            continue
 
# 测试程序
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("请提供两个参数:第一个参数是项目列表;第二个需要创新的文件用来保存该项目版本信息")
        sys.exit(1)
    
    file_name = sys.argv[1]
    if not file_name.endswith('.csv'):
        print("Please provide a CSV file.")
        sys.exit(1)
    if os.path.exists(file_name):
        print("打开文件,开始分析...") 
    else:
        print("输入文件不存在,请确认")
        sys.exit(1)
    
    save_file_name = sys.argv[2]
    if not save_file_name.endswith('.csv'):
        print("Please provide a CSV save file.")
        sys.exit(1)
    #如果保存文件不存在,则创建文件,添加表头
    if os.path.exists(save_file_name):
        print("保存文件已经存在,会在文件后面追加数据") 
    else:
        with open(save_file_name, 'a', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(fieldnames)

    #Name,html_urls = get_html_url_with_tags(file_name)
    #for url in html_urls:
    Name = ''
    with open(file_name, 'r', newline='') as file:
        reader = csv.DictReader(file)
        for row in reader:
            ifcheck = row.get('ifcheck')
            Name = row.get('Name')
            html_url = row.get('HTML URL')
            if ifcheck == '0' and html_url:
                new_html_url = html_url + "/tags"
                analyze_download_links(Name,new_html_url)
                update_ifcheck_value(file_name, Name)
            #user_input = input("您的输入中断了下载,按Ctrl+C键结束程序,其它键继续下载")
            #if user_input:
             #   print("用户输入了:", user_input)
             #   break
    print("检测完成!") 

————————————————————————————————


http://www.kler.cn/a/579277.html

相关文章:

  • 复合机器人:重新定义生产流程的核心引擎
  • Oracle SQL优化实战要点解析(11)——索引、相关子查询及NL操作(1)
  • 基于Spring Boot的城市垃圾分类管理系统的设计与实现(LW+源码+讲解)
  • 深度学习驱动的智能化革命:从技术突破到行业实践
  • Redis篇:基础知识总结与基于长期主义的内容更新
  • 降级选型啊
  • [数据结构算法递归]另一棵树的子树
  • IMX6ULL驱动开发Linux篇02——移植Rootfs
  • 如何在unity中完整录制一段动画
  • Python数据可视化创意分享:探索数据背后的故事
  • 跟踪性能提高11%|端到端新架构DMAD:通过分离语义-运动学习解决负迁移难题
  • C++ 数据结构详解及学习规划
  • Unity RenderFeature Configure和OnCameraSetup之区别
  • Python 数据可视化
  • Windows11下玩转 Docker
  • 数据结构第八节:红黑树(初阶)
  • 使用数据库和缓存的时候,是如何解决数据不一致的问题的?
  • MyBatis 中常用的 SQL 语句
  • 运动控制卡--概述学习
  • open webui-二次开发-源码启动前后端工程-【超简洁步骤】