当前位置: 首页 > article >正文

深入理解C++ 线程池:动手实践与源码解析

在当今多核处理器横行的时代,多任务处理已然成为各类软件提升性能的关键 “武器”。想象一下,你正使用一款图片处理软件,它需要同时对多张图片进行滤镜添加、尺寸调整等操作;又或者是一个网络服务器,瞬间要应对来自四面八方的海量用户请求。面对这些场景,如果为每个任务单独创建一个线程,任务结束后再销毁线程,频繁的线程创建与销毁操作,将会带来巨大的开销,就如同让一个短跑运动员不停地进行百米冲刺,很快就会体力不支。

这时候,线程池就宛如一位智能的任务指挥官,威风凛凛地闪亮登场!它预先创建好一组线程,让这些线程时刻准备着,就像一群待命的士兵。当有任务到来时,线程池会依据自身的调度策略,迅速为任务分配一个空闲线程,让任务得以立即执行,避免了因等待线程创建而造成的时间浪费。而且,在任务完成后,线程并不会被销毁,而是回到线程池中,等待下一次出征,如此循环往复,极大地提高了线程的复用率,减少了资源的消耗。

今天,咱们就一起深入 C++ 的世界,不仅要透彻理解线程池的工作原理,还要手把手地用代码实现一个线程池,同时对其源码进行细致解析,让线程池不再神秘,成为我们编程路上的得力助手。

一、线程池是什么?

在并发编程的世界里,线程池是一个至关重要的概念。简单来说,线程池就是一个可以容纳多个线程的池子,它的主要作用是管理和复用线程,以提高程序的性能和效率。

为什么我们需要线程池呢?这是因为线程的创建和销毁是有一定开销的。如果我们每次有任务需要执行时都去创建一个新线程,任务完成后又销毁这个线程,那么在任务量较大的情况下,这种频繁的创建和销毁操作会消耗大量的系统资源,导致程序的性能下降。而线程池通过预先创建一定数量的线程,并将这些线程复用,避免了频繁的线程创建和销毁,从而大大减少了系统开销,提高了任务的执行效率。

举个例子,假设有一个 Web 服务器,它需要处理大量的客户端请求。如果没有线程池,每次有新的请求到来时,服务器都要创建一个新线程来处理这个请求,请求处理完后再销毁线程。这样当并发请求量很大时,服务器会忙于创建和销毁线程,而无暇处理真正的业务逻辑,导致响应速度变慢。但如果使用了线程池,服务器在启动时就创建好一定数量的线程放在线程池中,当有请求到来时,直接从线程池中取出一个空闲线程来处理请求,请求处理完后,线程又回到线程池中等待下一个任务。这样不仅提高了响应速度,还降低了系统资源的消耗。

线程池适用于很多场景,比如上述的 Web 服务器场景,还有数据库连接池、文件处理、计算密集型任务等。在这些场景中,通过合理使用线程池,可以有效地提高系统的并发处理能力和性能。

二、为什么要用C++实现线程池

在探讨为何使用 C++ 实现线程池之前,先来了解 C++ 这门语言的独特魅力。C++ 作为一门强大的编程语言,在系统级编程领域占据着重要地位。它具备高效的执行效率,能够直接操作硬件资源,这使得它在对性能要求极高的场景中表现出色。例如,在操作系统内核开发、大型游戏开发、高性能服务器开发等领域,C++ 都有着广泛的应用 。

与其他编程语言相比,C++ 在性能优化方面有着显著的优势。它允许开发者对内存进行精细的控制,通过手动管理内存,避免了像一些高级语言中自动垃圾回收机制带来的额外开销。同时,C++ 的编译机制能够生成高度优化的机器码,充分利用硬件的特性,从而实现程序的高效运行。

那么,在 C++ 开发中,线程池又扮演着怎样重要的角色呢?

在服务器开发领域,线程池是提高并发处理能力的关键组件。以 Web 服务器为例,它需要同时处理大量客户端的请求。如果为每个请求都创建一个新线程,当并发请求量达到一定程度时,系统资源会被大量消耗,导致服务器性能急剧下降。而使用 C++ 实现的线程池,可以预先创建一定数量的线程,这些线程可以复用,大大减少了线程创建和销毁的开销,提高了服务器的响应速度和吞吐量。

在游戏开发中,线程池同样发挥着不可或缺的作用。现代游戏通常包含复杂的物理模拟、人工智能计算、图形渲染等任务。这些任务需要大量的计算资源,并且对实时性要求极高。通过使用线程池,游戏开发者可以将不同的任务分配到多个线程中并行执行,充分利用多核处理器的性能,从而提升游戏的运行效率和流畅度。例如,在一款大型 3D 游戏中,物理模拟和 AI 计算可以分别由线程池中的不同线程来处理,与主线程的图形渲染任务并行进行,避免了因单个任务耗时过长而导致的游戏卡顿。

除此之外,在一些对性能要求苛刻的科学计算、数据分析等领域,C++ 实现的线程池也能够帮助开发者充分利用系统资源,提高计算效率,快速得到计算结果。

C++ 的强大性能和对系统资源的精细控制能力,使得它成为实现线程池的理想选择。而线程池在 C++ 开发的各种高性能场景中,又能够极大地提升程序的并发处理能力和整体性能,两者相辅相成,共同为开发者打造高效、稳定的软件系统提供了有力支持。

三、C++实现线程池的核心原理

要深入理解 C++ 实现线程池的过程,首先得剖析其核心原理。线程池主要由几个关键部分协同工作,包括线程队列、任务队列、互斥锁、条件变量等,它们各自承担着独特的职责,共同构建起线程池高效运行的基础 。

3.1线程队列

线程队列,就像是一个随时待命的团队,其中包含了预先创建好的多个线程。这些线程在创建后并不会立即执行具体的任务,而是进入一种等待状态,随时准备接受任务的分配。它们就像训练有素的士兵,在军营中等待着出征的命令。在 C++ 中,我们可以使用std::vector<std::thread>来创建和管理这个线程队列。例如:

std::vector<std::thread> threads;
for (size_t i = 0; i < threadCount; ++i) {
    threads.emplace_back([this] { this->worker(); });
}

在这段代码中,threadCount表示我们希望创建的线程数量,通过循环创建了threadCount个线程,并将它们添加到threads向量中。每个线程都执行worker函数,这个函数就是线程的工作逻辑所在。

任务队列

任务队列则是存储待执行任务的地方,它像是一个任务仓库。当有新的任务到来时,就会被添加到这个队列中等待处理。任务队列可以使用std::queue来实现,为了确保在多线程环境下的安全访问,还需要配合互斥锁和条件变量。比如:

std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;

这里定义了一个tasks任务队列,用于存储类型为std::function<void()>的任务,也就是可以调用且无返回值的函数对象。queueMutex是互斥锁,用于保护任务队列,防止多个线程同时访问导致数据不一致。condition是条件变量,用于线程间的同步,当有新任务添加到队列时,通过条件变量通知等待的线程。

3.2互斥锁

互斥锁的作用至关重要,它就像一把锁,用来保护共享资源,确保同一时间只有一个线程能够访问任务队列。当一个线程想要访问任务队列(比如添加任务或取出任务)时,它必须先获取互斥锁。如果此时互斥锁已经被其他线程持有,那么这个线程就会被阻塞,直到互斥锁被释放。在 C++ 中,使用std::mutex来实现互斥锁,例如:

std::mutex mutex;
mutex.lock();
// 访问任务队列的代码
mutex.unlock();

在这段代码中,mutex.lock()用于获取互斥锁,当获取到锁后,就可以安全地访问任务队列。访问完成后,通过mutex.unlock()释放互斥锁,让其他线程有机会获取锁并访问任务队列。为了避免忘记解锁导致死锁,更推荐使用std::lock_guard或std::unique_lock,它们会在作用域结束时自动释放锁,例如:

{
    std::unique_lock<std::mutex> lock(mutex);
    // 访问任务队列的代码
} // lock自动析构,释放锁

3.3条件变量

条件变量主要用于线程间的同步和通信。它与互斥锁配合使用,当任务队列中没有任务时,工作线程可以通过条件变量进入等待状态,释放互斥锁,让出 CPU 资源。当有新任务添加到任务队列时,就可以通过条件变量通知等待的线程,让它们醒来并获取互斥锁,从任务队列中取出任务执行。例如:

std::condition_variable condition;
std::unique_lock<std::mutex> lock(mutex);
while (tasks.empty()) {
    condition.wait(lock);
}
auto task = std::move(tasks.front());
tasks.pop();

在这段代码中,condition.wait(lock)会使线程进入等待状态,并释放lock锁。当其他线程调用condition.notify_one()或condition.notify_all()通知时,等待的线程会被唤醒,重新获取lock锁,然后继续执行后续代码,从任务队列中取出任务。

3.4协同工作流程

线程池的工作流程是一个有序且高效的协作过程。当有新任务到来时,任务会被添加到任务队列中。这个过程中,需要先获取互斥锁,以保证任务队列的线程安全。添加任务后,通过条件变量通知等待的线程有新任务到来。

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
    using return_type = typename std::result_of<F(Args...)>::type;
    auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
        tasks.emplace([task]() { (*task)(); });
    }
    condition.notify_one();
    return res;
}

在这段代码中,enqueue函数用于将任务添加到任务队列。首先,通过std::bind和std::make_shared创建一个包装了任务的std::packaged_task,并获取其对应的std::future用于获取任务执行结果。然后,在临界区内(通过std::unique_lock自动管理锁)将任务添加到任务队列tasks中。最后,通过condition.notify_one()通知一个等待的线程有新任务。

而工作线程在启动后,会不断地尝试从任务队列中获取任务并执行。它们首先获取互斥锁,检查任务队列是否为空。如果为空,就通过条件变量等待,直到有新任务被添加。当获取到任务后,线程会执行任务,执行完成后再次回到获取任务的循环中。如果线程池停止,且任务队列为空,线程就会退出。

void ThreadPool::worker() {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            condition.wait(lock, [this] { return stop ||!tasks.empty(); });
            if (stop && tasks.empty()) return;
            task = std::move(tasks.front());
            tasks.pop();
        }
        task();
    }
}

在worker函数中,线程首先创建一个std::function<void()>类型的变量task用于存储从任务队列中取出的任务。然后,在临界区内(通过std::unique_lock自动管理锁),使用condition.wait等待条件满足,条件是stop为true(表示线程池停止)或者任务队列不为空。当条件满足且stop为true且任务队列为空时,线程返回退出。否则,从任务队列中取出任务并移动到task中。最后,在临界区外执行任务task()。这样,通过线程队列、任务队列、互斥锁和条件变量的紧密协作,线程池实现了高效的任务管理和并发执行 。

四、C++实现线程池的步骤

4.1创建任务类

任务类在整个线程池体系中扮演着关键的角色,它主要负责封装任务函数以及与之相关的参数,使得任务能够以一种统一、规范的形式被线程池管理和调度。

在 C++ 中,我们可以通过以下方式来定义一个任务类:

class Task {
public:
    // 使用模板来接受任意可调用对象及其参数
    template<class F, class... Args>
    Task(F&& f, Args&&... args) : func(std::bind(std::forward<F>(f), std::forward<Args>(args)...)) {}

    // 定义任务的执行函数
    void execute() {
        if (func) {
            func();
        }
    }

private:
    std::function<void()> func;
};

在上述代码中,我们利用了 C++ 的模板特性和std::function、std::bind来实现任务的封装。std::function<void()>类型的成员变量func用于存储可调用对象,通过std::bind将传入的函数f和参数args绑定成一个无参的可调用对象,赋值给func。execute函数则是任务的执行入口,当调用execute时,会执行绑定好的函数。

例如,假设有一个简单的加法函数:

int add(int a, int b) {
    return a + b;
}

我们可以创建一个任务对象来执行这个加法操作:

Task task(add, 3, 5);
task.execute(); // 执行任务,相当于调用add(3, 5)

这样,通过任务类的封装,我们可以将各种不同的任务以统一的方式进行管理和执行,为线程池的任务调度提供了基础。

4.2构建线程池类

线程池类是整个线程池实现的核心部分,它负责管理线程队列、任务队列以及协调线程的工作。下面是线程池类的基本框架:

class ThreadPool {
public:
    // 构造函数,初始化线程池
    ThreadPool(size_t numThreads);

    // 析构函数,清理线程池资源
    ~ThreadPool();

    // 添加任务到任务队列
    template<class F, class... Args>
    void enqueue(F&& f, Args&&... args);

private:
    // 线程执行的函数
    void worker();

    // 线程队列
    std::vector<std::thread> threads;

    // 任务队列
    std::queue<std::unique_ptr<Task>> tasks;

    // 互斥锁,保护任务队列
    std::mutex queueMutex;

    // 条件变量,用于线程同步
    std::condition_variable condition;

    // 线程池停止标志
    bool stop;
};

在这个线程池类中:

成员变量

threads是一个std::vector<std::thread>类型的线程队列,用于存储线程对象,每个线程都将执行worker函数。

tasks是一个std::queue<std::unique_ptr<Task>>类型的任务队列,用于存储任务对象,这里使用std::unique_ptr来管理任务对象的生命周期,确保内存安全。

queueMutex是一个互斥锁,用于保护任务队列,防止多个线程同时访问任务队列导致数据不一致。

condition是一个条件变量,与互斥锁配合使用,用于线程间的同步。当任务队列中没有任务时,工作线程可以通过条件变量进入等待状态,当有新任务添加到任务队列时,通过条件变量通知等待的线程。

stop是一个布尔类型的标志,用于控制线程池的停止。当stop为true时,线程池将停止接受新任务,并在处理完现有任务后关闭所有线程。

构造函数

ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
    for (size_t i = 0; i < numThreads; ++i) {
        threads.emplace_back([this] { this->worker(); });
    }
}

构造函数接受一个参数numThreads,表示线程池中的线程数量。在构造函数中,通过循环创建numThreads个线程,并将它们添加到threads队列中。每个线程都执行worker函数,[this] { this->worker(); }是一个 lambda 表达式,它捕获了this指针,使得线程能够访问线程池类的成员函数和变量。

析构函数

ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& thread : threads) {
        thread.join();
    }
}

析构函数用于清理线程池的资源。首先,在一个临界区内(通过std::unique_lock自动管理锁)将stop标志设置为true,表示线程池要停止。然后,通过condition.notify_all()通知所有等待的线程,让它们有机会检查stop标志并退出。最后,通过循环调用thread.join()等待所有线程执行完毕,释放线程资源。

添加任务函数

template<class F, class... Args>
void ThreadPool::enqueue(F&& f, Args&&... args) {
    auto task = std::make_unique<Task>(std::forward<F>(f), std::forward<Args>(args)...);
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        if (stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }
        tasks.push(std::move(task));
    }
    condition.notify_one();
}

enqueue函数是一个模板函数,用于将任务添加到任务队列中。它接受一个可调用对象f和一系列参数args,通过std::make_unique创建一个Task对象,并将其添加到任务队列tasks中。在添加任务时,先获取互斥锁,确保任务队列的线程安全。如果线程池已经停止(stop为true),则抛出异常。添加任务后,释放互斥锁,并通过condition.notify_one()通知一个等待的线程有新任务到来。

4.3实现关键函数

在上述线程池类中,worker函数和enqueue函数是实现线程池功能的关键。

worker函数

void ThreadPool::worker() {
    while (true) {
        std::unique_ptr<Task> task;
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            condition.wait(lock, [this] { return stop ||!tasks.empty(); });
            if (stop && tasks.empty()) {
                return;
            }
            task = std::move(tasks.front());
            tasks.pop();
        }
        task->execute();
    }
}

worker函数是线程执行的主体函数。它在一个无限循环中运行,不断尝试从任务队列中获取任务并执行。

具体步骤如下:

  • 首先创建一个std::unique_ptr<Task>类型的变量task,用于存储从任务队列中取出的任务。

  • 使用std::unique_lock<std::mutex>来获取互斥锁,进入临界区,保护任务队列的访问。

  • 使用condition.wait等待条件满足,条件是stop为true(表示线程池停止)或者任务队列不为空。condition.wait会自动释放互斥锁,使线程进入等待状态,直到被condition.notify_one()或condition.notify_all()唤醒。当线程被唤醒时,会重新获取互斥锁,继续执行后续代码。

  • 检查stop标志和任务队列是否为空,如果stop为true且任务队列为空,说明线程池已经停止且没有任务了,此时线程返回,结束执行。

  • 从任务队列中取出第一个任务,并将其移动到task变量中,然后将任务从任务队列中移除。

  • 退出临界区,释放互斥锁,执行任务的execute函数,完成任务的执行。

  • 循环回到开始,继续等待下一个任务。

enqueue函数

template<class F, class... Args>
void ThreadPool::enqueue(F&& f, Args&&... args) {
    auto task = std::make_unique<Task>(std::forward<F>(f), std::forward<Args>(args)...);
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        if (stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }
        tasks.push(std::move(task));
    }
    condition.notify_one();
}

enqueue函数用于将任务添加到任务队列中。具体步骤如下:

  • 使用std::make_unique创建一个Task对象,将传入的可调用对象f和参数args封装到任务对象中。这里使用std::forward来实现完美转发,确保参数的左值 / 右值特性不变。

  • 使用std::unique_lock<std::mutex>获取互斥锁,进入临界区,保护任务队列的访问。

  • 检查线程池是否已经停止,如果stop为true,说明线程池已经停止,此时抛出std::runtime_error异常,提示不能在停止的线程池中添加任务。

  • 将创建好的任务对象通过std::move移动到任务队列tasks中,std::move用于将一个对象的所有权转移给另一个对象,避免不必要的拷贝。

  • 退出临界区,释放互斥锁。

  • 通过condition.notify_one()通知一个等待的线程有新任务到来,被通知的线程会在condition.wait处被唤醒,然后尝试从任务队列中获取任务并执行。

通过以上关键函数的实现,线程池能够有效地管理线程和任务,实现任务的并发执行,提高程序的性能和效率。

五、代码示例与解析

5.1完整代码展示

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

class ThreadPool {
public:
    ThreadPool(size_t numThreads);
    ~ThreadPool();

    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;

private:
    void worker();

    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;

    std::mutex queueMutex;
    std::condition_variable condition;

    bool stop;
};


ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
    for (size_t i = 0; i < numThreads; ++i) {
        threads.emplace_back([this] {
            while (true) {
                std::function<void()> task;
                {
                    std::unique_lock<std::mutex> lock(queueMutex);
                    condition.wait(lock, [this] { return stop ||!tasks.empty(); });
                    if (stop && tasks.empty())
                        return;
                    task = std::move(tasks.front());
                    tasks.pop();
                }
                task();
            }
        });
    }
}


ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& thread : threads) {
        thread.join();
    }
}


template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
    using return_type = typename std::result_of<F(Args...)>::type;
    auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        if (stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");
        tasks.emplace([task]() { (*task)(); });
    }
    condition.notify_one();
    return res;
}

5.2代码逐行解析

包含头文件

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

这些头文件提供了实现线程池所需的各种工具和数据结构。vector用于存储线程队列,queue用于实现任务队列,thread用于线程操作,mutex和condition_variable用于线程同步,functional用于处理可调用对象,future用于获取异步任务的结果。

线程池类定义

class ThreadPool {
public:
    ThreadPool(size_t numThreads);
    ~ThreadPool();

    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;

private:
    void worker();

    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;

    std::mutex queueMutex;
    std::condition_variable condition;

    bool stop;
};

公有成员函数

  • ThreadPool(size_t numThreads):构造函数,用于初始化线程池,接受线程数量作为参数。

  • ~ThreadPool():析构函数,用于清理线程池资源,停止所有线程并等待它们结束。

  • template<class F, class... Args> auto enqueue(F&& f, Args&&... args) ->std::future<typename std::result_of<F(Args...)>::type>:模板函数,用于将任务添加到任务队列中,并返回一个std::future对象,以便获取任务的执行结果。

私有成员函数:void worker():线程执行的函数,每个线程都会调用这个函数,从任务队列中获取任务并执行。

私有成员变量

  • std::vector<std::thread> threads:线程队列,存储线程对象。

  • std::queue<std::function<void()>> tasks:任务队列,存储可调用对象,即任务。

  • std::mutex queueMutex:互斥锁,用于保护任务队列,确保线程安全。

  • std::condition_variable condition:条件变量,用于线程同步,当任务队列有新任务时通知等待的线程。

  • bool stop:线程池停止标志,当stop为true时,线程池停止接受新任务,并在处理完现有任务后关闭所有线程。

构造函数实现

ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
    for (size_t i = 0; i < numThreads; ++i) {
        threads.emplace_back([this] {
            while (true) {
                std::function<void()> task;
                {
                    std::unique_lock<std::mutex> lock(queueMutex);
                    condition.wait(lock, [this] { return stop ||!tasks.empty(); });
                    if (stop && tasks.empty())
                        return;
                    task = std::move(tasks.front());
                    tasks.pop();
                }
                task();
            }
        });
    }
}
  • ThreadPool::ThreadPool(size_t numThreads) : stop(false):构造函数初始化列表,将stop标志初始化为false。

  • for (size_t i = 0; i < numThreads; ++i):循环创建numThreads个线程。

  • threads.emplace_back([this] {... });:使用emplace_back将新线程添加到threads队列中,每个线程执行一个 lambda 表达式。

  • while (true):线程的主循环,不断尝试从任务队列中获取任务并执行。

  • std::function<void()> task;:定义一个变量task,用于存储从任务队列中取出的任务。

  • std::unique_lock<std::mutex> lock(queueMutex);:创建一个std::unique_lock对象,自动管理queueMutex锁,进入临界区。

  • condition.wait(lock, [this] { return stop ||!tasks.empty(); });:线程等待条件变量condition,当stop为true或者任务队列不为空时,线程被唤醒。condition.wait会自动释放lock锁,使线程进入等待状态,直到被通知唤醒,唤醒后会重新获取lock锁。

  • if (stop && tasks.empty()) return;:如果stop为true且任务队列为空,说明线程池已经停止且没有任务了,线程返回,结束执行。

  • task = std::move(tasks.front()); tasks.pop();:从任务队列中取出第一个任务,并将其移动到task变量中,然后将任务从任务队列中移除。

  • task();:执行任务。

析构函数实现

ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& thread : threads) {
        thread.join();
    }
}
  • std::unique_lock<std::mutex> lock(queueMutex); stop = true;:在临界区内,将stop标志设置为true,表示线程池要停止。

  • condition.notify_all();:通知所有等待的线程,让它们有机会检查stop标志并退出。

  • for (std::thread& thread : threads) { thread.join(); }:通过循环调用thread.join()等待所有线程执行完毕,释放线程资源。

添加任务函数实现

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
    using return_type = typename std::result_of<F(Args...)>::type;
    auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        if (stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");
        tasks.emplace([task]() { (*task)(); });
    }
    condition.notify_one();
    return res;
}
  • using return_type = typename std::result_of<F(Args...)>::type;:使用std::result_of获取函数F的返回值类型,并将其命名为return_type。

  • auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));:使用std::make_shared创建一个std::packaged_task对象,将传入的函数f和参数args绑定在一起,并包装成一个可调用对象,用于异步执行任务。std::forward用于完美转发,确保参数的左值 / 右值特性不变。

  • std::future<return_type> res = task->get_future();:获取std::packaged_task对象的std::future,用于获取任务的执行结果。

  • std::unique_lock<std::mutex> lock(queueMutex);:创建一个std::unique_lock对象,自动管理queueMutex锁,进入临界区。

  • if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");:检查线程池是否已经停止,如果stop为true,说明线程池已经停止,抛出std::runtime_error异常,提示不能在停止的线程池中添加任务。

  • tasks.emplace([task]() { (*task)(); });:将任务添加到任务队列中,[task]() { (*task)(); }是一个 lambda 表达式,用于执行std::packaged_task对象。

  • condition.notify_one();:通知一个等待的线程有新任务到来,被通知的线程会在condition.wait处被唤醒,然后尝试从任务队列中获取任务并执行。

  • return res;:返回std::future对象,以便调用者获取任务的执行结果。

六、线程池的测试与优化

6.1测试线程池

为了验证我们实现的线程池是否正确且稳定,编写测试代码是必不可少的环节。通过测试,我们可以检查线程池的各项功能,如添加任务、执行任务、线程复用等是否符合预期。以下是一个简单的测试示例:

#include <iostream>
#include <chrono>
#include <thread>
#include "ThreadPool.h"  // 假设线程池定义在这个头文件中

// 定义一个简单的任务函数
void simpleTask(int num) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Task " << num << " executed by thread " << std::this_thread::get_id() << std::endl;
}

int main() {
    ThreadPool pool(4);  // 创建一个包含4个线程的线程池

    // 添加多个任务到线程池
    for (int i = 0; i < 10; ++i) {
        pool.enqueue(simpleTask, i);
    }

    // 等待一段时间,让任务有足够时间执行
    std::this_thread::sleep_for(std::chrono::seconds(3));

    return 0;
}

在这个测试代码中:

  • 首先定义了一个simpleTask函数,它接受一个整数参数num,在函数内部,线程会休眠 1 秒钟,然后输出任务编号和执行该任务的线程 ID。

  • 在main函数中,创建了一个包含 4 个线程的线程池pool。

  • 通过循环调用pool.enqueue方法,向线程池中添加 10 个任务,每个任务都执行simpleTask函数,并传入不同的参数i。

  • 最后,主线程休眠 3 秒钟,这是为了给线程池中的线程足够的时间来执行任务。在实际应用中,可能需要更复杂的同步机制来确保所有任务都执行完毕。

运行这个测试代码后,可以观察输出结果,确认每个任务是否都被正确执行,以及任务是否是由线程池中的不同线程执行的,从而验证线程池的功能是否正常。

6.2性能优化

尽管我们已经实现了一个基本的线程池,但在实际应用中,还需要对其性能进行优化,以满足不同场景的需求。下面分析一些常见的性能瓶颈,并提出相应的优化建议。

调整线程数量:线程池中的线程数量是一个关键参数,它直接影响着线程池的性能。如果线程数量过少,可能导致任务等待时间过长,无法充分利用系统资源;而线程数量过多,则会增加线程上下文切换的开销,甚至可能导致系统资源耗尽。因此,需要根据任务的类型和系统的硬件配置来合理调整线程数量。

  • 对于 CPU 密集型任务:由于这类任务主要消耗 CPU 资源,过多的线程会导致频繁的上下文切换,降低性能。一般来说,线程数量可以设置为 CPU 核心数或略小于 CPU 核心数。例如,在一个具有 4 个 CPU 核心的系统中,对于 CPU 密集型任务,线程池的线程数量可以设置为 4 或 3。

  • 对于 I/O 密集型任务:这类任务在执行过程中大部分时间都在等待 I/O 操作完成,CPU 利用率相对较低。因此,可以适当增加线程数量,以充分利用 CPU 资源。通常,线程数量可以设置为 CPU 核心数的 2 - 3 倍。比如,在同样具有 4 个 CPU 核心的系统中,对于 I/O 密集型任务,线程池的线程数量可以设置为 8 - 12。

优化任务队列的数据结构:任务队列是线程池中的重要组成部分,其数据结构的选择会影响任务的添加和获取效率。在前面的实现中,我们使用了std::queue作为任务队列,它是一个基于链表的队列,在多线程环境下,链表的操作可能会带来一定的性能开销。

  • 可以考虑使用无锁队列:无锁队列利用原子操作来实现线程安全,避免了传统锁机制带来的开销,能够提高任务队列在高并发场景下的性能。例如,concurrent_queue是一个开源的无锁队列实现,它基于 CAS(Compare - And - Swap)操作,在多线程环境下具有较高的性能。

  • 根据任务特性选择合适的队列:如果任务具有优先级之分,可以使用优先级队列(如std::priority_queue)来存储任务,这样可以确保高优先级的任务优先被执行。在一个实时系统中,可能会有一些紧急任务需要立即处理,使用优先级队列就能满足这种需求。

减少锁的竞争:在多线程环境下,锁的竞争是导致性能下降的一个重要因素。在线程池的实现中,互斥锁用于保护任务队列的访问,当多个线程同时尝试访问任务队列时,就会产生锁竞争。

  • 使用细粒度锁:可以将任务队列按照一定的规则进行划分,每个部分使用单独的互斥锁进行保护。这样,不同的线程可以同时访问不同部分的任务队列,减少锁的竞争。例如,将任务队列按照任务类型划分为多个子队列,每个子队列都有自己的互斥锁。

  • 采用无锁数据结构:除了前面提到的无锁队列,还可以使用其他无锁数据结构来替代传统的加锁方式。例如,std::atomic类型可以用于实现一些简单的无锁数据结构,如原子计数器,它可以在多线程环境下高效地进行计数操作,而不需要使用锁。

通过以上性能优化措施,可以显著提升线程池的性能和效率,使其能够更好地适应各种复杂的应用场景。


http://www.kler.cn/a/561787.html

相关文章:

  • 是德科技keysight N5173B信号发生器,是一款经济高效的仪器
  • Java多线程中的死锁问题
  • Docker 部署 Jenkins持续集成(CI)工具
  • Java23种设计模式案例
  • smolagents学习笔记系列(五)Tools-in-depth-guide
  • 804 唯一摩斯密码词
  • 【leetcode hot 100 1】两数之和
  • 钉钉合同审批对接腾讯电子签,实现合同全流程自动化管理
  • 【删边问题——Tarjan求割边】
  • 宿主机的 root 是否等于 Docker 容器的 root?
  • Ajax数据采集与分析详解
  • 开源分布式存储系统在云原生数据库领域的实践与应用
  • 自定义提交按钮触发avue-form绑定的submit事件
  • AI前端开发:ScriptEcho如何降低编程培训学习成本
  • python绑定udp时使用127.0.0.1作为ip,无法sendto,报错Invalid argument
  • 无限宽度神经网络的神经正切核(Neural Tangent Kernel, NTK)
  • 多线程进阶 : 八股文面试题 一 [Java EE 多线程 锁和死锁相关问题]
  • vscode设置自动换行
  • 【WordPress】发布文章时自动通过机器人推送到钉钉
  • Pi币今日成交价格飙升,XBIT去中心化交易所助力新浪潮