当前位置: 首页 > article >正文

小白零基础--CPP多线程

进程

  • 进程就是运行中的程序
  • 线程=进程中的进程

1、C++11 Thread线程库基础

#include <iostream>
#include <thread>
#include<string>



void printthread(std::string msg){
  std::cout<<msg<<std::endl;
  for (int i = 0; i < 1000; i++)
  {
    std::cout<<"my  "<<i<<std::endl;
  }
  
  return;
}



int main(){
   //std::thread t(入口)
   //1、创建线程
    std::thread t(printthread,"hello thread");
   //2、保证等待线程结束,主线程在结束
   // t.join();
   //3、分离线程
   //t.detach();
   //4、joinable 判断是否可以调用join
   bool isjoin = t.joinable();
   if(isjoin){
      t.join();
   }
  std::cout<<"over"<<std::endl;
   system( "pause");
   
    return 0;
}

2、线程函数中的数据未定义错误

2.1 临时变量

错误例子

#include <iostream>
#include <thread>


void foo(int& x){
   x+=1;
}



int main(){
   //
   std::thread t(foo,1);
   t.join();
   system( "pause");
   
    return 0;
}

正确方案

#include <iostream>
#include <thread>


void foo(int& x){
   x+=1;
}



int main(){
   //
   int i=1;
   std::thread t(foo,std::ref(i));
   t.join();



  std::cout<<i<<std::endl;
   system( "pause");

    return 0;
}

2.2 传递指针/引用指向局部变量

2.1 的 i 在 main中,那要是不在呢?

#include <iostream>
#include <thread>

std::thread t;
void foo(int& x){
   x+=1;
}

void externi(){
  
    int i=1;
   t=std::thread (foo,std::ref(i));
}

int main(){
   //
   externi();
   t.join();



 
   system( "pause");

    return 0;
}

会报错
那怎么办呢?

#include <iostream>
#include <thread>

std::thread t;
 int i=1;
void foo(int& x){
   x+=1;
}

void externi(){
  
   
   t=std::thread (foo,std::ref(i));
}

int main(){
   //
   externi();
   t.join();



  std::cout<<i<<std::endl;
   system( "pause");

    return 0;
}

2.3 参数被提前手动释放

智能指针

#include <iostream>
#include <thread>
#include <memory>

class  myclass
{
private:
    /* data */
public:
    void foo(){
        std::cout<<"mememem"<<std::endl;
    }
};



int main(){
   std::shared_ptr<myclass> a=std::make_shared<myclass> ();
   std::thread t(&myclass::foo,a);

   system( "pause");
    return 0;
}

2.4 类的private函数

友元

#include <iostream>
#include <thread>
#include <memory>

class  myclass
{
private:
    friend void foo_thread();
     void foo(){
        std::cout<<"mememem"<<std::endl;
    }
public:
   
};
void foo_thread(){

    std::shared_ptr<myclass> a=std::make_shared<myclass> ();
    std::thread t(&myclass::foo,a);
    t.join();
}
int main(){
    foo_thread();
   system( "pause");
    return 0;
}

3互斥量

3.1数据共享–数据竞争问题

#include <iostream>
#include <thread>

int a = 0;
void func(){
    for (int i = 0; i < 1000000; i++)
    {
        a+=1;
    }
    
}
int main(){
   std::thread t1(func);
   std::thread t2(func);
   t1.join();
   t2.join();
  std::cout<<a<<std::endl;
   system( "pause");
    return 0;
}

3.2 互斥锁

#include <iostream>
#include <thread>
#include <mutex>
int a = 0;

std::mutex mt;
void func(){
    for (int i = 0; i < 1000000; i++)
    { 
        mt.lock();
        a+=1;
        mt.unlock();
    }
    
}
int main(){
   std::thread t1(func);
   std::thread t2(func);
   t1.join();
   t2.join();
  std::cout<<a<<std::endl;
   system( "pause");
    return 0;
}

3.3 理解线程安全

4互斥量死锁

4.1 死锁的概念

#include <iostream>
#include <thread>
#include <mutex>
using   namespace std;

mutex m1,m2;
void func1(){
    for (int i = 0; i < 100000; i++)
    { 
        m1.lock();
        m2.lock();
        m1.unlock();
        m2.unlock();
    }
    
    
}
void func2(){
       for (int i = 0; i < 100000; i++)
    { 
    m1.lock();
    m2.lock();
    m2.unlock();
    m1.unlock();
    }
}
int main(){
   thread t1(func1);
   thread t2(func2);
   t1.join();
   t2.join();
   
   cout<<"over<<"<<endl;
   system( "pause");
    return 0;
}

4.2 解决方案

5 lock_guard 和 std::unique_lock

5.1 lock_guard

#include <iostream>
#include <thread>
#include <mutex>
using   namespace std;
int a=0;
mutex m1;
void func1(){
    for(int i=0;i<10000;i++)
    {lock_guard<mutex> gm(m1);
      a++;
    
    }  
}

int main(){
   thread t1(func1);
   t1.join();
   cout<<a<<endl;
   system( "pause");
    return 0;
}

5.2 std::unique_lock

#include <iostream>
#include <thread>
#include <mutex>
using   namespace std;
int a=0;
mutex m1;
void func1(){
    for(int i=0;i<10000;i++)
    {
    unique_lock<mutex> gm(m1);
    a++;
    }  
}

int main(){
   thread t1(func1);
   t1.join();
   cout<<a<<endl;
   system( "pause");
    return 0;
}

6 call_once

6.1 单例模式

6.2 例子:日志类

#include <iostream>
#include <thread>
#include <mutex>
using   namespace std;
class Log{
public:
    Log(){};
    Log(const Log& log)=delete;
    Log &operator =(const Log& log)=delete;
    static Log& GetInstance(){
             static Log  log;//懒汉模式
             return log;
            //饿汉模式
            /**
            static Log   *log=nullptr;
            if(!log) log = new Log;
            return *log;
              */
    }
    void PrintLog(string msg){
            cout << __TIME__ <<" " <<msg<<endl;
    }
};

int main(){
   
   Log::GetInstance().PrintLog("error");

   system( "pause");
    return 0;
}

6.3 call_once

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

using namespace std;

// 需要一次性初始化的共享资源
class DatabaseConfig {
private:
    string serverAddress;
    int port;

    DatabaseConfig() : serverAddress("127.0.0.1"), port(3306) {
        cout << "数据库配置初始化完成!" << endl;
    }

public:
    static DatabaseConfig& getInstance() {
        static once_flag initFlag;
        static DatabaseConfig* instance = nullptr;

        call_once(initFlag, []() {
            instance = new DatabaseConfig();
        });

        return *instance;
    }

    void showConfig() {
        cout << "Server: " << serverAddress 
             << ":" << port << endl;
    }
};

// 多线程测试函数
void threadTask(int id) {
    this_thread::sleep_for(chrono::milliseconds(100 * id));
    auto& config = DatabaseConfig::getInstance();
    
    cout << "线程" << id << "获取配置:";
    config.showConfig();
}

int main() {
    vector<thread> threads;

    // 创建10个线程竞争访问
    for(int i = 0; i < 10; ++i) {
        threads.emplace_back(threadTask, i);
    }

    // 等待所有线程完成
    for(auto& t : threads) {
        t.join();
    }
   system("pause");
    return 0;
}
  • 合理使用 call_once 可以让多线程代码更简洁、更安全,尤其适合需要一次性初始化的场景

7 condition_variable

7.1 生产者-消费者模式概述

生产者-消费者模式是多线程编程中经典的同步问题,需要满足以下条件:

  1. 生产者线程生成数据并放入共享缓冲区。
  2. 消费者线程从缓冲区取出数据并处理。
  3. 同步要求
    • 缓冲区满时,生产者等待消费者消费数据。
    • 缓冲区空时,消费者等待生产者生产数据。

7.2 核心组件

  1. 共享缓冲区:通常使用队列(std::queue)实现。
  2. 互斥锁(std::mutex:保护对缓冲区的并发访问。
  3. 条件变量(std::condition_variable
    • not_full:生产者等待缓冲区非满。
    • not_empty:消费者等待缓冲区非空。

7.3实现代码

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>

using namespace std;

const int BUFFER_SIZE = 5;       // 缓冲区容量
queue<int> buffer;               // 共享缓冲区
mutex mtx;                       // 互斥锁
condition_variable not_full;     // 缓冲区非满条件
condition_variable not_empty;    // 缓冲区非空条件

// 生产者函数
void producer(int id) {
    for (int i = 0; i < 10; ++i) {
        unique_lock<mutex> lock(mtx);
        // 如果缓冲区满,等待消费者消费
        not_full.wait(lock, [] { 
            return buffer.size() < BUFFER_SIZE; 
        });
        // 生产数据
        int data = id * 100 + i;
        buffer.push(data);
        cout << "生产者 " << id << " 生产数据: " << data << endl;
        lock.unlock();
        not_empty.notify_one();  // 通知消费者
        this_thread::sleep_for(chrono::milliseconds(100));
    }
}

// 消费者函数
void consumer(int id) {
    for (int i = 0; i < 10; ++i) {
        unique_lock<mutex> lock(mtx);

        // 如果缓冲区空,等待生产者生产
        not_empty.wait(lock, [] { 
            return !buffer.empty(); 
        });

        // 消费数据
        int data = buffer.front();
        buffer.pop();
        cout << "消费者 " << id << " 消费数据: " << data << endl;

        lock.unlock();
        not_full.notify_one();   // 通知生产者

        this_thread::sleep_for(chrono::milliseconds(200));
    }
}

int main() {
    thread producers[2];
    thread consumers[3];

    // 启动2个生产者线程
    for (int i = 0; i < 2; ++i) {
        producers[i] = thread(producer, i);
    }

    // 启动3个消费者线程
    for (int i = 0; i < 3; ++i) {
        consumers[i] = thread(consumer, i);
    }

    // 等待所有线程结束
    for (auto& t : producers) t.join();
    for (auto& t : consumers) t.join();

    return 0;
}

7.4 代码解析

  1. 共享资源保护

    • 所有对缓冲区的操作(pushpop)均在互斥锁mtx的保护下进行。
    • 使用unique_lock自动管理锁的生命周期。
  2. 条件变量的使用

    • 生产者等待条件not_full.wait(lock, predicate)
      当缓冲区满时(buffer.size() >= BUFFER_SIZE),生产者线程阻塞,直到消费者消费数据后通过not_full.notify_one()唤醒。
    • 消费者等待条件not_empty.wait(lock, predicate)
      当缓冲区空时(buffer.empty()),消费者线程阻塞,直到生产者生产数据后通过not_empty.notify_one()唤醒。
  3. 通知机制

    • 生产者生产数据后调用not_empty.notify_one(),唤醒一个等待的消费者。
    • 消费者消费数据后调用not_full.notify_one(),唤醒一个等待的生产者。

7.5 运行结果示例

生产者 0 生产数据: 0
消费者 0 消费数据: 0
生产者 1 生产数据: 100
消费者 1 消费数据: 100
生产者 0 生产数据: 1
消费者 2 消费数据: 1
...
(输出将展示生产与消费的交替过程)

7.6 关键点总结

  1. 防止虚假唤醒
    条件变量的wait必须配合谓词(如buffer.size() < BUFFER_SIZE)使用,确保即使被意外唤醒也能重新检查条件。

  2. 资源管理

    • unique_lockwait时自动释放锁,唤醒后重新获取锁。
    • 使用notify_one而非notify_all,减少不必要的线程竞争。
  3. 死锁避免

    • 确保在调用notify_one前释放锁(通过lock.unlock())。
    • 避免在持有锁时进行耗时操作(如示例中的sleep_for在锁外执行)。

7.7 扩展场景

  • 多生产者和多消费者
    当前代码已支持多个生产者和消费者,通过调整线程数量即可验证。

  • 动态缓冲区大小
    可将BUFFER_SIZE设为动态值,根据需求调整。

  • 复杂数据类型
    queue<int>替换为自定义数据类型队列,实现更复杂的生产-消费逻辑。

此实现完整展示了如何利用condition_variable实现线程安全的生产者-消费者模式,可直接用于实际项目中的任务队列、线程池等场景。

8 跨平台线程池

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    // 构造函数,启动指定数量的工作线程
    ThreadPool(size_t threads = std::thread::hardware_concurrency())
        : stop(false) {
        for(size_t i = 0; i < threads; ++i)
            workers.emplace_back([this] {
                for(;;) {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        
                        if(this->stop && this->tasks.empty())
                            return;
                        
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            });
    }

    // 将任务添加到任务队列,返回一个future以便获取结果
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);

            // 不允许在停止线程池后添加新任务
            if(stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");

            tasks.emplace([task](){ (*task)(); });
        }
        condition.notify_one();
        return res;
    }

    // 析构函数,等待所有任务完成并停止所有线程
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for(std::thread &worker : workers)
            worker.join();
    }

private:
    std::vector<std::thread> workers;     // 工作线程集合
    std::queue<std::function<void()>> tasks; // 任务队列
    
    std::mutex queue_mutex;               // 任务队列互斥锁
    std::condition_variable condition;    // 条件变量
    bool stop;                            // 停止标志
};

// 使用示例
int main() {
    ThreadPool pool(4); // 创建4个工作线程

    // 提交多个任务到线程池
    std::vector<std::future<int>> results;

    for(int i = 0; i < 8; ++i) {
        results.emplace_back(
            pool.enqueue([i] {
                std::this_thread::sleep_for(std::chrono::seconds(1));
                return i*i;
            })
        );
    }

    // 获取任务结果
    for(auto && result : results)
        std::cout << result.get() << ' ';
    std::cout << std::endl;

    return 0;
}

9 异步并发 async future packaged task promise

#include <iostream>
#include <future>
#include <thread>
#include <chrono>
#include <vector>

int compute(int x) {
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
    return x * x;
}

int main() {
    // 使用 std::async 启动多个异步任务
    std::vector<std::future<int>> futures;
    for (int i = 1; i <= 5; ++i) {
        futures.push_back(std::async(std::launch::async, compute, i));
    }

    // 获取所有任务的结果
    for (auto& future : futures) {
        std::cout << "Result: " << future.get() << std::endl;
    }

    // 使用 std::packaged_task 手动控制任务执行
    std::packaged_task<int(int)> task(compute);
    std::future<int> future = task.get_future();

    std::thread t(std::move(task), 10);
    t.join(); // 等待线程完成

    std::cout << "Packaged Task Result: " << future.get() << std::endl;

    return 0;
}

10 原子操作atomic

#include <iostream>
#include <thread>
#include <atomic>
using namespace std;

atomic<int> a(0); // 使用 atomic<int> 替代普通 int

void func1() {
    for (int i = 0; i < 10000; i++) {
        a++; // 原子操作,无需额外的互斥锁
    }
}

int main() {
    thread t1(func1);
    t1.join();
    cout << a << endl; // 输出最终结果
    system("pause");
    return 0;
}

http://www.kler.cn/a/529351.html

相关文章:

  • 如何实现一个CLI命令行功能 | python 小知识
  • Ruby 模块(Module)
  • Hive存储系统全面测试报告
  • 仿真设计|基于51单片机的温室环境监测调节系统
  • Day31-【AI思考】-关键支点识别与战略聚焦框架
  • MySQL CTE:解锁SQL查询新模式
  • w186格障碍诊断系统spring boot设计与实现
  • 星际智慧农业系统(SAS),智慧农业的未来篇章
  • MP4分析工具
  • 97,【5】buuctf web [极客大挑战 2020]Greatphp
  • python算法和数据结构刷题[5]:动态规划
  • 【Springboot2】热部署开启
  • 【人工智能】 在本地运行 DeepSeek 模型:Ollama 安装指南
  • deep generative model stanford lecture note2 --- autoregressive
  • Windows11 不依赖docker搭建 deepseek-R1 1.5B版本(附 Open WebUi搭建方式)
  • openmv运行时突然中断并且没断联只是跟复位了一样
  • 如何在Intellij IDEA中识别一个文件夹下的多个Maven module?
  • 【单层神经网络】基于MXNet库简化实现线性回归
  • Python sider-ai-api库 — 访问Claude、llama、ChatGPT、gemini、o1等大模型API
  • ollama和deepseek-r1-1.5b和AnythingLLM
  • Java 有很多常用的库
  • 【4. C++ 变量类型详解与创新解读】
  • UI线程用到COM只能选单线程模型
  • [CVPR 2024] AnyDoor: Zero-shot Object-level Image Customization
  • 17.2 图形绘制7
  • ES的机架感知-Rack Awareness