当前位置: 首页 > article >正文

基于阻塞队列及环形队列的生产消费模型

目录

条件变量函数

等待条件满足

阻塞队列

升级版

信号量

POSIX信号量

环形队列


条件变量函数

等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 
 参数: 
 cond:要在这个条件变量上等待 
 mutex:互斥量,后面详细解释 

pthread_cond_wait:第二个参数必须是正在使用的互斥锁

a.pthread_cond_wait:该函数调用时,会以原子性的方式将锁释放,并将自己挂起

b.pthread_cond_wait:该函数被唤醒返回的时候,会自动从新获取锁

阻塞队列

blockqueue.hpp

#pragma once
#include<iostream>
#include<string>
#include<queue>
#include<unistd.h>
#include<pthread.h>

using namespace std;
static const int gmaxcap=5;
template<class T>
class BlockQueue
{
public:
    BlockQueue(const int& maxcap=gmaxcap)
        :_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }
    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        while (is_full())
            pthread_cond_wait(&_pcond,&_mutex);//生产条件不满足
        _q.push(in);
        //阻塞队列中一定有数据
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }
    void pop(T* out)
    {
        pthread_mutex_lock(&_mutex);
        while(is_empty)
            pthread_cond_wait(&_ccond,&_mutex);
        *out=_q.front();
        _q.pop();
        //队列中一定有一个空位置
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size()==_maxcap;
    }
private:
    queue<T> _q;
    int _maxcap;
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;//生产者对应的条件变量
    pthread_cond_t _ccond;
};


task.hpp

#pragma once
#include<iostream>
#include<cstdio>
#include<string>
#include<functional>

using namespace std;
class Task
{
    using func_t=function<int(int,int,char)>;
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func)
        :_x(x),_y(y),_op(op),_callback(func)
    {}
    string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d ",_x,_op,_y,result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ? ",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};

升级版

blockqueue.hpp

#pragma once
#include<iostream>
#include<string>
#include<queue>
#include<unistd.h>
#include<pthread.h>

using namespace std;
static const int gmaxcap=5;
template<class T>
class BlockQueue
{
public:
    BlockQueue(const int& maxcap=gmaxcap)
        :_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }
    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        while (is_full())
            pthread_cond_wait(&_pcond,&_mutex);//生产条件不满足
        _q.push(in);
        //阻塞队列中一定有数据
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }
    void pop(T* out)
    {
        pthread_mutex_lock(&_mutex);
        while(is_empty)
            pthread_cond_wait(&_ccond,&_mutex);
        *out=_q.front();
        _q.pop();
        //队列中一定有一个空位置
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size()==_maxcap;
    }
private:
    queue<T> _q;
    int _maxcap;
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;//生产者对应的条件变量
    pthread_cond_t _ccond;
};


task.hpp

#pragma once
#include<iostream>
#include<cstdio>
#include<string>
#include<functional>

using namespace std;
class CalTask
{
    using func_t=function<int(int,int,char)>;
public:
    CalTask()
    {}
    CalTask(int x,int y,char op,func_t func)
        :_x(x),_y(y),_op(op),_callback(func)
    {}
    string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d ",_x,_op,_y,result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ? ",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};
const string oper="+-*/%";
int mymath(int x,int y,char op)
{
    int result=0;
    switch (op)
    {
    case '+':
        result=x+y;
        break;
    case '-':
        result=x-y;
        break;
    case '*':
        result=x*y;
        break;
    case '/':
    {
        if(y==0)
        {
            cerr<<"div zero error!"<<endl;
            result=-1;
        }    
        else result=x/y;
    }
        break;
    case '%':
    {
        if(y==0)
        {
            cerr<<"div zero error!"<<endl;
            result=-1;
        }    
        else result=x%y;
    }
        break;
    default:
        break;
    }
    return result;
}
class SaveTask
{
    typedef function<void(const string&)> func_t;
public:
    SaveTask()
    {}
    SaveTask(const string& message,func_t func)
        :_message(message),_func(func)
    {}
    void operator()()
    {
        _func(_message);
    }
private:
    string _message;
    func_t _func;
};
void Save(const string& message)
{
    const string target="./log.txt";
    FILE* fp=fopen(target.c_str(),"a+");
    if(!fp)
    {
        cerr<<"fopen error"<<endl;
        return;
    }
    fputs(message.c_str(),fp);
    fputs("\n",fp);
    fclose(fp);
}

MainCp.cc

#include"BlockQueue.hpp"
#include"task.hpp"
#include<sys/types.h>
#include<unistd.h>
#include<ctime>

//
//
template<class C,class S>
class BlockQueues
{

public:
    BlockQueue<C>* c_bq;
    BlockQueue<S>* s_bq;
};
void* consumer(void* bqs_)
{
    BlockQueue<CalTask>* bq=(static_cast<BlockQueues<CalTask,SaveTask>* >(bqs_))->c_bq;
    BlockQueue<SaveTask>* save_bq=(static_cast<BlockQueues<CalTask,SaveTask>* >(bqs_))->s_bq;
    
    while(true)
    {
        /* consumer */
        // int data;
        // bq->pop(&data);
        CalTask t;
        bq->pop(&t);
        string result=t();
        cout<<"消费数据: "<<result<<endl;

        SaveTask save(result,Save);
        save_bq->push(save);
        cout<<"推送保存任务完成..."<<endl;
        sleep(1);
    }
    return nullptr;
}
void* producter(void* bqs_)
{
    BlockQueue<CalTask>* bq=(static_cast<BlockQueues<CalTask,SaveTask>* >(bqs_))->c_bq;
    
    while (true)
    {
        //producer
        int x=rand()%10+1;
        int y=rand()%5;
        int operCode=rand()%oper.size();
        CalTask t(x,y,oper[operCode],mymath);
        bq->push(t);
        cout<<"生产任务: "<<t.toTaskString()<<endl;
        // sleep(1);
    }
    return nullptr;
}
void* saver(void* bqs_)
{
    BlockQueue<SaveTask>* save_bq=(static_cast<BlockQueues<CalTask,SaveTask>* >(bqs_))->s_bq;
    while (true)
    {
        SaveTask t;
        save_bq->pop(&t);
        t();
        cout << "推送保存任务完成..." << endl;
    }
    return nullptr;
}
int main()
{
    srand((unsigned long)time(nullptr));
    BlockQueues<CalTask,SaveTask> bqs;

    bqs.c_bq=new BlockQueue<CalTask>();
    bqs.s_bq=new BlockQueue<SaveTask>();
    pthread_t c,p,s;
    pthread_create(&c,nullptr,consumer,&bqs);
    pthread_create(&p,nullptr,producter,&bqs);
    pthread_create(&s,nullptr,saver,&bqs);
    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    pthread_join(s,nullptr);
    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

./MainCp
生产任务: 9 * 0 = ? 
生产任务: 9 - 4 = ? 
生产任务: 8 - 0 = ? 
生产任务: 3 - 4 = ? 
生产任务: 6 + 1 = ? 
消费数据: 9 * 0 = 0 
推送保存任务完成...
生产任务: 2 - 2 = ? 
推送保存任务完成...
消费数据: 9 - 4 = 5 
推送保存任务完成...
生产任务: 9 - 0 = ? 
推送保存任务完成...
消费数据: 8 - 0 = 8 
推送保存任务完成...
生产任务: 6 * 3 = ? 
推送保存任务完成...
消费数据: 3 - 4 = -1 
推送保存任务完成...
生产任务: 4 * 4 = ? 
推送保存任务完成...
消费数据: 6 + 1 = 7 
推送保存任务完成...
生产任务: 5 % 4 = ? 
推送保存任务完成...
^C
zhangsan@ubuntu:~/practice-using-ubuntu/20241005/blockqueue$ cat log.txt
9 * 0 = 0 
9 - 4 = 5 
8 - 0 = 8 
3 - 4 = -1 
6 + 1 = 7 

信号量

a.信号量的本质就是计数器

b.只有拥有信号量,在未来就一定能拥有临界资源的一部分

申请信号量的本质就是:对临界资源中特点小块资源的预定机制

sem--         申请资源       P        必须保证操作的原子性

sem++       释放资源        V       必须保证操作的原子性

POSIX信号量

环形队列

RingQueue.hpp

#pragma once

#include<iostream>
#include<cassert>
#include<vector>
#include<ctime>
#include<cstdlib>
#include<semaphore.h>
#include<unistd.h>
#include<pthread.h>

static const int gcap=5;

template<class T>
class RingQueue
{
private:
    void P(sem_t& sem)
    {
        int n=sem_wait(&sem);
        assert(n==0);
    }
    void V(sem_t& sem)
    {
        int n=sem_post(&sem);
        assert(n==0);
    }
public:
    RingQueue(const int& cap=gcap):_queue(cap),_cap(cap)
    {
        int n=sem_init(&_spaceSem,0,_cap);
        assert(n==0);
        n=sem_init(&_dataSem,0,0);
        assert(n==0);
        _productorStep=_consumerStep=0;
        pthread_mutex_init(&_pmutex,nullptr);
        pthread_mutex_init(&_cmutex,nullptr);
    }
    void Push(const T& in)
    {
        
        P(_spaceSem);//productor
        pthread_mutex_lock(&_pmutex);
        _queue[_productorStep++]=in;
        _productorStep%=_cap;
        pthread_mutex_unlock(&_pmutex);//更高效
        V(_dataSem);
        
    }
    void Pop(T* out)
    {
        pthread_mutex_lock(&_cmutex);
        P(_dataSem);
        *out=_queue[_consumerStep++];
        _consumerStep%=_cap;
        V(_spaceSem);
        pthread_mutex_unlock(&_cmutex);
    }
    ~RingQueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);
        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);
    }
private:
    vector<T> _queue;
    int _cap;
    sem_t _spaceSem;//生产者->空间资源
    sem_t _dataSem;
    int _productorStep;
    int _consumerStep;
    pthread_mutex_t _pmutex;
    pthread_mutex_t _cmutex;
};

task.hpp

#pragma once
#include<iostream>
#include<cstdio>
#include<string>
#include<functional>

using namespace std;
class Task
{
    using func_t=function<int(int,int,char)>;
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func)
        :_x(x),_y(y),_op(op),_callback(func)
    {}
    string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d ",_x,_op,_y,result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ? ",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};
const string oper="+-*/%";
int mymath(int x,int y,char op)
{
    int result=0;
    switch (op)
    {
    case '+':
        result=x+y;
        break;
    case '-':
        result=x-y;
        break;
    case '*':
        result=x*y;
        break;
    case '/':
    {
        if(y==0)
        {
            cerr<<"div zero error!"<<endl;
            result=-1;
        }    
        else result=x/y;
    }
        break;
    case '%':
    {
        if(y==0)
        {
            cerr<<"div zero error!"<<endl;
            result=-1;
        }    
        else result=x%y;
    }
        break;
    default:
        break;
    }
    return result;
}

main.cc

#include"RingQueue.hpp"
#include"task.hpp"

using namespace std;

void* ProductorRoutine(void* rq)
{
    RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>* >(rq);
    while (true)
    {
        /* code */
        int x=rand()%100;
        int y=rand()%50;
        char op=oper[rand()%oper.size()];
        Task t(x,y,op,mymath);
        ringqueue->Push(t);
        cout<<"生产者派发了一个任务: "<<t.toTaskString()<<endl;
        sleep(1);
    }
}
void* ConsumerRoutine(void* rq)
{
    RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>* >(rq);
    while (true)
    {
        /* code */
        Task t;
        ringqueue->Pop(&t);
        string result=t();
        cout<<"消费者消费了一个任务"<<result<<endl;
    }
    
}
int main()
{
    srand((unsigned int)time(nullptr));
    RingQueue<Task>* rq=new RingQueue<Task>();

    pthread_t p,c;
    pthread_create(&p,nullptr,ProductorRoutine,rq);
    pthread_create(&c,nullptr,ConsumerRoutine,rq);
    pthread_join(p,nullptr);
    pthread_join(c,nullptr);
    delete rq;
    return 0;
}


http://www.kler.cn/news/335653.html

相关文章:

  • 第一集---初识计算机系统
  • 如何利用免费音频剪辑软件制作出精彩音频
  • 使用ElasticSearch-dump工具进行ES数据迁移、备份
  • python 实现贪婪合并排序算法
  • 【MySQL】-- 库的操作
  • 数据结构--集合框架
  • 【React】事件机制
  • 复习HTML(基础)
  • whisper 实现语音识别 ASR - python 实现
  • js 如何平拆嵌套数组
  • 【EXCEL数据处理】000013 案例 EXCEL筛选与高级筛选。
  • 消息称苹果iPhone系列将完全放弃LCD屏幕
  • redis-数据类型
  • STM32+ADC+扫描模式
  • Electron Vue框架环境搭建 Vue3环境搭建
  • 在Python中实现多目标优化问题(7)模拟退火算法的调用
  • Django学习笔记十二:程序优化
  • GOM引擎 GEEM2被攻击后触发无敌模式的BUFF脚本范例
  • 使用Mybatis框架的主要优势
  • 【最新华为OD机试E卷-支持在线评测】简单的自动曝光(100分)多语言题解-(Python/C/JavaScript/Java/Cpp)