当前位置: 首页 > article >正文

C++常用多线程模式

文章目录

      • 1. Fork - Join模式
      • 2. Producer - Consumer模式
      • 3. Readers - Writers模式
      • 4. Work Thread模式
      • 5. Actor模式
      • 6、 Pipeline模式概述
      • 应用场景
      • C++实现示例
      • 代码解释

1. Fork - Join模式

  • 原理:将一个大任务分解为多个子任务,这些子任务在不同的线程中并行执行,当所有子任务完成后,再将它们的结果合并得到最终结果。
  • 优点:充分利用多核处理器的并行能力,提高计算效率;任务分解和结果合并的过程清晰,便于理解和实现分治算法等。
  • 缺点:需要考虑子任务的划分合理性以及线程间的同步问题,以确保结果的正确性。
  • 适用场景:适用于能有效分解为多个独立子任务的计算密集型任务,如大规模数据处理、科学计算中的矩阵运算等。
#include <iostream>
#include <vector>
#include <thread>
#include <numeric>

// 计算数组某区间的和
void sum_subarray(const std::vector<int>& arr, int start, int end, int& result) {
    result = std::accumulate(arr.begin() + start, arr.begin() + end, 0);
}

int main() {
    std::vector<int> arr(1000);
    std::iota(arr.begin(), arr.end(), 1);

    int mid = arr.size() / 2;
    int left_sum = 0, right_sum = 0;

    // Fork: 创建两个线程分别计算左右子数组的和
    std::thread left_thread(sum_subarray, std::ref(arr), 0, mid, std::ref(left_sum));
    std::thread right_thread(sum_subarray, std::ref(arr), mid, arr.size(), std::ref(right_sum));

    // Join: 等待两个线程完成
    left_thread.join();
    right_thread.join();

    int total_sum = left_sum + right_sum;
    std::cout << "Total sum: " << total_sum << std::endl;

    return 0;
}

2. Producer - Consumer模式

  • 原理:生产者线程负责生产数据并将其放入缓冲区,消费者线程从缓冲区取出数据进行处理。通过缓冲区实现生产者和消费者的解耦,使它们可以以不同的速度运行。
  • 优点:提高了程序的并发度和整体性能,生产者和消费者可以独立优化和扩展,代码可读性和维护性较好。
  • 缺点:需要合理设计缓冲区大小和同步机制,避免缓冲区溢出或不足,同时要处理好生产者和消费者之间的同步与互斥问题。
  • 适用场景:广泛应用于各种数据处理场景,如文件读取与处理、网络数据的接收与解析、消息队列系统等。
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

std::queue<int> task_queue;
std::mutex mtx;
std::condition_variable cv;
bool is_running = true;

// 生产者函数
void producer() {
    for (int i = 0; i < 10; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        {
            std::unique_lock<std::mutex> lock(mtx);
            task_queue.push(i);
            std::cout << "Produced: " << i << std::endl;
        }
        cv.notify_one();
    }
    {
        std::unique_lock<std::mutex> lock(mtx);
        is_running = false;
    }
    cv.notify_all();
}

// 消费者函数
void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [] { return!task_queue.empty() ||!is_running; });
        if (task_queue.empty() &&!is_running) {
            break;
        }
        int task = task_queue.front();
        task_queue.pop();
        std::cout << "Consumed: " << task << std::endl;
    }
}

int main() {
    std::thread producer_thread(producer);
    std::thread consumer_thread(consumer);

    producer_thread.join();
    consumer_thread.join();

    return 0;
}

3. Readers - Writers模式

  • 原理:允许多个读者线程同时访问共享资源,但当有写者线程访问时,所有读者和其他写者都需等待。写者线程具有更高的优先级,以确保数据的一致性。
  • 优点:能有效提高对共享资源的访问效率,在有大量读操作和少量写操作的情况下,能充分利用多核处理器的并行性。
  • 缺点:实现较为复杂,需要精细地控制读写线程的同步与互斥,以避免数据不一致和死锁等问题。
  • 适用场景:常用于数据库系统、文件系统等需要频繁读写共享数据的场景,其中读操作远远多于写操作。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex rw_mutex;
std::condition_variable rw_cv;
int readers = 0;
bool writer_active = false;

// 读者函数
void reader(int id) {
    {
        std::unique_lock<std::mutex> lock(rw_mutex);
        rw_cv.wait(lock, [] { return!writer_active; });
        ++readers;
    }

    std::cout << "Reader " << id << " is reading." << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    {
        std::unique_lock<std::mutex> lock(rw_mutex);
        --readers;
        if (readers == 0) {
            rw_cv.notify_one();
        }
    }
}

// 写者函数
void writer(int id) {
    {
        std::unique_lock<std::mutex> lock(rw_mutex);
        rw_cv.wait(lock, [] { return!writer_active && readers == 0; });
        writer_active = true;
    }

    std::cout << "Writer " << id << " is writing." << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    {
        std::unique_lock<std::mutex> lock(rw_mutex);
        writer_active = false;
        rw_cv.notify_all();
    }
}

int main() {
    std::thread r1(reader, 1);
    std::thread r2(reader, 2);
    std::thread w1(writer, 1);
    std::thread r3(reader, 3);

    r1.join();
    r2.join();
    w1.join();
    r3.join();

    return 0;
}

4. Work Thread模式

  • 原理:有一个任务队列,多个工作线程从队列中获取任务并执行。任务队列可以是优先级队列或普通队列,工作线程根据一定的规则从队列中取出任务进行处理。
  • 优点:实现相对简单,易于扩展工作线程的数量来应对不同的负载需求,能有效利用线程资源,避免线程的频繁创建和销毁。
  • 缺点:任务队列的管理和线程的调度需要一定的开销,可能会出现任务饥饿现象,即某些低优先级任务长时间得不到执行。
  • 适用场景:适用于处理大量异步任务的场景,如网络服务器中的请求处理、分布式系统中的任务调度等。
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <vector>

std::queue<std::function<void()>> work_queue;
std::mutex queue_mutex;
std::condition_variable queue_cv;
bool is_working = true;

// 工作线程函数
void worker() {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            queue_cv.wait(lock, [] { return!work_queue.empty() ||!is_working; });
            if (work_queue.empty() &&!is_working) {
                break;
            }
            task = std::move(work_queue.front());
            work_queue.pop();
        }
        task();
    }
}

int main() {
    std::vector<std::thread> workers;
    for (int i = 0; i < 3; ++i) {
        workers.emplace_back(worker);
    }

    // 添加任务到队列
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        for (int i = 0; i < 5; ++i) {
            work_queue.emplace([i] {
                std::cout << "Task " << i << " is being executed." << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
            });
        }
    }
    queue_cv.notify_all();

    // 等待所有任务完成
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        is_working = false;
    }
    queue_cv.notify_all();

    for (auto& t : workers) {
        t.join();
    }

    return 0;
}

5. Actor模式

  • 原理:将整个处理过程分为多个阶段,每个阶段由一个或多个Actor(参与者)负责。数据在这些Actor之间像流水线一样依次传递,每个Actor处理完数据后将其传递给下一个Actor,直到最终处理完成。
  • 优点:提高了系统的吞吐量和响应性,各个阶段可以并行执行,而且易于维护和扩展,每个Actor可以独立开发和测试。
  • 缺点:需要精心设计流水线的各个阶段和数据传递方式,以确保数据的正确流转和系统的稳定性。
  • 适用场景:适用于处理流程较为复杂且可以明确划分为多个阶段的任务,如视频编码解码、图像处理流水线、网络数据包的处理等。
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>

// 第一个阶段 Actor
class FirstActor {
public:
    std::queue<int> input_queue;
    std::mutex mtx;
    std::condition_variable cv;
    bool is_finished = false;

    void run() {
        for (int i = 0; i < 5; ++i) {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            {
                std::unique_lock<std::mutex> lock(mtx);
                input_queue.push(i);
            }
            cv.notify_one();
        }
        {
            std::unique_lock<std::mutex> lock(mtx);
            is_finished = true;
        }
        cv.notify_all();
    }
};

// 第二个阶段 Actor
class SecondActor {
public:
    FirstActor& first_actor;
    std::queue<int> output_queue;
    std::mutex mtx;
    std::condition_variable cv;
    bool is_finished = false;

    SecondActor(FirstActor& fa) : first_actor(fa) {}

    void run() {
        while (true) {
            int data;
            {
                std::unique_lock<std::mutex> lock(first_actor.mtx);
                first_actor.cv.wait(lock, [this] { return!first_actor.input_queue.empty() || first_actor.is_finished; });
                if (first_actor.input_queue.empty() && first_actor.is_finished) {
                    break;
                }
                data = first_actor.input_queue.front();
                first_actor.input_queue.pop();
            }
            data *= 2;
            {
                std::unique_lock<std::mutex> lock(mtx);
                output_queue.push(data);
            }
            cv.notify_one();
        }
        {
            std::unique_lock<std::mutex> lock(mtx);
            is_finished = true;
        }
        cv.notify_all();
    }
};

// 第三个阶段 Actor
class ThirdActor {
public:
    SecondActor& second_actor;

    ThirdActor(SecondActor& sa) : second_actor(sa) {}

    void run() {
        while (true) {
            int data;
            {
                std::unique_lock<std::mutex> lock(second_actor.mtx);
                second_actor.cv.wait(lock, [this] { return!second_actor.output_queue.empty() || second_actor.is_finished; });
                if (second_actor.output_queue.empty() && second_actor.is_finished) {
                    break;
                }
                data = second_actor.output_queue.front();
                second_actor.output_queue.pop();
            }
            std::cout << "Final result: " << data << std::endl;
        }
    }
};

int main() {
    FirstActor first_actor;
    SecondActor second_actor(first_actor);
    ThirdActor third_actor(second_actor);

    std::thread t1(&FirstActor::run, &first_actor);
    std::thread t2(&SecondActor::run, &second_actor);
    std::thread t3(&ThirdActor::run, &third_actor);

    t1.join();
    t2.join();
    t3.join();

    return 0;
}

6、 Pipeline模式概述

Pipeline(流水线)模式是一种将一个复杂任务分解为多个独立子任务,并让这些子任务像流水线一样依次执行的设计模式。每个子任务负责处理一部分工作,处理完后将结果传递给下一个子任务,最终完成整个复杂任务。这种模式可以提高系统的并发性能和可维护性,因为每个子任务可以独立开发、测试和优化,而且不同的子任务可以并行执行。

应用场景

  • 数据处理流程:例如在图像或视频处理中,一个完整的处理流程可能包括图像读取、降噪、增强、裁剪等多个步骤,每个步骤可以作为一个独立的子任务在流水线上执行。
  • 网络请求处理:在网络服务器中,一个请求可能需要经过接收、解析、验证、业务逻辑处理、响应生成等多个阶段,使用流水线模式可以高效地处理大量请求。

C++实现示例

以下是一个简单的C++示例,模拟一个简单的数据处理流水线,包含三个子任务:数据生成、数据处理和数据输出。

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>

// 第一个阶段:数据生成
class DataGenerator {
public:
    std::queue<int> outputQueue;
    std::mutex mtx;
    std::condition_variable cv;
    bool isFinished = false;

    void run() {
        for (int i = 0; i < 10; ++i) {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            {
                std::unique_lock<std::mutex> lock(mtx);
                outputQueue.push(i);
            }
            cv.notify_one();
        }
        {
            std::unique_lock<std::mutex> lock(mtx);
            isFinished = true;
        }
        cv.notify_all();
    }
};

// 第二个阶段:数据处理
class DataProcessor {
public:
    DataGenerator& generator;
    std::queue<int> outputQueue;
    std::mutex mtx;
    std::condition_variable cv;
    bool isFinished = false;

    DataProcessor(DataGenerator& gen) : generator(gen) {}

    void run() {
        while (true) {
            int data;
            {
                std::unique_lock<std::mutex> lock(generator.mtx);
                generator.cv.wait(lock, [this] { return!generator.outputQueue.empty() || generator.isFinished; });
                if (generator.outputQueue.empty() && generator.isFinished) {
                    break;
                }
                data = generator.outputQueue.front();
                generator.outputQueue.pop();
            }
            // 简单的数据处理,这里将数据加倍
            data *= 2;
            {
                std::unique_lock<std::mutex> lock(mtx);
                outputQueue.push(data);
            }
            cv.notify_one();
        }
        {
            std::unique_lock<std::mutex> lock(mtx);
            isFinished = true;
        }
        cv.notify_all();
    }
};

// 第三个阶段:数据输出
class DataOutputter {
public:
    DataProcessor& processor;

    DataOutputter(DataProcessor& proc) : processor(proc) {}

    void run() {
        while (true) {
            int data;
            {
                std::unique_lock<std::mutex> lock(processor.mtx);
                processor.cv.wait(lock, [this] { return!processor.outputQueue.empty() || processor.isFinished; });
                if (processor.outputQueue.empty() && processor.isFinished) {
                    break;
                }
                data = processor.outputQueue.front();
                processor.outputQueue.pop();
            }
            std::cout << "Processed data: " << data << std::endl;
        }
    }
};

int main() {
    DataGenerator generator;
    DataProcessor processor(generator);
    DataOutputter outputter(processor);

    std::thread generatorThread(&DataGenerator::run, &generator);
    std::thread processorThread(&DataProcessor::run, &processor);
    std::thread outputterThread(&DataOutputter::run, &outputter);

    generatorThread.join();
    processorThread.join();
    outputterThread.join();

    return 0;
}

代码解释

  1. DataGenerator类:负责生成数据,将生成的数据放入输出队列outputQueue,并通过条件变量cv通知下一个阶段有新数据可用。
  2. DataProcessor类:从DataGenerator的输出队列中获取数据,对数据进行处理(这里简单地将数据加倍),然后将处理后的数据放入自己的输出队列,并通知下一个阶段。
  3. DataOutputter类:从DataProcessor的输出队列中获取数据,并将其输出到控制台。
  4. main函数:创建三个线程分别运行三个阶段的任务,并等待所有线程执行完毕。

通过这种方式,数据在三个阶段之间依次传递,形成了一个简单的流水线。每个阶段可以独立运行,提高了系统的并发性能。


http://www.kler.cn/a/594849.html

相关文章:

  • 文档处理控件Aspose典型应用案例:助力企业实现高效智能文档处理
  • 深入浅出Qt容器类:QList、QMap、QHash等常用容器性能对比测试
  • 鸿蒙保姆级教学
  • Bigemap Pro 的三种地图下载方式
  • Elasticsearch Sql 查询
  • 尚硅谷爬虫(解析_xpath的基本使用)笔记
  • 大模型的应用与微调:如何调用 LLM?从 OpenAI API 到本地部署
  • Linux 安全与存储管理指南
  • 腾讯云大模型知识引擎×DeepSeek:股票分析低代码应用实践
  • let const var 底层区域别,es6还有AO 对象和GO对象吗
  • rust学习笔记17-异常处理
  • Redis 管道(Pipeline)深度解析:原理、场景与实战
  • 多包管理工具
  • 删除菜品接口
  • 软考高级信息系统管理工程师通关100题(21-40)附记忆口诀
  • 支持向量机SVM的MATLAB分类预测实现
  • 华为IPD研发管理体系的3大核心框架解析
  • HOW - 平时如何保持学习和成长?
  • vscode/windsurf/trae无法识别junit的@Test注解解决办法
  • 如何在 C++ 中运行 DeepSeek R1 LLM