Python并发编程全面解析:解锁程序性能的新维度
import concurrent. futures
import time
def task ( n) :
"""模拟耗时任务"""
print ( f"开始执行任务 { n} " )
time. sleep( 2 if n % 2 == 0 else 1 )
return f"任务 { n} 完成"
with concurrent. futures. ThreadPoolExecutor( ) as executor:
results = [ executor. submit( task, i) for i in range ( 5 ) ]
for f in concurrent. futures. as_completed( results) :
print ( f. result( ) )
with concurrent. futures. ProcessPoolExecutor( ) as executor:
results = executor. map ( task, range ( 5 ) )
for res in results:
print ( res)
一、多线程编程实战
线程池基础用法
import threading
from queue import Queue
def worker ( q) :
"""线程工作函数"""
while True :
item = q. get( )
if item is None :
break
print ( f"处理 { item} " )
q. task_done( )
q = Queue( )
threads = [ ]
for i in range ( 3 ) :
t = threading. Thread( target= worker, args= ( q, ) )
t. start( )
threads. append( t)
for item in [ 'A' , 'B' , 'C' , 'D' , 'E' ] :
q. put( item)
q. join( )
for _ in range ( 3 ) :
q. put( None )
for t in threads:
t. join( )
线程安全与锁机制
class BankAccount :
def __init__ ( self) :
self. balance = 1000
self. lock = threading. Lock( )
def transfer ( self, amount) :
with self. lock:
new_balance = self. balance + amount
time. sleep( 0.1 )
self. balance = new_balance
def test_transfer ( account) :
for _ in range ( 100 ) :
account. transfer( 1 )
account = BankAccount( )
threads = [ ]
for _ in range ( 10 ) :
t = threading. Thread( target= test_transfer, args= ( account, ) )
threads. append( t)
t. start( )
for t in threads:
t. join( )
print ( f"最终余额: { account. balance} " )
二、多进程编程深入
进程间通信(IPC)
from multiprocessing import Process, Pipe
def worker ( conn) :
"""子进程处理函数"""
while True :
msg = conn. recv( )
if msg == 'exit' :
break
print ( f"处理消息: { msg} " )
conn. send( msg. upper( ) )
conn. close( )
parent_conn, child_conn = Pipe( )
p = Process( target= worker, args= ( child_conn, ) )
p. start( )
messages = [ 'hello' , 'world' , 'python' , 'exit' ]
for msg in messages:
parent_conn. send( msg)
if msg != 'exit' :
print ( f"收到回复: { parent_conn. recv( ) } " )
p. join( )
共享内存与性能对比
import multiprocessing
def cpu_bound ( n) :
"""计算密集型任务"""
return sum ( i* i for i in range ( n) )
start = time. time( )
[ cpu_bound( 10 ** 6 ) for _ in range ( 4 ) ]
print ( f"顺序执行: { time. time( ) - start: .2f } s" )
start = time. time( )
with multiprocessing. Pool( ) as pool:
pool. map ( cpu_bound, [ 10 ** 6 ] * 4 )
print ( f"多进程执行: { time. time( ) - start: .2f } s" )
三、协程与异步编程
asyncio基础架构
import asyncio
async def fetch_data ( url) :
"""模拟异步请求"""
print ( f"开始请求 { url} " )
await asyncio. sleep( 2 )
print ( f"完成请求 { url} " )
return f" { url} 响应"
async def main ( ) :
tasks = [
asyncio. create_task( fetch_data( f"https://api/data/ { i} " ) )
for i in range ( 5 )
]
results = await asyncio. gather( * tasks)
print ( "所有请求完成:" , results)
asyncio. run( main( ) )
生产级异步HTTP客户端
import aiohttp
import async_timeout
async def async_fetch ( session, url) :
"""带超时的异步请求"""
try :
async with async_timeout. timeout( 5 ) :
async with session. get( url) as response:
return await response. text( )
except asyncio. TimeoutError:
print ( f"请求超时: { url} " )
return None
async def batch_fetch ( urls) :
"""批量异步请求"""
async with aiohttp. ClientSession( ) as session:
tasks = [ async_fetch( session, url) for url in urls]
return await asyncio. gather( * tasks)
urls = [
'https://httpbin.org/delay/1' ,
'https://httpbin.org/delay/3' ,
'https://httpbin.org/delay/2'
]
results = asyncio. run( batch_fetch( urls) )
print ( "获取到" , len ( [ r for r in results if r] ) , "个有效响应" )
四、并发模型对比与选择
并发方式对比矩阵
特性 多线程 多进程 协程 执行模型 抢占式切换 独立内存空间 协作式切换 最佳适用场景 I/O密集型任务 CPU密集型任务 高并发I/O操作 内存占用 共享内存,较低 独立内存,较高 极低 启动开销 较小 较大 最小 数据共享 通过共享内存 需要IPC机制 通过事件循环 Python实现限制 受GIL限制 无GIL限制 需要Python 3.7+
性能测试对比 (处理1000个HTTP请求)
方式 耗时 CPU使用率 内存占用 同步顺序执行 82.3s 12% 45MB 多线程(50线程) 4.2s 85% 210MB 多进程(4进程) 6.8s 95% 580MB 协程(500并发) 2.1s 78% 65MB
是
否
是
是
否
混合型
问题类型
CPU密集型?
多进程
I/O密集型?
高并发?
协程
多线程
进程+线程/协程
最佳实践指南 :
优先使用concurrent.futures
高层API 避免在多线程中执行CPU密集型任务 使用队列进行线程/进程间通信 协程中避免阻塞操作 设置合理的并发数量(线程/进程数) 使用threading.local
管理线程局部存储 多进程编程时使用Manager
共享状态 异步编程注意异常处理 使用性能分析工具(cProfile, line_profiler) 考虑使用第三方库(celery, ray)
MAX_WORKERS = min ( 32 , ( os. cpu_count( ) or 1 ) + 4 )
def safe_execute ( func) :
"""带异常处理的执行装饰器"""
@functools. wraps ( func)
def wrapper ( * args, ** kwargs) :
try :
return func( * args, ** kwargs)
except Exception as e:
logging. error( f"执行失败: { str ( e) } " )
raise
return wrapper
@safe_execute
def critical_task ( ) :
"""关键任务函数"""