当前位置: 首页 > article >正文

Python----Python高级(并发编程:线程Thread,多线程,线程间通信,线程同步,线程池)

一、线程Thread

1.1、线程

  1. 线程(Thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位
  2. 线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位;
  3. 一个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线;
  4. 拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度;
  5. 调度和切换:线程上下文切换比进程上下文切换要快得多。

1.2、线程的创建 

        Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只使用threading这个高级模块。

from threading import Thread
Thread(group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)

from threading import Thread
Thread(group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)

各个参数的说明: 

        group: 应该始终为None,保留供未来扩展使用。
        target: 是一个可调用的对象(函数),该线程启动时,这个对象将被调用。如果不提供,则不会运行任何东西。
        name: 线程名称。默认情况下,将分配一个唯一的名称。
        args: 传递给
        target: 函数的位置参数,默认是一个元组。
        kwargs: 传递给target函数的关键字参数,默认是一个字典。
        daemon: 指定线程是否为守护线程。如果设置为True,则该线程会在主线程结束时自动退出。如果是None,则继承自创建它的线程。

Thread类提供的主要方法和属性: 

        start():启动线程活动。使用start去启动线程,会调用run()方法,它会创建一个新的线程来执行run()方法中的代码。
        run():表示线程活动的方法。可以在子类中重写此方法,重写之后执行重写的代码。通常不需要直接调用run方法,应该调用start方法去启动线程,如果直接调用run方法(没有通过start方法去启动线程)那么run方法中的代码将在当前线程中同步执行,而不是在新的线程中执行。
        join(timeout=None):等待线程终止。timeout参数是可选的,表示等待的最长时间(以秒为单位)。如果没有指定timeout参数,则该方法将无限期等待。
        is_alive():返回线程是否还活着。
        getName():返回线程名。
        setName(name):设置线程名。
        isDaemon():返回线程的守护状态。
        setDaemon(daemonic):设置线程的守护状态。必须在start开始前设置。
        name:线程名称。
        ident:线程的标识符。如果线程尚未启动,则为None。
        daemon:线程的守护状态。

1.3、方法包装

1.3.1、不传参的线程

import time
from threading import Thread
def func1():
    print("Function 1 is running.")
    time.sleep(2)
    print("Function 1 has finished.")
def func2():
    print("Function 2 is running.")
    time.sleep(2)
    print("Function 2 has finished.")
if __name__ == '__main__':
    start_time = time.time()  # 记录程序开始时间
    # 创建一个线程来运行func2
    process = Thread(target=func2)
    process.start()
    # 主程序运行func1
    func1()
    # 等待线程结束
    process.join()
    # 计算并打印整个程序的运行时间
    total_time = time.time() - start_time
    print(f"Total execution time: {total_time:.2f} seconds")
'''

Function 2 is running.Function 1 is running.

thread:t2 :1
thread:t1 :1
thread:t2 :2thread:t1 :2

Function 2 has finished.Function 1 has finished.

Total execution time: 2.00 seconds

'''

1.3.2、 传参的线程

1.3.1.1、使用args传参
from threading import Thread
import time
def say_hello(name):
    time.sleep(2)
    print(f'Hello {name}, Nice to meet you!')
if __name__ == '__main__':
    start = time.time()
    thread1 = Thread(target=say_hello, args=('Alice', ))
    thread1.start()
    say_hello('Bob')
    thread1.join()
    exe_time = time.time() - start
    print(exe_time)
'''
Hello Alice, Nice to meet you!
thread:t1 :2Hello Bob, Nice to meet you!

2.0016591548919678
thread:t2 :2
'''
1.3.1.2、使用kwargs传参
from threading import Thread
import time
def say_hello(name):
    time.sleep(2)
    print(f'Hello {name}, Nice to meet you!')
if __name__ == '__main__':
    start = time.time()
    thread1 = Thread(target=say_hello, kwargs={'name': 'Alice'})
    thread1.start()
    say_hello('Bob')
    thread1.join()
    exe_time = time.time() - start
    print(exe_time)
    
'''
Hello Bob, Nice to meet you!
Hello Alice, Nice to meet you!
2.00122332572937
'''

1.4、类包装

#encoding=utf-8
#类的方式创建线程
from threading import Thread
from time import sleep
class MyThread(Thread):
    def __init__(self,name):
        Thread.__init__(self)
        self.name =name
    def run(self):
        for i in range(3):
            print(f"thread:{self.name} :{i}")
            sleep(1)
if __name__ == '__main__':
    print("主线程,start")
    #创建线程(类的方式)
    t1 = MyThread('t1')
    t2 = MyThread('t2')
    #启动线程
    t1.start()
    t2.start()
    print("主线程,end")

'''
主线程,start
thread:t1 :0
thread:t2 :0
主线程,end
thread:t2 :1thread:t1 :1

thread:t1 :2thread:t2 :2
'''

1.5、 Timer

        Timer,也被称为定时器,它允许你在一定时间后执行一个函数或者可调用的对象。 Timer类是 Thread类的一个子类,因此它具有线程的所有特性,并且可以用来在后 台执行定时任务。

参数说明:

        interval: 一个浮点数或整数,表示在执行function之前需要等待的时间(以秒为单位)。
        function: 一个可调用的对象,当定时器到期时将被执行。
        args: 传递给function的位置参数元组。
        kwargs: 传递给function的关键字参数字典。

实例方法:

        start(): 启动定时器。Timer将在指定的时间间隔后开始执行目标函数。
        cancel(): 取消定时器。如果定时器尚未启动,则此方法无效。如果定时器正在运行,调用cancel()将停止定时器,并且目标函数不会被调用。

import threading
import time
def get_time():
    # 获取当前时间的时间戳
    current_time = time.time()
    # 将时间戳格式化为年月日时分秒的格式
    formatted_time = time.strftime('%Y-%m-%d %H:%M:%S',
    time.localtime(current_time))
    print(formatted_time)
if __name__ == '__main__':
    now_time = time.time()
    # 将时间戳格式化为年月日时分秒的格式
    formatted_time = time.strftime('%Y-%m-%d %H:%M:%S',
    time.localtime(now_time))
    print(formatted_time)
    # 创建一个Timer,5秒后执行hello函数
    timer = threading.Timer(5, get_time)
    timer.start()
    timer.cancel()
    # 在主线程中做一些其他事情
    print("Do some other things...")
    # 等待定时器完成(如果需要的话)
    timer.join()
'''
2025-xx-xx xx:xx:xx
Do some other things...
'''

1.6、守护线程 

        在行为上还有一种叫守护线程,主要的特征是它的生命周期。主线程死亡,它也就随之死亡。在python中,线程通过setDaemon(True|False)来设置是否为守护线程。

守护线程作用是为其他线程提供便利服务,守护线程最典型的应用就是 GC (垃圾收集器)

from threading import Thread
from time import sleep
class MyThread(Thread):
    def __init__(self,name):
        Thread.__init__(self)
        self.name =name
    def run(self):
        for i in range(3):
            print(f"thread:{self.name} :{i}")
            sleep(1)
if __name__ == '__main__':
    print("主线程,start")
    #创建线程(类的方式)
    t1 = MyThread('t1')
    #t1设置为守护线程
    t1.setDaemon(True)#3.10后被废弃,可以直接:t1.daemon=True
    #启动线程
    t1.start()
    print("主线程,end")

二、多线程与多进程的区别 

2.1、 调度

        多线程(Threading):线程是操作系统能够进行运算调度的最小单位,被包含 在进程之中,是进程中的实际运作单位。由于线程共享进程的内存空间,操作系 统可以在同一进程空间中快速切换不同线程,实现并发执行。

        多进程(Multiprocessing):进程是操作系统分配资源的最小单位。每个进程 都有自己的内存空间,因此进程间的通信比线程要复杂,但在python中,多进程 可以利用多核处理器的多个核心,实现真正的并行计算。

2.2、GIL(全局解释器锁)

        多线程:CPython解释器(Python的主要实现)有一个全局解释器锁(GIL), 它确保同一时刻只有一个线程执行Python字节码。因此,即使在多核CPU上,使 用多线程的Python程序也无法实现真正的并行计算,GIL限制了线程在执行CPU 密集型任务时的效率。

        多进程:每个Python进程都有自己的Python解释器和内存空间,因此GIL不会限 制多进程。多进程可以绕过GIL,充分利用多核处理器进行并行计算。

GIL锁的优点:

1. 简化内存管理:由于GIL确保同一时刻只有一个线程在执行,因此CPython的内存 管理可以设计得更加简单。它不需要考虑多个线程同时修改对象的情况,从而避 免了复杂的多线程内存回收问题。

2. 易于实现:GIL简化了CPython解释器的实现,因为它不需要考虑多线程并发执行 时的数据竞争和同步问题。

3. 单线程性能:在没有多线程竞争的情况下,单个线程的性能可以保持得很好,因 为GIL避免了不必要的上下文切换。 

GIL锁的缺点: 

1. 限制了并发性能:在多核处理器上,GIL使得即使在多线程程序中,也无法实现 真正的并行计算。因为GIL确保在任何给定时间只有一个线程在执行,所以在 CPU密集型任务中,多线程并不会带来性能上的提升。

2. 线程间竞争:在多线程环境中,当线程试图获取GIL时,可能会发生频繁的上下 文切换,这会导致性能下降,尤其是在线程数量较多时。

3. 不利于多核处理器:随着多核处理器的普及,GIL成为了Python程序利用多核优 势的一个障碍。尽管可以通过多进程来绕过GIL的限制,但这增加了程序的复杂 性和资源消耗。

4. 需要额外的同步机制:尽管GIL简化了单线程的内存管理,但在多线程程序中, 为了防止数据竞争,开发者仍然需要使用锁或其他同步机制来保护共享资源,这 增加了编程的复杂性。 

2.3、内存共享

        多线程:由于线程共享内存,它们可以直接读写同一内存地址的数据,这使得数 据共享变得简单,但同时也引入了线程安全问题。

        多进程:进程间内存是隔离的,数据共享需要通过进程间通信(IPC)机制,如 管道、消息队列、共享内存等。

2.4、创建和销毁开销

        多线程:线程通常比进程轻量,创建和销毁线程的开销相对较小。

        多进程:进程比线程重,创建和销毁进程的开销相对较大。

2.5、适用场景

        多线程:适合于I/O密集型任务,如网络请求、文件读写等,因为这些任务常常需 要等待,而GIL在等待期间会释放,使得其他线程可以执行。

        多进程:适合于CPU密集型任务,如科学计算、图像处理等,因为它们可以并行 运行而不会受到GIL的限制。

2.6、易用性

        多线程:由于共享内存和数据结构,多线程编程通常更简单,但也更容易出错。

        多进程:需要更多的代码来处理进程间通信和数据同步,但相对来说更安全。

三、线程间通信

3.1、Queue

先进先出的原则

queue.Queue(maxsize=0)

maxsize:队列的最大尺寸。如果设置为小于或等于0的数,则队列的尺寸是无 限的。 

类方法 :

●        Queue.qsize():返回队列中当前有几条消息。

●        Queue.empty():如果队列为空,返回True,否则返回 False。

●        Queue.full():如果队列已满(达到最大尺寸),返回 True,否则返回 False。

●        Queue.put(item, block=True, timeout=None):将 item 放入队列。如果 block 是True是 None(默认),则在必要时阻塞至有空闲的插槽, 如果timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没有可 用的插槽,将引发 queue.Full 异常。 

●        Queue.put_nowait(item):相当于 Queue.put(item, block=False)。如果队列已满,立即引发 queue.Full 异常。

●        Queue.get(b1ock=True,timeout=None):从队列中移除并返回一个元素。如果 block是 True 且 timeout是 None(默认),则在必要时阻塞至队列中有项目可用。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没有项目可用,将引发 queue.Empty 异常。

●        Queue.get_nowait():相当于 Queue.get(block=False)。如果队列为空立即引发 queue.Empty 异常。

 ●       Queue.task_done():指示之前入队的一个任务已经完成。由队列的消费者线程使用。每个Queue. get()调用之后,需要调用 Queue.task_done()告诉队列该任务处理完成。
●        Queue.join():阻塞调用线程,直到队列中的所有项目都被处理完(即队列中每个项目都有一个对应的 Queue.task_done()调用)。

from threading import Thread
from queue import Queue
import time
# 生产者函数
def producer(queue):
    for i in range(5):
        queue.put(f"Product {i}")
        print(f"Produced {i}")
        time.sleep(1)
    # 消费者函数
def consumer(queue):
    while True:
        product = queue.get()
        if product is None:
            break  # 接收到结束信号,退出循环
        print(f"Consumed {product}")
        time.sleep(1)
if __name__ == '__main__':
    queue = Queue(5)
    p = Thread(target=producer, args=(queue,))
    c = Thread(target=consumer, args=(queue,))
    p.start()
    c.start()
    p.join()  # 等待生产者线程结束


    queue.put(None)  # 发送结束信号给消费者
    c.join()  # 等待消费者线程结束
    
'''
Produced 0
Consumed Product 0
Produced 1Consumed Product 1

Produced 2Consumed Product 2

Produced 3Consumed Product 3

Produced 4
Consumed Product 4
'''

3.2、LifoQueue

后进先出

queue.LifoQueue(maxsize=0)

maxsize:队列的最大尺寸。如果设置为小于或等于0的数,则队列的尺寸是无 限的。 

常用方法: 

●        LifoQueue.put(item, block=True, timeout=None):将 如果 block 是 True 且 timeout 是 item 放入队列。 None(默认),则在必要时阻塞至有空闲 的插槽。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没 有可用的插槽,将引发完全异常。 
●        LifoQueue.put_nowait(item):相当于LifoQueue.put(item,b1ock=False)。如果队列已满,立即引发完全异常。
●        LifoQueue.get(block=True,timeout=None):从队列中移除并返回一个元素。如果 block是True且timeout是 None(默认),则在必要时阻塞至队列中有项目可用。如果 timeout 是正数,将最多阻塞timeout秒,如果在这段时间内没有项目可用,将引发完全异常。 
●        LifoQueue.get_nowait():相当于 LifoQueue.get(block=False)。如果队列为空,立即引发完全异常。
●        LifoQueue.qsize():返回队列中的项目数量
●        LifoQueue.empty():如果队列为空,返回 True,否则返回 False。
●        LifoQueue.full():如果队列已满(达到最大尺寸),返回 True,否则返回False 。

import queue
# 创建一个 LifoQueue
lifo_queue = queue.LifoQueue(5)
# 向 LifoQueue 中放入元素
lifo_queue.put('First')
lifo_queue.put('Second')
lifo_queue.put('Third')
# 从 LifoQueue 中取出元素
print(lifo_queue.get())
print(lifo_queue.get())
print(lifo_queue.get())
'''
Third
Second
First
'''

3.3、PriorityQueue

实现优先级队列

queue.PriorityQueue(maxsize=0) 

maxsize:队列的最大尺寸。如果设置为小于或等于0的数,则队列的尺寸是无 限的。

常用方法: 

●        PriorityQueue.put((priority, item), block=True, timeout=None):将 item 放入队列,并为其指定一个优先级 timeout 是 priority。如果 block 是 True 且 None(默认),则在必要时阻塞至有空闲的插槽。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没有可用的插槽,将引发 完全异常。
●        PriorityQueue.put_nowait((item, priority):相当于 PriorityQueue.put((item, priority), block=False)。如果队列已满,立 即引发完全异常。
●        PriorityQueue.get(block=True, timeout=None):从队列中移除并返回一 个元素。如果 block 是 True 且 timeout 是 None(默认),则在必要时阻塞 至队列中有项目可用。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在 这段时间内没有项目可用,将引发完全异常。
●        PriorityQueue.get_nowait():相当于 PriorityQueue.get(block=False)。如果队列为空,立即引发完全异常。
●        PriorityQueue.qsize():返回队列中的项目数量。
●        PriorityQueue.empty():如果队列为空,返回True,否则返回False。
●        PriorityQueue.full():如果队列已满(达到最大尺寸),返回True,否则返回 False。

import queue
# 创建一个 PriorityQueue
priority_queue = queue.PriorityQueue(5)
# 向 PriorityQueue 中放入元素
priority_queue.put((2, 'Task1'))
priority_queue.put((0, 'Task2'))
priority_queue.put((1, 'Task3'))
# 从 PriorityQueue 中取出元素
print(priority_queue.get())
print(priority_queue.get())
print(priority_queue.get())
'''
(0, 'Task2')
(1, 'Task3')
(2, 'Task1')
'''

四、线程同步

        在Python中,线程同步指的是一系列用于控制多个线程访问共享资源的方法和规 则,以避免数据不一致或竞争条件(Race Condition)的问题。由于线程是操作系统 调度的基本单元,它们可能会同时操作同一份数据,这可能会导致数据错误或难以预 测的结果。以下是几种常见的线程同步机制:

1. 锁(Locks):锁是最基本的同步机制。在Python中,可以通过 threading模块 的Lock类来实现。锁可以确保同一时间只有一个线程能够访问共享资源。线程 在访问资源前必须获取锁,访问结束后释放锁。

2. 信号量(Semaphores):信号量是一个更高级的同步机制,它维护了一个计数 器,线程可以增加或减少这个计数器。如果计数器为零,则线程会阻塞,直到其 他线程增加计数器。

3. 事件(Events):事件是一种线程之间的通信机制。一个线程可以设置事件,而 其他线程可以等待该事件的发生。这可以用来通知一个或多个线程某个条件已经 满足。

4. 条件变量(Condition Variables):条件变量通常与互斥锁一起使用,它允许 线程在某个条件不满足时挂起(等待),直到另一个线程通知条件已经满足。

5. 屏障(Barriers):屏障是一种同步机制,允许多个线程在某个点上同步,直到 所有线程都到达屏障点后,才能继续执行。

4.1、锁

4.1.1、Lock

import threading

lock = threading.Lock()

获取锁:使用 lock.acquire() 方法可以获取锁。如果锁已经被其他线程持有, 则调用此方法的线程将被阻塞,直到锁被释放。

释放锁:使用 lock.release() 方法可以释放锁。释放锁后,其他线程可以获取 该锁。 

from threading import Thread, Lock
from queue import Queue
import time
import sys
def producer(queue):
    while True:
        queue.put('hello')
        time.sleep(0.3)
def consumer(queue, lock, consumer_id):
    while True:
        with lock:
            if not queue.empty():
                res = queue.get()
            print(f'consumer{consumer_id}: {res}')
            sys.stdout.flush()
if __name__ == '__main__':
    queue = Queue(10)  # 设置队列的大小
    lock = Lock()
    producer_thread = Thread(target=producer, args=(queue,))
    producer_thread.start()
    consumers = []
    for i in range(1, 6):  # 创建5个消费者线程
        consumer_thread = Thread(target=consumer, args=(queue, lock, i))
        consumer_thread.start()
        consumers.append(consumer_thread)
    # 等待生产者线程结束
    producer_thread.join()
    # 等待所有消费者线程结束
    for consumer_thread in consumers:
        consumer_thread.join()

4.1.2、RLock

import threading

rlock = threading.RLock()

获取锁:使用 rlock.acquire() 方法可以获取锁。如果锁已经被其他线程持 有,则调用此方法的线程将被阻塞,直到锁被释放。如果锁已经被当前线程持 有,则持有计数增加,方法立即返回。

释放锁:使用 rlock.release() 方法可以释放锁。这会导致持有计数减少。如 果计数降到零,锁被释放,其他线程可以获取。 

from threading import Thread, RLock
from queue import Queue
import time
import sys
def producer(queue):
    while True:
        queue.put('hello')
        time.sleep(0.5)
def consumer(queue, lock, consumer_id):
    while True:
        with lock:
            if not queue.empty():
                res = queue.get()
            print(f'consumer{consumer_id}: {res}')
            sys.stdout.flush()
if __name__ == '__main__':
    queue = Queue(10)  # 设置队列的大小
    lock = RLock()
    producer_thread = Thread(target=producer, args=(queue,))
    producer_thread.start()
    consumers = []
    for i in range(1, 6):  # 创建5个消费者线程
        consumer_thread = Thread(target=consumer, args=(queue, lock, i))
        consumer_thread.start()
        consumers.append(consumer_thread)

4.1.3、死锁 

在多线程程序中,死锁问题很大一部分是由于一个线程同时获取多个锁造成的。举例:

有两个人都要做饭,都需要“锅”和“菜刀”才能炒菜。

from threading import Thread, Lock
from time import sleep
def fun1():
    lock1.acquire()
    print('fun1拿到菜刀')
    sleep(2)
    lock2.acquire()
    print('fun1拿到锅')
    lock2.release()
    print('fun1释放锅')
    lock1.release()
    print('fun1释放菜刀')
def fun2():
    lock2.acquire()
    print('fun2拿到锅')
    lock1.acquire()
    print('fun2拿到菜刀')
    lock1.release()
    print('fun2释放菜刀')
    lock2.release()
    print('fun2释放锅')
if __name__ == '__main__':
    lock1 = Lock()
    lock2 = Lock()
    t1 = Thread(target=fun1)
    t2 = Thread(target=fun2)
    t1.start()
    t2.start()

死锁的解决方法

        死锁是由于“同步块需要同时持有多个锁造成”的,要解决这个问题,思路很简单,就是:同一个代码块,不要同时持有两个对象锁。

4.2、信号量

        Semaphore(信号量)是一个同步原语,用于控制 对共享资源的访问数量。信号量维护一个内部计数器,这个计数器可以通过两个基本 方法来控制: acquire()(获取)和 release()(释放)。

计数信号量:信号量是一个整数计数器,用于表示可用的资源数量。每当线程想 要访问共享资源时,它必须先获取信号量。如果信号量的计数大于零,则线程可 以减少计数(获取信号量),并继续执行。如果计数为零,则线程将被阻塞,直 到其他线程释放信号量。

资源限制:信号量通常用于限制对共享资源的并发访问数量。 

import threading

semaphore = threading.Semaphore(value) 

value:表示最大的并发访问数量。

获取信号量:调用 semaphore.acquire()方法来获取信号量。如果计数器大于 零,则计数器减一,方法立即返回。如果计数器为零,调用线程将被阻塞,直到 其他线程释放信号量。

释放信号量:调用 semaphore.release()方法来释放信号量。这会增加计数器 的值,并唤醒等待获取信号量的线程(如果有)。 

from threading import Thread, Lock
from time import sleep
from multiprocessing import Semaphore
"""
一个房间一次只允许两个人通过
若不使用信号量,会造成所有人都进入这个房子
若只允许一人通过可以用锁-Lock()
"""
def home(name, se):
    se.acquire() # 拿到一把钥匙
    print(f'{name}进入了房间')
    sleep(3)
    print(f'******************{name}走出来房间')
    se.release() # 还回一把钥匙
if __name__ == '__main__':
    se = Semaphore(2)    # 创建信号量的对象,有两把钥匙
    for i in range(7):
        p = Thread(target=home, args=(f'tom{i}', se))
        p.start()
'''
执行结果:
tom0进入了房间
tom1进入了房间
******************tom1走出来房间
tom2进入了房间
******************tom0走出来房间
tom3进入了房间
******************tom2走出来房间******************tom3走出来房间
tom4进入了房间
tom5进入了房间
******************tom5走出来房间******************tom4走出来房间
tom6进入了房间
******************tom6走出来房间
Process finished with exit code 0
'''

4.3、事件

        Event 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,event 对象中的信号标志被设置假。如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行

方法名说明
event.wait(timeout=None)调用该方法的线程会被阻塞,如果设置了timeout参数,超时后,线程会停止阻塞继续执行;
event.set()将event的标志设置为True,调用wait方法的所有线程将被唤醒
event.clear()将event的标志设置为False,调用wait方法的所有线程将被阻塞
event.is_set()判断event的标志是否为True
#小伙伴们,围着吃火锅,当菜上齐了,请客的主人说:开吃!
#于是小伙伴一起动筷子,这种场景如何实现
import threading
import time
def chihuoguo(name):
    #等待事件,进入等待阻塞状态
    print(f'{name}已经启动')
    print(f'小伙伴{name}已经进入就餐状态!')
    time.sleep(1)
    event.wait()
    # 收到事件后进入运行状态
    print(f'{name}收到通知了.' )
    print(f'小伙伴{name}开始吃咯!')
if __name__ == '__main__':
    event = threading.Event()
    # 创建新线程
    thread1 = threading.Thread(target=chihuoguo, args=("tom", ))
    thread2 = threading.Thread(target=chihuoguo, args=("cherry", ))
    # 开启线程
    thread1.start()
    thread2.start()
    time.sleep(10)
    # 发送事件通知
    print('---->>>主线程通知小伙伴开吃咯!')
    event.set()
'''
执行结果:
tom已经启动
小伙伴tom已经进入就餐状态!
cherry已经启动
小伙伴cherry已经进入就餐状态!
---->>>主线程通知小伙伴开吃咯!
tom收到通知了.
小伙伴tom开始吃咯!
cherry收到通知了.
小伙伴cherry开始吃咯!
'''

4.4、条件变量

        在Python的 threading模块中, Condition(条件变量)是一个高级同步机制,它 允许线程在某些条件满足时进行等待,并在这些条件不满足时继续执行。与多进程中 的Condition一样,多线程中的Condition内部也提供了一个RLock,这意味着它提供 了锁机制,并在此基础上提供了条件等待和通知的功能。

from threading import Condition
threading.Condition()

●        acquire():获取条件变量的锁。如果锁已经被其他线程持有,则调用线程将被 阻塞,直到锁被释放。

●        release():释放条件变量的锁。

●        wait():释放条件变量的锁,并将调用线程阻塞,直到其他线程调用 notify() 或notify_all()

●        wait_for(predicate,timeout=None):释放内部锁,并使调用线程阻塞。 predicate 应该是一个可调用对象而且它的返回值可被解释为一个布尔值,如果为 True,则线程被唤醒,False就继续等待。 timeout 参数给出最大等待时间。

●        notify():唤醒一个等待的线程。如果有多于一个线程在等待,则随机选择一 个唤醒。

 ●       notify_all():唤醒所有等待的线程。 

4.4.1、使用默认锁

from threading import Thread, Condition
import time
def producer(condition):
    while True:
        condition.acquire()
        condition.notify_all()  # 通知所有等待的消费者
        condition.release()
        time.sleep(3)  # 模拟生产者的工作周期
def consumer(condition, number):
    while True:
        condition.acquire()
        print(f'{number}正在等待condition')
        condition.wait()  # 等待条件被满足
        print(f'{number}已释放condition')
# 证明condition自带的是RLock,而不是Lock
# condition.release()
if __name__ == '__main__':
    condition = Condition()
    producer_thread = Thread(target=producer, args=(condition,))
    producer_thread.start()
    consumers = [
        Thread(target=consumer, args=(condition, 1)),
        Thread(target=consumer, args=(condition, 2)),
        Thread(target=consumer, args=(condition, 3)),
        Thread(target=consumer, args=(condition, 4)),
        Thread(target=consumer, args=(condition, 5))
    ]
    for c in consumers:
        c.start()
    producer_thread.join()
    for c in consumers:
        c.join()

4.4.2、使用with管理

from threading import Thread, Condition
import time
def producer(condition):
    while True:
        time.sleep(3)  # 模拟生产者的工作周期
        with condition:  # 使用with语句自动管理锁
            condition.notify_all()  # 通知所有等待的消费者
def consumer(condition, number):
    while True:
        with condition:  # 使用with语句自动管理锁
            print(f'{number}正在等待condition')
            condition.wait()  # 等待条件被满足
            print(f'{number}已释放condition')
if __name__ == '__main__':
    condition = Condition()
    producer_thread = Thread(target=producer, args=(condition,))
    producer_thread.start()
    consumers = [
        Thread(target=consumer, args=(condition, 1)),
        Thread(target=consumer, args=(condition, 2)),
        Thread(target=consumer, args=(condition, 3)),
        Thread(target=consumer, args=(condition, 4)),
        Thread(target=consumer, args=(condition, 5))
    ]
    for c in consumers:
        c.start()
    producer_thread.join()
    for c in consumers:
        c.join()

4.4.3、使用自定义锁 

from threading import Thread, Condition, Lock
import time
def producer(condition):
    while True:
        with condition:  # 使用with语句自动管理锁
            condition.notify_all()  # 通知所有等待的消费者
        time.sleep(3)  # 模拟生产者的工作周期
def consumer(condition, number):
    while True:
        with condition:  # 使用with语句自动管理锁
            print(f'{number}抢到了锁,正在等待condition')
            condition.wait()  # 等待条件被满足
            print(f'condition已经触发,{number}释放了锁,')
if __name__ == '__main__':
    lock = Lock()
    condition = Condition(lock)
    producer_thread = Thread(target=producer, args=(condition,))
    producer_thread.start()
    consumers = [
        Thread(target=consumer, args=(condition, 1)),
        Thread(target=consumer, args=(condition, 2)),
        Thread(target=consumer, args=(condition, 3)),
        Thread(target=consumer, args=(condition, 4)),
        Thread(target=consumer, args=(condition, 5))
    ]
    for c in consumers:
        c.start()
    producer_thread.join()
    for c in consumers:
        c.join()

4.5、 屏障

        在Python的 threading模块中, Barrier(屏障)是一种同步机制,用于让一组线 程在某个点上同步。当所有线程都到达屏障点时,它们将继续执行;如果任何线程没 有到达屏障点,则所有线程都会被阻塞,直到所有线程都到达。

import threading

barrier = threading.Barrier(parties, action=None, timeout=None)

parties:屏障点上需要等待的线程数量。

action(可选):当所有线程到达屏障点时,可以执行的一个函数。

timeout(可选):默认的超时时间,如果wait没有指定时间将使用这个时间。 

Barrier的方法与属性: 

●        wait(timeout):阻塞线程,直到屏障被释放。如果所有线程都到达屏障点, 屏障将被释放,所有线程继续执行;如果任何线程没有到达,所有线程将被阻 塞。如果提供了timeout,这里的timeout会优先于创建Barrier对象时提供的 timeout参数。该函数会返回一个整数,取值在0~(parties-1)之间。

●        reset():重置Barrier为默认的初始状态。如果Barrier中仍有线程等待释放, 将会引发异常。

●        abort():使Barrier处于破损状态,这将导致任何现有和未来对wait()方法的调 用失败并引发异常。

●        parties:冲出Barrier所需要的线程数量。

●        n_waiting:当前时刻正在Barrier中阻塞的线程数量。

●        broken:一个布尔值,表示Barrier是否为破损态。 

import threading
import time
import random
def worker(barrier):
    print(f"Worker {threading.current_thread().name} is waiting for the barrier.")
    seconds = random.randint(0, 5)
    print(f"Worker {threading.current_thread().name} is waiting{seconds}.")
    time.sleep(seconds)
    barrier.wait()
    print(f"Worker {threading.current_thread().name} has passed the barrier.")
if __name__ == '__main__':
    barrier = threading.Barrier(5)
    workers = [
        threading.Thread(target=worker, args=(barrier,)),
        threading.Thread(target=worker, args=(barrier,)),
        threading.Thread(target=worker, args=(barrier,)),
        threading.Thread(target=worker, args=(barrier,)),
        threading.Thread(target=worker, args=(barrier,)),
    ]
    for worker in workers:
        worker.start()
    for worker in workers:
        worker.join()

五、线程池

        线程池维护一个工作线程的集合,用于执行任务。当任务到达时,线程池可以选择一 个空闲的线程来处理任务,而不是为每个任务都创建一个新的线程。

降低资源消耗:线程的创建和销毁开销较大,通过重用线程,可以降低这些开 销。

提高响应速度:不需要等待线程的创建就能立即执行任务。

提高线程的可管理性:线程池可以统一管理线程的创建、销毁、数量等内容。

from concurrent.futures import  ThreadPoolExecutor
ThreadPoolExecutor(max_workers=None, thread_name_prefix='',initializer=None, initargs=())

max_workers:线程池中线程的最大数量。如果设置为 None或未指定,为 min(32, os.cpu_count() + 4), 这个默认值会保留至少 5 个工作线程用于 I/O 密 集型任务。 对于那些释放了 GIL 的 CPU 密集型任务,它最多会使用 32 个 CPU 核心。这样能够避免在多核机器上不知不觉地使用大量资源。

thread_name_prefix:线程名称的前缀,有助于调试时识别线程。

initializer:一个可选的可调用对象,每个工作线程在启动时都会调用它。这 可以用来执行线程的初始化操作,例如设置线程局部存储。

initargs:一个元组,其中包含传递给 initializer的可调用对象的参数。 

线程池的方法: 

1. submit(fn, *args, **kwargs): 提交一个可调用的函数 kwargs到线程池中执行。这个方法返回一个 fn和必要的参数 args和 Future对象。

2. map(func, *iterables, timeout=None, chunksize=1): 它允许你将一个函数 func 应用于多个可迭代对象 iterables 中的元素,并且并行地在多个线程上 执行这些函数调用。 timeout是可选参数,用于设置阻塞等待每个任务完成的最 大秒数。如果 timeout 被触发,将引发 concurrent.futures.TimeoutError。 chunksize是可选参数,用于指定每次 切割几个参数给func,不过只对进程池有效果,对线程池无效。返回的结果是一 个迭代器。

3. shutdown(wait=True): 关闭线程池,停止接收新任务。如果 wait参数为 True,则等待所有已提交的任务完成。当使用with语句创建线程池时,with语 句会在结束后自动调用shutdown。 

submit 

from concurrent.futures import  ThreadPoolExecutor
def calculate_square(number):
    return number * number
# 数字列表
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 使用with语句创建ThreadPoolExecutor实例
with ThreadPoolExecutor(max_workers=5) as executor:
    results = []
    for num in numbers:
        result = executor.submit(calculate_square, num)
        results.append(result)
    for res in results:
        print(res.result())

map

from concurrent.futures import ThreadPoolExecutor
import time
def square(n):
    """计算给定数字的平方"""
    time.sleep(0.1)  # 模拟耗时操作
    return n * n
if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=4) as executor:  # 创建包含4个线程的线程池
        numbers = range(10)
        results = list(executor.map(square, numbers))  # 并行计算每个数字的平方
        print(results)

六、思维导图


http://www.kler.cn/a/535426.html

相关文章:

  • 数据结构与算法学习笔记----博弈论
  • IOPS与吞吐量、读写块大小及延迟之间的关系
  • 在Debian 12上安装VNC服务器
  • Python自动化测试selenium指定截图文件名方法
  • 笔试-业务逻辑4
  • 自定义多功能输入对话框:基于 Qt 打造灵活交互界面
  • 尚硅谷spring框架视频教程——学习笔记二(数据库、事务、webflux)
  • [实验日志] VS Code 连接服务器上的 Python 解释器进行远程调试
  • node.js的require()
  • 低至3折,百度智能云千帆宣布全面支持DeepSeek-R1/V3调用
  • Web3.0 技术应用溯源系统建设
  • MS17-010(永恒之蓝1.0)漏洞远程控制win7系统操作实战小白通俗易懂
  • 如何使用sqlalchemy的orm模式构建表结构1对1,1对多,多对多的关系
  • 如何打造一个更友好的网站结构?
  • Vue组件开发——进阶篇
  • OS10 固件更新步骤-U 盘方式
  • SQL 中的谓词逻辑
  • PHP商会招商项目系统小程序
  • CentOS服务器部署Docker+Jenkins持续集成环境
  • 微信小程序调用企业微信客户服务插件联通企业微信客服
  • 【Elasticsearch】geotile grid聚合
  • Python Bug修复案例分析:列表切片引发的内存泄漏问题
  • DeepSeek R1 Distill Llama 70B(免费版)API使用详解
  • CSS Position(定位)详解及举例说明
  • C#迭代器和Unity的Coroutine原理
  • Vue el-input密码输入框 按住显示密码,松开显示*;阻止浏览器密码回填,自写密码输入框;校验输入非汉字内容;文本框聚焦到内容末尾;