当前位置: 首页 > article >正文

使用DeepSeek+本地知识库,尝试从0到1搭建高度定制化工作流(爬虫模块篇)

6.环境配置

python=3.9
langchain==0.1.13
langchain-community==0.0.29


torch==2.0.1
transformers>=4.38.2
timm>=0.9.16
accelerate
sentencepiece
attrdict
einops


# for gradio demo
gradio==3.48.0
gradio-client==0.6.1
mdtex2html==1.3.0
pypinyin==0.50.0
tiktoken==0.5.2
tqdm==4.64.0
colorama==0.4.5
Pygments==2.12.0
markdown==3.4.1
SentencePiece==0.1.96
requests
fake_useragent
hashlib
playwright

其中playwright安装后需要安装对应的浏览器驱动,使用

playwright install

进行安装。

7.模块实现

7.1. 代理池建立
  • 目的:获取代理IP,用于爬取数据时使用,频繁使用本地的IP地址可能会导致IP被封禁。本文仅供展示,因此仅选取部分免费的代理IP爬取建立线程池,免费的代理IP大多数质量不高,因此需要定期更新代理池或使用付费代理。

  • 实现思路:   - 1.读取json文件中的代理IP获取网址,因为网址的每一页包含多条代理IP,并且网址的每一页的URL格式相同,因此可以采用多线程的方式快速获取所有代理IP。将获取到的代理IP存入txt文件中,每日定时更新。   - 2.模拟人为请求,在爬取过程中模拟访问浏览器,而非单纯使用request库发起请求,避免被网站封禁。

  • 代码实现:

from playwright.sync_api import sync_playwright
import time
import random
from fake_useragent import UserAgent
import json
from concurrent.futures import ThreadPoolExecutor


def fetch_ip(url):
    """每个线程使用独立的浏览器实例"""
    with sync_playwright() as p:
        try:
            # 初始化浏览器实例
            browser = p.chromium.launch(
                headless=True,
                args=[
                    '--disable-blink-features=AutomationControlled',
                    '--no-sandbox',
                    '--disable-setuid-sandbox'
                ]
            )
            
            # 创建新的上下文
            context = browser.new_context(
                user_agent=get_stealth_headers()['User-Agent'],
                viewport={'width': 1920, 'height': 1080},
                extra_http_headers=get_stealth_headers()
            )
            
            # 反检测脚本
            context.add_init_script("""
                Object.defineProperty(navigator, 'webdriver', {
                    get: () => undefined
                })
            """)


            page = context.new_page()
            
            # 随机鼠标移动
            page.mouse.move(
                random.randint(0, 1920),
                random.randint(0, 1080)
            )
            
            # 访问页面
            page.goto(url, timeout=60000)
            
            # 随机滚动
            for _ in range(random.randint(1, 3)):
                page.mouse.wheel(0, random.randint(300, 800))
                time.sleep(0.5)
            
            # 等待表格加载
            page.wait_for_selector('table tbody tr', timeout=15000)
            
            # 解析数据
            rows = page.query_selector_all('table tbody tr')
            ip_list = []
            for row in rows:
                try:
                    ip = row.query_selector('td:first-child').inner_text().strip()
                    port = row.query_selector('td:nth-child(2)').inner_text().strip()
                    ip_list.append(f'{ip}:{port}')
                except:
                    continue
                    
            return ip_list
            
        except Exception as e:
            print(f'请求 {url} 失败: {str(e)}')
            return []
        finally:
            # 确保资源释放
            if 'page' in locals():
                page.close()
            if 'context' in locals():
                context.close()
            if 'browser' in locals():
                browser.close()


def get_stealth_headers():
    ua = UserAgent()
    config = json.load(open('docs/ip_crawl_config.json', 'r', encoding='utf-8'))
    return {
        'User-Agent': ua.random,
        'Accept-Language': 'en-US,en;q=0.9',
        'Referer': config['referer']
    }


def main():
    config = json.load(open('docs/ip_crawl_config.json', 'r', encoding='utf-8'))
    base_url = config['base_url']
    urls = [f'{base_url}/{page_num}/' for page_num in range(1, 20)]
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = executor.map(fetch_ip, urls)
        
        with open('proxies.txt', 'w', encoding='utf-8') as f:
            total = 0
            for idx, ip_list in enumerate(results, 1):
                if ip_list:
                    f.write('\n'.join(ip_list) + '\n')
                    print(f'第 {idx} 页获取到 {len(ip_list)} 条代理')
                    total += len(ip_list)
            print(f'共获取 {total} 条代理')


if __name__ == '__main__':
    main()
7.2. 数据爬取
  • 目的:获取所要生成文案的相关数据,本项目所需要的是商品市场分析的有关数据,因此需要爬取对应平台以及第三方网站。

  • 实现思路:   - 1.分析网站数据爬取思路,若网站数据为静态加载,固定格式,可以直接通过python中的网页分析库进行爬取,例如beautifulsoup4等。   - 2.由于本项目所需爬取的网站结构相对复杂,因此使用了抓包的方式进行爬取。爬虫过程涉及js逆向分析,需要分析js代码,找到数据来源,通过python的requests库进行数据爬取。(感兴趣的朋友可以查看我先前发的文章,有详细讲解js逆向分析的过程)   - 3.爬取数据后,对数据进行清洗,去除无用数据,将数据存储到本地json文件,方便后续使用。

  • 代码实现:

'''
Author: yeffky
Date: 2025-02-09 17:18:05
LastEditTime: 2025-02-15 15:33:48
'''
import requests
import random
import time
import json
from fake_useragent import UserAgent
import hashlib
import time
from datetime import datetime



# 定义md5加密函数
def md5_hash(s):
    return hashlib.md5(s.encode('utf-8')).hexdigest()


# Le函数,类似于JS中的Le函数
def getAuth(e):
    # 第一次MD5,将输入转换为字符串并计算MD5
    first_md5 = hashlib.md5(str(e).encode()).hexdigest()
    # 拼接字符串并第二次计算MD5
    second_md5 = hashlib.md5(combined.encode()).hexdigest()
    return second_md5


def get_random_proxy():
    # 尝试打开名为 'proxies.txt' 的文件
    try:
        with open('proxies.txt', 'r') as f:
            # 读取文件内容并按行分割成列表
            proxies = f.read().splitlines()
            # 如果列表不为空,则随机选择一个代理并返回
            if proxies:
                return random.choice(proxies)
    # 如果文件不存在,则捕获 FileNotFoundError 异常并忽略
    except FileNotFoundError:
        pass
    # 如果文件不存在或列表为空,则返回 None
    return None


def remove_invalid_proxy(proxy):
    # 尝试执行以下代码块,如果发生异常则跳转到except块
    try:
        # 打开名为'proxies.txt'的文件,以读取模式('r')
        with open('proxies.txt', 'r') as f:
            proxies = f.read().splitlines()
        
        if proxy in proxies:
        # 检查传入的proxy是否在proxies列表中
            proxies.remove(proxy)
        
        with open('proxies.txt', 'w') as f:
            f.write('\n'.join(proxies) + '\n')
    except FileNotFoundError:
        pass


def make_request(url, payload, auth, timestamp):
    # 创建一个UserAgent对象,用于生成随机的User-Agent字符串
    ua = UserAgent()
    headers = {'User-Agent': ua.random, 'Auth': str(auth), 'Timestamp': str(timestamp), "Content-Type":"application/json",}
    proxy = get_random_proxy() # 获取一个随机的代理
    proxies = {'http': proxy, 'https': proxy} if proxy else None
    
    try:
        response = requests.post(
            url,
            headers=headers,
            timeout=10, 
            proxies=proxies,
            json=payload, 
            # verify=False
        )
        response.raise_for_status()
        print(response)
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")
        
        # If the request failed, remove the invalid proxy
        if proxy:
            remove_invalid_proxy(proxy)
        
        # Try a new proxy
        return None


def crawl_data():
    # 加载爬虫配置文件
    crawal_config = json.load(open('./docs/crawl_config.json', 'r'))
    base_url = crawal_config['base_url']  # 获取基础URL
    payload = crawal_config['payload']  # 获取请求负载
    
    response_text = None
    while not response_text:  # Retry until the request is successful
        # 时间戳
        t = int((time.time()) * 1000)  # 获取当前时间戳(毫秒)
        auth = getAuth(t)
        response_text = make_request(base_url, payload, auth, t)
        if not response_text:
            print("Retrying with a new proxy...")
            time.sleep(random.randint(3, 5))  # Add a small delay before retrying


    try:
        data = response_text
        items_list = data.get('data', {}).get('list', [])
        
        # 获取今天的日期
        today_date = datetime.now().strftime('%Y-%m-%d')


        # 在文件名后加上今天的日期
        filename = f'goods_{today_date}.json'
        cnt = 0
        with open(f'data/{filename}', 'w', encoding='utf-8') as f:
            f.write('{"items": [' + '\n')
            for item in items_list:
                item['商品涨幅'] = item.pop('Priceincrease')
                item['商品成交量'] = item.pop('SaleCount')
                item['商品名称'] = item.pop('GoodsName')
                item['商品最低价'] = item.pop('price')
                item['商品图标'] = item.pop('iconUrl')
                cnt += 1
                item_str = json.dumps(item, ensure_ascii=False, indent=2)
                if cnt == len(items_list):
                    f.write(item_str + '\n')
                else:
                    f.write(item_str + ',\n\n')
            f.write(']}' + '\n')
        print(f"Successfully wrote {len(items_list)} items to goods.txt")
    except json.JSONDecodeError:
        print("Failed to decode JSON response")


if __name__ == "__main__":
    crawl_data()

http://www.kler.cn/a/548205.html

相关文章:

  • Git子模块实战:大型后台管理系统模块拆分实践
  • EasyExcel 复杂填充
  • docker 运行 芋道微服务
  • Qt QSpinBox 总结
  • pytest测试专题 - 2.1 一种推荐的测试目录结构
  • floodfill算法系列一>太平洋大西洋水流问题
  • 西门子的通信负载是什么意思
  • 【Mysql】:如何恢复误删的数据?
  • 跳跃游戏 II - 贪心算法解法
  • RabbitMQ使用guest登录提示:User can only log in via localhost
  • 【WPSOffice】汇总
  • 运维-自动访问系统并截图
  • DeepSeek24小时写作机器人,持续创作高质量文案
  • 医院数智化转型下的大健康发展AI化多路径探析(上)
  • 【R语言】回归分析
  • 【C++内存管理】—— 策略、陷阱及应对之道
  • Qt工作总结03 <qSort按某一属性进行排序>
  • 解析 2025 工业边缘计算:三大技术风向的影响力
  • QEMU 搭建 Ubuntu x86 虚拟机
  • 力扣-二叉树-257 二叉树的所有路径