python爬虫根据需要查找某个链接并保存
import re
import os
from urllib.parse import urlparse, quote
import asyncio
import aiohttp
from bs4 import BeautifulSoup
#所有下载文件
file_name = “1.txt”
async def fetch_url(session, url):
try:
# 首先尝试 HTTPS
async with session.get(url, timeout=5) as response:
if response.status == 200:
# 尝试多种编码
try:
return await response.text()
except UnicodeDecodeError:
# 如果 UTF-8 失败,尝试其他常见编码
content = await response.read()
for encoding in [‘gbk’, ‘gb2312’, ‘gb18030’, ‘big5’, ‘utf-8-sig’]:
try:
return content.decode(encoding)
except UnicodeDecodeError:
continue
# 如果所有编码都失败,使用 errors=‘ignore’ 忽略错误字符
return content.decode(‘utf-8’, errors=‘ignore’)
# 如果 HTTPS 失败,尝试 HTTP
if url.startswith('https://'):
http_url = 'http://' + url[8:]
print(f"HTTPS 失败,尝试 HTTP: {http_url}")
async with session.get(http_url, timeout=5) as response:
if response.status == 200:
try:
return await response.text()
except UnicodeDecodeError:
content = await response.read()
for encoding in ['gbk', 'gb2312', 'gb18030', 'big5', 'utf-8-sig']:
try:
return content.decode(encoding)
except UnicodeDecodeError:
continue
return content.decode('utf-8', errors='ignore')
return None
except Exception as e:
# 如果 HTTPS 出错,尝试 HTTP
if url.startswith('https://'):
try:
http_url = 'http://' + url[8:]
print(f"HTTPS 出错,尝试 HTTP: {http_url}")
async with session.get(http_url, timeout=5) as response:
if response.status == 200:
try:
return await response.text()
except UnicodeDecodeError:
content = await response.read()
for encoding in ['gbk', 'gb2312', 'gb18030', 'big5', 'utf-8-sig']:
try:
return content.decode(encoding)
except UnicodeDecodeError:
continue
return content.decode('utf-8', errors='ignore')
except Exception as e2:
print(f"HTTP 也失败: {str(e2)}")
else:
print(f"获取 {url} 失败: {str(e)}")
return None
async def extract_customer_service_links(html):
if not html:
return []
soup = BeautifulSoup(html, 'html.parser')
service_links = []
# 查找可能的客服链接(根据实际网站结构调整这些选择器)
patterns = [
# 通过文本内容查找
'在线客服', '联系客服', '人工客服', 'customer service', "客服", "kf", "kefu",
# 通过class或id查找
'service-link', 'customer-service', 'online-service'
]
for pattern in patterns:
# 查找包含相关文本的链接
links = soup.find_all(
'a', string=lambda text: text and pattern.lower() in text.lower())
service_links.extend(links)
# 查找可能的class或id
links = soup.find_all(
'a', class_=lambda x: x and pattern.lower() in x.lower())
service_links.extend(links)
return list(set(link.get('href') for link in service_links if link.get('href')))
def is_valid_url(url):
“”“验证URL是否为有效的网址格式”“”
try:
result = urlparse(url)
# 检查是否有效的URL格式
return all([result.scheme in (‘http’, ‘https’), result.netloc])
except Exception:
return False
async def process_domain(session, domain):
try:
if not domain.startswith((‘http://’, ‘https://’)):
url = f’https://{domain}’
else:
url = domain
print(f"\n处理网址: {url}")
html_content = await fetch_url(session, url)
if html_content:
customer_links = await extract_customer_service_links(html_content)
if customer_links:
results = []
print("找到的客服链接:")
for cs_link in customer_links:
try:
# 处理相对路径
if cs_link.startswith('/'):
parsed_url = urlparse(url)
cs_link = f"{parsed_url.scheme}://{parsed_url.netloc}{cs_link}"
elif not cs_link.startswith(('http://', 'https://')):
cs_link = f"https://{cs_link}"
# 验证URL格式
if not is_valid_url(cs_link):
print(f"跳过无效的客服链接: {cs_link}")
continue
print(f"- {cs_link}")
results.append(f"{domain} {cs_link}")
except Exception as e:
print(f"处理链接时出错: {str(e)}")
continue
return results
else:
print("未找到客服链接")
return []
except Exception as e:
print(f"处理域名 {domain} 时出错: {str(e)}")
return []
async def process_file():
if not os.path.exists(file_name):
print(f"文件 {file_name} 不存在")
return
result_file = file_name.rsplit('.', 1)[0] + '-result.txt'
# 读取已处理的域名
processed_domains = set()
try:
if os.path.exists(result_file):
with open(result_file, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
domain = line.split()[0].strip()
processed_domains.add(domain)
print(f"已处理过 {len(processed_domains)} 个域名")
except Exception as e:
print(f"读取已处理结果时出错: {str(e)}")
processed_domains = set()
try:
with open(file_name, 'r', encoding='utf-8') as f:
domains = f.read().splitlines()
except Exception as e:
print(f"读取文件失败: {str(e)}")
return
# 过滤出未处理的域名
domains_to_process = [d.strip() for d in domains if d.strip() and d.strip() not in processed_domains]
if not domains_to_process:
print("所有域名都已处理完成")
return
print(f"待处理域名数量: {len(domains_to_process)}")
try:
async with aiohttp.ClientSession() as session:
# 将域名列表分成大小为10的批次
batch_size = 10
for i in range(0, len(domains_to_process), batch_size):
batch = domains_to_process[i:i + batch_size]
print(f"\n处理第 {i//batch_size + 1} 批,共 {len(batch)} 个域名")
# 并发处理当前批次的域名
tasks = [process_domain(session, domain) for domain in batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理并保存结果
for results in batch_results:
if isinstance(results, list): # 正常的结果
for result in results:
try:
with open(result_file, 'a', encoding='utf-8') as f:
f.write(f"{result}\n")
except Exception as e:
print(f"保存结果时出错: {str(e)}")
else: # 发生异常的情况
print(f"任务执行出错: {results}")
# 可选:在批次之间添加短暂延迟,避免请求过于密集
await asyncio.sleep(1)
except Exception as e:
print(f"程序执行出错: {str(e)}")
finally:
print(f"\n处理完成,结果保存在: {result_file}")
if name == “main”:
asyncio.get_event_loop().run_until_complete(process_file())