当前位置: 首页 > article >正文

linux线程 | 同步与互斥 | 线程池以及知识点补充

创作活动​​​​​​​

​​​​​​​

        前言:本节内容是linux的线程的相关知识。本篇首先会实现一个简易的线程池, 然后再将线程池利用单例的懒汉模式改编一下。 然后再谈一些小的知识点,比如自旋锁, 读者写者问题等等。 那么, 现在开始我们的学习吧。

        ps:本节内容设计到了单例模式, 建议了解单例设计模式以及多线程, 生产消费者模型的友友们进行观看哦。

目录

 线程池

 什么是线程池

线程池的应用场景

代码实现 

准备文件

makefile

Task.h

ThreadPool.h

main.cpp

运行结果

单例模式

常见的锁

自旋锁

自旋锁接口

读者写者问题 

概念

 接口

理解


 线程池

 什么是线程池

        线程池就是一种线程的使用模式。 线程的过多会带来调度的开销, 进而影响缓存局部性和整体的性能。 而线程池维护者多个线程, 等待着监督管理者分配可并发执行的任务。 这避免了在处理短时间任务时创建与销毁线程的代价。 线程池不仅能够保证内核的充分利用, 还能防止过度调用。 

线程池的应用场景

  •         需要大量的线程来完成任务,且完成任务的时间比较短。 单个任务小, 任务数量大更加适合!对于任务很大(时间很长)的场景, 线程池的有点就不明显了。 因为如果一个任务很大的话, 那这个任务比创建线程的时间长的多, 就不明显。 
  •         对于性能要求比较苛刻, 比如要求服务器迅速相应客户请求。 
  •         接受突发性的大量请求, 但不至于使服务器因此产生大量线程的引用。 突发性大量客户请求, 在没有线程池情况下, 将产生大量线程, 虽然理论上大部分操作系统县城数据最大值不是问题, 短时间内产生大量线程可能使内存到达极限, 出现错误。 

代码实现 

准备文件

makefile

main.exe:main.cpp
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -rf main.exe

Task.h

#include <iostream>
using namespace std;
#include <vector>
#include <string>

//加减乘除
string opers = "+-*/%";



//Task.h文件里面包含了任务类, 这个是我们线程池要执行的任务
class Task
{
public:
    //构造函数, 第一个参数data1, 第二个参数data2, 第三个参数是加减乘除的符号。 这个任务就是进行四则运算
    Task(int x, int y, char op)
        : data1_(x), data2_(y), op_(op)
    {}

    ~Task() {}

    //执行任务的接口run(), 这个方法对三个变量进行判断, 然后进行运算。 
    void run()
    {
        switch (op_)
        {
        case '+':
            cout << data1_ << "+" << data2_ << "=" << data1_ + data2_ << endl;
            break;
        case '-':
            cout << data1_ << "-" << data2_ << "=" << data1_ - data2_ << endl;
            break;
        case '*':
            cout << data1_ << "*" << data2_ << "=" << data1_ * data2_ << endl;
            break;
        case '/':
            if (data2_ == 0)
            {
                cout << "error, " << data1_ << '/' << data2_ << " is error!" << endl;
            }
            else
            {
                cout << data1_ << "/" << data2_ << "=" << data1_ / data2_ << endl;
            }
            break;
        case '%':
            if (data2_ == 0)
            {
                cout << "error, " << data1_ << '%' << data2_ << " is error!" << endl;
            }
            else
            {
                cout << data1_ << "%" << data2_ << "=" << data1_ % data2_ << endl;
            }
            break;
        default:
            cout << "default" << endl;
            break;
        }
    }

    //仿函数, 为了方便我们的对象能够像函数一样使用。 
    void operator()()
    {
        run();
    }

private:
    //每一个任务对象里面都有三个参数, 一个data1, 一个data2, 最后一个op_
    int data1_;
    int data2_;

    char op_;
};

ThreadPool.h


#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<string>
using namespace std;
#include<queue>
#include<ctime>
#include<unistd.h>

//对线程的属性做一下封装, 有利于线程池的保存以及后面的处理
struct ThreadInfo
{
    pthread_t tid_;
    string name_;   
};


template<class T>
class ThreadPool
{
    static const int defaultnum = 5;  //默认的线程池的大小(线程池的大小就是里面包含的线程的数量)

private:
    //加锁解锁
    void Lock()
    {
        pthread_mutex_lock(&mutex_);
    }

    void Unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }

    //唤醒线程, 线程是可以被挂起的(就比如信号量)。 当任务没有的时候,线程就要被挂起, 有任务后再唤醒
    void Wakeup()
    {
        pthread_cond_signal(&cond_);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&cond_, &mutex_);
    }

public:

    //线程要执行的函数
    static void* HandlerTask(void* args)
    {
        ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
        while(true) 
        {
            tp->Lock();
            while (tp->tasks_.empty())
            {
                tp->ThreadSleep(); //如果队列里面没有任务了, 就让线程去休眠。
            }
            //否则就去拿到tasks里面的任务
            T t = tp->tasks_.front();
            tp->tasks_.pop();
            
            //
            tp->Unlock();
            t();  //每一个线程先对任务进行消费, 消费完成之后处理任务。    

        }
    }

    //运行这个线程池, 也就是先将线程创建出来。 然后去运行线程
    void Start()
    {
        int num = threads_.size();
        for (int i = 0; i < num; i++)
        {
            threads_[i].name_ = "thread-";
            threads_[i].name_ += to_string(i);   
            pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);
        }
    }

    //主线程给线程池发送任务, 注意, 这个任务一定是可以被储存起来的。 因为当我们的任务很多很多的时候, 我们的线程池内的线程要一个一个地对这些任务进行处理
    void Push(const T& t)
    {
        Lock();
        tasks_.push(t);
        Wakeup();

        Unlock();
    }

    //线程池地初始化, 就是将锁和条件变量都初始化一下
    ThreadPool(int num = defaultnum)
        :threads_(num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }

    //析构
    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }

private:
    vector<ThreadInfo> threads_;   //线程都维护在vector当中, 这个就是线程池里面的线程的个数,
    queue<T> tasks_ ;              //向线程池中发送任务, 这个队列里面保存的就是我们的任务的数目。 

    pthread_mutex_t mutex_;        //锁,用来生产者线程(本份代码只是主线程)给线程池发送任务时候加锁使用以及消费者线程抢夺任务时加锁使用 
    pthread_cond_t cond_;          //条件变量, 用来没有任务的时候,消费者要挂起。 

};

main.cpp

#include"ThreadPool.h"
#include"Task.h"

int main()
{
    //运行线程池
    ThreadPool<Task>* tp = new ThreadPool<Task>();
    tp->Start();    

    srand(time(nullptr) ^ getpid());
    while (true)
    {
        //构建任务  
        int x = rand() % 10 + 1;
        usleep(10);
        int y = rand() % 5;
        char op = opers[rand()% opers.size()];
        Task t(x, y, op);
        
        //交给线程池处理
        //主线程给线程池发送任务, 其实就相当于主线程时生产者。 
        tp->Push(t);



        sleep(1);
    }
    return 0;
}

运行结果

单例模式

        利用单例模式来创建线程池

主要改动就是ThreadPool.h


#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<string>
using namespace std;
#include<queue>
#include<ctime>
#include<unistd.h>

//对线程的属性做一下封装, 有利于线程池的保存以及后面的处理
struct ThreadInfo
{
    pthread_t tid_;
    string name_;   
};


template<class T>
class ThreadPool
{
    static const int defaultnum = 5;  //默认的线程池的大小(线程池的大小就是里面包含的线程的数量)

private:
    //加锁解锁
    void Lock()
    {
        pthread_mutex_lock(&mutex_);
    }

    void Unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }

    //唤醒线程, 线程是可以被挂起的(就比如信号量)。 当任务没有的时候,线程就要被挂起, 有任务后再唤醒
    void Wakeup()
    {
        pthread_cond_signal(&cond_);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&cond_, &mutex_);
    }



public:

    //线程要执行的函数
    static void* HandlerTask(void* args)
    {
        ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
        while(true) 
        {
            tp->Lock();
            while (tp->tasks_.empty())
            {
                tp->ThreadSleep(); //如果队列里面没有任务了, 就让线程去休眠。
            }
            //否则就去拿到tasks里面的任务
            T t = tp->tasks_.front();
            tp->tasks_.pop();
            
            //
            tp->Unlock();
            t();  //每一个线程先对任务进行消费, 消费完成之后处理任务。    

        }
    }

    //运行这个线程池, 也就是先将线程创建出来。 然后去运行线程
    void Start()
    {
        int num = threads_.size();
        for (int i = 0; i < num; i++)
        {
            threads_[i].name_ = "thread-";
            threads_[i].name_ += to_string(i);   
            pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);
        }
    }

    //主线程给线程池发送任务, 注意, 这个任务一定是可以被储存起来的。 因为当我们的任务很多很多的时候, 我们的线程池内的线程要一个一个地对这些任务进行处理
    void Push(const T& t)
    {
        Lock();
        tasks_.push(t);
        Wakeup();

        Unlock();
    }

    //线程池地初始化, 就是将锁和条件变量都初始化一下
    ThreadPool(int num = defaultnum)
        :threads_(num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }

    //析构
    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }

    //获取单例
    //改编成单例的步骤里面只有这里要说一下, 就是为什么我们要套双层判断。 其实这里的最外面的一层的判断是我们另外加上去的。 为什么
    //要加这个判断呢? 就是如果我们不加最外层这一层判断。 那么每一个线程获取单例都要申请所,加锁。 不就是相当于所有的线程都在串行执行? 这就有效率问题。 
    //解决方案就是这个再加一层判断。 这样假如有四个线程。 那么一开始四个线程都在判断, 那么它们四个线程都进入了if里面。 然后就都申请锁, 但是只有第一个线程能够
    //进入第二层里面, 其他的进入不了。 那么当这一轮的四个线程都申请一次锁候就都退出了函数, 然后就都去做自己的事情了。 问题是, 当下次它们再来申请单例对象的时候它们连
    //第一层判断都成功不了了, 也就都不用加锁解锁了, 这就大大提高了效率!!!
    static ThreadPool<T>* GetInstance(int num = defaultnum)
    {
        if (tp_ == nullptr)
        {
            pthread_mutex__lock(&lock_);
            
            if (tp_ == nullptr) 
            {
                tp_ = new ThreadPool<T>(num);
            }

            pthread_mutex_unlock(&lock_);
        }
        
        return tp_;
    }



private:
    //构造函数私有化, 只有Getinstance里面才能创建。 
    ThreadPool(int num = defaultnum)
        :threads_(num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }

    //单例模式只有一个对象, 所以要将拷贝构造和拷贝赋值封住, 为了防止有人在外部重新拷贝一个对象。 
    ThreadPool(const ThreadPool<T>& tp) = delete;
    const ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete;

    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }


private:
    vector<ThreadInfo> threads_;   //线程都维护在vector当中, 这个就是线程池里面的线程的个数,
    queue<T> tasks_ ;              //向线程池中发送任务, 这个队列里面保存的就是我们的任务的数目。 

    pthread_mutex_t mutex_;        //锁,用来生产者线程(本份代码只是主线程)给线程池发送任务时候加锁使用以及消费者线程抢夺任务时加锁使用 

    pthread_cond_t cond_;          //条件变量, 用来没有任务的时候,消费者要挂起。 

    

    pthread_mutex_t lock_;         //锁, 这个锁是为了在获取单例的时候能够让线程原子性的访问if (tp_ == nullptr)。
    static ThreadPool<T>* tp_;    //tp指针, 这就是唯一个单例对象。 

};

template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;    

 运行结果和上次一样:

常见的锁

  •         悲观锁:我们平时写的锁都是悲观锁。 就是总是担心其他线程来修改数据, 所以在线程访问数据之前都要先加锁。 就比如互斥锁和信号量。
  •         乐观锁:猜测线程在访问资源的时候不会有其他的线程,因此不上锁。但是在更新数据之前会判断其他线程在更新前有没有对数据进行修改。 
  •         CAS:当需要更新时,判断当前内存值和之前取得的值是否相等。 如果相等则用新值更新, 若不相等则失效。
  •         自旋锁:这个要着重谈一下, 后面很长一段论述都是在谈他。

自旋锁

在讲自旋锁之前, 博主要将在讲解进程等待的时候的故事再重新讲一下:       

        就是假设小王今天吃饭的时候想起来明天要考试了。 然后小王就很慌, 很慌怎么办呢。 所以小王就想到了他的好朋友小明。 小明是一个努力型学霸, 然后小王就找到他让他带他复习明天的考试, 小王请吃饭。 所以呢, 小明就很痛快地答应了。 但是小明要先读一个小时的书, 小明一听可以啊。 之后小明就去楼上读书了, 小王呢, 要知道我们现在是非常强调效率的!所以小王也不在原地等, 他说他去学校门口的网吧上会网, 然后他就去上网了。 小明读完书之后就给小王打了一个电话, 小王接到电话后立马又回来接着小明去吃饭了。——这是故事一

        在小明的帮助下, 小王疯狂的复习了一天一夜。 最后小王考试的时候成功考了60分, 过了。 几天之后小王又从别人的口中得知两天后又要考网络, 然后小王就又去请教小明。 小明就说, 好的, 马上我就从楼上下来, 我俩一起去自习室。 请问, 小王这个时候还回去网吧上网吗? 不会的!小王不去网吧了! 但是呢,过了十秒钟小王就给小明打了电话问下来了没有, 小明说马上。 又过了十秒钟, 小王又打电话问下来了没有, 小明说马上。 又过了十秒钟......, 过了很多个十秒钟之后, 小明终于下来了。 于是两个人愉快的又去吃饭了。 ——这是故事二。

        在上面的故事当中, 我们把小明当成临界区代码, 那么小王在楼下等小明和去网吧等小明取决于什么? 是不是就是取决于小明的时常? 我们前面一直都在研究临界区的线程安全问题。 但是我们几乎没有研究过线程在临界区待得时间的长短的问题。 等待什么呢? 等待刚刚进去的线程什么时候出来。 那么, 我们如果一个线程在临界区内待得时间非常长。 那么其他现车给在申请锁之后最好的做法是不是就是挂起? 如果一个线程在临界区待得时间非常短, 我们此时可以选择让其他线程在此时不要选择挂起, 而是处于一种自旋状态。 而我们的小王每十秒就打个电话的过程其实就是自选的过程。 所以我么以前学习的所有锁, 全部都是挂起等待锁。 一个线程进入, 其他的线程竞争失败, 失败的时候他们就把自己挂起等待了。 挂起等待要不要花时间? 挂起的时候就是将我们的执行流放到我们的等待队列里面, 然后唤醒的时候又把它们从等待队列里面拆下来放到cpu里面去执行。 这个过程来回对数据结构做迁移, 是要花时间的。 ——所以, 什么是自旋, 自旋就是不把自己挂起, 而是由线程不断地去周而复始的去申请锁。 如果申请锁成功则进入, 失败则返回重新检测锁的状态。 

自旋锁接口

        其实我们可以自己实现自旋锁, 就是使用trylock, trylock就是尝试检测这个锁, 如果这个锁没有申请成功, 就出错返回。 但是lock就是直接挂起, 所以我们想要实现自旋锁就可以使用while循环然后trylock。 

        也可以使用系统给我们提供的接口

        可以自己使用spin_lock, 这个函数就会对一个锁返回申请, 直到申请成功。 spin_trylock就是和mutex_trylock一样,都是申请失败了就直接返回, 想要自选需要自己加。

读者写者问题 

概念

        生活中有哪些场景是读者写者问题呢?

        我们在初中小学画的板报、作家写的小说、甚至博主写的这篇博客都是读者写者的问题。 就比如我们写一篇博客, 我们写博客的人就是写者, 然后读这篇博客的人就是读者。 

        对于读者写者问题来说, 也是要遵守321原则——三种关系、两种角色、一个交易场所。

  •         三种关系:写者和读者(互斥竞争)、读者和读者(共享关系)、写者和读者(互斥和同步)
  •         两种角色:写者和读者
  •         一个交易场所:数据交换的地点

        为什么读者和读者之间是共享关系, 为什么消费者和消费者之间确是互斥关系呢?

  •         因为写者把数据写进去了, 读者并不会把数据拿走!!!
  •         但是生产者生产了数据之后,消费者是会把数据拿走的!!!

 接口

pthread_rwlock_t是读写锁的类型, 然后上图两个函数就是对锁进行初始化和对锁进行销毁。

rdlock是以读者的身份进行加锁, 另外我们的wlock就是以写者的方式进行加锁。

 

然后呢, 我们的解锁时使用统一的接口进行解锁。

理解

        一般而言, 读多写少。 如果有一个写者在写, 任何读者都不能进来。 而一个读者正在读, 任何写者都不能进来。  所以, 正常来讲, 读者争锁的能力比写者要强。 所以在我们的读者写者当中, 我们往往会出现读者很多, 而写者很少的情况。 所以竞争锁的时候我们的读者竞争到锁的概率是非常的高的, 进而导致写者长时间得不到锁而产生饥饿问题。

        饥饿问题是一个中性的现象还是一个偏向编译的现象呢? 在读者写者模型这里是一个比较偏向中性的词语。 因为他就是一个事实, 因为读者本来就多, 读者本来就多写者本来就少, 所以就注定了根据这种场景是默认的现象。 那么这种默认的行为我们叫做读者优先。 

现在我们实现一下读者优先的伪代码

首先就是读者的加锁和解锁操作:
lock(&rlock);
reader_count++;
if (reader_count == 1) lock(&wlock);  //reader_count == 1说明这是第一个读者, 从这里开始就让读者进不来!!!
unlock(&rlock);

//读者进来了之后, 就进行读取写者的数据
//都读取完成之后就离开
lock(&rlock);
reader_count--;
if (reader_count == 0) unlock(wlock); //如果是最后一个读者, 就让写者进来,就可以写数据了。 
unlock(&rlock);

然后是写者的加锁和解锁,写者的比较简单,因为写者少, 竞争锁的能力弱
lock(&wlock);

//写入

unlock(&wlock);

  ——————以上就是本节全部内容哦, 如果对友友们有帮助的话可以关注博主, 方便学习更多知识哦!!!   


http://www.kler.cn/news/359633.html

相关文章:

  • 轻帆云移动端智能语音提单:一键触达,智能工单新体验,助力高效运营
  • 分布式篇(分布式事务)(持续更新迭代)
  • 出栈序列的合法性判断
  • PyCharm借助MobaXterm跳板机连接服务器
  • laydate.laydate.render()开始日期和结束日期选择器互相限制选择值动态生效
  • 了解 ChatGPT 中的公平性问题
  • 基于MATLAB的混沌序列图像加密程序
  • 深入理解计算机系统--计算机系统漫游
  • 专题:数组(已完结)
  • 企业一级流程架构规划方法
  • 如何在Docker中运行Squid
  • 3.Three.js程序基本框架结构和API说明
  • 微软运用欺骗性策略大规模打击网络钓鱼活动
  • 【自用】做完项目怎么使用git仓库进行代码的管理/上传/下载
  • 爬虫学习——26.JS逆向(2)
  • 安装配置sqoop(超详细)
  • 【RabbitMQ】RabbitMQ 的七种工作模式介绍
  • python+大数据+基于Spark的共享单车数据存储系统【内含源码+文档+部署教程】
  • 常见加密算法
  • .net framework 3.5sp1组件如何启动?