当前位置: 首页 > article >正文

Linux:同步

目录

一、同步概念

条件变量

二、生产者消费者模型

三、环形队列


一、同步概念

        互斥用来解决 访问临界资源 的非原子性,通俗来说,由于互斥锁的实现,保证了在用户角度看,同一个时间内访问临界资源的代码只有一个线程在执行。

        而同步,用来解决,多线程中,临界资源长时间被同一个线程占用,造成其他线程饥饿的问题。

条件变量

        条件变量用来实现同步。接口的使用和互斥类似。

        条件变量的作用就是,当一个线程拿到锁,访问临界资源结束后,让这个线程去某个条件变量的队列中去等待,同时释放锁资源。

 

        不使用同步机制造成的现象。

 

        使用同步机制改善。 

 

#include <iostream>
#include <vector>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;

void* MCore(void* args)
{
    sleep(3);
    std::cout << "master 开始工作" << std::endl;
    std::string name = static_cast<char*>(args);
    while(1)
    {
        pthread_cond_signal(&gcond);//唤醒其中一个队列首部的线程
        std::cout << "master唤醒一个线程" << std::endl;
        sleep(1);
    }
        
}
void StartMaster(std::vector<pthread_t>* threads)
{
    pthread_t tid;
    int n = pthread_create(&tid,nullptr,MCore,(void*)"Master Thread");
    if(n == 0)
    {
        std::cout << "create master success" << std::endl;
    }
    threads->emplace_back(tid);
}

void* SCore(void* args)
{
    std::string name = static_cast<char*>(args);
    while(1)
    {
        //1.加锁
        pthread_mutex_lock(&gmutex);
        //2.条件变量
        pthread_cond_wait(&gcond,&gmutex);
        std::cout << "当前被唤醒的线程是:" << name << std::endl;
        sleep(1);
        
        //3.解锁
        pthread_mutex_unlock(&gmutex);
        
    }

}
void StartSlaver(std::vector<pthread_t>* threads,int n)
{
    for(int i = 0;i < n;++i)
    {
        char* name = new char[64];
        snprintf(name,64,"new thread-%d",i+1);
        pthread_t tid;

        int n = pthread_create(&tid,nullptr,SCore,(void*)name);
        if(n == 0)
        {
            std::cout << "create success:" << name << std::endl;
            threads->emplace_back(tid);
        }
    }
}

void WaitThread(std::vector<pthread_t>& threads)
{
    for(auto& tid: threads)
    {
        pthread_join(tid,nullptr);
    }
}
int main()
{
    std::vector<pthread_t> threads;
    StartMaster(&threads);
    StartSlaver(&threads,5);
    WaitThread(threads);

    return 0;
}

        当副线程获取锁资源后,对锁的管理变成不会立刻释放锁,而是去指定的条件变量(队列)下去等待,等待被唤醒,同时释放锁,锁资源同时给到那个被唤醒的进程。

二、生产者消费者模型

  • 解释生产者消费者模型

        生产者消费者模型,用来解决,并发运行下,多个线程之间的数据传递问题。

  • 3个关系

        并发运行下:

        多个生产者线程之间,既要同步也要互斥

        多个消费者线程之间,既要同步也要互斥

        生产者和消费者之间,既要同步也要互斥

  • 2种角色

        生产者、消费者

  • 一种交易场所

        交易场所本质就是内存中的一种数据结构,交易对象是数据。

  • 基于阻塞队列实现生产者消费者模型

        交易场所是阻塞队列。

        多个生产线程之间要互斥,只能有一个生产线程向阻塞队列放数据。

        多个消费线程之间要互斥,只能有一个消费线程向阻塞队列拿数据。

        每一个生产线程和每一个消费线程之间要互斥。因此,所有线程都要互斥的访问这个阻塞队列,只需要一把互斥锁即可,但是需要两个条件变量。

基于阻塞队列实现生产者消费者的项目地址

BlockQueue.hpp

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__

#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>

template <typename T>
class BlockQueue
{
    private:
    bool IsFull()
    {
        return _blockqueue.size() == _cap;
    }
    bool IsEmpty()
    {
        return _blockqueue.empty();
    }
public:
BlockQueue(int cap)
:_cap(cap)
{
    _product_wait_num = 0;
    _consum_wait_num = 0;
    pthread_mutex_init(&_mutex,nullptr);
    pthread_cond_init(&_consum_cond,nullptr);
    pthread_cond_init(&_product_cond,nullptr);
}

~BlockQueue()
{
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_consum_cond);
    pthread_cond_destroy(&_product_cond);
}
//生产者是向阻塞队列里面加数据
void Enqueue(T & in)
{
    pthread_mutex_lock(&_mutex);
    //生产者拿到了队列,判断队列是否为满的
    //如果是满的,则需要通知消费者来消费,让后让该生产者去等待
    
    while(IsFull())//为什么是while而不是if,保证唤醒的进程争夺锁成功后,要确保队列中有数据再去执行后面的代码
    {
        _product_wait_num++;//++和--操作

        pthread_cond_wait(&_product_cond,&_mutex);
        _product_wait_num--;
    }
    //生产
    _blockqueue.push(in);
    if(_consum_wait_num > 0)
    {
        pthread_cond_signal(&_consum_cond);
    }
    pthread_mutex_unlock(&_mutex);

}
//消费者是从队列里面拿数据
void Pop(T * out)
{
    pthread_mutex_lock(&_mutex);

    while(IsEmpty())
    {
        //如果队列为空,消费者就去等待,同时释放锁
        _consum_wait_num++;

        pthread_cond_wait(&_consum_cond,&_mutex);
        _consum_wait_num--;
    }

    //消费
    *out = _blockqueue.front();
    _blockqueue.pop();
    if(_product_wait_num > 0)
    {
        pthread_cond_signal(&_product_cond);
    }
    pthread_mutex_unlock(&_mutex);
}
private:

std::queue<T> _blockqueue;
int _cap;
pthread_mutex_t _mutex;//保护阻塞队列的锁
pthread_cond_t _consum_cond;
pthread_cond_t _product_cond;

int _product_wait_num;
int _consum_wait_num;

};


#endif

Thread.hpp

#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <functional>

#include <unistd.h>
#include <pthread.h>


namespace ThreadModule
{

    template<typename T>
    using func_t = std::function<void(T)>;

    template<typename T>
    class Thread
    {
    public:
        Thread(func_t<T> func,T data,const std::string &name = "none-name")
        :_func(func),_data(data),_threadname(name),_stop(true)
        {}
        ~Thread()
        {}
        //这里涉及到了一套解决方案,静态成员函数是类级别的,换句话说,每个对象拿到的函数是同一个,
        //因此,通过输入型参数代入this指针,再调函数来实现多个对象各自调用
        void Excute()
        {
            _func(_data);
        }
        static void* threadroutine(void* args)
        {
            Thread<T> *self = static_cast<Thread<T>*>(args);

            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = pthread_create(&_tid,nullptr,threadroutine,this);
            if(!n)
            {
                _stop = false;
                return true;
            }
            else{
                return false;
            }
        }
        void Detach()
        {
            if(!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if(!_stop)
            {
                pthread_join(_tid,nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
    private:
        pthread_t _tid;
        std::string _threadname;
        T  _data;//此时是阻塞队列,是一个共享资源
        func_t<T> _func;
        bool _stop;
    };

}

#endif

Task.hpp

#pragma once

#include <iostream>
#include <string>
#include <functional>


//using Task = std::function<void()>;

class Task
{
public:
    Task() {}
    Task(int a, int b) : _a(a), _b(b), _result(0)
    {
    }
    void Excute()
    {
        _result = _a + _b;
    }
    std::string ResultToString()
    {
        return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);
    }
    std::string DebugToString()
    {
        return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
    }

private:
    int _a;
    int _b;
    int _result;
};

Main.cc

#include "Thread.hpp"
#include "BlockQueue.hpp"
#include <vector>
#include <cstdlib>
#include <ctime>
#include "Task.hpp"
using namespace ThreadModule;


using  blockqueue_t = BlockQueue<Task>;

class ThreadData
{
    public:
    ThreadData(blockqueue_t & bq,std::string name)
    :_bq(bq),who(name)
    {}

    blockqueue_t &_bq;
    std::string who;
};

void Consumer(ThreadData* td)
{
    while(1)
    {
        sleep(1);
        //1.从阻塞队列中取一个任务
        Task t;
        td->_bq.Pop(&t);

        //2.处理这个任务
        t.Excute();
        std::cout << "Consumer Task" << t.ResultToString() << std::endl;
    }
}
void Product(ThreadData* td)
{
    srand(time(nullptr)^ pthread_self());
    while(1)
    {
        sleep(1);
        //1.生成任务
        int a = rand() % 10 +1;
        usleep(1000);
        int b = rand() % 20 + 1;
        Task t(a,b);

        //2.把任务放到阻塞队列中
        td->_bq.Enqueue(t);
        std::cout << "Product Task" << t.DebugToString() << std::endl;
    }
}

void StartConsumer(std::vector<Thread<ThreadData*>> * threads,int num,blockqueue_t& bq)
{
    for(int i = 0;i < num; ++i)
    {
        std::string name = "thread-" + std::to_string(i+1);
        ThreadData* td = new ThreadData(bq,name);
        threads->emplace_back(Consumer,td,name);
        threads->back().Start();//每新建一个线程就启动它
    }
}
void StartProductor(std::vector<Thread<ThreadData*>> * threads,int num,blockqueue_t& bq)
{
    for(int i = 0;i < num; ++i)
    {
        std::string name = "thread-" + std::to_string(i+1);
        ThreadData* td = new ThreadData(bq,name);

        threads->emplace_back(Product,td,name);
        threads->back().Start();//每新建一个线程就启动它
    }
}
void WaitAllThread(std::vector<Thread<ThreadData*>>&  threads)
{
    for(auto& e : threads)
    {
        e.Join();
    }
}

int main()
{
    blockqueue_t* bq = new blockqueue_t(5);//队列最多有五个数据
    std::vector<Thread<ThreadData*>> threads;
    StartConsumer(&threads,1,*bq);
    StartProductor(&threads,1,*bq);
    WaitAllThread(threads);

}


三、环形队列

  • 基于环形队列的生产者消费者模型,不再是条件变量控制,而是信号量机制

        信号量机制中,将临界资源视为一个整体,就是基于阻塞队列实现的生产者消费者模型。而将临界资源划分为一块块小的数据块,信号量就是用来表示临界资源的数量,信号量的使用,主要表现为基于环形队列实现的生产者消费者模型。

        和System的信号量通信方式有一定关联,此处的信号量机制是POSIX中的信号量机制,更多是用来完成生产消费模型。

RingQueue.hpp

#ifndef __RING_QUEUE_HPP__
#define __RING_QUEUE_HPP__


#include <iostream>
#include <string>
#include <vector>
#include <pthread.h>
#include <semaphore.h>

template<typename T>
class RingQueue
{
    private:
    void P(sem_t& sg)
    {
        sem_wait(&sg);//p操作对应的函数,执行该函数,信号量资源如果大于0,则减减
    }
    void V(sem_t& sg)
    {
        sem_post(&sg);
    }
    void Lock(pthread_mutex_t& mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    void Unlock(pthread_mutex_t& mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
    public:
    RingQueue(int cap)
    :_cap(cap)
    {
        _productor_step = 0;
        _consumer_step = 0;

        pthread_mutex_init(&_productor_mutex,nullptr);
        pthread_mutex_init(&_consumer_mutex,nullptr);

        sem_init(&_room_sem,0,_cap);
        sem_init(&_data_sem,0,0);
    }
    ~RingQueue()
    {
        sem_destroy(&_room_sem);
        sem_destroy(&_data_sem);

        pthread_mutex_destroy(&_productor_mutex);
        pthread_mutex_destroy(&_consumer_mutex);
    }
    void Enqueue(const T& in)
    {
        //生产任务
        //P操作申请一个空间
        P(_room_sem);
        //多个生产者进程之间要互斥
        Lock(_productor_mutex);
        _ring_queue[_productor_step++] = in;//生产者按照顺序生产
        _productor_step %= _cap;

        Unlock(_productor_mutex);
        V(_data_sem);

    }
    void Pop(T* out)
    {
        P(_data_sem);
        Lock(_consumer_mutex);

        *out = _ring_queue[_consumer_step++];
        _consumer_step %= _cap;

        Unlock(_consumer_mutex);
        V(_room_sem);
    }
    private:
    //1.定义一个环形队列
    std::vector<T> _ring_queue;
    int _cap;//环形队列的容量上限

    //2.生产者和消费者的下标
    int _productor_step;
    int _consumer_step;

    //3.定义信号量
    sem_t _room_sem;//生产者关心有几个空间
    sem_t _data_sem;//消费者关心有几个数据

    //4.定义锁,维护多生产多消费之间的互斥的关系,
    //注意生产者和消费者之间不需要互斥,因此需要两把锁
    pthread_mutex_t _productor_mutex;
    pthread_mutex_t _consumer_mutex;
    
};


#endif

http://www.kler.cn/a/566708.html

相关文章:

  • 【微知】git log如何将每次提交按照一行查看?(git log --oneline)
  • unity lua属性绑定刷新
  • C++ 设计模式 十一:代理模式 (读书 现代c++设计模式)
  • MySQL 和 Elasticsearch 之间的数据同步
  • docker里面pgadmin4自动备份pg数据库操作
  • NIM平台开发基于提示工程的大语言模型(LLM)应用
  • 笔记20250225
  • IP-----双重发布
  • 使用PDFMiner.six解析PDF数据
  • 【Rust中级教程】题外话:Rust + Python联合编程(基于Maturin)
  • 城电科技|会追日的智能花,光伏太阳花开启绿色能源新篇章
  • 一个便捷的web截图库~
  • 回溯算法(C/C++)
  • mysql中事务的基本概念
  • 开源PDF解析工具olmOCR
  • next.js-学习3
  • USRP4120-通用软件无线电平台
  • C语言一维数组的全面解析
  • CAN总线通信协议学习1——物理层
  • HarmonyOS 5.0应用开发——多线程Worker和@Sendable的使用方法