使用DeepSeek+本地知识库,尝试从0到1搭建高度定制化工作流(爬虫模块篇)
6.环境配置
python=3.9
langchain==0.1.13
langchain-community==0.0.29
torch==2.0.1
transformers>=4.38.2
timm>=0.9.16
accelerate
sentencepiece
attrdict
einops
# for gradio demo
gradio==3.48.0
gradio-client==0.6.1
mdtex2html==1.3.0
pypinyin==0.50.0
tiktoken==0.5.2
tqdm==4.64.0
colorama==0.4.5
Pygments==2.12.0
markdown==3.4.1
SentencePiece==0.1.96
requests
fake_useragent
hashlib
playwright
其中playwright安装后需要安装对应的浏览器驱动,使用
playwright install
进行安装。
7.模块实现
7.1. 代理池建立
-
目的:获取代理IP,用于爬取数据时使用,频繁使用本地的IP地址可能会导致IP被封禁。本文仅供展示,因此仅选取部分免费的代理IP爬取建立线程池,免费的代理IP大多数质量不高,因此需要定期更新代理池或使用付费代理。
-
实现思路: - 1.读取json文件中的代理IP获取网址,因为网址的每一页包含多条代理IP,并且网址的每一页的URL格式相同,因此可以采用多线程的方式快速获取所有代理IP。将获取到的代理IP存入txt文件中,每日定时更新。 - 2.模拟人为请求,在爬取过程中模拟访问浏览器,而非单纯使用request库发起请求,避免被网站封禁。
-
代码实现:
from playwright.sync_api import sync_playwright
import time
import random
from fake_useragent import UserAgent
import json
from concurrent.futures import ThreadPoolExecutor
def fetch_ip(url):
"""每个线程使用独立的浏览器实例"""
with sync_playwright() as p:
try:
# 初始化浏览器实例
browser = p.chromium.launch(
headless=True,
args=[
'--disable-blink-features=AutomationControlled',
'--no-sandbox',
'--disable-setuid-sandbox'
]
)
# 创建新的上下文
context = browser.new_context(
user_agent=get_stealth_headers()['User-Agent'],
viewport={'width': 1920, 'height': 1080},
extra_http_headers=get_stealth_headers()
)
# 反检测脚本
context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
""")
page = context.new_page()
# 随机鼠标移动
page.mouse.move(
random.randint(0, 1920),
random.randint(0, 1080)
)
# 访问页面
page.goto(url, timeout=60000)
# 随机滚动
for _ in range(random.randint(1, 3)):
page.mouse.wheel(0, random.randint(300, 800))
time.sleep(0.5)
# 等待表格加载
page.wait_for_selector('table tbody tr', timeout=15000)
# 解析数据
rows = page.query_selector_all('table tbody tr')
ip_list = []
for row in rows:
try:
ip = row.query_selector('td:first-child').inner_text().strip()
port = row.query_selector('td:nth-child(2)').inner_text().strip()
ip_list.append(f'{ip}:{port}')
except:
continue
return ip_list
except Exception as e:
print(f'请求 {url} 失败: {str(e)}')
return []
finally:
# 确保资源释放
if 'page' in locals():
page.close()
if 'context' in locals():
context.close()
if 'browser' in locals():
browser.close()
def get_stealth_headers():
ua = UserAgent()
config = json.load(open('docs/ip_crawl_config.json', 'r', encoding='utf-8'))
return {
'User-Agent': ua.random,
'Accept-Language': 'en-US,en;q=0.9',
'Referer': config['referer']
}
def main():
config = json.load(open('docs/ip_crawl_config.json', 'r', encoding='utf-8'))
base_url = config['base_url']
urls = [f'{base_url}/{page_num}/' for page_num in range(1, 20)]
with ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(fetch_ip, urls)
with open('proxies.txt', 'w', encoding='utf-8') as f:
total = 0
for idx, ip_list in enumerate(results, 1):
if ip_list:
f.write('\n'.join(ip_list) + '\n')
print(f'第 {idx} 页获取到 {len(ip_list)} 条代理')
total += len(ip_list)
print(f'共获取 {total} 条代理')
if __name__ == '__main__':
main()
7.2. 数据爬取
-
目的:获取所要生成文案的相关数据,本项目所需要的是商品市场分析的有关数据,因此需要爬取对应平台以及第三方网站。
-
实现思路: - 1.分析网站数据爬取思路,若网站数据为静态加载,固定格式,可以直接通过python中的网页分析库进行爬取,例如beautifulsoup4等。 - 2.由于本项目所需爬取的网站结构相对复杂,因此使用了抓包的方式进行爬取。爬虫过程涉及js逆向分析,需要分析js代码,找到数据来源,通过python的requests库进行数据爬取。(感兴趣的朋友可以查看我先前发的文章,有详细讲解js逆向分析的过程) - 3.爬取数据后,对数据进行清洗,去除无用数据,将数据存储到本地json文件,方便后续使用。
-
代码实现:
'''
Author: yeffky
Date: 2025-02-09 17:18:05
LastEditTime: 2025-02-15 15:33:48
'''
import requests
import random
import time
import json
from fake_useragent import UserAgent
import hashlib
import time
from datetime import datetime
# 定义md5加密函数
def md5_hash(s):
return hashlib.md5(s.encode('utf-8')).hexdigest()
# Le函数,类似于JS中的Le函数
def getAuth(e):
# 第一次MD5,将输入转换为字符串并计算MD5
first_md5 = hashlib.md5(str(e).encode()).hexdigest()
# 拼接字符串并第二次计算MD5
second_md5 = hashlib.md5(combined.encode()).hexdigest()
return second_md5
def get_random_proxy():
# 尝试打开名为 'proxies.txt' 的文件
try:
with open('proxies.txt', 'r') as f:
# 读取文件内容并按行分割成列表
proxies = f.read().splitlines()
# 如果列表不为空,则随机选择一个代理并返回
if proxies:
return random.choice(proxies)
# 如果文件不存在,则捕获 FileNotFoundError 异常并忽略
except FileNotFoundError:
pass
# 如果文件不存在或列表为空,则返回 None
return None
def remove_invalid_proxy(proxy):
# 尝试执行以下代码块,如果发生异常则跳转到except块
try:
# 打开名为'proxies.txt'的文件,以读取模式('r')
with open('proxies.txt', 'r') as f:
proxies = f.read().splitlines()
if proxy in proxies:
# 检查传入的proxy是否在proxies列表中
proxies.remove(proxy)
with open('proxies.txt', 'w') as f:
f.write('\n'.join(proxies) + '\n')
except FileNotFoundError:
pass
def make_request(url, payload, auth, timestamp):
# 创建一个UserAgent对象,用于生成随机的User-Agent字符串
ua = UserAgent()
headers = {'User-Agent': ua.random, 'Auth': str(auth), 'Timestamp': str(timestamp), "Content-Type":"application/json",}
proxy = get_random_proxy() # 获取一个随机的代理
proxies = {'http': proxy, 'https': proxy} if proxy else None
try:
response = requests.post(
url,
headers=headers,
timeout=10,
proxies=proxies,
json=payload,
# verify=False
)
response.raise_for_status()
print(response)
return response.json()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
# If the request failed, remove the invalid proxy
if proxy:
remove_invalid_proxy(proxy)
# Try a new proxy
return None
def crawl_data():
# 加载爬虫配置文件
crawal_config = json.load(open('./docs/crawl_config.json', 'r'))
base_url = crawal_config['base_url'] # 获取基础URL
payload = crawal_config['payload'] # 获取请求负载
response_text = None
while not response_text: # Retry until the request is successful
# 时间戳
t = int((time.time()) * 1000) # 获取当前时间戳(毫秒)
auth = getAuth(t)
response_text = make_request(base_url, payload, auth, t)
if not response_text:
print("Retrying with a new proxy...")
time.sleep(random.randint(3, 5)) # Add a small delay before retrying
try:
data = response_text
items_list = data.get('data', {}).get('list', [])
# 获取今天的日期
today_date = datetime.now().strftime('%Y-%m-%d')
# 在文件名后加上今天的日期
filename = f'goods_{today_date}.json'
cnt = 0
with open(f'data/{filename}', 'w', encoding='utf-8') as f:
f.write('{"items": [' + '\n')
for item in items_list:
item['商品涨幅'] = item.pop('Priceincrease')
item['商品成交量'] = item.pop('SaleCount')
item['商品名称'] = item.pop('GoodsName')
item['商品最低价'] = item.pop('price')
item['商品图标'] = item.pop('iconUrl')
cnt += 1
item_str = json.dumps(item, ensure_ascii=False, indent=2)
if cnt == len(items_list):
f.write(item_str + '\n')
else:
f.write(item_str + ',\n\n')
f.write(']}' + '\n')
print(f"Successfully wrote {len(items_list)} items to goods.txt")
except json.JSONDecodeError:
print("Failed to decode JSON response")
if __name__ == "__main__":
crawl_data()