当前位置: 首页 > article >正文

【13】PyQt多线程多任务管理

目录

多线程&多任务介绍

多线程管理

1. 拷贝依赖

2. 使用示例

多任务管理

1. 拷贝依赖

2. 使用示例


多线程&多任务介绍

多线程&多任务通常是指将一个任务或多个任务运行在子线程,并且子线程可以独立启动,或通过线程池启动。子线程通常用于执行以下类型的任务:

  • 长时间运行的任务:子线程适合处理那些耗时较长的任务,以避免阻塞主线程的执行。例如,进行复杂的计算、图像处理、视频编解码等任务可以放在子线程中执行,以保持应用程序的响应性。
  • 阻塞型任务:如果有一些阻塞型的操作,可能会导致主线程被阻塞,例如进行网络请求、文件读写、数据库查询等。将这些任务放在子线程中执行可以确保主线程的流畅运行,同时避免应用程序的无响应状态。
  • 并行处理任务:如果有多个独立的任务需要同时执行,可以将它们分配给多个子线程来实现并行处理。例如,批量下载多个文件、同时进行多个数据处理任务等。
  • 异步操作:子线程可以用于执行异步操作,例如在后台处理数据、定时任务、监听外部设备的输入等。这样可以保持应用程序的其他部分正常运行,同时处理异步任务。

需要注意的是,使用子线程需要谨慎处理线程间的数据共享和同步问题,以避免出现竞态条件或其他并发问题。在 PyQt 中,可以使用线程间通信机制(如信号槽机制)来安全地传递数据和操作UI元素。

多线程管理

通常线程有两种启动方式,分别对应不同的使用场景:

  • 将任务直接运行在一个独立的线程里

用于执行一次性长期任务(例如:并行任务,多线程下载),长期循环阻塞式接收任务(例如:接收蓝牙、网络、串口消息)

  • 将任务运行在一个线程池的线程里

用于执行频繁触发型短期任务,避免线程资源浪费(例如:数据库操作、文件写入、网络数据下载),因为线程池可以重复利用其内部维护的N个线程,按需创建,减少资源的申请与释放操作。

1. 拷贝依赖

"""
线程管理器,用于管理线程的创建、启动、停止等。
包含两种线程启动方式:
1. start 运行一个独立线程,用于执行一次性短期任务(例如:并行任务,多线程下载)、长期循环接收任务(例如:阻塞式接收蓝牙、串口消息)
2. start_in_thread_pool 在一个线程池里运行任务,用于执行非定期触发型短期任务,避免线程资源浪费(例如:数据库操作、文件写入、网络数据下载)

其中包含了一个 Worker 类
	● Worker 类用于创建线程任务
Worker 类继承自 QRunnable 类,用于创建线程任务。
    ● 其中包含了一个 run 方法,用于执行线程任务
    ● 一个 signal_connect 方法,用于连接信号槽
    ● 一个 stop 方法,用于停止线程任务
    ● 一个 emit_msg 方法,用于发送消息。
"""
import inspect
from typing import Callable

from PyQt5.QtCore import pyqtSignal, QObject, QRunnable, QThread, QThreadPool


class WorkerSignals(QObject):
    signal_finished = pyqtSignal()
    signal_error = pyqtSignal(Exception)
    signal_result = pyqtSignal(object)
    signal_msg = pyqtSignal(object)


class Worker(QRunnable):
    def __init__(self, target: Callable, args=None, kwargs=None):
        super().__init__()
        self.setAutoDelete(True)  # 自动删除,避免内存泄漏
        self.__func = target
        self.__args = args if args else ()
        self.__kwargs = kwargs if kwargs else {}
        self.__signals = WorkerSignals()
        self.is_running = False
        self.worker_thread: WorkerThread = None

    def run(self):
        self.is_running = True
        try:
            # 如果func的第一个参数是Worker类型,则将self作为第一个参数传入
            if self.__is_worker_func(self.__func):
                result = self.__func(self, *self.__args, **self.__kwargs)
            else:
                # 否则,直接传入参数
                result = self.__func(*self.__args, **self.__kwargs)
            self.__signals.signal_result.emit(result)
        except Exception as e:
            self.__signals.signal_error.emit(e)
        finally:
            self.is_running = False
            self.__signals.signal_finished.emit()

    def signal_connect(self, msg_handler=None, result_handler=None, finished_handler=None, error_handler=None):
        if msg_handler:
            self.__signals.signal_msg.connect(msg_handler)
        if result_handler:
            self.__signals.signal_result.connect(result_handler)
        if finished_handler:
            self.__signals.signal_finished.connect(finished_handler)
        if error_handler:
            self.__signals.signal_error.connect(error_handler)
        return self

    def stop(self):
        self.is_running = False

    def emit_msg(self, msg):
        self.__signals.signal_msg.emit(msg)

    def start(self, daemon=True):
        """
        1. 运行一个独立线程,用于执行一次性短期任务(例如:并行任务,多线程下载)、长期循环接收任务(例如:阻塞式接收蓝牙、串口消息)
        :return:
        """
        self.worker_thread = WorkerThread(self)
        self.worker_thread.daemon = daemon
        self.worker_thread.start()
        return self.worker_thread

    def start_in_thread_pool(self):
        """
        2. 在一个线程池里运行任务,用于执行非定期的短期任务,避免线程资源浪费(例如:文件写出、网络数据下载)
        :param refresh_worker: 任务
        """
        QThreadPool.globalInstance().start(self)

    @classmethod
    def __is_worker_func(cls, func: Callable):
        """
        判断一个函数是否是worker函数,worker函数的第一个参数必须是Worker类型
        :param func:
        :return:
        """
        sig = inspect.signature(func)
        # 判断第一个参数是否是Worker类型,或者参数名是否是worker
        param_keys = list(sig.parameters.keys())
        if len(param_keys) > 0:
            first_param = sig.parameters[param_keys[0]]
            if first_param.annotation == Worker:
                return True
            if first_param.name == "worker":
                return True

        return False


class WorkerThread(QThread):

    def __init__(self, worker: Worker):
        super().__init__()
        self.__worker = worker

    def run(self):
        self.__worker.run()

2. 使用示例

以下代码分别显示了如下两个应用场景:

  • 使用方式1:单线程循环接收消息示例
  • 使用方式2:利用线程池异步执行多个独立任务示例

代码如下:

# 使用示例
import sys
import os
import time
import threading

from PyQt5.QtCore import pyqtSlot
from PyQt5.QtWidgets import QApplication
import requests

from qt_worker import Worker


def long_time_recv_task(worker: Worker, title, start):
    counter = start
    thread_name = threading.currentThread().name
    while worker.is_running:
        # 模拟阻塞(等待网络、串口、蓝牙等)
        time.sleep(1)
        # 模拟收到消息
        worker.emit_msg(f"{title} long time task {counter} : {thread_name}")

        counter += 1
        if counter >= 110:
            break

    return "refresh_worker done: {}".format(counter)


@pyqtSlot(object)
def on_result_received(msg):
    thread_name = threading.currentThread().name
    print(f"result: < {msg} > {thread_name}")


def single_recv_thread_test():
    """
    单线程循环接收消息
    :return:
    """
    worker = Worker(long_time_recv_task, args=("消息接收",), kwargs={"start": 100})
    worker.signal_connect(
        msg_handler=lambda msg: print(msg),
        result_handler=on_result_received,
    )
    worker.start()


def pic_download_task(url):
    """
    下载图片, 保存到pic目录
    :param url: 图片地址
    :return:
    """
    response = requests.get(url)
    if response.status_code != 200:
        print("连接图片服务器失败:", response.status_code)
        return
    file_name = url.split('/')[-1]
    # 如果pic目录不存在,则创建
    if not os.path.exists('pic'):
        os.mkdir('pic')

    with open(f"pic/{file_name}", 'wb') as f:
        f.write(response.content)
        # 返回f的绝对路径
        return "{} -> {}".format(url, os.path.abspath(f.name))


def thread_pool_test():
    """
    利用线程池连续下载多个图片文件
    :return:
    """

    pics = [
        "https://www.baidu.com/img/bd_logo1.png",
        "https://c-ssl.duitang.com/uploads/blog/202305/26/EWSwLxqBhV5zZJa.jpg",
        "https://c-ssl.duitang.com/uploads/blog/202305/26/lGSxjBMefx04z33.jpg",
        "https://c-ssl.duitang.com/uploads/blog/202305/26/XxSLogyQCQd9emB.jpg",
        "https://c-ssl.duitang.com/uploads/item/202002/26/20200226215648_yynrr.jpg",
    ]

    for pic in pics:
        worker = Worker(pic_download_task, args=(pic,))
        worker.signal_connect(result_handler=lambda msg: print("保存成功:", msg))
        worker.start_in_thread_pool()


if __name__ == '__main__':
    app = QApplication(sys.argv)

    # 使用方式1:单线程循环接收消息示例
    single_recv_thread_test()

    # 使用方式2:利用线程池异步执行多个任务示例(下载多个图片文件)
    thread_pool_test()

    sys.exit(app.exec_())

多任务管理

指开启一个长期运行的线程,在线程内部,运行一个循环,循环中阻塞式地接收用户发来的任务(定期或非定期),并及时按照用户预定的函数进行执行(通常要消耗一小段时间)。这个多任务管理器应有如下特点:

  • 不会阻塞主线程(保障主界面操作顺滑)
  • 一旦任务完成,将执行结果返回给任务的发布者。这可以通过回调函数、事件或其他适当的机制来实现。
  • 如果任务执行过程中发生异常,需要将异常信息返回给任务的发布者,以便了解并采取适当的处理措施。
  • 可以保障任务的执行顺序和发布任务的顺序一致(通过队列保证要求)
  • 这个长期任务管理器可以通过调用方法进行停止

1. 拷贝依赖

这段代码定义了一个 TaskWorker 类,该类继承自 QThread 类,并使用队列实现了多个任务的异步执行。

"""
运行一个独立线程,用于执行长期循环发送任务,可以随时执行异步任务,内部维护一个消息队列(例如:发送蓝牙、串口消息)

这段代码定义了一个 TaskWorker 类,该类继承自 QThread 类,并使用队列实现了任务的异步执行。

"""

from queue import Queue

from PyQt5.QtCore import QThread, pyqtSignal


class TaskWorker(QThread):
    taskResult = pyqtSignal(object)
    taskError = pyqtSignal(Exception)
    taskFinished = pyqtSignal()

    def __init__(self, do_task, parent=None):
        super(TaskWorker, self).__init__(parent)
        self.do_task = do_task
        self.task_queue = Queue()
        self.is_running = True

    def run(self):
        while self.is_running:
            # 不断从队列中取出任务并执行,如果没有任务则阻塞
            task_arg = self.task_queue.get()
            # 如果取出的任务为 None,且线程已设置为关闭,则退出线程
            if task_arg is None and not self.is_running:
                break
            try:
                result = self.do_task(task_arg)
                self.taskResult.emit(result)
            except Exception as e:
                self.taskError.emit(e)

        self.taskFinished.emit()

    def signal_connect(self, result_handler=None, finished_handler=None, error_handler=None):

        # Connect the worker's signal to the handler slot
        if result_handler is not None:
            self.taskResult.connect(result_handler)
        if finished_handler is not None:
            self.taskFinished.connect(finished_handler)
        if error_handler is not None:
            self.taskError.connect(error_handler)

        return

    def join_queue(self, task):
        if not self.is_running:
            return

        self.task_queue.put(task)

    def stop(self):
        self.is_running = False
        # Put a None task to the queue to stop the thread
        self.task_queue.put(None)

2. 使用示例

主程序中创建了一个 TaskWorker 实例,将任务添加到任务队列中,并使用手动/定时器定期添加任务。
当任务完成时,TaskWorker 实例会发出信号,主程序中的槽函数会接收到这些信号并进行处理。

代码具体步骤如下:

  1. 在主程序中定义一个 do_task 函数,该函数用于执行任务。
  2. 在主程序中定义两个槽函数 on_resulton_error,分别用于处理任务完成和任务出错的信号。
  3. 创建一个 TaskWorker 实例,并传入任务函数,函数里是任务要执行的内容
  4. TaskWorker 实例的信号连接到槽函数。
  5. 运行 TaskWorker 实例的start方法启动线程
  6. 运行主程序,通过各种方式把任务及任务参数通过join_queue加入队列
  7. 等待多个任务执行完成。
"""
主程序中创建了一个 TaskWorker 实例,将任务添加到任务队列中,并使用定时器定期添加任务。
当任务完成时,TaskWorker 实例会发出信号,主程序中的槽函数会接收到这些信号并进行处理。

 代码具体步骤如下:
 1. 在主程序中定义一个 do_task 函数,该函数用于执行任务。
 2. 在主程序中定义两个槽函数 on_result 和 on_error,分别用于处理任务完成和任务出错的信号。
 3. 创建一个 TaskWorker 实例。
 4. 将 TaskWorker 实例的信号连接到槽函数。
 5. 将任务添加到任务队列中,并使用定时器定期添加任务。
 6. 运行主程序,等待任务执行完成。
"""
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import pyqtSlot, QTimer
import sys
import threading
import time

from qt_task import TaskWorker


def do_task(task_arg):
    a, b = task_arg
    rst = a / b
    # Simulate a time-consuming task
    time.sleep(1)  # Pause for 1 seconds
    return f"Task signal_result {a} / {b} = {rst}"


@pyqtSlot(object)
def on_result(result):
    # print(threading.currentThread())
    print("on result: ", result)


@pyqtSlot(Exception)
def on_error(e):
    # print(threading.currentThread())
    print("on error: ", e)


if __name__ == '__main__':
    app = QApplication(sys.argv)

    print("main: ", threading.currentThread())

    task_worker = TaskWorker(do_task)
    task_worker.signal_connect(
        result_handler=on_result,
        finished_handler=lambda: print("on finished"),
        error_handler=on_error,
    )
    task_worker.start()

    # Add tasks to the task manager
    task_worker.join_queue((3, 2))
    task_worker.join_queue((4, 2))
    task_worker.join_queue((5, 2))
    task_worker.join_queue((5, 0))

    # Use a timer to add tasks periodically
    timer = QTimer()
    timer.timeout.connect(lambda: task_worker.join_queue((5, 2)))
    timer.start(3000)  # Add a task every 5 seconds

    # 执行一个10秒后的延时任务
    QTimer.singleShot(10000, lambda: task_worker.stop())

    sys.exit(app.exec_())

 

 

 


http://www.kler.cn/a/160514.html

相关文章:

  • Git在版本控制中的应用
  • SpringBoot实战(三十一)集成iText5,实现RSA签署PDF
  • git配置远程仓库的认证信息
  • 曹操为什么总是亲征
  • Leecode热题100-35.搜索插入位置
  • 新版 idea 编写 idea 插件时,启动出现 ClassNotFound
  • 虚拟机配置网络(这里以centos为例)
  • 操作系统的特征
  • Java集合进阶(上)
  • 0基础学java-day14
  • logback整合rabbitmq实现消息记录日志
  • 关于域名、ssl证书的一些问题
  • ThreadX开源助力Microsoft扩大应用范围:对比亚马逊AWS的策略差异
  • 在cmd下查看当前python的版本
  • vue2+typescript使用高德地图2.0版本
  • 物联网安全芯片ACL16 采用 32 位内核,片内集成多种安全密码模块 且低成本、低功耗
  • 【1】基于多设计模式下的同步异步日志系统-项目介绍
  • 5-redis高级-哨兵
  • vue3 Hooks函数使用及常用utils封装
  • LeetCode双指针:第一个错误的版本
  • Redis Reactor事件驱动模型源码
  • Linux-centos上如何配置管理NFS服务器?
  • 数据分析中的绝地反击:如何解救一个陷入困境的数据模型
  • IDEA切换Python虚拟环境
  • Vue3计算属性与监听属性和生命周期
  • Linux网卡命名规则