当前位置: 首页 > article >正文

【Linux】互斥锁、基于阻塞队列、环形队列的生产消费模型、单例线程池

头像
⭐️个人主页:@小羊
⭐️所属专栏:Linux
很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~

动图描述

目录

  • 1、互斥锁
  • 2、生产消费模型
    • 2.1 阻塞队列
    • 2.2 环形队列
  • 3、单例线程池
  • 4、线程安全和重入问题


1、互斥锁

  • 临界资源:多线程执行流共享的资源就叫做临界资源

  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区

  • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用

  • 原子性:不会被任何调度机制打断的操作,该操作只有两种状态,要么完成,要么未完成,没有中间态

大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。而多个线程并发的操作共享变量,会带来一些问题。

在这里插入图片描述

内存中的数据是共享的,但是当线程把数据从内存读到CPU中的寄存器中,变成线程的上下文数据,就变成了私有的,而ticketnum--后需要把数据再次写入到内存中,如果线程在写入内存前被切换,多个线程都执行这一操作就可能会出现ticketnum减为负数的情况。

避免类似上述问题,需要解决三个问题:

  1. 代码必须要有互斥行为,当代码进入临界区时,不允许其他线程进入该临界区。
  2. 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
  3. 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。

| 线程或进程什么时候被切换?

  1. 时间片耗尽时
  2. 有更高优先级的进程要调度时
  3. 通过sleep,从内核返回用户时,会进行时间片是否到达的检测,进而导致切换

在这里插入图片描述

如果锁对象是全局的或静态的,可以用宏:PTHREAD_MUTEX_INITIALIZER初始化,并且不用我们主动destroy;如果锁对象是局部的,需要用pthread_mutex_init初始化,用pthread_mutex_destroy释放。

  1. 所有对资源的保护,都是对临界区代码的访问,因为资源都是通过代码访问的。
  2. 要保证加锁的细粒度。
  3. 加锁就是找到临界区,对临界区进行加锁。

那么相应的又有一些问题:

  • 锁也是全局的共享资源,谁保证锁的安全?加锁和解锁被设计为原子的。
  • 如果看待锁?加锁本质就是对资源的预定工作,整体使用资源,所以加锁前先要申请锁。
  • 如果申请锁的时候,锁已经被别的线程拿走了怎么办?其他线程阻塞等待。
  • 线程在访问临界区的时候,可不可以被切换?可以,我被切走,其他线程也不能进来,因为我走的时候是带着锁走的,保证了原子性。

lock是原子的,其他线程无法进入,锁是如何实现的?
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据交换(私有和共享),由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。

#pragma once

#include <iostream>
#include <pthread.h>

namespace MutexModule
{
    class Mutex
    {
    public:
        Mutex(const Mutex&) = delete;
        const Mutex& operator=(const Mutex&) = delete;
        Mutex()
        {
            int n = pthread_mutex_init(&_lock, nullptr);
        }
        void Lock()
        {
            int n = pthread_mutex_lock(&_lock);
        }
        void Unlock()
        {
            int n = pthread_mutex_unlock(&_lock);
        }
        pthread_mutex_t* LockPtr()
        {
            return &_lock;
        }
        ~Mutex()
        {
            int n = pthread_mutex_destroy(&_lock);
        }
    private:
        pthread_mutex_t _lock;
    };

    class LockGuard
    {
    public:
        LockGuard(Mutex& mutex)
            :_mutex(mutex)
        {
            _mutex.Lock();
        }
        ~LockGuard()
        {
            _mutex.Unlock();
        }
    private:
        Mutex& _mutex;
    };
}

2、生产消费模型

2.1 阻塞队列

当一个线程互斥地访问某个变量时,它可能发现在其他线程改变状态之前,它什么都做不了。例如一个线程访问队列时,发现队列为空它只能等待,直到其他线程将一个节点添加到队列中,这种情况就需要条件变量。

生产者消费者模型:

  • 生产者和生产者:互斥
  • 消费者和消费者:互斥
  • 生产者和消费者:互斥 && 同步

总结:3种关系2种角色1个交易区。

在这里插入图片描述

pthread_cond_wait是一个函数,只要是函数就可能会出错,如果这个函数调用出错继续往下执行代码,但是当前阻塞队列中已经满了,再向其中放数据就出错了。还有如果阻塞等待的线程和其他线程所发的信号数量不匹配,也就是出现了伪唤醒,也会出现问题。
为了规避这种问题,这里判断阻塞队列是否为满我们就不同if判断了,而是改用while判断,只要阻塞队列为空,就一直阻塞等待,这样做可以规避很多可能出现的错误。

》:上面实现的生产者消费者模型只是单生产单消费,只保证了生产者和消费者之间的互斥关系,如果增加线程让其变成多生产多消费,该如何修改?
事实上我们只需要增加对应的线程即可,因为我们的临界区只用了一把互斥锁保护,生产者和生产者之间,消费者和消费者之间也可以保证互斥的关系。

》:这里我们实现的生产者消费者模型在访问临界区资源时是互斥的,不难发现它运行的效率不是很高,那我们该怎么保证生产者消费者模型的效率问题呢?
事实上我们看待生产者消费者模型不能只聚焦于临界区这小点来思考,为什么说是一小点呢?生产者要生产数据(实际上是往临界区中放数据),也是需要从外部获取“原料”的,消费者消费数据(实际上是从临界区中取数据),拿到后也需要处理数据,不管是生产者从别的地方获取“原料”还是消费者处理数据,都是需要时间的。也就是说在生产者从外部获取资源的时候,消费者可以随意访问临界区,在消费者处理数据的时候生产者也可以随意访问临界区,这个时候生产者和消费者是并行的,也就是说只有生产者访问临界区相对于消费者访问临界区是串行的,这是生产者消费者模型高效率的关键。

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>
#include "Mutex.hpp"
#include "Cond.hpp"

namespace BlockQueueModule
{
    // version 2
    using namespace CondModule;
    using namespace MutexModule;

    template<class T>
    class BlockQueue
    {
    private:
        bool IsFull() {return _cap == _q.size();}
        bool IsEmpty() {return _q.empty();}
        
    public:
        BlockQueue(int cap = 10)
            :_cap(cap), _cwait(0), _pwait(0)
        {}

        void Equeue(const T &data)   //生产者
        {
            LockGuard lockguard(_mutex);//RAII
            while (IsFull())
            {
                std::cout << "生产者进入等待..." << std::endl;
                _pwait++;
                //如果阻塞队列为满,则等待消费者发信号
                //wait首先解锁,让其他线程有访问临界区的机会
                _producter_signal.Wait(_mutex);
                _pwait--;
                std::cout << "生产者被唤醒..." << std::endl;
                //wait被唤醒后重新申请锁,访问临界区
            }

            //走到这里,说明生产者收到了消费者发的信号,临界区一定有空位置,可以生产了
            _q.push(data);

            //生产者放完数据,一定有数据,给消费者发信号可以消费了
            if (_cwait)
            {
                _consumer_signal.Notify();
            }
        }

        void Pop(T *data)         //消费者
        {
            LockGuard lockguard(_mutex);//RAII
            while (IsEmpty())
            {
                std::cout << "消费者进入等待..." << std::endl;
                _cwait++;
                //如果阻塞队列为空,则等待生产者发信号
                _consumer_signal.Wait(_mutex);
                _cwait--;
                std::cout << "消费者被唤醒..." << std::endl;
            }

            //走到这里,说明消费者收到了生产者发的信号,临界区一定有数据,可以消费了
            *data = _q.front();
            _q.pop();

            //消费者取完数据,一定有空位置,给生产者发信号可以生成了
            if (_pwait)
            {
                _producter_signal.Notify();
            }
        }

        ~BlockQueue()
        {}
    private:
        std::queue<T> _q;                  //临界资源
        int _cap;                          //默认临界区大小
        Mutex _mutex;                      //互斥锁
        Cond _producter_signal;            //生产者条件变量
        Cond _consumer_signal;             //消费者条件变量

        int _cwait;     //有多少个消费者在阻塞等待
        int _pwait;     //有多少个生产者在阻塞等待
    };

    // version 1
    // static const int gcap = 10; //默认临界区大小

    // template<class T>
    // class BlockQueue
    // {
    // private:
    //     bool IsFull()
    //     {
    //         return _cap == _q.size();
    //     }
        
    //     bool IsEmpty()
    //     {
    //         return _q.empty();
    //     }
    // public:
    //     BlockQueue(int cap = gcap)
    //         :_cap(cap), _cwait(0), _pwait(0)
    //     {
    //         pthread_mutex_init(&_mutex, nullptr);
    //         pthread_cond_init(&_producter_signal, nullptr);
    //         pthread_cond_init(&_consumer_signal, nullptr);
    //     }

    //     void Equeue(const T &data)   //生产者
    //     {
    //         pthread_mutex_lock(&_mutex);
    //         while (IsFull())
    //         {
    //             std::cout << "生产者进入等待..." << std::endl;
    //             _pwait++;
    //             //如果阻塞队列为满,则等待消费者发信号
    //             //wait首先解锁,让其他线程有访问临界区的机会
    //             pthread_cond_wait(&_producter_signal, &_mutex);
    //             _pwait--;
    //             std::cout << "生产者被唤醒..." << std::endl;
    //             //wait被唤醒后重新申请锁,访问临界区
    //         }

    //         //走到这里,说明生产者收到了消费者发的信号,临界区一定有空位置,可以生产了
    //         _q.push(data);

    //         //生产者放完数据,一定有数据,给消费者发信号可以消费了
    //         if (_cwait)
    //         {
    //             pthread_cond_signal(&_consumer_signal);
    //         }
    //         pthread_mutex_unlock(&_mutex);
    //     }

    //     void Pop(T *data)         //消费者
    //     {
    //         pthread_mutex_lock(&_mutex);
    //         while (IsEmpty())
    //         {
    //             std::cout << "消费者进入等待..." << std::endl;
    //             _cwait++;
    //             //如果阻塞队列为空,则等待生产者发信号
    //             pthread_cond_wait(&_consumer_signal, &_mutex);
    //             _cwait--;
    //             std::cout << "消费者被唤醒..." << std::endl;
    //         }

    //         //走到这里,说明消费者收到了生产者发的信号,临界区一定有数据,可以消费了
    //         *data = _q.front();
    //         _q.pop();

    //         //消费者取完数据,一定有空位置,给生产者发信号可以生成了
    //         if (_pwait)
    //         {
    //             pthread_cond_signal(&_producter_signal);
    //         }
    //         pthread_mutex_unlock(&_mutex);
    //     }

    //     ~BlockQueue()
    //     {
    //         pthread_mutex_destroy(&_mutex);
    //         pthread_cond_destroy(&_producter_signal);
    //         pthread_cond_destroy(&_consumer_signal);
    //     }
    // private:
    //     std::queue<T> _q;                  //临界资源
    //     int _cap;                          //最大容量
    //     pthread_mutex_t _mutex;            //互斥锁
    //     pthread_cond_t _producter_signal;  //生产者条件变量
    //     pthread_cond_t _consumer_signal;   //消费者条件变量

    //     int _cwait;     //有多少个消费者在阻塞等待
    //     int _pwait;     //有多少个生产者在阻塞等待
    // };
}

2.2 环形队列

对于环形队列,重要的一点是为空或为满,指针指向的是同一个位置。如果为空,保证生产者先原子性的生产,如果为满,保证消费者原子性的先消费。这里体现出互斥是通过信号量来实现的。
资源用信号量表示,任何人访问临界资源之前都必须先申请信号量,信号量表示资源的数目。资源分为空间资源和数据资源,对于生产者来说他关注的是空间资源,对于消费者来说他关注的是数据资源。

》:前面的阻塞队列中不管是生产还是消费前都要先判断,为什么环形队列这里没有判断呢?
因为这里信号量本身就是表示资源数目,只要成功,就一定有,不需要判断。

上面已经基本实现了生产者和消费者之间的同步和互斥关系,那么多生产多消费中生产者和生产者,消费者和消费者之间的互斥关系如何保证呢?

事实上上面生产者和消费者之间的同步和互斥关系是通过信号量来保证的,也就是说单生产和单消费这里不需要互斥锁,在这里互斥锁我们只需要用来处理生产者和生产者,消费者和消费者之间的互斥关系就行。而我们知道环形队列中读写位置各自只有一个,所以多线程之间的生产和消费最后还是单生产单消费问题,所以我们只需要用两把锁,一把锁守护生产权利,一把锁守护消费权利,让多线程先竞争这把锁,然后生产或消费。

在这里插入图片描述

》:如上,我们应该在哪个位置加锁更好一点呢?
首先不管在哪个位置加锁,我们都能保证生产者之间,消费者之间都是互斥的关系。其次,多个线程申请锁不管最后谁是赢家,可以肯定的是赢家只有一个,也就是说如果先让多线程申请锁,然后再有这个赢家申请信号量;而如果先让多个线程申请信号量,信号量可以是多个,所以最后赢家可能有多个,然后再让这些个线程去竞争锁,在这些线程竞争锁的同时其他没申请到信号量的线程如果有信号量了也可以同时申请信号量,也就是可以达到并行。
这就好比看电影,如果在前面加锁就像是我们先排队,然后在买票,一次只能进去一个同学;如果在后面加锁就像是我们先买票,然后在排队进入,在买到票的同学排队进入放映厅的过程中后面来的同学都可以先去买票,很明显第二种效率更高。

#pragma once

#include <iostream>
#include <vector>
#include <unistd.h>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Sem.hpp"

namespace RingQueueModule
{
    using namespace MutexModule;
    using namespace CondModule;
    using namespace SemModule;

    template<class T>
    class RingQueue
    {
    public:
        RingQueue(int cap)
            :_ring(cap), _cap(cap), _p_step(0)
            ,_c_step(0), _spacesem(cap), _datasem(0)
        {}

        void Equeue(const T& data)
        {
            // 1.LockGuard lockguard(_p_lock);
            //先申请信号量,在竞争锁,效率更高
            _spacesem.P();
            LockGuard lockguard(_p_lock);
            _ring[_p_step++] = data;
            _p_step %= _cap;
            _datasem.V();
        }

        void Pop(T *out)
        {
            _datasem.P();
            LockGuard lockguard(_c_lock);
            *out = _ring[_c_step++];
            _c_step %= _cap;
            _spacesem.V();  //释放资源,信号量+1
        }

        ~RingQueue()
        {}

    private:
        std::vector<T> _ring;    //临界资源
        int _cap;                //临界空间大小
        int _p_step;             //生产者位置
        int _c_step;             //消费者位置
        Sem _spacesem;           //空间信号量
        Sem _datasem;            //数据信号量

        Mutex _p_lock;           
        Mutex _c_lock;
    };
}

3、单例线程池

在这里插入图片描述

#pragma once

#include <iostream>
#include <vector>
#include <queue>
#include <memory>
#include "Thread.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Log.hpp"

// 懒汉模式线程池
namespace ThreadPoolModule
{
    using namespace ThreadModule;
    using namespace LogModule;
    using namespace CondModule;
    using namespace LogModule;

    using thread_t = std::shared_ptr<Thread>;
    const static int threadnum = 5;

    template <class T>
    class ThreadPool
    {
    private:
        bool IsEmpty()
        {
            return _taskq.empty();
        }
        // 处理任务
        void HandlerTask(std::string name)
        {
            LOG(LogLevel::DEBUG) << "线程: " << name << ", 进入HandlerTask...";
            while (true)
            {
                T t;
                {
                    LockGuard lockguard(_lock);     // 在任务队列中拿任务需要加锁保护
                    while (IsEmpty() && _isrunning) // 只有任务队列为空 && 线程池在运行,线程才需要等待休眠
                    {
                        _wait_num++;
                        _cond.Wait(_lock);
                        _wait_num--;
                    }
                    if (IsEmpty() && !_isrunning) // 只有任务队列为空 && 线程池退出,所有的线程才不需要等待
                        break;
                    t = _taskq.front();
                    _taskq.pop();
                }
                t(name); // 线程有独立栈,处理任务不需要被保护
            }
            LOG(LogLevel::INFO) << "线程:" << name << ",退出...";
        }

        ThreadPool(int num = threadnum)
            : _num(num), _wait_num(0), _isrunning(false)
        {
            for (int i = 0; i < _num; i++)
            {
                // 非静态成员函数取函数指针需要加&
                _threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
                LOG(LogLevel::INFO) << "创建线程:" << _threads.back()->Name();
            }
        }

    public:

		// 不支持拷贝、赋值
        ThreadPool(const ThreadPool<T>&) = delete;
        const ThreadPool<T>& operator=(const ThreadPool<T>&) = delete;

        // 1.将构造函数私有化,让其不能在类外实例化对象,只能在类里面创建一个对象
        // 2.创建单例对象的函数设为静态函数,在类外通过指定类域的方式访问这个静态成员函数,获取单例对象
        // 3.因为构造函数被私有,类外不能创建对象,非静态成员函数只能通过对象访问,不能通过指定类域的方式访问
        static ThreadPool<T> *GetInstance()
        {
            if (_instance == nullptr)
            {
                LockGuard lockguard(_mtx);
                if (_instance == nullptr)
                {
                    LOG(LogLevel::INFO) << "单例首次被执行,需要加载对象...";
                    _instance = new ThreadPool<T>();
                }
            }
            return _instance;
        }

        void Equeue(const T &in)
        {
            LockGuard lockguard(_lock);
            if (_isrunning)
            {
                _taskq.push(in);
                if (_wait_num > 0)
                {
                    _cond.Notify();
                }
            }
        }

        void Start()
        {
            LockGuard lockguard(_lock);
            if (_isrunning)
                return;
            _isrunning = true;
            for (auto &threadptr : _threads)
            {
                threadptr->Start();
                LOG(LogLevel::INFO) << "启动线程:" << threadptr->Name();
            }
        }

        void Wait()
        {
            for (auto &threadptr : _threads)
            {
                threadptr->Join();
                LOG(LogLevel::INFO) << "回收线程:" << threadptr->Name();
            }
        }

        void Stop()
        {
            LockGuard lockguard(_lock);
            if (_isrunning)
            {
                // 退出线程池必须保证
                // 1.不能再进任务
                _isrunning = false;
                // 2.线程池内的任务必须全部处理完
                // 3.让线程自己退出(被唤醒)
                if (_wait_num > 0)
                {
                    _cond.NotifyAll(); // 将等待中的线程全部唤醒,如果有任务处理完剩余任务,线程正常回收
                }
            }
        }

        ~ThreadPool()
        {}

    private:
        std::vector<thread_t> _threads; // 管理线程
        std::queue<T> _taskq;           // 管理任务
        int _num;                       // 线程个数
        int _wait_num;                  // 正在等待任务的线程个数
        Mutex _lock;
        Cond _cond;      // 等待任务
        bool _isrunning; // 线程池状态

        static ThreadPool<T>* _instance; // 单例对象
        static Mutex _mtx;
    };

    template<class T>
    ThreadPool<T>* ThreadPool<T>::_instance = nullptr;

    template<class T>
    Mutex ThreadPool<T>::_mtx;
}


4、线程安全和重入问题

  • 线程安全:就是多个线程在访问共享资源时,能够正确地执行,不会相互干扰或破坏彼此的执行结果。一般而言,多个线程并发同一段只有局部变量的代码时,不会出现不同的结果。但是对全局变量或者静态变量进行操作,并且没有锁保护的情况下,容易出现该问题。
  • 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

重入可以分为两种情况:

  • 多线程重入函数
  • 信号导致一个执行流重复进入函数

| 可重入和线程安全的联系:

  • 函数是可重入的,那就是线程安全的。
  • 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题。
  • 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。

| 可重入与线程安全区别:

  • 可重入函数是线程安全函数的一种。
  • 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
  • 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数有锁还未释放则会产生死锁,因此是不可重入的。

本篇文章的分享就到这里了,如果您觉得在本文有所收获,还请留下您的三连支持哦~

头像

http://www.kler.cn/a/522297.html

相关文章:

  • CTF从入门到精通
  • CAPL编程常见问题与解决方案深度解析
  • typescript 简介
  • [MySQL]事务的理论、属性与常见操作
  • 08.OSPF 特殊区域及其他特性
  • java小白日记32(注解)
  • “基因合作:生命演化中的共生与目的性”
  • 【2024年华为OD机试】 (A卷,200分)- 开放日活动、取出尽量少的球(JavaScriptJava PythonC/C++)
  • 6. 使用springboot做一个音乐播放器软件项目【1.0版项目完结】附带源码~
  • Android SystemUI——最近任务列表启动(十八)
  • FPGA 26,数码管动态显示,解析与实现( 使用 Xilinx Vivado 实现数码管动态显示 )
  • 计算机网络之计算机网络基本概念
  • 【Leetcode 每日一题】45. 跳跃游戏 II
  • Linux 命令之技巧(Tips for Linux Commands)
  • QT 笔记
  • 深入探讨防抖函数中的 this 上下文
  • 论文笔记(六十五)Schmidt-EKF-based Visual-Inertial Moving Object Tracking
  • LeetCode-175. 组合两个表
  • H2 Database安装部署
  • VMware 中Ubuntu无网络连接/无网络标识解决方法【已解决】
  • PHP Error处理与优化指南
  • volatile之四类内存屏障指令 内存屏障 面试重点 底层源码
  • 多模态论文笔记——TECO
  • 已解决:Win10任务状态栏卡死点击无响应的解决方案
  • 【SAP-PP】生产订单和计划订单
  • DeepSeek-R1试用