当前位置: 首页 > article >正文

python并发爬虫

爬虫多线程方法生成

from threading import Thread


def func(name):
    for i in range(100):
        print(f"{name}完成了{i}项任务")


if __name__ == '__main__':
    t1 = Thread(target=func, args=('老杨',))
    t2 = Thread(target=func, args=('老李',))
    t3 = Thread(target=func, args=('老孙',))

    t1.start()
    t2.start()
    t3.start()

    t1.join()
    t2.join()
    t3.join()

    print("主线程结束")

爬虫多线程类生成

from threading import Thread
from time import sleep


class MyThread(Thread):
    def __init__(self, name):
        super(MyThread, self).__init__()  # 继承MyThread的父类
        self.name = name

    def run(self):
        for i in range(100):
            print(f"{self.name}完成了{i}项工作")
            sleep(0.5)


if __name__ == '__main__':
    t1 = MyThread('老杨')
    t2 = MyThread('老孙')
    t3 = MyThread('老李')
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()
    print("主线程结束")

线程池

from concurrent.futures import ThreadPoolExecutor


def func(name):
    for i in range(10):
        print(name, i)


if __name__ == '__main__':
    with ThreadPoolExecutor(10) as t:  # 作用是创建10个线程
        for i in range(100):
            t.submit(func, f"周杰伦{i}")

这段代码使用了 ThreadPoolExecutor 来创建一个线程池,允许并行执行多个任务。具体来说:

  1. 创建线程池ThreadPoolExecutor(10) 创建一个能够同时管理 10 个线程的线程池。

  2. 提交任务t.submit(func, f"周杰伦{i}") 在循环中提交了 100 个任务(i 从 0 到 99)。每个任务调用 func 函数,并传入一个字符串参数,格式为 "周杰伦{i}"(例如 "周杰伦0", "周杰伦1", ..., "周杰伦99")。

  3. 并发执行ThreadPoolExecutor 将会在可用的 10 个线程中并发执行这些任务。然而,由于总共有 100 个任务,线程池会轮流使用线程,确保每次都只有 10 个任务在运行。

但是这种会发生资源的争端,后续可以使用生产者消费者的模式,来确保资源不会被重复。

如果想要拿到返回值怎么弄

线程池返回值1

import time
from concurrent.futures import ThreadPoolExecutor

def func(name, t):
    time.sleep(t)
    return name


def fn(res):
    print(res.result())


if __name__ == '__main__':
    with ThreadPoolExecutor(10) as t:
        t.submit(func, '周结论', 3).add_done_callback(fn)
        t.submit(func, '周一', 2).add_done_callback(fn)
        t.submit(func, '周二', 1).add_done_callback(fn)

在这段代码中,t.submit(func, '周二', 1) 的结果是一个 Future 对象,它表示异步执行 func 函数的计算结果。add_done_callback(fn) 方法用于注册一个回调函数 fn,这个回调函数会在 Future 对象完成时被调用。

当 fn 被调用时,它会接收一个参数,该参数是已完成的 Future 对象。这个对象包含了 func 执行的结果、异常信息等。

因此,add_done_callback(fn) 会向 fn 传入这个 Future 对象作为参数。你可以在 fn 函数内通过这个对象访问你需要的信息,比如:

  • 如果 func 执行成功,可以通过 future.result() 获取结果。
  • 如果 func 执行失败,可以通过 future.exception() 获取抛出的异常。

在这个业务逻辑当中,add_done_callback返回会立即执行,返回call_back执行的顺序是不确定的,返回的顺序是不确定的。

线程池返回值2

import time
from concurrent.futures import ThreadPoolExecutor

def func(name, t):
    time.sleep(t)
    print(f"我是", name)
    return name


if __name__ == '__main__':
    with ThreadPoolExecutor(10) as t:
        result = t.map(func, ['周杰伦', '老李', '小王'], [2, 1, 3])
        for i in result:
            print(i)

map的返回值是生成器,返回的内容和任务分发的顺序是一致的

我们要学会看函数

这是map的函数,要学会自己看,fn就是要传入的函数,*iterables的意思是可迭代对象,所以列表是可迭代对象吧,元组也是可迭代对象,所以这里不只是传入列表。然后看Returns an iterator equivalent to map(fn ,iter)会返回一个可迭代的对象,所以要拿到返回值我们就可以使用for循环来拿取返回值。

线程池实战案例

网址为北京新发地菜市场:新发地-价格行情

import json
import requests
from concurrent.futures import ThreadPoolExecutor
import threading

headers = {
    "Accept": "*/*",
    "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
    "Connection": "keep-alive",
    "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
    "Origin": "http://www.xinfadi.com.cn",
    "Referer": "http://www.xinfadi.com.cn/priceDetail.html",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0",
    "X-Requested-With": "XMLHttpRequest"
}
url = "http://www.xinfadi.com.cn/getPriceData.html"

# 线程锁用于文件写入
file_lock = threading.Lock()


def get_data(current):
    data = {
        "limit": "20",
        "current": "",
        "pubDateStartTime": "2023/01/01",
        "pubDateEndTime": "2023/12/31",  # 设置结束时间
        "prodPcatid": "",
        "prodCatid": "",
        "prodName": ""
    }
    data["current"] = str(current)
    err_number = 0
    while True:
        try:
            proxy = get_ip()#这里插入自己的代理ip
            response = requests.post(url, headers=headers, data=data, proxies=proxy)
            response.raise_for_status()  # 检查请求是否成功
            data_list = json.loads(response.text)['list']
        except requests.exceptions.RequestException as e:
            err_number += 1
            if err_number == 10:
                print(f"{current}请求次数超过10次")
            print(f'{current}号请求失败: {e}')
            return
        except json.JSONDecodeError as e:
            err_number += 1
            if err_number == 10:
                print(f"{current}请求次数超过10次")
            print(f'{current}号JSON解析失败: {e}')
            return
        except KeyError as e:
            err_number += 1
            if err_number == 10:
                print(f"{current}请求次数超过10次")
            print(f'{current}号数据格式错误: {e}')
            return
        continue
    for item in data_list:
        prodName = item.get("prodName", "")
        highPrice = item.get("highPrice", "")
        lowPrice = item.get("lowPrice", "")
        avgPrice = item.get("avgPrice", "")

        # 使用线程锁确保文件写入安全
        with file_lock:
            with open('data.csv', mode='a', encoding='utf-8') as f:
                f.write(f'{current}, {prodName}, {lowPrice}, {avgPrice}, {highPrice}\n')

    print(f"{current}号的数据爬取完成")


if __name__ == '__main__':
    # 初始化或清理 data.csv 文件
    with open('data.csv', mode='w', encoding='utf-8') as f:
        f.write("日期, 产品名称, 最低价, 平均价, 最高价\n")

    with ThreadPoolExecutor(max_workers=10) as t:  # 调整线程池大小
        for day in range(1, 40):
            t.submit(get_data, day)

讲解一下,这里的ThreadPoolExecutor(max_workers=10) as t:

下面有for循环,每次爬取的数据包不一样,所以在爬取数据包上不会造成数据冲突,但是在写入数据的时候,有可能会造成数据重复,所以这里采用了数据锁,在写文件的时候,保证每次只有一个线程对文件进行写操作。

多进程

多进程和多线程差不多,读者可以去看我在python收录下的并发程序这篇文章

多线程和多进程的共同使用

【示例】爬取堆糖的图片

import json
import time
import requests
from threading import Thread
from multiprocessing import Process, Queue
from concurrent.futures import ThreadPoolExecutor


class getUrl(object):
    def __init__(self, Queue):
        self.url = "https://www.duitang.com/napi/blogv2/list/by_search/"
        self.headers = {
            "Accept": "text/plain, */*; q=0.01",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
            "Connection": "keep-alive",
            "Referer": "https://www.duitang.com/search/?kw=%E6%90%9E%E7%AC%91%E8%A1%A8%E6%83%85%E5%8C%85&type=feed",
            "Sec-Fetch-Dest": "empty",
            "Sec-Fetch-Mode": "cors",
            "Sec-Fetch-Site": "same-origin",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0",
            "X-Requested-With": "XMLHttpRequest",
            "sec-ch-ua": "\"Chromium\";v=\"134\", \"Not:A-Brand\";v=\"24\", \"Microsoft Edge\";v=\"134\"",
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": "\"Windows\""
        }
        self.cookies = {
            
        }
        self.queue = Queue

    def get_ip(self):
        """请求代理ip"""
        pass

    def get_url(self, sum):
        print(f"正在爬取第{sum / 24}页的数据")
        params = {
            "kw": "搞笑表情包",
            "after_id": str(sum),
            "type": "feed",
            "include_fields": "top_comments,is_root,source_link,item,buyable,root_id,status,like_count,like_id,sender,album,reply_count,favorite_blog_id",
            "_type": "",
            "_": str(int(time.time() * 1000))
        }
        err_number = 0
        while True:
            try:
                proxy = self.get_ip()
                response = requests.get(self.url, headers=self.headers, cookies=self.cookies, params=params,
                                        proxies=proxy, timeout=60)
                response.encoding = 'utf-8'
                data_list = json.loads(response.text)['data']['object_list']
                for data in data_list:
                    img_url = data['photo']['path']  # 拿到图片的url
                    self.queue.put(img_url)  # 将爬取到的图片数据传入队列中
                break
            except Exception as e:
                print(f"getUrl出现问题{e}")
                if err_number == 5:
                    print(f"{sum / 24}页的url请求次数过多跳过")
                    break
                err_number += 1

    def run(self):
        """程序的主程序"""
        with ThreadPoolExecutor(5) as T:  # 开辟线程池,包含十个线程
            for page in range(0, 10):
                T.submit(self.get_url, page * 24)
        self.queue.put('OK')


class savePhoto(object):
    def __init__(self, Queue):
        self.queue = Queue  # 实例化进程队列
        self.headers = {
            "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
            "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
            "cache-control": "max-age=0",
            "if-modified-since": "Fri, 26 Jul 2024 12:38:40 GMT",
            "if-none-match": "\"2cb739af98e219ca4681ca8316ac7265\"",
            "priority": "u=0, i",
            "sec-ch-ua": "\"Chromium\";v=\"134\", \"Not:A-Brand\";v=\"24\", \"Microsoft Edge\";v=\"134\"",
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": "\"Windows\"",
            "sec-fetch-dest": "document",
            "sec-fetch-mode": "navigate",
            "sec-fetch-site": "none",
            "sec-fetch-user": "?1",
            "upgrade-insecure-requests": "1",
            "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0"
        }
        self.cookies = {
            
        }

    def get_ip(self):
        """请求代理ip"""
        pass

    def save_photo(self, url):
        """用于存储图片"""
        title = url.split('/')[-1]
        err_number = 0
        while True:
            try:
                proxy = self.get_ip()
                response = requests.get(url=url, headers=self.headers, cookies=self.cookies, proxies=proxy, timeout=60)
                response.encoding = 'utf-8'
                with open('./img/' + title, mode='wb') as f:
                    f.write(response.content)
                print(f"{url}下载完毕")
                break
            except Exception as e:
                print(f"savePhoto出现问题{e}")
                if err_number == 5:
                    print(f"{url}请求次数过多")
                    return 0
                err_number += 1


    def run(self):
        with ThreadPoolExecutor(5) as T:
            while 1:
                url = self.queue.get()
                if url == 'OK':
                    break
                T.submit(self.save_photo, url)


if __name__ == '__main__':
    # 实例化队列对象
    start = time.perf_counter()
    time.time()
    Q = Queue()
    gu = getUrl(Q)
    sp = savePhoto(Q)
    # 创建示例化对象,开启进程
    print("准备启动进程")
    p1 = Process(target=gu.run)
    p2 = Process(target=sp.run)
    p1.start()
    print("进程p1启动")
    p2.start()
    print("进程p2启动")
    p1.join()
    p2.join()
    end = time.perf_counter()
    print(f"耗时: {end - start} 秒")


http://www.kler.cn/a/614006.html

相关文章:

  • SpringMVC的请求与响应
  • 如何使用Python爬虫获取1688商品评论?
  • pyspark学习rdd处理数据方法——学习记录
  • TDengine 中的系统信息统计
  • 【leetcode hot 100 45】跳跃游戏Ⅱ
  • SpringBoot 7 种实现 HTTP 调用的方式
  • Maven 多模块项目(如微服务架构)中,父 POM(最外层) 和 子模块 POM(具体业务模块)的区别和联系
  • 深入理解 Linux 基础 IO:从文件操作到缓冲区机制
  • 如何利用 CSS 的clip - path属性创建不规则形状的元素,应用场景有哪些?
  • ngx_http_core_init_main_conf
  • windows免密ssh登录linux
  • 项目代码第10讲【数据库运维知识——如何优化数据库查询效率?】:各种日志查看;主从复制;分库分表(MyCat);读写分离;区别数据分区、分表、分库
  • uni-app AES 加密
  • 判断质数及其优化方法
  • unity 做一个圆形分比图
  • 内网服务器无法通过公网地址访问映射到公网的内网服务
  • 使用事件监听器来处理并发环境中RabbitMQ的同步响应问题
  • 代码随想录算法训练营--打卡day1
  • maxDataPointsPerRollingArg must be at least 1
  • vue(1-45)