python之多线程和多进程以及threading和multiprocessing模块
在 Python 中,多线程和多进程是实现并发编程的两种主要方式。多线程适用于 I/O 密集型任务,而多进程适用于 CPU 密集型任务。Python 提供了 threading
模块用于多线程编程,提供了 multiprocessing
模块用于多进程编程。
多线程
基本用法
使用 threading
模块可以创建和管理线程。以下是一个简单的多线程示例:
import threading
import time
def worker(name):
print(f"Thread {name} starting")
time.sleep(2)
print(f"Thread {name} finishing")
# 创建线程
thread1 = threading.Thread(target=worker, args=("A",))
thread2 = threading.Thread(target=worker, args=("B",))
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print("All threads finished")
线程锁
线程锁用于防止多个线程同时访问共享资源,避免竞争条件。以下是一个使用线程锁的示例:
import threading
lock = threading.Lock()
counter = 0
def increment_counter():
global counter
for _ in range(100000):
with lock:
counter += 1
# 创建线程
threads = []
for i in range(10):
thread = threading.Thread(target=increment_counter)
threads.append(thread)
thread.start()
# 等待线程完成
for thread in threads:
thread.join()
print(f"Final counter value: {counter}")
多进程
基本用法
使用 multiprocessing
模块可以创建和管理进程。以下是一个简单的多进程示例:
import multiprocessing
import time
def worker(name):
print(f"Process {name} starting")
time.sleep(2)
print(f"Process {name} finishing")
# 创建进程
process1 = multiprocessing.Process(target=worker, args=("A",))
process2 = multiprocessing.Process(target=worker, args=("B",))
# 启动进程
process1.start()
process2.start()
# 等待进程完成
process1.join()
process2.join()
print("All processes finished")
进程锁
进程锁用于防止多个进程同时访问共享资源,避免竞争条件。以下是一个使用进程锁的示例:
import multiprocessing
lock = multiprocessing.Lock()
counter = multiprocessing.Value('i', 0)
def increment_counter():
for _ in range(100000):
with lock:
counter.value += 1
# 创建进程
processes = []
for i in range(10):
process = multiprocessing.Process(target=increment_counter)
processes.append(process)
process.start()
# 等待进程完成
for process in processes:
process.join()
print(f"Final counter value: {counter.value}")
threading
模块
threading
模块用于在单个进程中创建和管理多个线程。线程是轻量级的,并且共享相同的内存空间。
参数
-
target:
- 类型: 可调用对象
- 说明: 线程启动后要执行的函数或方法。
-
name:
- 类型: 字符串
- 说明: 线程的名称。默认情况下,Python 会自动生成一个唯一的名称。
-
args:
- 类型: 元组
- 说明: 传递给
target
函数的位置参数。
-
kwargs:
- 类型: 字典
- 说明: 传递给
target
函数的关键字参数。
-
daemon:
- 类型: 布尔值
- 说明: 如果设置为
True
,则表示该线程是守护线程。当所有非守护线程结束时,程序将退出,即使守护线程仍在运行。
示例
import threading
def worker(arg1, arg2, kwarg1=None):
print(f"Worker thread is running with arguments: {arg1}, {arg2}, {kwarg1}")
# 创建线程,传递位置参数和关键字参数
thread = threading.Thread(
target=worker,
args=(10, 20),
kwargs={'kwarg1': 'example'},
name='MyWorkerThread',
daemon=True
)
# 启动线程
thread.start()
# 等待线程完成
thread.join()
target=worker
: 指定线程要执行的函数是worker
。args=(10, 20)
: 传递给worker
函数的位置参数是10
和20
。kwargs={'kwarg1': 'example'}
: 传递给worker
函数的关键字参数是kwarg1='example'
。name='MyWorkerThread'
: 指定线程的名称为MyWorkerThread
。daemon=True
: 将线程设置为守护线程。
基本用法
- 创建线程
import threading
def worker():
print("Worker thread is running")
# 创建线程
thread = threading.Thread(target=worker)
# 启动线程
thread.start()
# 等待线程完成
thread.join()
- 使用子类创建线程
import threading
class MyThread(threading.Thread):
def run(self):
print("MyThread is running")
# 创建线程
thread = MyThread()
# 启动线程
thread.start()
# 等待线程完成
thread.join()
- 线程同步
使用 threading.Lock
来确保线程安全。
import threading
lock = threading.Lock()
def worker():
with lock:
# 线程安全的代码块
print("Worker thread is running")
# 创建多个线程
threads = [threading.Thread(target=worker) for _ in range(5)]
# 启动所有线程
for thread in threads:
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
multiprocessing
模块
multiprocessing
模块用于创建和管理多个进程。每个进程都有自己独立的内存空间,因此适用于 CPU 密集型任务。
在 multiprocessing
模块中,multiprocessing.Process
类用于创建和管理进程。与 threading.Thread
类似,multiprocessing.Process
也接受一些参数来配置进程的行为。以下是 multiprocessing.Process
的常用参数及其解释:
multiprocessing.Process
参数
-
target:
- 类型: 可调用对象
- 说明: 进程启动后要执行的函数或方法。
-
name:
- 类型: 字符串
- 说明: 进程的名称。默认情况下,Python 会自动生成一个唯一的名称。
-
args:
- 类型: 元组
- 说明: 传递给
target
函数的位置参数。
-
kwargs:
- 类型: 字典
- 说明: 传递给
target
函数的关键字参数。
-
daemon:
- 类型: 布尔值
- 说明: 如果设置为
True
,则表示该进程是守护进程。当所有非守护进程结束时,程序将退出,即使守护进程仍在运行。
示例
import multiprocessing
def worker(arg1, arg2, kwarg1=None):
print(f"Worker process is running with arguments: {arg1}, {arg2}, {kwarg1}")
# 创建进程,传递位置参数和关键字参数
process = multiprocessing.Process(
target=worker,
args=(10, 20),
kwargs={'kwarg1': 'example'},
name='MyWorkerProcess',
daemon=True
)
# 启动进程
process.start()
# 等待进程完成
process.join()
target=worker
: 指定进程要执行的函数是worker
。args=(10, 20)
: 传递给worker
函数的位置参数是10
和20
。kwargs={'kwarg1': 'example'}
: 传递给worker
函数的关键字参数是kwarg1='example'
。name='MyWorkerProcess'
: 指定进程的名称为MyWorkerProcess
。daemon=True
: 将进程设置为守护进程。
进程的启动和等待
process.start()
: 启动进程,调用worker
函数。process.join()
: 等待进程完成。在process.join()
被调用之前,主进程会被阻塞。
进程间通信
multiprocessing
模块还提供了多种进程间通信的方式,如 Queue
、Pipe
、Value
和 Array
等。
使用 Queue
进行进程间通信
import multiprocessing
def worker(queue):
queue.put("Message from worker")
# 创建队列
queue = multiprocessing.Queue()
# 创建进程
process = multiprocessing.Process(target=worker, args=(queue,))
# 启动进程
process.start()
# 等待进程完成
process.join()
# 获取队列中的消息
message = queue.get()
print(message)
使用 Pipe
进行进程间通信
import multiprocessing
def worker(conn):
conn.send("Message from worker")
conn.close()
# 创建管道
parent_conn, child_conn = multiprocessing.Pipe()
# 创建进程
process = multiprocessing.Process(target=worker, args=(child_conn,))
# 启动进程
process.start()
# 等待进程完成
process.join()
# 获取管道中的消息
message = parent_conn.recv()
print(message)
通过这些参数和通信方式,你可以灵活地配置和管理进程的行为,并实现进程间的通信。
基本用法
- 创建进程
import multiprocessing
def worker():
print("Worker process is running")
# 创建进程
process = multiprocessing.Process(target=worker)
# 启动进程
process.start()
# 等待进程完成
process.join()
- 使用子类创建进程
import multiprocessing
class MyProcess(multiprocessing.Process):
def run(self):
print("MyProcess is running")
# 创建进程
process = MyProcess()
# 启动进程
process.start()
# 等待进程完成
process.join()
- 进程间通信
使用 multiprocessing.Queue
或 multiprocessing.Pipe
进行进程间通信。
import multiprocessing
def worker(queue):
queue.put("Message from worker")
# 创建队列
queue = multiprocessing.Queue()
# 创建进程
process = multiprocessing.Process(target=worker, args=(queue,))
# 启动进程
process.start()
# 等待进程完成
process.join()
# 获取队列中的消息
message = queue.get()
print(message)
- 进程池
使用 multiprocessing.Pool
来管理进程池。
import multiprocessing
def worker(x):
return x * x
# 创建进程池
with multiprocessing.Pool(4) as pool:
results = pool.map(worker, [1, 2, 3, 4, 5])
print(results)