Linux从0到1——线程同步和互斥【互斥量/条件变量/信号量/PC模型】
Linux从0到1——线程同步和互斥
- 1. Linux线程互斥
- 1.1 问题引入
- 1.2 互斥相关概念
- 1.3 多执行流并发访问公共资源的数据不一致问题
- 1.4 互斥量(锁)
- 1.5 改进抢票系统
- 1.6 锁的简单封装
- 1.7 锁的实现原理
- 1.8 可重入VS线程安全
- 1.9 死锁
- 2. Linux线程同步
- 2.1 理解同步
- 2.2 条件变量
- 2.3 认识接口
- 2.4 改进抢票系统
- 3. POSIX信号量
- 3.1 理解信号量运用场景
- 3.2 快速认识接口
- 4. 生产者消费者模型
- 4.1 理解
- 4.2 模型实现
- 4.2.1 基于阻塞队列——单生产单消费
- 4.2.2 给生产消费模型传任务,模拟实际应用场景
- 4.2.3 基于阻塞队列,改造多生产多消费
- 4.2.4 基于环形队列——单生产单消费
- 4.2.5 基于环形队列,改造多生产多消费
1. Linux线程互斥
本章内容会直接使用到之前章节自定义封装的线程库,链接在此:https://blog.csdn.net/weixin_73870552/article/details/144543376?spm=1001.2014.3001.5501,如果学过C++11线程库,则可以跳过这部分。
1.1 问题引入
实现一个抢票逻辑,观察现象
#include <iostream>
#include <unistd.h>
#include <string>
#include "Thread.hpp"
int ticket = 10000; // 一共1万张票
std::string GetThreadName()
{
static int number = 1; // 生命周期随进程,且只在当前作用域有效
char name[64];
snprintf(name, sizeof(name), "Thread-%d", number++);
return name;
}
void GetTicket(std::string name)
{
while(true)
{
if(ticket > 0)
{
usleep(1000); // 充当抢票时间
printf("%s get a ticket: %d\n", name.c_str(), ticket);
ticket--;
}
else
{
// ticket == 0了就不抢了
break;
}
// 实际情况,还有后续的动作
}
}
int main()
{
std::string name1 = GetThreadName();
Thread<std::string> t1(GetTicket, name1, name1);
std::string name2 = GetThreadName();
Thread<std::string> t2(GetTicket, name2, name2);
std::string name3 = GetThreadName();
Thread<std::string> t3(GetTicket, name3, name3);
std::string name4 = GetThreadName();
Thread<std::string> t4(GetTicket, name4, name4);
t1.Start();
t2.Start();
t3.Start();
t4.Start();
t1.Join();
t2.Join();
t3.Join();
t4.Join();
return 0;
}
正常来说,ticket
抢到0就应该停止抢票了,可是现在票被抢到了负数,这肯定是出问题了。
1.2 互斥相关概念
1. 临界资源:多线程执行流共享的资源就叫做临界资源
- 上例中,临界资源就是全局变量
ticket
。
2. 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
3. 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
4. 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
1.3 多执行流并发访问公共资源的数据不一致问题
1. 理解多执行流并发访问公共资源的数据不一致问题
int cnt = 0;
cnt++; // 这段代码是原子性的吗?
- 实际上
cnt++
这句代码,会转化成三句汇编代码,1)将0从内存中换入寄存器,2)对寄存器中0数据++,3)将++后的数据1换回内存。所以cnt++
这句代码不是原子的,它可能在3步中的任何一步处被打断。 - 假设现在有两个线程在访问
cnt
,执行cnt++
的动作,第一个线程在执行完步骤1)后,时间片到了,从CPU上被剥离下来,保存自己的硬件上下文数据。第二个线程此时被处理机调度,上CPU执行,一直将cnt
加到了100,时间片才到,被从CPU上剥离,此时内存中,cnt
的值已经变成了100。 - 这时线程一又被调度了,带着自己之前的硬件上下文,将寄存器中
cnt
的值覆盖成0。线程一之前已经执行过步骤1)了,接下来会直接执行步骤2),将cnt++
,值变为1。此时问题来了,线程一将执行步骤3),将1这个值写入内存,cnt
的值由100变为了1。线程二之前做的努力全部白费了。
2. 回顾1.1
- 在1.1例子中的
if
语句处,并发问题已经出现了:
- 当
ticket==1
时,四个线程可能同时执行if
判断操作,也就是说,在ticket--
还没来得及执行的时候,四个线程就都进入临界区了,if
条件均成立。这时ticket
就可能被--
多次,直接干成负数。
1.4 互斥量(锁)
1. 初始化锁
- 静态分配,一般用于创建全局锁:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
- 动态分配,用于创建局部锁:
mutex
:要初始化的锁;attr
:传nullptr
即可。
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
2. 销毁锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
- 使用
PTHREAD_ MUTEX_ INITIALIZER
初始化的互斥量不需要销毁; - 不要销毁一个已经加锁的互斥量;
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁;
3. 加锁和解锁
- 返回值:成功返回0,失败返回错误码。
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
- 调用
pthread_mutex_lock
时,可能会遇到以下情况:- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功;
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么
pthread_ lock
调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
1.5 改进抢票系统
1. 全局锁保护临界区
#include <iostream>
#include <unistd.h>
#include <string>
#include "Thread.hpp"
int ticket = 10000; // 一共1万张票(全局共享资源)
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 定义锁
std::string GetThreadName()
{
static int number = 1; // 生命周期随进程,且只在当前作用域有效
char name[64];
snprintf(name, sizeof(name), "Thread-%d", number++);
return name;
}
// 加锁:
// 1. 我们要尽可能的给少的代码加锁
// 2. 一般加锁,都是给临界区加锁
void GetTicket(std::string name)
{
while(true)
{
pthread_mutex_lock(&mutex); // 申请锁本身是安全的,原子的
if(ticket > 0) // 一个线程在临界区中访问临界资源的时候,可能发生切换
{
usleep(1000); // 充当抢票时间
printf("%s get a ticket: %d\n", name.c_str(), ticket);
ticket--;
pthread_mutex_unlock(&mutex);
}
else
{
// ticket == 0了就不抢了
pthread_mutex_unlock(&mutex);
break;
}
// 实际情况,还有后续的动作
}
}
int main()
{
std::string name1 = GetThreadName();
Thread<std::string> t1(GetTicket, name1, name1);
std::string name2 = GetThreadName();
Thread<std::string> t2(GetTicket, name2, name2);
std::string name3 = GetThreadName();
Thread<std::string> t3(GetTicket, name3, name3);
std::string name4 = GetThreadName();
Thread<std::string> t4(GetTicket, name4, name4);
t1.Start();
t2.Start();
t3.Start();
t4.Start();
t1.Join();
t2.Join();
t3.Join();
t4.Join();
return 0;
}
- 可以发现,运行的时间更长了,但是抢票的逻辑正确了。
在一些OS中,可能会出现票全被一个线程抢完的现象,这样其他两个进程就迟迟不能上处理机调度,出现线程饥饿问题。出现这种问题,单纯的互斥是解决不了的,还需要引入同步机制,让各线程的执行具有一定的顺序性。
2. 局部锁保护临界区
- 将局部锁作为函数参数传递给
GetTicket
。
#include <iostream>
#include <unistd.h>
#include <string>
#include "Thread.hpp"
int ticket = 10000; // 一共1万张票(全局共享资源)
std::string GetThreadName()
{
static int number = 1; // 生命周期随进程,且只在当前作用域有效
char name[64];
snprintf(name, sizeof(name), "Thread-%d", number++);
return name;
}
// 加锁:
// 1. 我们要尽可能的给少的代码加锁
// 2. 一般加锁,都是给临界区加锁
void GetTicket(pthread_mutex_t *mutex)
{
while(true)
{
pthread_mutex_lock(mutex); // 申请锁本身是安全的,原子的
if(ticket > 0) // 一个线程在临界区中访问临界资源的时候,可能发生切换
{
usleep(1000); // 充当抢票时间
printf("get a ticket: %d\n", ticket);
ticket--;
pthread_mutex_unlock(mutex);
}
else
{
// ticket == 0了就不抢了
pthread_mutex_unlock(mutex);
break;
}
// 实际情况,还有后续的动作
}
}
int main()
{
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, nullptr);
std::string name1 = GetThreadName();
Thread<pthread_mutex_t *> t1(GetTicket, name1, &mutex);
std::string name2 = GetThreadName();
Thread<pthread_mutex_t *> t2(GetTicket, name2, &mutex);
std::string name3 = GetThreadName();
Thread<pthread_mutex_t *> t3(GetTicket, name3, &mutex);
std::string name4 = GetThreadName();
Thread<pthread_mutex_t *> t4(GetTicket, name4, &mutex);
t1.Start();
t2.Start();
t3.Start();
t4.Start();
t1.Join();
t2.Join();
t3.Join();
t4.Join();
pthread_mutex_destroy(&mutex);
return 0;
}
1.6 锁的简单封装
1. LockGurad.hpp头文件
#pragma once
#include <pthread.h>
// 不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:
Mutex(pthread_mutex_t *lock):_lock(lock)
{}
void Lock()
{
pthread_mutex_lock(_lock);
}
void Unlock()
{
pthread_mutex_unlock(_lock);
}
~Mutex()
{}
private:
pthread_mutex_t *_lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock):_mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex _mutex;
};
2. 改进抢票代码
- 这里使用的是局部锁,记得要
destroy
。
#include <iostream>
#include <unistd.h>
#include <string>
#include "Thread.hpp"
#include "LockGuard.hpp"
...
void GetTicket(pthread_mutex_t *mutex)
{
while (true)
{
// 非临界区代码
{
// 临界区
LockGuard Lockguard(mutex);
if (ticket > 0)
{
usleep(1000); // 充当抢票时间
printf("get a ticket: %d\n", ticket);
ticket--;
}
else
{
break;
}
}
// 实际情况,还有后续的动作
}
}
...
1.7 锁的实现原理
1. 上伪代码
- 为了实现互斥锁操作,大多数体系结构都提供了
swap
或exchange
汇编指令,该指令的作用是把寄存器和内存单元的数据相交换。
2. 理解lock
- 大家可以将
mutex
简单理解成一个结构体:
struct XXX
{
int mutex = 1;
}
movb $0, %al
,表示将数据0,放入寄存器%al
中。xchgb %al, mutex
,表示将寄存器%al
中的数据和内存中mutex
的数据做交换。- 假设现在线程1申请锁成功了,也就是成功执行了
movb $0, %al
,和xchgb %al, mutex
操作。此时内存中mutex
的值已经从1被换成0了,数据1就被换入了线程1的硬件上下文数据中,if
条件成立,return 0
。 - 这时如果线程2再来申请锁,执行
xchgb %al, mutex
操作后,数据0被换入线程2的%al
寄存器,执行if
判断后,线程2被挂起等待。
3. 理解unlock
movb $1, mutex
,其实就是将数据1,写入内存mutex
。这样其他线程再申请锁时,就可以拿到1,进而满足if
条件,成功申请锁了。
1.8 可重入VS线程安全
1. 概念
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
2. 常见的线程不安全的情况
- 不保护共享变量的函数。
- 函数状态随着被调用,状态发生变化的函数。
- 返回指向静态变量指针的函数。
- 调用线程不安全函数的函数。
3. 常见的线程安全的情况
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的。
- 类或者接口对于线程来说都是原子操作。
- 多个线程之间的切换不会导致该接口的执行结果存在二义性。
4. 常见不可重入的情况
- 调用了
malloc/free
函数,因为malloc
函数是用全局链表来管理堆的。 - 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构。
- 函数体内使用了静态的数据结构。
5. 常见可重入的情况
- 不使用全局变量或静态变量。
- 不使用
malloc
或者new
开辟出的空间。 - 不调用不可重入函数。
- 不返回静态或全局数据,所有数据都有函数的调用者提供。
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据。
6. 可重入与线程安全联系
- 函数是可重入的,那就是线程安全的。
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题。
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
7. 可重入与线程安全区别
- 可重入函数是线程安全函数的一种。
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数锁还未释放则会产生死锁,因此是不可重入的。
1.9 死锁
1. 概念
- 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
- 例子:
- A和B互相申请对方的锁,并且都不释放自己已经有的锁,就会陷入无止境的循环等待。
2. 死锁的四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用。
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放。
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺。
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系。
3. 避免死锁
- 破坏死锁的四个必要条件。
- 加锁顺序一致。比如上面的例子中,线程A和线程B,都先申请锁1,再申请锁2。
- 避免锁未释放的场景。
- 资源一次性分配,避免对分配的资源频繁上锁解锁。
2. Linux线程同步
2.1 理解同步
假设现在有一个场景:一群学生在轮流使用一个自习室,这个自习室每次只能容纳下一个人,且只有一把钥匙,只有拿到钥匙的人才能进入。
现在有一个学生已经进去自习了,其他学生都在排队等待这个自习室。这个学生学习完了,准备走了,刚出自习室大门,突然又不想走了,又回去了。就这样来来回回,这名同学一直进进出出,其他同学一直没办法进去学习,而且这个同学也不学习,大家都不愿意了。
这时管理层出了一个规定,一旦你出了自习室的门,就必须把钥匙交出去,给后面排队的学生。你要是还想自习,就必须去队伍末尾重新排队。就这样,大家的自习行为有了一定的顺序性。
钥匙就是我们之前说的“锁”,而学生就是一个一个的“执行流”,让这些执行流的执行,具有一定的顺序性,就是“同步”。
2.2 条件变量
1. 故事时刻
- 一个人向盘子中放苹果,一个向盘子中拿苹果。其中这个拿苹果的人,是个瞎子,他看不到,每次想拿苹果时,只能去盘子中摸。如果盘子中有苹果,就可以摸到苹果,将苹果拿走。但是如果盘子中没有苹果,就只能无功而返。
- 瞎子在拿苹果的时候,会把盘子拿走,放苹果的人就放不进去。如果这个瞎子每隔几秒钟,就去盘子里摸一摸有没有苹果,这就会导致放苹果的人即使有苹果也放不进去。
- 这还只是一个瞎子的情况,如果有多个瞎子呢?每个瞎子都抢着去摸苹果,放苹果的人就更放不进去了。
- 为了避免这种情况的发生,聪明的人想到了利用一个铃铛。当放苹果的人,将苹果放入盘子中后,会敲响这个铃铛。瞎子在听到铃铛响后,才会去盘子中摸苹果,铃铛不响,瞎子就不会去摸。这就很好的解决了瞎子抢盘子的问题。
- 在上面这个例子中,瞎子和放苹果的人都是执行流,盘子就是一个临界资源,苹果是数据,铃铛就充当条件变量的角色。
不难看出,条件变量很好的解决了互斥情况下,可能存在的饥饿问题,提高了互斥的效率。
2. 理解条件变量
struct cond
{
// 条件是否成立
int flag;
// 维护一个线程等待队列
tcb_queue;
}
flag
用来标记条件是否成立;tcb_queue
是cond
内部维护的一个等待队列。- 当条件不满足时,线程加入等待队列;当条件满足时,线程从等待队列中被激活。
2.3 认识接口
1. 初始化条件变量
- 静态分配,创建全局条件变量:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
- 动态分配,创建局部条件变量:
cond
:要初始化的条件变量;attr
:NULL
。
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
2. 销毁
- 使用
PTHREAD_COND_INITIALIZER
初始化的条件变量不需要销毁。
int pthread_cond_destroy(pthread_cond_t *cond);
3. 等待条件满足
cond
:要在这个条件变量上等待;mutex
:互斥量,后面详细解释。
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
4. 唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒等待队列中的所有线程
int pthread_cond_signal(pthread_cond_t *cond); // 唤醒等待队列头部的一个线程
2.4 改进抢票系统
1. 提出问题
- 一共有1000张票,票抢完之后,线程不
break
,而是打印一句“没票了”。相当于线程轮询检测tickets
,有票了就抢,没有就打印信息,以便之后tickets
又有票了,可以第一时间抢到。
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int tickets = 1000; // 1000张票
void *threadRoutine(void *args)
{
std::string name = static_cast<const char*>(args);
while(true)
{
pthread_mutex_lock(&mutex);
if(tickets > 0)
{
std::cout << name << ", get a ticket: " << tickets-- << std::endl;
usleep(1000); // 模拟抢票时间
}
else
{
std::cout << "没有票了, " << name << std::endl;
}
pthread_mutex_unlock(&mutex);
}
}
// 主线程
int main()
{
pthread_t t1, t2, t3;
pthread_create(&t1, nullptr, threadRoutine, (void*)"thread-1");
pthread_create(&t2, nullptr, threadRoutine, (void*)"thread-2");
pthread_create(&t3, nullptr, threadRoutine, (void*)"thread-3");
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
return 0;
}
- 发现一个现象,
tickets
减到0后,这三个线程还不断去访问tickets
,且频率非常之高。如果我们还有一个线程用来隔一段时间更新一下tickets
,放一些票出来,访问tickets
时,怎么可能抢的过这三个线程?这就是对系统资源极大的浪费,没有票就别老是去访问tickets
了,老老实实等着不好吗。
这不就是之前提到的,瞎子抢盘子,导致放苹果的人放不进去苹果的情况吗?
2. 使用条件变量,升级抢票系统
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int tickets = 1000; // 1000张票
void *threadRoutine(void *args)
{
std::string name = static_cast<const char*>(args);
while(true)
{
pthread_mutex_lock(&mutex);
if(tickets > 0)
{
std::cout << name << ", get a ticket: " << tickets-- << std::endl;
usleep(1000); // 模拟抢票时间
}
else
{
std::cout << "没有票了, " << name << std::endl;
pthread_cond_wait(&cond, &mutex); // 等待条件变量满足
}
pthread_mutex_unlock(&mutex);
}
}
// 主线程
int main()
{
pthread_t t1, t2, t3;
pthread_create(&t1, nullptr, threadRoutine, (void*)"thread-1");
pthread_create(&t2, nullptr, threadRoutine, (void*)"thread-2");
pthread_create(&t3, nullptr, threadRoutine, (void*)"thread-3");
while(true)
{
// 每隔5s,放1000支票
sleep(5);
pthread_mutex_lock(&mutex);
tickets += 1000;
pthread_mutex_unlock(&mutex);
pthread_cond_broadcast(&cond); // 唤醒全部线程
}
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
return 0;
}
3. 关于pthread_cond_wait的一些细节
- 线程在等待时,会自动释放锁;
- 线程被唤醒时,是在临界区内唤醒的,线程在
pthread_cond_wait
返回时,要重新申请并持有锁; - 当线程被唤醒时,重新申请并持有锁本质也是要参与锁的竞争的。
4. 总结
- 单纯的互斥,可以保证数据的安全,但是不一定合理或者高效!
3. POSIX信号量
3.1 理解信号量运用场景
在之前的学习中,我们其实接触过信号量:进程间通信。这里带着大家把之前学过的知识,和今天要讲的知识,平滑的连接一下。
1. 问题引入
- 基于
BlockQueue
实现的PC模型,整个阻塞队列都是临界资源,Push
或Pop
的时候整个队列都是互斥的,任何执行流都进不来。聪明的你可能发现了,但是队头插入和队尾pop
,理论上好像可以并发执行啊,怎么上面的模型做不到呢?所以说这个模型不是很高效。
2. 信号量运用场景
- 就好比一个大自习室中,每进来一个同学或者出去一个同学,都要把这个大的自习室控制起来,保证只有这一个同学能进来或者出去。这不是扯淡吗?凭什么他进来的时候我不能出去。
- 为了解决这个问题,我们设计了预定机制。自习室中只有100个座位,100个座位预定满了,就不让人进了,但是此时里面的人还是可以出去的。出去一个人,就要放出一个预定名额。
- 将一个大的临界区资源,拆分成小的临界区资源管理起来,这就是信号量能做的事。
3.2 快速认识接口
信号量有自己的头文件
<semaphore.h>
。
1. 初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
pshared
:0表示线程间共享,非零表示进程间共享;value
:信号量初始值。
2. 销毁信号量
int sem_destroy(sem_t *sem);
3. 等待信号量
- 等待信号量,会将信号量的值减1(就是我们常说的P操作)。
int sem_wait(sem_t *sem); //P()
4. 发布信号量
- 发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。(V操作)
int sem_post(sem_t *sem);//V()
4. 生产者消费者模型
4.1 理解
1. 故事时刻
- 有一个超市,超市里有一个展架,供货商作为生产者,向展架上放商品。顾客作为消费者,从展架上拿商品。
- 展架在同一时间,只能被一个人使用。供货商和供货商不能同时使用一个展架,顾客也不能同时跑到一个展架上去拿东西。当展架上已经被放满货物时,供货商就不能再往展架上放东西了。当展架上没有东西时,顾客就不能去展架上拿东西了。
2. 思考一下这几个角色之间的关系
- 生产者和生产者之间,是竞争关系——互斥;
- 消费者和消费者之间,是竞争关系——互斥;
- 生产者和消费者之间,是同步加互斥关系。
3. 总结:321原则
- 3种关系;
- 2种角色(消费者和生产者可以有多个);
- 1个交易场所(内存空间)。
4.2 模型实现
4.2.1 基于阻塞队列——单生产单消费
1. 阻塞队列的实现(blockQueue.hpp)
- 思路:
- 生产者向队列中放数据,消费者从队列中拿数据,队列就相当于一个公共资源,需要被保护起来,满足互斥。
- 同时还需要维护生产者和消费者之间的同步关系。
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
const int defaultcap = 5; // 默认容量
template<class T>
class BlockQueue
{
public:
BlockQueue(int cap = defaultcap)
:_capacity(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
bool IsFull()
{
return _q.size() == _capacity;
}
bool IsEmpty()
{
return _q.size() == 0;
}
// 生产
bool Push(const T &in)
{
pthread_mutex_lock(&_mutex);
// 队列满了
while(IsFull()) // 不能用 if(IsFull())
{
// 生产者阻塞等待
pthread_cond_wait(&_p_cond, &_mutex);
}
// 队列不满
_q.push(in);
// 生产一个数据后,唤醒消费者
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_mutex);
}
// 消费
bool Pop(T *out)
{
pthread_mutex_lock(&_mutex);
// 队列为空
while(IsEmpty())
{
// 阻塞等待
pthread_cond_wait(&_c_cond, &_mutex);
}
// 队列不为空
*out = _q.front();
_q.pop();
// 消费者消费后,唤醒生产者
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _q;
int _capacity; // 容量
pthread_mutex_t _mutex;
pthread_cond_t _p_cond; // 生产者条件变量
pthread_cond_t _c_cond; // 消费者条件变量
};
- 细节:
- 拿
Push
方法来说,pthread_cond_signal(&_c_cond);
,应该放在pthread_mutex_unlock(&_mutex);
的之前,还是之后呢?原则上来说都可以,都不算错,但是习惯将pthread_cond_signal(&_c_cond);
放在之前。 - 判断队列是否为空为满时,不能使用
if
,因为pthread_cond_wait
存在伪唤醒的情况:今天我们是一次唤醒一个线程,如果一次唤醒多个就会出问题。拿生产动作来说,如果当前队列已满,且使用if(IsFull())
判断队列是否为满,肯定会先判断成立,然后让一批线程阻塞在这里。此时如果消费者消费了一个数据,就将唤醒这阻塞的一批线程。这些线程都从pthread_cond_wait
处开始向后执行,向队列中插入多个数据,出现问题。为了避免这种情况,我们使用while
来判断队列是否为空为满,这样一来,只有在队列真正为空为满时,才会跳出while
循环,完美解决伪唤醒问题。
- 拿
2. 测试代码
- 让消费者每隔一秒消费一次:
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
void *consumer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args); // 生产者和消费者同时看到了一份公共资源
while(true)
{
sleep(1); // 隔一秒消费一次
// 1. 消费数据
int data = 0;
bq->Pop(&data);
// 2. 处理
std::cout << "consumer data: " << data << std::endl;
}
return nullptr;
}
void *productor(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args); // 生产者和消费者同时看到了一份公共资源
while(true)
{
// 1. 准备数据
int data = rand() % 10 + 1; // [1, 10]
// 2. 进行生产
bq->Push(data);
std::cout << "productor data: " << data << std::endl;
}
return nullptr;
}
int main()
{
// 种下随机数种子
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());
BlockQueue<int> *bq = new BlockQueue<int>();
pthread_t c, p; // 消费者和生产者
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, productor, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
- 事实上,生产者和消费者这两个线程,谁先跑我们是不知道的。但是我们让消费者在运行的时候先等了一秒钟,如果同步逻辑正确,就可以看到如下现象:生产者瞬间将队列生产满,然后等待消费者消费。
- 大家也可以让生产者先跑,自行观察同步现象。
4.2.2 给生产消费模型传任务,模拟实际应用场景
1. 任务模块(Task.hpp)
- 实现一个给线程传计算加减乘除任务,线程跑任务的场景。
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
const int defaultvalue = 0;
enum
{
ok = 0,
div_zero, // 除0错误
mod_zero, // 模0错误
unknow // 未知错误
};
const std::string opers = "+-*/%)(&";
class Task
{
public:
Task()
{
}
Task(int x, int y, char op)
: data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok)
{
}
void Run()
{
switch (oper)
{
case '+':
result = data_x + data_y;
break;
case '-':
result = data_x - data_y;
break;
case '*':
result = data_x * data_y;
break;
case '/':
{
if (data_y == 0)
code = div_zero;
else
result = data_x / data_y;
}
break;
case '%':
{
if (data_y == 0)
code = mod_zero;
else
result = data_x % data_y;
}
break;
default:
code = unknow;
break;
}
}
std::string PrintTask()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=?";
return s;
}
std::string PrintResult()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=";
s += std::to_string(result);
s += " [";
s += std::to_string(code);
s += "]";
return s;
}
~Task()
{
}
private:
int data_x;
int data_y;
char oper; // + - * / %
int result;
int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4
};
2. 测试程序——单生产单消费
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
void *consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args); // 生产者和消费者同时看到了一份公共资源
while(true)
{
sleep(1);
Task t;
bq->Pop(&t);
t.Run();
std::cout << "consumer data: " << t.PrintResult() << std::endl;
}
return nullptr;
}
void *productor(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while(true)
{
// 1. 准备数据
int data1 = rand() % 10;
usleep(rand() % 123);
int data2 = rand() % 10;
usleep(rand() % 123);
char oper = opers[rand() % opers.size()];
Task t(data1, data2, oper);
// 2. 进行生产
bq->Push(t);
// for debug
std::cout << "productor data: " << t.PrintTask() << std::endl;
}
return nullptr;
}
int main()
{
// 种下随机数种子
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c, p; // 消费者和生产者
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, productor, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
3. 测试结果
4.2.3 基于阻塞队列,改造多生产多消费
阻塞队列的实现模块是不用改的,复用单生产单消费的代码即可。下面我们就拿多生产多消费的测试代码测试一下:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <time.h>
void *Consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args); // 生产者和消费者同时看到了一份公共资源
while(true)
{
sleep(1);
Task t;
bq->Pop(&t);
t.Run();
std::cout << "consumer data: " << t.PrintResult() << std::endl;
}
return nullptr;
}
void *Productor(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while(true)
{
// 1. 准备数据
int data1 = rand() % 10;
usleep(rand() % 123);
int data2 = rand() % 10;
usleep(rand() % 123);
char oper = opers[rand() % opers.size()];
Task t(data1, data2, oper);
// 2. 进行生产
bq->Push(t);
// for debug
std::cout << "productor data: " << t.PrintTask() << std::endl;
}
return nullptr;
}
int main()
{
// 种下随机数种子
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());
pthread_t c[3], p[2]; // 3个消费者,两个生产者
BlockQueue<Task> *rq = new BlockQueue<Task>();
pthread_create(&p[0], nullptr, Productor, (void*)rq);
pthread_create(&p[1], nullptr, Productor, (void*)rq);
pthread_create(&c[0], nullptr, Consumer, (void*)rq);
pthread_create(&c[1], nullptr, Consumer, (void*)rq);
pthread_create(&c[2], nullptr, Consumer, (void*)rq);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(c[2], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
return 0;
}
4.2.4 基于环形队列——单生产单消费
1. 算法思路
- 生产者和消费者往一个环形队列中写入数据,需要满足两个条件:
- 生产者不能把消费者套一个圈,出现这种情况意味着队列已经满;
- 消费者不能超过生产者,出现这种情况意味着队列为空了。
- 当队列为满时,只能让消费者向前跑;当队列为空时,只能让生产者向前跑。
2. 代码实现思路
- 用一个
vector
逻辑抽象成环形队列。 - 使用信号量管理资源:
- 对于生产者,空间是资源;
- 对于消费者,数据是资源。
- 伪代码:
- 开始时
sem_space = N
,sem_data = 0
,N表示环形队列容量。
- 开始时
3. 环形队列模块(RingQueue.hpp)
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
const int defaultsize = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
public:
RingQueue(int size = defaultsize)
:_size(size)
,_ringqueue(size)
,_p_step(0)
,_c_step(0)
{
sem_init(&_space_sem, 0, size);
sem_init(&_data_sem, 0, 0);
}
// 生产
void Push(const T &in)
{
P(_space_sem);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _size;
V(_data_sem);
}
// 消费
void Pop(T *out)
{
P(_data_sem);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _size;
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
}
private:
int _size;
std::vector<T> _ringqueue;
int _p_step; // 消费者位置
int _c_step; // 消费者位置
sem_t _space_sem; // 空间信号量,生产者申请
sem_t _data_sem; // 数据信号量,消费者申请
};
4. 测试程序——单生产单消费
#include "RingQueue.hpp"
#include <pthread.h>
#include <unistd.h>
void *Productor(void *args)
{
sleep(3); // 生产者先不生产,此时消费者会阻塞等待,观察同步
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
int cnt = 100;
while(true)
{
rq->Push(cnt);
std::cout << "product done, data is :" << cnt << std::endl;
cnt--;
sleep(1);
}
}
void *Consumer(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
while(true)
{
int data = 0;
rq->Pop(&data);
std::cout << "consumer done, data is: " << data << std::endl;
}
}
int main()
{
pthread_t c, p;
RingQueue<int> *rq = new RingQueue<int>();
pthread_create(&p, nullptr, Productor, (void*)rq);
pthread_create(&c, nullptr, Consumer, (void*)rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
上一节生产者-消费者的例子是基于
queue
实现的,其空间可以动态分配,现在是基于固定大小的环形队列,不能动态分配。
4.2.5 基于环形队列,改造多生产多消费
1. 思考一下,能否复用之前单生产单消费的代码?
- 不行,在单生产单消费的场景中,生产者与生产者之间,消费者和消费者之间,天然就是互斥的,不需要我们去维护这个关系。
- 但是在多生产多消费场景下,我们需要再设计两个锁,维持生产者与生产者之间,消费者与消费者之间的互斥关系。
2. 环形队列改造(RingQueue.hpp)
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
const int defaultsize = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
public:
RingQueue(int size = defaultsize)
:_size(size)
,_ringqueue(size)
,_p_step(0)
,_c_step(0)
{
sem_init(&_space_sem, 0, size);
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_p_mutex, nullptr);
pthread_mutex_init(&_c_mutex, nullptr);
}
// 生产
void Push(const T &in)
{
P(_space_sem);
pthread_mutex_lock(&_p_mutex);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _size;
pthread_mutex_unlock(&_p_mutex);
V(_data_sem);
}
// 消费
void Pop(T *out)
{
// 先申请信号量,再申请锁
P(_data_sem);
pthread_mutex_lock(&_c_mutex);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _size;
pthread_mutex_unlock(&_c_mutex);
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_p_mutex);
pthread_mutex_destroy(&_c_mutex);
}
private:
int _size;
std::vector<T> _ringqueue;
int _p_step; // 消费者位置
int _c_step; // 消费者位置
sem_t _space_sem; // 空间信号量,生产者申请
sem_t _data_sem; // 数据信号量,消费者申请
pthread_mutex_t _p_mutex; // 生产锁
pthread_mutex_t _c_mutex; // 消费锁
};
- 细节:
- 先申请信号量,还是先申请锁?结论是先申请信号量。
- 举一个看电影的例子,先申请信号量,再申请锁,就相当于先并发的把票卖出去,然后再让有票的人一个一个进来;先申请锁,再申请信号量,就相当于先让人一个一个的进来,然后再买票,这时票只能一张一张卖了,这样无疑是比较慢的。
3. 测试程序
#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <time.h>
void *Consumer(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args); // 生产者和消费者同时看到了一份公共资源
while(true)
{
sleep(1);
Task t;
rq->Pop(&t);
t.Run();
std::cout << "consumer data: " << t.PrintResult() << std::endl;
}
return nullptr;
}
void *Productor(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while(true)
{
// 1. 准备数据
int data1 = rand() % 10;
usleep(rand() % 123);
int data2 = rand() % 10;
usleep(rand() % 123);
char oper = opers[rand() % opers.size()];
Task t(data1, data2, oper);
// 2. 进行生产
rq->Push(t);
// for debug
std::cout << "productor data: " << t.PrintTask() << std::endl;
}
return nullptr;
}
int main()
{
// 种下随机数种子
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());
pthread_t c[3], p[2];
RingQueue<Task> *bq = new RingQueue<Task>();
pthread_create(&p[0], nullptr, Productor, (void*)bq);
pthread_create(&p[1], nullptr, Productor, (void*)bq);
pthread_create(&c[0], nullptr, Consumer, (void*)bq);
pthread_create(&c[1], nullptr, Consumer, (void*)bq);
pthread_create(&c[2], nullptr, Consumer, (void*)bq);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(c[2], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
return 0;
}