【13】PyQt多线程多任务管理
目录
多线程&多任务介绍
多线程管理
1. 拷贝依赖
2. 使用示例
多任务管理
1. 拷贝依赖
2. 使用示例
多线程&多任务介绍
多线程&多任务通常是指将一个任务或多个任务运行在子线程,并且子线程可以独立启动,或通过线程池启动。子线程通常用于执行以下类型的任务:
- 长时间运行的任务:子线程适合处理那些耗时较长的任务,以避免阻塞主线程的执行。例如,进行复杂的计算、图像处理、视频编解码等任务可以放在子线程中执行,以保持应用程序的响应性。
- 阻塞型任务:如果有一些阻塞型的操作,可能会导致主线程被阻塞,例如进行网络请求、文件读写、数据库查询等。将这些任务放在子线程中执行可以确保主线程的流畅运行,同时避免应用程序的无响应状态。
- 并行处理任务:如果有多个独立的任务需要同时执行,可以将它们分配给多个子线程来实现并行处理。例如,批量下载多个文件、同时进行多个数据处理任务等。
- 异步操作:子线程可以用于执行异步操作,例如在后台处理数据、定时任务、监听外部设备的输入等。这样可以保持应用程序的其他部分正常运行,同时处理异步任务。
需要注意的是,使用子线程需要谨慎处理线程间的数据共享和同步问题,以避免出现竞态条件或其他并发问题。在 PyQt 中,可以使用线程间通信机制(如信号槽机制)来安全地传递数据和操作UI元素。
多线程管理
通常线程有两种启动方式,分别对应不同的使用场景:
- 将任务直接运行在一个独立的线程里
用于执行一次性长期任务(例如:并行任务,多线程下载),长期循环阻塞式接收任务(例如:接收蓝牙、网络、串口消息)
- 将任务运行在一个线程池的线程里
用于执行频繁触发型短期任务,避免线程资源浪费(例如:数据库操作、文件写入、网络数据下载),因为线程池可以重复利用其内部维护的N个线程,按需创建,减少资源的申请与释放操作。
1. 拷贝依赖
"""
线程管理器,用于管理线程的创建、启动、停止等。
包含两种线程启动方式:
1. start 运行一个独立线程,用于执行一次性短期任务(例如:并行任务,多线程下载)、长期循环接收任务(例如:阻塞式接收蓝牙、串口消息)
2. start_in_thread_pool 在一个线程池里运行任务,用于执行非定期触发型短期任务,避免线程资源浪费(例如:数据库操作、文件写入、网络数据下载)
其中包含了一个 Worker 类
● Worker 类用于创建线程任务
Worker 类继承自 QRunnable 类,用于创建线程任务。
● 其中包含了一个 run 方法,用于执行线程任务
● 一个 signal_connect 方法,用于连接信号槽
● 一个 stop 方法,用于停止线程任务
● 一个 emit_msg 方法,用于发送消息。
"""
import inspect
from typing import Callable
from PyQt5.QtCore import pyqtSignal, QObject, QRunnable, QThread, QThreadPool
class WorkerSignals(QObject):
signal_finished = pyqtSignal()
signal_error = pyqtSignal(Exception)
signal_result = pyqtSignal(object)
signal_msg = pyqtSignal(object)
class Worker(QRunnable):
def __init__(self, target: Callable, args=None, kwargs=None):
super().__init__()
self.setAutoDelete(True) # 自动删除,避免内存泄漏
self.__func = target
self.__args = args if args else ()
self.__kwargs = kwargs if kwargs else {}
self.__signals = WorkerSignals()
self.is_running = False
self.worker_thread: WorkerThread = None
def run(self):
self.is_running = True
try:
# 如果func的第一个参数是Worker类型,则将self作为第一个参数传入
if self.__is_worker_func(self.__func):
result = self.__func(self, *self.__args, **self.__kwargs)
else:
# 否则,直接传入参数
result = self.__func(*self.__args, **self.__kwargs)
self.__signals.signal_result.emit(result)
except Exception as e:
self.__signals.signal_error.emit(e)
finally:
self.is_running = False
self.__signals.signal_finished.emit()
def signal_connect(self, msg_handler=None, result_handler=None, finished_handler=None, error_handler=None):
if msg_handler:
self.__signals.signal_msg.connect(msg_handler)
if result_handler:
self.__signals.signal_result.connect(result_handler)
if finished_handler:
self.__signals.signal_finished.connect(finished_handler)
if error_handler:
self.__signals.signal_error.connect(error_handler)
return self
def stop(self):
self.is_running = False
def emit_msg(self, msg):
self.__signals.signal_msg.emit(msg)
def start(self, daemon=True):
"""
1. 运行一个独立线程,用于执行一次性短期任务(例如:并行任务,多线程下载)、长期循环接收任务(例如:阻塞式接收蓝牙、串口消息)
:return:
"""
self.worker_thread = WorkerThread(self)
self.worker_thread.daemon = daemon
self.worker_thread.start()
return self.worker_thread
def start_in_thread_pool(self):
"""
2. 在一个线程池里运行任务,用于执行非定期的短期任务,避免线程资源浪费(例如:文件写出、网络数据下载)
:param refresh_worker: 任务
"""
QThreadPool.globalInstance().start(self)
@classmethod
def __is_worker_func(cls, func: Callable):
"""
判断一个函数是否是worker函数,worker函数的第一个参数必须是Worker类型
:param func:
:return:
"""
sig = inspect.signature(func)
# 判断第一个参数是否是Worker类型,或者参数名是否是worker
param_keys = list(sig.parameters.keys())
if len(param_keys) > 0:
first_param = sig.parameters[param_keys[0]]
if first_param.annotation == Worker:
return True
if first_param.name == "worker":
return True
return False
class WorkerThread(QThread):
def __init__(self, worker: Worker):
super().__init__()
self.__worker = worker
def run(self):
self.__worker.run()
2. 使用示例
以下代码分别显示了如下两个应用场景:
- 使用方式1:单线程循环接收消息示例
- 使用方式2:利用线程池异步执行多个独立任务示例
代码如下:
# 使用示例
import sys
import os
import time
import threading
from PyQt5.QtCore import pyqtSlot
from PyQt5.QtWidgets import QApplication
import requests
from qt_worker import Worker
def long_time_recv_task(worker: Worker, title, start):
counter = start
thread_name = threading.currentThread().name
while worker.is_running:
# 模拟阻塞(等待网络、串口、蓝牙等)
time.sleep(1)
# 模拟收到消息
worker.emit_msg(f"{title} long time task {counter} : {thread_name}")
counter += 1
if counter >= 110:
break
return "refresh_worker done: {}".format(counter)
@pyqtSlot(object)
def on_result_received(msg):
thread_name = threading.currentThread().name
print(f"result: < {msg} > {thread_name}")
def single_recv_thread_test():
"""
单线程循环接收消息
:return:
"""
worker = Worker(long_time_recv_task, args=("消息接收",), kwargs={"start": 100})
worker.signal_connect(
msg_handler=lambda msg: print(msg),
result_handler=on_result_received,
)
worker.start()
def pic_download_task(url):
"""
下载图片, 保存到pic目录
:param url: 图片地址
:return:
"""
response = requests.get(url)
if response.status_code != 200:
print("连接图片服务器失败:", response.status_code)
return
file_name = url.split('/')[-1]
# 如果pic目录不存在,则创建
if not os.path.exists('pic'):
os.mkdir('pic')
with open(f"pic/{file_name}", 'wb') as f:
f.write(response.content)
# 返回f的绝对路径
return "{} -> {}".format(url, os.path.abspath(f.name))
def thread_pool_test():
"""
利用线程池连续下载多个图片文件
:return:
"""
pics = [
"https://www.baidu.com/img/bd_logo1.png",
"https://c-ssl.duitang.com/uploads/blog/202305/26/EWSwLxqBhV5zZJa.jpg",
"https://c-ssl.duitang.com/uploads/blog/202305/26/lGSxjBMefx04z33.jpg",
"https://c-ssl.duitang.com/uploads/blog/202305/26/XxSLogyQCQd9emB.jpg",
"https://c-ssl.duitang.com/uploads/item/202002/26/20200226215648_yynrr.jpg",
]
for pic in pics:
worker = Worker(pic_download_task, args=(pic,))
worker.signal_connect(result_handler=lambda msg: print("保存成功:", msg))
worker.start_in_thread_pool()
if __name__ == '__main__':
app = QApplication(sys.argv)
# 使用方式1:单线程循环接收消息示例
single_recv_thread_test()
# 使用方式2:利用线程池异步执行多个任务示例(下载多个图片文件)
thread_pool_test()
sys.exit(app.exec_())
多任务管理
指开启一个长期运行的线程,在线程内部,运行一个循环,循环中阻塞式地接收用户发来的任务(定期或非定期),并及时按照用户预定的函数进行执行(通常要消耗一小段时间)。这个多任务管理器应有如下特点:
- 不会阻塞主线程(保障主界面操作顺滑)
- 一旦任务完成,将执行结果返回给任务的发布者。这可以通过回调函数、事件或其他适当的机制来实现。
- 如果任务执行过程中发生异常,需要将异常信息返回给任务的发布者,以便了解并采取适当的处理措施。
- 可以保障任务的执行顺序和发布任务的顺序一致(通过队列保证要求)
- 这个长期任务管理器可以通过调用方法进行停止
1. 拷贝依赖
这段代码定义了一个 TaskWorker
类,该类继承自 QThread
类,并使用队列实现了多个任务的异步执行。
"""
运行一个独立线程,用于执行长期循环发送任务,可以随时执行异步任务,内部维护一个消息队列(例如:发送蓝牙、串口消息)
这段代码定义了一个 TaskWorker 类,该类继承自 QThread 类,并使用队列实现了任务的异步执行。
"""
from queue import Queue
from PyQt5.QtCore import QThread, pyqtSignal
class TaskWorker(QThread):
taskResult = pyqtSignal(object)
taskError = pyqtSignal(Exception)
taskFinished = pyqtSignal()
def __init__(self, do_task, parent=None):
super(TaskWorker, self).__init__(parent)
self.do_task = do_task
self.task_queue = Queue()
self.is_running = True
def run(self):
while self.is_running:
# 不断从队列中取出任务并执行,如果没有任务则阻塞
task_arg = self.task_queue.get()
# 如果取出的任务为 None,且线程已设置为关闭,则退出线程
if task_arg is None and not self.is_running:
break
try:
result = self.do_task(task_arg)
self.taskResult.emit(result)
except Exception as e:
self.taskError.emit(e)
self.taskFinished.emit()
def signal_connect(self, result_handler=None, finished_handler=None, error_handler=None):
# Connect the worker's signal to the handler slot
if result_handler is not None:
self.taskResult.connect(result_handler)
if finished_handler is not None:
self.taskFinished.connect(finished_handler)
if error_handler is not None:
self.taskError.connect(error_handler)
return
def join_queue(self, task):
if not self.is_running:
return
self.task_queue.put(task)
def stop(self):
self.is_running = False
# Put a None task to the queue to stop the thread
self.task_queue.put(None)
2. 使用示例
主程序中创建了一个 TaskWorker
实例,将任务添加到任务队列中,并使用手动/定时器定期添加任务。
当任务完成时,TaskWorker
实例会发出信号,主程序中的槽函数会接收到这些信号并进行处理。
代码具体步骤如下:
- 在主程序中定义一个
do_task
函数,该函数用于执行任务。- 在主程序中定义两个槽函数
on_result
和on_error
,分别用于处理任务完成和任务出错的信号。- 创建一个
TaskWorker
实例,并传入任务函数,函数里是任务要执行的内容- 将
TaskWorker
实例的信号连接到槽函数。- 运行
TaskWorker
实例的start
方法启动线程- 运行主程序,通过各种方式把任务及任务参数通过
join_queue
加入队列- 等待多个任务执行完成。
"""
主程序中创建了一个 TaskWorker 实例,将任务添加到任务队列中,并使用定时器定期添加任务。
当任务完成时,TaskWorker 实例会发出信号,主程序中的槽函数会接收到这些信号并进行处理。
代码具体步骤如下:
1. 在主程序中定义一个 do_task 函数,该函数用于执行任务。
2. 在主程序中定义两个槽函数 on_result 和 on_error,分别用于处理任务完成和任务出错的信号。
3. 创建一个 TaskWorker 实例。
4. 将 TaskWorker 实例的信号连接到槽函数。
5. 将任务添加到任务队列中,并使用定时器定期添加任务。
6. 运行主程序,等待任务执行完成。
"""
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import pyqtSlot, QTimer
import sys
import threading
import time
from qt_task import TaskWorker
def do_task(task_arg):
a, b = task_arg
rst = a / b
# Simulate a time-consuming task
time.sleep(1) # Pause for 1 seconds
return f"Task signal_result {a} / {b} = {rst}"
@pyqtSlot(object)
def on_result(result):
# print(threading.currentThread())
print("on result: ", result)
@pyqtSlot(Exception)
def on_error(e):
# print(threading.currentThread())
print("on error: ", e)
if __name__ == '__main__':
app = QApplication(sys.argv)
print("main: ", threading.currentThread())
task_worker = TaskWorker(do_task)
task_worker.signal_connect(
result_handler=on_result,
finished_handler=lambda: print("on finished"),
error_handler=on_error,
)
task_worker.start()
# Add tasks to the task manager
task_worker.join_queue((3, 2))
task_worker.join_queue((4, 2))
task_worker.join_queue((5, 2))
task_worker.join_queue((5, 0))
# Use a timer to add tasks periodically
timer = QTimer()
timer.timeout.connect(lambda: task_worker.join_queue((5, 2)))
timer.start(3000) # Add a task every 5 seconds
# 执行一个10秒后的延时任务
QTimer.singleShot(10000, lambda: task_worker.stop())
sys.exit(app.exec_())