当前位置: 首页 > article >正文

python爬虫根据需要查找某个链接并保存

import re
import os
from urllib.parse import urlparse, quote
import asyncio
import aiohttp
from bs4 import BeautifulSoup
#所有下载文件
file_name = “1.txt”

async def fetch_url(session, url):
try:
# 首先尝试 HTTPS
async with session.get(url, timeout=5) as response:
if response.status == 200:
# 尝试多种编码
try:
return await response.text()
except UnicodeDecodeError:
# 如果 UTF-8 失败,尝试其他常见编码
content = await response.read()
for encoding in [‘gbk’, ‘gb2312’, ‘gb18030’, ‘big5’, ‘utf-8-sig’]:
try:
return content.decode(encoding)
except UnicodeDecodeError:
continue
# 如果所有编码都失败,使用 errors=‘ignore’ 忽略错误字符
return content.decode(‘utf-8’, errors=‘ignore’)

    # 如果 HTTPS 失败,尝试 HTTP
    if url.startswith('https://'):
        http_url = 'http://' + url[8:]
        print(f"HTTPS 失败,尝试 HTTP: {http_url}")
        async with session.get(http_url, timeout=5) as response:
            if response.status == 200:
                try:
                    return await response.text()
                except UnicodeDecodeError:
                    content = await response.read()
                    for encoding in ['gbk', 'gb2312', 'gb18030', 'big5', 'utf-8-sig']:
                        try:
                            return content.decode(encoding)
                        except UnicodeDecodeError:
                            continue
                    return content.decode('utf-8', errors='ignore')
    return None
except Exception as e:
    # 如果 HTTPS 出错,尝试 HTTP
    if url.startswith('https://'):
        try:
            http_url = 'http://' + url[8:]
            print(f"HTTPS 出错,尝试 HTTP: {http_url}")
            async with session.get(http_url, timeout=5) as response:
                if response.status == 200:
                    try:
                        return await response.text()
                    except UnicodeDecodeError:
                        content = await response.read()
                        for encoding in ['gbk', 'gb2312', 'gb18030', 'big5', 'utf-8-sig']:
                            try:
                                return content.decode(encoding)
                            except UnicodeDecodeError:
                                continue
                        return content.decode('utf-8', errors='ignore')
        except Exception as e2:
            print(f"HTTP 也失败: {str(e2)}")
    else:
        print(f"获取 {url} 失败: {str(e)}")
    return None

async def extract_customer_service_links(html):
if not html:
return []

soup = BeautifulSoup(html, 'html.parser')
service_links = []

# 查找可能的客服链接(根据实际网站结构调整这些选择器)
patterns = [
    # 通过文本内容查找
    '在线客服', '联系客服', '人工客服', 'customer service', "客服", "kf", "kefu",
    # 通过class或id查找
    'service-link', 'customer-service', 'online-service'
]

for pattern in patterns:
    # 查找包含相关文本的链接
    links = soup.find_all(
        'a', string=lambda text: text and pattern.lower() in text.lower())
    service_links.extend(links)

    # 查找可能的class或id
    links = soup.find_all(
        'a', class_=lambda x: x and pattern.lower() in x.lower())
    service_links.extend(links)

return list(set(link.get('href') for link in service_links if link.get('href')))

def is_valid_url(url):
“”“验证URL是否为有效的网址格式”“”
try:
result = urlparse(url)
# 检查是否有效的URL格式
return all([result.scheme in (‘http’, ‘https’), result.netloc])
except Exception:
return False

async def process_domain(session, domain):
try:
if not domain.startswith((‘http://’, ‘https://’)):
url = f’https://{domain}’
else:
url = domain

    print(f"\n处理网址: {url}")
    html_content = await fetch_url(session, url)
    if html_content:
        customer_links = await extract_customer_service_links(html_content)
        if customer_links:
            results = []
            print("找到的客服链接:")
            for cs_link in customer_links:
                try:
                    # 处理相对路径
                    if cs_link.startswith('/'):
                        parsed_url = urlparse(url)
                        cs_link = f"{parsed_url.scheme}://{parsed_url.netloc}{cs_link}"
                    elif not cs_link.startswith(('http://', 'https://')):
                        cs_link = f"https://{cs_link}"
                    
                    # 验证URL格式
                    if not is_valid_url(cs_link):
                        print(f"跳过无效的客服链接: {cs_link}")
                        continue
                        
                    print(f"- {cs_link}")
                    results.append(f"{domain} {cs_link}")
                except Exception as e:
                    print(f"处理链接时出错: {str(e)}")
                    continue
            return results
        else:
            print("未找到客服链接")
            return []
except Exception as e:
    print(f"处理域名 {domain} 时出错: {str(e)}")
    return []

async def process_file():
if not os.path.exists(file_name):
print(f"文件 {file_name} 不存在")
return

result_file = file_name.rsplit('.', 1)[0] + '-result.txt'

# 读取已处理的域名
processed_domains = set()
try:
    if os.path.exists(result_file):
        with open(result_file, 'r', encoding='utf-8') as f:
            for line in f:
                if line.strip():
                    domain = line.split()[0].strip()
                    processed_domains.add(domain)
        print(f"已处理过 {len(processed_domains)} 个域名")
except Exception as e:
    print(f"读取已处理结果时出错: {str(e)}")
    processed_domains = set()

try:
    with open(file_name, 'r', encoding='utf-8') as f:
        domains = f.read().splitlines()
except Exception as e:
    print(f"读取文件失败: {str(e)}")
    return

# 过滤出未处理的域名
domains_to_process = [d.strip() for d in domains if d.strip() and d.strip() not in processed_domains]
if not domains_to_process:
    print("所有域名都已处理完成")
    return

print(f"待处理域名数量: {len(domains_to_process)}")

try:
    async with aiohttp.ClientSession() as session:
        # 将域名列表分成大小为10的批次
        batch_size = 10
        for i in range(0, len(domains_to_process), batch_size):
            batch = domains_to_process[i:i + batch_size]
            print(f"\n处理第 {i//batch_size + 1} 批,共 {len(batch)} 个域名")
            
            # 并发处理当前批次的域名
            tasks = [process_domain(session, domain) for domain in batch]
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 处理并保存结果
            for results in batch_results:
                if isinstance(results, list):  # 正常的结果
                    for result in results:
                        try:
                            with open(result_file, 'a', encoding='utf-8') as f:
                                f.write(f"{result}\n")
                        except Exception as e:
                            print(f"保存结果时出错: {str(e)}")
                else:  # 发生异常的情况
                    print(f"任务执行出错: {results}")
            
            # 可选:在批次之间添加短暂延迟,避免请求过于密集
            await asyncio.sleep(1)
            
except Exception as e:
    print(f"程序执行出错: {str(e)}")
finally:
    print(f"\n处理完成,结果保存在: {result_file}")

if name == “main”:
asyncio.get_event_loop().run_until_complete(process_file())


http://www.kler.cn/a/506025.html

相关文章:

  • Python调用go语言编译的库
  • WordPress如何配置AJAX以支持点击加载更多?
  • 中台成熟度模型有什么用
  • replaceState和vue的router.replace删除query参数的区别
  • Java安全—SPEL表达式XXESSTI模板注入JDBCMyBatis注入
  • HTML实战课堂之启动动画弹窗
  • 阿里云 EMR 发布托管弹性伸缩功能,支持自动调整集群大小,最高降本60%
  • 如何解决 XGBoost 控制台警告:版本不一致导致的模型加载问题
  • day10_Structured Steaming
  • 【MATLAB代码】CV和CA模型组成的IMM(滤波方式为UKF),可复制粘贴源代码
  • 神经网络常见操作(卷积)输入输出
  • 【微服务】SpringBoot 通用异常处理方案使用详解
  • PyTorch使用教程(3)-Tensor包
  • C语言预处理艺术:编译前的魔法之旅
  • 人工智能-机器学习之多分类分析(项目实战二-鸢尾花的多分类分析)
  • git仓库迁移(从一个平台的仓库迁移到另一个平台的仓库)
  • (处理 Kafka 消息积压) - 高吞吐 + 零丢失的阻塞队列实战方案
  • Android 防止每次打开APP都显示启动页
  • 接口传参 data格式和json格式区别是什么
  • 基于Springboot + vue实现的旅游网站
  • LeetCode 3280. 将日期转换为二进制表示
  • HTML常用元素及其示例
  • react中hooks之useEffect 用法总结
  • 嵌入式Linux:什么是进程?
  • php基本数据结构
  • docker 部署 MantisBT