当前位置: 首页 > article >正文

Linux_线程同步生产者消费者模型

同步的相关概念

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
  • 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。

同步的意义
互斥保证安全性,安全不一定合理或高效!!同步主要是在保证安全的前提下,让系统变得更加合理和高效!

条件变量

当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。条件变量是一个用来进行线程同步的特性,内部要维护线程队列。

条件变量函数

pthread_cond_init函数

// 初始化局部的条件变量
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t
*restrict attr);

// 使用宏值初始化(全局),类比pthread_mutex_init
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  • cond 是指向条件变量对象的指针,该对象将被初始化。
  • attr 是指向条件变量属性的指针,用于指定条件变量的属性。如果此参数为 NULL,则使用默认属性。在大多数应用中,通常传递 NULL

函数成功时返回 0;出错时返回错误码。

pthread_cond_wait函数

// 使线程在条件变量上等待,直到该条件变量被另一个线程的信号
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict
mutex);
  • cond 是指向条件变量对象的指针。
  • mutex 是指向互斥锁对象的指针,该互斥锁必须在调用 pthread_cond_wait 之前被当前线程持有(即锁定状态)。

pthread_cond_signal函数

// 唤醒一个线程
int pthread_cond_signal(pthread_cond_t *cond);

pthread_cond_broadcast函数 

// 唤醒所有线程
int pthread_cond_broadcast(pthread_cond_t *cond);

一个Demo样例

// 初始化全局的锁和条件变量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void* active(void* args)
{
    std::string name = static_cast<const char*>(args);
    while(true)
    {
        pthread_mutex_lock(&mutex);
        // 让每一个线程去条件变量那里去等待
        pthread_cond_wait(&cond, &mutex);    
        printf("%s is active!\n", name.c_str());
        
        pthread_mutex_unlock(&mutex);
    }
}
int main()
{
    pthread_t tid1, tid2, tid3;
    pthread_create(&tid1, nullptr, active, (void*)"thread-1");
    pthread_create(&tid2, nullptr, active, (void*)"thread-2");
    pthread_create(&tid3, nullptr, active, (void*)"thread-3");

    while(true)
    {
        printf("main wakeup thread...\n");
        // 我唤醒一个线程,一个线程去做相应的工作
        pthread_cond_signal(&cond);
        sleep(1);
    }

    pthread_join(tid1, nullptr);
    pthread_join(tid2, nullptr);
    pthread_join(tid3, nullptr);
    return 0;    
}

将上面的pthread_cond_signal换成下面这句代码,线程就会全部被唤醒。

pthread_cond_broadcast(&cond);

生产者消费者模型

什么是生产者消费者模型

生产者消费者模型(Producer-Consumer Model)是一种经典的多线程同步问题,用于描述两个或多个线程之间共享有限资源的场景。在这个模型中,生产者负责生成数据并将其放入缓冲区(Buffer),消费者则从缓冲区中取出数据并消费。生产者和消费者通过缓冲区进行通信,但它们的执行速度可能不同,因此需要通过同步机制来协调它们的行为。

为什么要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的

生产者消费者模型的优点:解耦、支持并发、支持忙闲不均

1分钟记住生产者消费者模型

生产者消费者模型可简单理解为321原则

  • 三种关系(生产和生产(互斥),消费和消费(互斥),生产和消费(互斥&&同步))
  • 两种角色(生产线程,消费线程)
  • 一个缓冲区(一段内存空间,如阻塞队列)

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

Blocking Queue的Demo代码

设计思路:

(1)定义阻塞队列

阻塞队列是生产者和消费者之间的通信桥梁。它需要支持以下操作:

  • 入队(put):生产者调用,将数据放入队列。如果队列已满,则阻塞生产者线程,直到有空间可用。
  • 出队(take):消费者调用,从队列中取出数据。如果队列为空,则阻塞消费者线程,直到有数据可用。

(2)生产者逻辑

生产者负责生成数据并将其放入阻塞队列。

  1. 生成数据。

  2. 调用阻塞队列的put方法将数据放入队列。

  3. 如果队列已满,生产者线程会被阻塞,直到有空间可用。

(3)消费者逻辑

消费者负责从阻塞队列中取出数据并消费。

  1. 调用阻塞队列的take方法从队列中取出数据。

  2. 如果队列为空,消费者线程会被阻塞,直到有数据可用。

  3. 消费数据

// BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>

static const int gcap = 10;
template <typename T>
class BlockQueue
{
private:
    bool IsFull() { return _q.size() == _cap; }
    bool IsEmpty() { return _q.empty(); }
public:
    BlockQueue(int cap = gcap)
        : _cap(cap)
        , _cwait_num(0)
        , _pwait_num(0)
    {
        // 基于RAII设计初始化和销毁
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_productor_cond, nullptr);
        pthread_cond_init(&_consumer_cond, nullptr);
    }
    void Put(const T &in) // 生产者
    {
        pthread_mutex_lock(&_mutex);
        // 为了防止伪唤醒,我们这里使用while进行判断
        // 伪唤醒是指将线程唤醒之后,线程没有资源可以访问,然后继续去等待
        while (IsFull())
        {
            _pwait_num++; // 若空间满了,就让等待的生产者数量++
            // 若满了,则生产者不能生产,必须在条件变量下等待
            // 临界区里面抱着锁等待是不被允许的!!!所以pthread_cond_wait
            // 被调用的时候:除了指明自己在那个条件变量下等待,还会释放自己抱着的锁
            pthread_cond_wait(&_productor_cond, &_mutex);
            // 返回时,线程被唤醒 && 重新申请并持有锁(它会在临界区内醒来!)
            // 才能继续往下运行
            _pwait_num--; // 生产者被唤醒,数量--
        }
        // while(IsFull())条件不满足 || 线程被唤醒, 退出循环
        _q.push(in);
        // 肯定有数据,唤醒消费者
        if (_cwait_num)
            pthread_cond_signal(&_consumer_cond);
        pthread_mutex_unlock(&_mutex);
    }
    void Take(T *out) // 消费者
    {
        pthread_mutex_lock(&_mutex);
        while (IsEmpty())
        {
            _cwait_num++; // 若数据空了,就让等待的消费者数量++
            pthread_cond_wait(&_consumer_cond, &_mutex);
            _cwait_num--; // 消费者被唤醒,数量--
        }
        *out = _q.front();
        _q.pop();
        // 肯定有空间,唤醒生产者
        if (_pwait_num)
            pthread_cond_signal(&_productor_cond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_productor_cond);
        pthread_cond_destroy(&_consumer_cond);
    }

private:
    std::queue<T> _q;               // 使用普通队列作为保存数据的容器
    int _cap;                       // bq最大容量
    pthread_mutex_t _mutex;         // 互斥
    pthread_cond_t _productor_cond; // 生产者条件变量
    pthread_cond_t _consumer_cond;  // 消费者条件变量

    int _cwait_num;                 // 在条件变量下等待的消费者数量
    int _pwait_num;                 // 在条件变量下等待的生产者数量
};
// Main.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>

void* Consumer(void* args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        sleep(2);

        int data;
        // 1.从bq中拿到数据
        bq->Take(&data);
        // 2.做各种处理
        printf("Consumer, 消费了一个数据: %d\n", data);
    }
}
void* Productor(void* args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);
    // 1.从外部获取数据
    int data = 10;
    while(true)
    {
        // 2.生产到bq中
        printf("Productor, 生产了一个数据: %d\n", data);
        bq->Put(data);
        data++;
    }
}
int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(4);// 共享资源
    // 单生产,单消费
    pthread_t c, p;
    // 最后一个参数传递bq,保证同一个线程看到同一个队列
    pthread_create(&c, nullptr, Consumer, bq);
    pthread_create(&p, nullptr, Productor, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    delete bq;
    return 0;
}

让消费者睡眠2秒,生产者正常运行

让生产者睡眠2秒,消费者正常运行

ps. 上面代码是单生产单消费模式的,若想改为多生产多消费的,直接添加线程就可以。因为在生产者消费者模型的321原则中,21已经有了,3中的其中一种关系(生产和消费)已经有了,只需要补充完剩下的两种就可以。而我们设计的生产者与生产者或者消费者与消费者之间本身就只有一把锁,所以它们天然的就存在互斥关系!

基于环形队列的生产者消费者模型

认识POSIX信号量以及接口

信号量本身就是一个计数器,它用来描述你所申请资源数目的多少

信号量原理:一元信号量(或二元信号量)主要用于实现资源的互斥访问。当一个线程(或进程)获取了信号量(将其值从1减为0),它便获得了对某个共享资源的独占访问权,其他试图获取该信号量的线程将被阻塞,直到信号量被释放(其值从0加回1)。

信号量对公共资源使用时可以整体使用,也可以不整体使用。整体使用就是把整个资源看作一份。当二元信号量的值为1时,表示资源未被占用,线程可以获取信号量并进入临界区;当信号量的值为0时,表示资源已被占用,其他线程必须等待。(如上面的阻塞队列)。

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。但POSIX可以用于线程间同步

信号量的两个重要操作

1. P操作(Wait操作)

P操作(也称为sem_wait)用于等待信号量,其目的是减少信号量的值,并根据信号量的当前值决定是否阻塞当前线程或进程。

2. V操作(Post操作)

V操作(也称为sem_post)用于释放信号量,其目的是增加信号量的值,并可能唤醒等待该信号量的线程或进程。

 sem_init函数
// 初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

参数

  • sem:指向要初始化的信号量对象的指针。
  • pshared:控制信号量的作用域。如果pshared的值非0,则信号量在进程间共享;如果为0,则信号量仅在调用进程内的线程间共享
  • value:信号量的初始值。这个值必须是非负的。

返回值

成功时,sem_init 返回0。失败时,返回-1,并设置errno以指示错误

sem_destroy函数
// 销毁信号量
int sem_destroy(sem_t *sem);

参数

  • sem:指向要销毁的信号量对象的指针。

返回值

  • 成功时,sem_destroy 返回 0。失败时,返回 -1,并设置 errno 以指示错误。
sem_wait函数
// 等待信号量
int sem_wait(sem_t *sem); //P()

参数

  • sem:指向要操作的信号量对象的指针。

返回值

  • 成功时返回0。失败时返回-1,并设置errno以指示错误。

该函数用于信号量的P操作,从信号量的值中减去1,如果信号量的值变为0,则当前线程将被阻塞,直到信号量的值被其他线程通过 sem_post 增加。

sem_post函数
// 发布信号量
int sem_post(sem_t *sem);//V()

参数

  • sem:指向要操作的信号量的指针。

返回值

  • 成功时,sem_post返回0。
  • 失败时,返回-1,并设置errno以指示错误。可能的错误包括EINVAL,表示传入的信号量不是一个有效的信号量。

该函数用于信号量的V操作,将信号量的值增加1;唤醒等待线程:如果有线程因为调用sem_wait函数而在该信号量上等待(即信号量的值为0且线程被阻塞),那么sem_post函数可能会唤醒其中一个等待的线程。具体唤醒哪个线程取决于操作系统的线程调度策略。

环形队列的Demo代码

这里环形队列我们使用数组模拟,用模运算来模拟环状特性。

设计思路:

1. 环形队列的结构

环形队列是一个固定大小的数组,通过两个指针(headtail)来表示队列的头部和尾部。队列的头部用于消费者读取数据,队列的尾部用于生产者写入数据。

2. 生产者和消费者线程

2.1 生产者线程

  • 使用信号量P(sem_wait函数)操作进行空间的预定,若没有空间则进行等待

  • 加锁

  • 写入数据:将数据写入 tail 指向的位置。

  • 更新 tail:将 tail 向前移动一位:tail = (tail + 1) % size

  • 解锁

  • 释放一个数据,即消费者有新数据可用

2.2 消费者线程

  • 使用信号量V(sem_post函数)操作进行数据的预定,若没有数据则进行等待

  • 加锁

  • 读取数据:从 head 指向的位置读取数据。

  • 更新 head:将 head 向前移动一位:head = (head + 1) % size

  • 解锁

  • 释放一个空间,即生产者有空闲位置可用

3. 同步机制

为了确保生产者和消费者之间的同步,需要使用互斥锁(mutex)和信号量(semaphore)来控制对共享资源的访问。

  • 互斥锁:用于保护生产者线程和消费者线程,确保同一时间只有一个线程可以修改队列。

  • 信号量:

    • sem_wait:表示等待信号量,从信号量的值中减去1

    • sem_post:表示发布信号量,从信号量的值中加1, 并唤醒等待的信号量

// Sem.hpp
#pragma once
#include <semaphore.h>

int defaultvalue = 1;
class Sem
{
public:
    Sem(int value = defaultvalue)
    {
        int n = ::sem_init(&_sem, 0, value);
        (void)n;
    }
    void P()
    {
        int n = ::sem_wait(&_sem);
        (void)n;
    }
    void V()
    {
        int n = ::sem_post(&_sem);
        (void)n;
    }
    ~Sem()
    {
        int n = ::sem_destroy(&_sem);
        (void)n;
    }

private:
    sem_t _sem;        // 信号量
    int _init_value;
};
// RingBuffer.hpp
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include "Sem.hpp"

template <typename T>
class RingBuffer
{
public:
    RingBuffer(int size)
        : _ring(size),    // 给vector初始化一个大小
          _size(size),
          _tail(0),       // 生产者的默认位置
          _head(0),       // 消费者的默认位置
          _datasem(0),    // 默认数据为0
          _spacesem(size) // 默认空间为满
    {
        int n = ::pthread_mutex_init(&_p_lock, nullptr);
        (void)n;
        int m = ::pthread_mutex_init(&_c_lock, nullptr);
        (void)m;
    }
    // 生产者
    void Equeue(const T &in)
    {
        _spacesem.P(); // 预定一个空间
        // 上锁
        pthread_mutex_lock(&_p_lock);
        _ring[_tail] = in; // 生产
        _tail++;
        _tail %= _size; // 维持环形特性
        pthread_mutex_unlock(&_p_lock);

        _datasem.V(); // 释放一个数据
    }
    // 消费者
    void Pop(T *out)
    {
        _datasem.P(); // 预定一个数据
        // 上锁
        pthread_mutex_lock(&_c_lock);
        *out = _ring[_head]; // 取出数据
        _head++;
        _head %= _size;
        pthread_mutex_unlock(&_c_lock);

        _spacesem.V(); // 释放一个空间
    }
    ~RingBuffer()
    {
        int n = ::pthread_mutex_destroy(&_p_lock);
        (void)n;
        int m = ::pthread_mutex_destroy(&_c_lock);
        (void)m;
    }

private:
    std::vector<T> _ring;    // 环, 临界资源
    int _size;               // 总容量
    int _tail;               // 生产者位置
    int _head;               // 消费位置

    Sem _datasem;            // 数据信号量
    Sem _spacesem;           // 空间信号量
    pthread_mutex_t _p_lock; // 生产者锁
    pthread_mutex_t _c_lock; // 消费者锁
};
// Main.cc
#include "RingBuffer.hpp"
#include <string>
#include <unistd.h>

// 生产
void *Consumer(void *args)
{
    RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);
    int data = 0;
    while (true)
    {
        // sleep(1);
        ring_buffer->Equeue(data);
    
        std::cout << "生产了一个数据: " << data << std::endl;
        data++;
    }
    return nullptr;
}
// 消费
void *Productor(void *args)
{
    RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);

    while (true)
    {
        sleep(2);
        int data;
        ring_buffer->Pop(&data);
        std::cout << "消费了一个数据: " << data << std::endl;
    }
    return nullptr;
}

int main()
{
    RingBuffer<int> *ring_buffer = new RingBuffer<int>(5); // 共享资源 -> 临界资源
    pthread_t c1, p1, c2, c3, p2;
    pthread_create(&c1, nullptr, Consumer, ring_buffer);
    pthread_create(&c2, nullptr, Consumer, ring_buffer);
    pthread_create(&c3, nullptr, Consumer, ring_buffer);
    pthread_create(&p1, nullptr, Productor, ring_buffer);
    pthread_create(&p2, nullptr, Productor, ring_buffer);

    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);

    delete ring_buffer;
    return 0;
}

相关细节:

1.为了完成多生产和多消费的模型,消费者和生产者的同步和互斥问题我们的环形队列中已经解决。除此之外,我们还需要维护生产者和生产者 &&消费者和消费者之间的互斥关系。由于环形队列的下标也是属于临界资源的,如果不维持关系内部的互斥关系,是一定会破坏环形队列结构的。所以势必要引入生产者互斥锁和消费者互斥锁。

2.先加锁 or 先申请信号量

一定是先申请信号量,在加锁。先申请信号量,会让所有线程先瓜分好信号量,在其它线程进行等待锁的时候,此时资源已经申请好了,可以直接进入临界区;如果先加锁,在申请信号量,所有线程都会在锁上等待,等待完成之后在去申请信号量,这样解决问题的实际效率会变低。

3.使用信号量实现生产者消费者模型,对资源进行使用,申请时,为什么不判断一下条件是否满足?

信号量本身就是判断条件! 信号量:是一个计数器,是资源的预订机制。预订:可以理解为在外部,可以不判断资源是否满足,就可以知道内部资源的情况! 


http://www.kler.cn/a/527569.html

相关文章:

  • 【计算机网络】设备更换地区后无法访问云服务器问题
  • 缩位求和——蓝桥杯
  • Java内存模型 volatile 线程安全
  • 主流的AEB标准有哪些?
  • [STM32 - 野火] - - - 固件库学习笔记 - - -十二.基本定时器
  • 19.Word:小马-校园科技文化节❗【36】
  • 适合超多氛围灯节点应用的新选择
  • springboot 2.7.6 security mysql redis jwt配置例子
  • 【股票数据API接口36】如何获取股票当天逐笔大单交易数据之Python、Java等多种主流语言实例代码演示通过股票数据接口获取数据
  • 仿真设计|基于51单片机的温室环境监测调节系统
  • C++实现状态模式
  • 如何选择Spring AOP的动态代理?JDK与CGLIB的适用场景
  • python 语音识别
  • 如何在 Kafka 中实现自定义分区器
  • 27.Word:财务软件应用的书稿【10】
  • 数据结构与算法之二叉树: LeetCode LCP 10. 二叉树任务调度 (Ts版)
  • 记忆化搜索(5题)
  • 因果推断与机器学习—用机器学习解决因果推断问题
  • 为AI聊天工具添加一个知识系统 之80 详细设计之21 符号逻辑 之1
  • Contrastive Imitation Learning
  • 基于SpringCloud的广告系统设计与实现(四)
  • vue3项目中编写less
  • 华为Ascend产品
  • STM32CubeMX6.13.0打开后不显示界面,但是任务管理器显示该程序正在运行
  • 深入理解Flexbox:弹性盒子布局详解
  • OpenSource - 通过 system-design-101 掌握架构设计