Linux:同步
目录
一、同步概念
条件变量
二、生产者消费者模型
三、环形队列
一、同步概念
互斥用来解决 访问临界资源 的非原子性,通俗来说,由于互斥锁的实现,保证了在用户角度看,同一个时间内访问临界资源的代码只有一个线程在执行。
而同步,用来解决,多线程中,临界资源长时间被同一个线程占用,造成其他线程饥饿的问题。
条件变量
条件变量用来实现同步。接口的使用和互斥类似。
条件变量的作用就是,当一个线程拿到锁,访问临界资源结束后,让这个线程去某个条件变量的队列中去等待,同时释放锁资源。
不使用同步机制造成的现象。
使用同步机制改善。
#include <iostream>
#include <vector>
#include <pthread.h>
#include <unistd.h>
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
void* MCore(void* args)
{
sleep(3);
std::cout << "master 开始工作" << std::endl;
std::string name = static_cast<char*>(args);
while(1)
{
pthread_cond_signal(&gcond);//唤醒其中一个队列首部的线程
std::cout << "master唤醒一个线程" << std::endl;
sleep(1);
}
}
void StartMaster(std::vector<pthread_t>* threads)
{
pthread_t tid;
int n = pthread_create(&tid,nullptr,MCore,(void*)"Master Thread");
if(n == 0)
{
std::cout << "create master success" << std::endl;
}
threads->emplace_back(tid);
}
void* SCore(void* args)
{
std::string name = static_cast<char*>(args);
while(1)
{
//1.加锁
pthread_mutex_lock(&gmutex);
//2.条件变量
pthread_cond_wait(&gcond,&gmutex);
std::cout << "当前被唤醒的线程是:" << name << std::endl;
sleep(1);
//3.解锁
pthread_mutex_unlock(&gmutex);
}
}
void StartSlaver(std::vector<pthread_t>* threads,int n)
{
for(int i = 0;i < n;++i)
{
char* name = new char[64];
snprintf(name,64,"new thread-%d",i+1);
pthread_t tid;
int n = pthread_create(&tid,nullptr,SCore,(void*)name);
if(n == 0)
{
std::cout << "create success:" << name << std::endl;
threads->emplace_back(tid);
}
}
}
void WaitThread(std::vector<pthread_t>& threads)
{
for(auto& tid: threads)
{
pthread_join(tid,nullptr);
}
}
int main()
{
std::vector<pthread_t> threads;
StartMaster(&threads);
StartSlaver(&threads,5);
WaitThread(threads);
return 0;
}
当副线程获取锁资源后,对锁的管理变成不会立刻释放锁,而是去指定的条件变量(队列)下去等待,等待被唤醒,同时释放锁,锁资源同时给到那个被唤醒的进程。
二、生产者消费者模型
- 解释生产者消费者模型
生产者消费者模型,用来解决,并发运行下,多个线程之间的数据传递问题。
- 3个关系
并发运行下:
多个生产者线程之间,既要同步也要互斥
多个消费者线程之间,既要同步也要互斥
生产者和消费者之间,既要同步也要互斥
- 2种角色
生产者、消费者
- 一种交易场所
交易场所本质就是内存中的一种数据结构,交易对象是数据。
- 基于阻塞队列实现生产者消费者模型
交易场所是阻塞队列。
多个生产线程之间要互斥,只能有一个生产线程向阻塞队列放数据。
多个消费线程之间要互斥,只能有一个消费线程向阻塞队列拿数据。
每一个生产线程和每一个消费线程之间要互斥。因此,所有线程都要互斥的访问这个阻塞队列,只需要一把互斥锁即可,但是需要两个条件变量。
基于阻塞队列实现生产者消费者的项目地址
BlockQueue.hpp
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
template <typename T>
class BlockQueue
{
private:
bool IsFull()
{
return _blockqueue.size() == _cap;
}
bool IsEmpty()
{
return _blockqueue.empty();
}
public:
BlockQueue(int cap)
:_cap(cap)
{
_product_wait_num = 0;
_consum_wait_num = 0;
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_consum_cond,nullptr);
pthread_cond_init(&_product_cond,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consum_cond);
pthread_cond_destroy(&_product_cond);
}
//生产者是向阻塞队列里面加数据
void Enqueue(T & in)
{
pthread_mutex_lock(&_mutex);
//生产者拿到了队列,判断队列是否为满的
//如果是满的,则需要通知消费者来消费,让后让该生产者去等待
while(IsFull())//为什么是while而不是if,保证唤醒的进程争夺锁成功后,要确保队列中有数据再去执行后面的代码
{
_product_wait_num++;//++和--操作
pthread_cond_wait(&_product_cond,&_mutex);
_product_wait_num--;
}
//生产
_blockqueue.push(in);
if(_consum_wait_num > 0)
{
pthread_cond_signal(&_consum_cond);
}
pthread_mutex_unlock(&_mutex);
}
//消费者是从队列里面拿数据
void Pop(T * out)
{
pthread_mutex_lock(&_mutex);
while(IsEmpty())
{
//如果队列为空,消费者就去等待,同时释放锁
_consum_wait_num++;
pthread_cond_wait(&_consum_cond,&_mutex);
_consum_wait_num--;
}
//消费
*out = _blockqueue.front();
_blockqueue.pop();
if(_product_wait_num > 0)
{
pthread_cond_signal(&_product_cond);
}
pthread_mutex_unlock(&_mutex);
}
private:
std::queue<T> _blockqueue;
int _cap;
pthread_mutex_t _mutex;//保护阻塞队列的锁
pthread_cond_t _consum_cond;
pthread_cond_t _product_cond;
int _product_wait_num;
int _consum_wait_num;
};
#endif
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <functional>
#include <unistd.h>
#include <pthread.h>
namespace ThreadModule
{
template<typename T>
using func_t = std::function<void(T)>;
template<typename T>
class Thread
{
public:
Thread(func_t<T> func,T data,const std::string &name = "none-name")
:_func(func),_data(data),_threadname(name),_stop(true)
{}
~Thread()
{}
//这里涉及到了一套解决方案,静态成员函数是类级别的,换句话说,每个对象拿到的函数是同一个,
//因此,通过输入型参数代入this指针,再调函数来实现多个对象各自调用
void Excute()
{
_func(_data);
}
static void* threadroutine(void* args)
{
Thread<T> *self = static_cast<Thread<T>*>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid,nullptr,threadroutine,this);
if(!n)
{
_stop = false;
return true;
}
else{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid,nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
private:
pthread_t _tid;
std::string _threadname;
T _data;//此时是阻塞队列,是一个共享资源
func_t<T> _func;
bool _stop;
};
}
#endif
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
//using Task = std::function<void()>;
class Task
{
public:
Task() {}
Task(int a, int b) : _a(a), _b(b), _result(0)
{
}
void Excute()
{
_result = _a + _b;
}
std::string ResultToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);
}
std::string DebugToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
}
private:
int _a;
int _b;
int _result;
};
Main.cc
#include "Thread.hpp"
#include "BlockQueue.hpp"
#include <vector>
#include <cstdlib>
#include <ctime>
#include "Task.hpp"
using namespace ThreadModule;
using blockqueue_t = BlockQueue<Task>;
class ThreadData
{
public:
ThreadData(blockqueue_t & bq,std::string name)
:_bq(bq),who(name)
{}
blockqueue_t &_bq;
std::string who;
};
void Consumer(ThreadData* td)
{
while(1)
{
sleep(1);
//1.从阻塞队列中取一个任务
Task t;
td->_bq.Pop(&t);
//2.处理这个任务
t.Excute();
std::cout << "Consumer Task" << t.ResultToString() << std::endl;
}
}
void Product(ThreadData* td)
{
srand(time(nullptr)^ pthread_self());
while(1)
{
sleep(1);
//1.生成任务
int a = rand() % 10 +1;
usleep(1000);
int b = rand() % 20 + 1;
Task t(a,b);
//2.把任务放到阻塞队列中
td->_bq.Enqueue(t);
std::cout << "Product Task" << t.DebugToString() << std::endl;
}
}
void StartConsumer(std::vector<Thread<ThreadData*>> * threads,int num,blockqueue_t& bq)
{
for(int i = 0;i < num; ++i)
{
std::string name = "thread-" + std::to_string(i+1);
ThreadData* td = new ThreadData(bq,name);
threads->emplace_back(Consumer,td,name);
threads->back().Start();//每新建一个线程就启动它
}
}
void StartProductor(std::vector<Thread<ThreadData*>> * threads,int num,blockqueue_t& bq)
{
for(int i = 0;i < num; ++i)
{
std::string name = "thread-" + std::to_string(i+1);
ThreadData* td = new ThreadData(bq,name);
threads->emplace_back(Product,td,name);
threads->back().Start();//每新建一个线程就启动它
}
}
void WaitAllThread(std::vector<Thread<ThreadData*>>& threads)
{
for(auto& e : threads)
{
e.Join();
}
}
int main()
{
blockqueue_t* bq = new blockqueue_t(5);//队列最多有五个数据
std::vector<Thread<ThreadData*>> threads;
StartConsumer(&threads,1,*bq);
StartProductor(&threads,1,*bq);
WaitAllThread(threads);
}
三、环形队列
- 基于环形队列的生产者消费者模型,不再是条件变量控制,而是信号量机制
信号量机制中,将临界资源视为一个整体,就是基于阻塞队列实现的生产者消费者模型。而将临界资源划分为一块块小的数据块,信号量就是用来表示临界资源的数量,信号量的使用,主要表现为基于环形队列实现的生产者消费者模型。
和System的信号量通信方式有一定关联,此处的信号量机制是POSIX中的信号量机制,更多是用来完成生产消费模型。
RingQueue.hpp
#ifndef __RING_QUEUE_HPP__
#define __RING_QUEUE_HPP__
#include <iostream>
#include <string>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
template<typename T>
class RingQueue
{
private:
void P(sem_t& sg)
{
sem_wait(&sg);//p操作对应的函数,执行该函数,信号量资源如果大于0,则减减
}
void V(sem_t& sg)
{
sem_post(&sg);
}
void Lock(pthread_mutex_t& mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t& mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap)
:_cap(cap)
{
_productor_step = 0;
_consumer_step = 0;
pthread_mutex_init(&_productor_mutex,nullptr);
pthread_mutex_init(&_consumer_mutex,nullptr);
sem_init(&_room_sem,0,_cap);
sem_init(&_data_sem,0,0);
}
~RingQueue()
{
sem_destroy(&_room_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_productor_mutex);
pthread_mutex_destroy(&_consumer_mutex);
}
void Enqueue(const T& in)
{
//生产任务
//P操作申请一个空间
P(_room_sem);
//多个生产者进程之间要互斥
Lock(_productor_mutex);
_ring_queue[_productor_step++] = in;//生产者按照顺序生产
_productor_step %= _cap;
Unlock(_productor_mutex);
V(_data_sem);
}
void Pop(T* out)
{
P(_data_sem);
Lock(_consumer_mutex);
*out = _ring_queue[_consumer_step++];
_consumer_step %= _cap;
Unlock(_consumer_mutex);
V(_room_sem);
}
private:
//1.定义一个环形队列
std::vector<T> _ring_queue;
int _cap;//环形队列的容量上限
//2.生产者和消费者的下标
int _productor_step;
int _consumer_step;
//3.定义信号量
sem_t _room_sem;//生产者关心有几个空间
sem_t _data_sem;//消费者关心有几个数据
//4.定义锁,维护多生产多消费之间的互斥的关系,
//注意生产者和消费者之间不需要互斥,因此需要两把锁
pthread_mutex_t _productor_mutex;
pthread_mutex_t _consumer_mutex;
};
#endif