当前位置: 首页 > article >正文

python基础入门:7.3并发编程初探

Python并发编程全面解析:解锁程序性能的新维度

# 并发执行模板
import concurrent.futures
import time

def task(n):
    """模拟耗时任务"""
    print(f"开始执行任务 {n}")
    time.sleep(2 if n % 2 == 0 else 1)
    return f"任务 {n} 完成"

# 多线程执行
with concurrent.futures.ThreadPoolExecutor() as executor:
    results = [executor.submit(task, i) for i in range(5)]
    for f in concurrent.futures.as_completed(results):
        print(f.result())

# 多进程执行
with concurrent.futures.ProcessPoolExecutor() as executor:
    results = executor.map(task, range(5))
    for res in results:
        print(res)
一、多线程编程实战
  1. 线程池基础用法
import threading
from queue import Queue

def worker(q):
    """线程工作函数"""
    while True:
        item = q.get()
        if item is None:
            break
        print(f"处理 {item}")
        q.task_done()

# 创建线程池
q = Queue()
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(q,))
    t.start()
    threads.append(t)

# 提交任务
for item in ['A', 'B', 'C', 'D', 'E']:
    q.put(item)

# 等待完成
q.join()
for _ in range(3):
    q.put(None)
for t in threads:
    t.join()
  1. 线程安全与锁机制
class BankAccount:
    def __init__(self):
        self.balance = 1000
        self.lock = threading.Lock()

    def transfer(self, amount):
        with self.lock:
            new_balance = self.balance + amount
            time.sleep(0.1)  # 模拟处理延迟
            self.balance = new_balance

def test_transfer(account):
    for _ in range(100):
        account.transfer(1)

account = BankAccount()
threads = []

for _ in range(10):
    t = threading.Thread(target=test_transfer, args=(account,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"最终余额: {account.balance}")  # 正确应为2000
二、多进程编程深入
  1. 进程间通信(IPC)
from multiprocessing import Process, Pipe

def worker(conn):
    """子进程处理函数"""
    while True:
        msg = conn.recv()
        if msg == 'exit':
            break
        print(f"处理消息: {msg}")
        conn.send(msg.upper())
    conn.close()

# 创建管道
parent_conn, child_conn = Pipe()

# 启动子进程
p = Process(target=worker, args=(child_conn,))
p.start()

# 主进程通信
messages = ['hello', 'world', 'python', 'exit']
for msg in messages:
    parent_conn.send(msg)
    if msg != 'exit':
        print(f"收到回复: {parent_conn.recv()}")

p.join()
  1. 共享内存与性能对比
import multiprocessing

def cpu_bound(n):
    """计算密集型任务"""
    return sum(i*i for i in range(n))

# 顺序执行
start = time.time()
[cpu_bound(10**6) for _ in range(4)]
print(f"顺序执行: {time.time()-start:.2f}s")

# 多进程执行
start = time.time()
with multiprocessing.Pool() as pool:
    pool.map(cpu_bound, [10**6]*4)
print(f"多进程执行: {time.time()-start:.2f}s")
三、协程与异步编程
  1. asyncio基础架构
import asyncio

async def fetch_data(url):
    """模拟异步请求"""
    print(f"开始请求 {url}")
    await asyncio.sleep(2)  # 模拟IO等待
    print(f"完成请求 {url}")
    return f"{url} 响应"

async def main():
    tasks = [
        asyncio.create_task(fetch_data(f"https://api/data/{i}"))
        for i in range(5)
    ]
    results = await asyncio.gather(*tasks)
    print("所有请求完成:", results)

# 执行事件循环
asyncio.run(main())
  1. 生产级异步HTTP客户端
import aiohttp
import async_timeout

async def async_fetch(session, url):
    """带超时的异步请求"""
    try:
        async with async_timeout.timeout(5):
            async with session.get(url) as response:
                return await response.text()
    except asyncio.TimeoutError:
        print(f"请求超时: {url}")
        return None

async def batch_fetch(urls):
    """批量异步请求"""
    async with aiohttp.ClientSession() as session:
        tasks = [async_fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# 使用示例
urls = [
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/3',
    'https://httpbin.org/delay/2'
]

results = asyncio.run(batch_fetch(urls))
print("获取到", len([r for r in results if r]), "个有效响应")
四、并发模型对比与选择
  1. 并发方式对比矩阵
特性多线程多进程协程
执行模型抢占式切换独立内存空间协作式切换
最佳适用场景I/O密集型任务CPU密集型任务高并发I/O操作
内存占用共享内存,较低独立内存,较高极低
启动开销较小较大最小
数据共享通过共享内存需要IPC机制通过事件循环
Python实现限制受GIL限制无GIL限制需要Python 3.7+
  1. 性能测试对比(处理1000个HTTP请求)
方式耗时CPU使用率内存占用
同步顺序执行82.3s12%45MB
多线程(50线程)4.2s85%210MB
多进程(4进程)6.8s95%580MB
协程(500并发)2.1s78%65MB
混合型
问题类型
CPU密集型?
多进程
I/O密集型?
高并发?
协程
多线程
进程+线程/协程

最佳实践指南

  1. 优先使用concurrent.futures高层API
  2. 避免在多线程中执行CPU密集型任务
  3. 使用队列进行线程/进程间通信
  4. 协程中避免阻塞操作
  5. 设置合理的并发数量(线程/进程数)
  6. 使用threading.local管理线程局部存储
  7. 多进程编程时使用Manager共享状态
  8. 异步编程注意异常处理
  9. 使用性能分析工具(cProfile, line_profiler)
  10. 考虑使用第三方库(celery, ray)
# 生产环境配置示例
MAX_WORKERS = min(32, (os.cpu_count() or 1) + 4)  # 线程池公式

def safe_execute(func):
    """带异常处理的执行装饰器"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logging.error(f"执行失败: {str(e)}")
            raise
    return wrapper

@safe_execute
def critical_task():
    """关键任务函数"""
    # 业务逻辑...

http://www.kler.cn/a/541740.html

相关文章:

  • 中国通信企业协会 通信网络安全服务能力评定 证书使用说明
  • java.io.InvalidClassException
  • 2025 年 2 月 TIOBE 指数
  • DeepSeek 实践总结
  • Spring Boot Actuator(官网文档解读)
  • Win11下搭建Kafka环境
  • DeepSeek:搅动人工智能产业风云的鲶鱼效应深度解读
  • 观察者模式 + 中介者模式联合使用:构建高内聚低耦合的智能协调系统
  • Linux ARM64 将内核虚拟地址转化为物理地址
  • PAT乙级真题 — 1078 字符串压缩与解压(java)
  • 力扣-栈与队列-347 前k个高频元素
  • Web3 的虚实融合之路:从虚拟交互到元宇宙构建
  • DeepSeek 助力 Vue 开发:打造丝滑的进度条
  • 【天梯赛】L1-104 九宫格(C++)
  • 10苍穹外卖之Task、WebSocket(音频是前端实现)
  • 【练习】图论
  • 实验8 配置标准访问控制列表IPv4 ACL
  • 易语言文件分析工具
  • 检测网络安全漏洞 工具 网络安全 漏洞扫描 实验
  • 预训练语言模型:从BERT到GPT,NLP的新纪元
  • flask如何进行测试
  • Anaconda Navigator 与 Conda:GUI 和 CLI 的对比与使用
  • 【Python】Anaconda安装
  • 《DeepSeek+Langchain落地实操:RAG知识增强检索和智能体实战开发》
  • redis之事件
  • Qlabel 每五个一换行 并、号分割