python并发编程实战
python并发编程有三种
- 多线程Thread
- 多进程Process
- 多协程Coroutine
cpu密集型计算
cpu密集型也叫计算密集型,是指I/O在很短的时间就可以完成,cpu需要大量的计算处理,特点是cpu占用率相当高
例如:压缩解压缩、加密解密、正则表达式搜索
IO密集型
IO密集型指的是系统运作大部分的状态是CPU在等I/O(硬盘/内存)的读/写操作,cpu占用率仍然较低
例如:文件处理程序、网络爬虫程序、读写数据库程序
多线程、多进程、多协程的对比
多进程
- 优点:可以利用多核CPU并行计算
- 缺点:占用资源最多、可启动数目比线程少
- 适用于:CPU密集型计算
多线程
- 优点:相比进程,更轻量级、占用资源少
- 缺点:
- 相比进程:多线程只能并发执行,不能利用多CPU(GIL)
- 相比协程:启动数目有限,占用内存资源,有线程切换开销
- 适用于:IO密集型计算、同时运行的任务数目要求不多
多协程
- 优点:内存开销最少、启动协程数量最多
- 缺点:支持的库有限制(aiohttp vs request)、代码实现复杂
- 适用于:IO密集型计算、需要超多任务运行、但有现成库支持的场景
python慢的头号嫌疑犯——全局解释器锁GIL
python速度慢的原因一:动态类型语言,边解释边执行
python速度慢的原因二:GIL无法利用多核CPU并发执行
GIL是什么
全局解释器(Global Interpreter Lock)
是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行
即便在多核心处理器上,使用GIL的解释器也只允许同一时间执行一个线程
怎么规避GIL带来的限制
1.多线程threading机制依然是有用的,用于IO密集型计算
因为在I/O期间,线程会释放GIL,实现CPU和IO的并行
因此多线程用于IO密集型计算依然可以大幅提升速度
但是多线程用于CPU密集型计算时,只会更加拖慢速度
2.使用multiprocessing的多进程机制实现并行计算、利用多核cpu优势
为了应对GIL的问题,python提供了multiprocessing
python利用多线程加速爬虫
blog_spider.py
import requests
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 51)]
def craw(url):
r = requests.get(url)
print(url, len(r.text))
if __name__ == '__main__':
craw(urls[0])
multi_thread_craw.py
import threading
import time
import blog_spider
# 单线程爬取
def single_thread():
print("单线程爬取开始")
for url in blog_spider.urls:
blog_spider.craw(url)
print("单线程爬取结束")
# 多线程爬取
def multi_thread():
print("多线程爬取开始")
threads = []
for url in blog_spider.urls:
threads.append(threading.Thread(target=blog_spider.craw, args=(url,)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("多线程爬取结束")
if __name__ == '__main__':
start = time.time()
single_thread()
end = time.time()
single_time = end - start
print("单线程爬取时间:", single_time, "秒")
print("------------------分割线---------------------")
start = time.time()
multi_thread()
end = time.time()
multi_time = end - start
print("多线程爬取时间:", multi_time, "秒")
print("时间倍数:", single_time / multi_time)
单线程爬取时间:
多线程爬取时间:
python实现生产者消费者爬虫
多组件的Pipeline技术架构
复杂的事情一般都不会一下子做完,而是会分很多中间步骤一步一步完成
生产者消费者爬虫的架构
多线程数据通信的queue.Queue
queue.Queue可以用于多线程之间的、线程安全的数据通信
blog_spider.py
import requests
from bs4 import BeautifulSoup
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 51)]
def craw(url):
r = requests.get(url)
return r.text
def parse(html):
# class="post-item-title"
soup = BeautifulSoup(html, "html.parser")
links = soup.find_all("a", class_="post-item-title")
return [(link["href"], link.get_text()) for link in links]
if __name__ == '__main__':
for result in parse(craw(urls[3])):
print(result)
producer_customer_spider.py
import queue
import random
import threading
import time
import blog_spider
# 生产者
def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
while True:
url = url_queue.get()
html = blog_spider.craw(url)
html_queue.put(html)
print(threading.current_thread().name, f"爬取:{url}", "url_queue队列大小:", url_queue.qsize())
time.sleep(random.randint(1, 2))
# 消费者
# 输出对象fout
def do_parse(html_queue: queue.Queue, fout):
while True:
html = html_queue.get()
results = blog_spider.parse(html)
for result in results:
fout.write(str(result) + "\n")
print(threading.current_thread().name, f"results大小:", len(results), "html_queue队列大小:", html_queue.qsize())
time.sleep(random.randint(1, 2))
if __name__ == '__main__':
url_queue = queue.Queue()
html_queue = queue.Queue()
for url in blog_spider.urls:
url_queue.put(url)
for idx in range(3):
t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"生产者{idx}")
t.start()
fout = open("data.txt", "w")
for idx in range(2):
t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"消费者{idx}")
t.start()
线程安全问题以及Lock解决方案
线程安全
线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。
由于线程的执行随时会发生切换,就造就了不可预料的结果,出现线程不安全
Lock用于解决线程安全问题
用法一:try-finally模式
import threading
lock = threading.Lock()
lock.acquire()
try:
#do something
finally:
lock.release()
用法2:with模式
import threading
lock = threading.Lock()
with lock:
#do something
线程不安全案例代码
unlock_thread.py
import threading
import time
class Account:
def __init__(self, balance):
self.balance = balance
def draw(account: Account, amount):
if account.balance>=amount:
# 这里加阻塞为了保证线程不安全现象发生
time.sleep(0.1)
print(threading.current_thread().name,"取钱成功")
account.balance -= amount
print(threading.current_thread().name,"余额:",account.balance)
else:
print(threading.current_thread().name,"取钱失败")
if __name__ == '__main__':
account = Account(1000)
ta = threading.Thread(name="线程a", target=draw, args=(account, 800))
tb = threading.Thread(name="线程b", target=draw, args=(account, 800))
ta.start()
tb.start()
线程安全案例代码
lock_thread.py
import threading
import time
# 获取lock对象
lock = threading.Lock()
class Account:
def __init__(self, balance):
self.balance = balance
def draw(account: Account, amount):
with lock:
if account.balance >= amount:
time.sleep(0.1)
print(threading.current_thread().name, "取钱成功")
account.balance -= amount
print(threading.current_thread().name, "余额:", account.balance)
else:
print(threading.current_thread().name, "取钱失败")
if __name__ == '__main__':
account = Account(1000)
ta = threading.Thread(name="线程a", target=draw, args=(account, 800))
tb = threading.Thread(name="线程b", target=draw, args=(account, 800))
ta.start()
tb.start()
好用的线程池ThreadPoolExecutor
线程池的原理
新建线程系统需要分配资源、终止线程系统需要回收资源,如果可以重用线程,则可以减去新建/终止的开销
线程池的执行过程可以分为以下几个步骤:
- 核心线程数检查:当提交任务后,线程池首先会检查当前线程数。如果此时线程数小于核心线程数,则新建线程并执行任务。
- 任务队列处理:随着任务的不断增加,线程数会逐渐增加并达到核心线程数。此时,如果仍有任务被不断提交,这些任务会被放入阻塞队列中等待执行。
- 非核心线程创建:如果任务特别多,达到了任务队列的容量上限,线程池就会继续创建非核心线程来执行任务,直到达到最大线程数。
- 拒绝策略:当线程数达到最大线程数时,如果仍有任务提交,线程池会执行拒绝策略,如抛出异常、丢弃最旧的任务等。
使用线程池的好处
- 提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源
- 适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短
- 防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题
- 代码优势:使用线程池的语法比自己新建线程执行线程更加简洁
ThreadPoolExecutor的使用语法
from concurrent.futures import ThreadPoolExecutor, as_completed
用法1:map函数,简单,注意map的结果和入参时顺序对应的
with ThreadPoolExecutor() as pool:
results = pool.map(craw,urls)
for result in results:
print(result)
用法2:future模式,更强大。注意如果用as_completed顺序是不定的
with ThreadPoolExecutor() as pool:
futures = [pool.submit(craw,url) for url in urls]
for future in futures:
print(future.result())
for future in as_completed(futures):
print(future.result())
代码演示
import concurrent.futures
import blog_spider
#爬取
with concurrent.futures.ThreadPoolExecutor() as pool:
htmls = pool.map(blog_spider.craw,blog_spider.urls)
htmls = list(zip(blog_spider.urls,htmls))
for url, html in htmls:
print(url, len(html))
print("爬取结束")
# 解析
with concurrent.futures.ThreadPoolExecutor() as pool:
futures = {}
for url, html in htmls:
future = pool.submit(blog_spider.parse,html)
futures[future] = url
#有序打印
for future, url in futures.items():
print(url, future.result())
print("开始无序打印")
#无序打印
for future in concurrent.futures.as_completed(futures):
url = futures[future]
print(url, future.result())