Linux——信号量和(环形队列消费者模型)
Linux——线程条件变量(同步)-CSDN博客
文章目录
目录
文章目录
前言
一、信号量是什么?
二、信号量
1、主要类型
2、操作
3、应用场景
三、信号量函数
1、sem_init 函数
2、sem_wait 函数
3、sem_post 函数
4、sem_destroy 函数
5、sem_getvalue 函数
6、sem_trywait 函数
7、sem_timedwait 函数
四、环形队列
1、定义与原理
2、操作
五、线程池
基本原理
主要功能
实现方式
六、基于环形队列的消费者模型
1、main函数
2、RingQueue.hpp
3、Task.hpp
编辑
前言
信号量(Semaphore)是一种用于多线程或多进程环境下实现同步和互斥的机制。
一、信号量是什么?
信号量本质上是一个计数器,用于控制对共享资源的访问。它的值表示当前可用的资源数量。当一个线程或进程想要访问某个共享资源时,它需要先检查信号量的值。如果信号量的值大于 0,则表示有可用资源,该线程或进程可以获取资源并将信号量的值减 1;如果信号量的值为 0,则表示没有可用资源,该线程或进程需要等待,直到其他线程或进程释放资源,使信号量的值大于 0。
二、信号量
1、主要类型
- 二进制信号量:也称为互斥信号量,它的值只能是 0 或 1。主要用于实现互斥访问,确保在任何时刻只有一个线程或进程能够访问共享资源,就像一个房间只有一把钥匙,谁拿到钥匙谁才能进入房间使用里面的资源,使用完后把钥匙放回,其他人才有机会拿到钥匙进入。
- 计数信号量:其值可以是任意非负整数,用于控制同时访问共享资源的线程或进程数量。比如有一个停车场有 10 个停车位,就可以用一个初始值为 10 的计数信号量来表示,每有一辆车进入停车场,信号量的值就减 1,当信号量的值为 0 时,表示停车场已满,后续车辆需要等待。
2、操作
- P 操作:也称为 wait 操作或 down 操作。当一个进程或线程执行 P 操作时,它会检查信号量的值。如果信号量的值大于 0,则将信号量的值减 1,然后进程或线程可以继续执行;如果信号量的值为 0,则进程或线程会被阻塞,放入等待队列,直到信号量的值大于 0。
- V 操作:也称为 signal 操作或 up 操作。当一个进程或线程执行 V 操作时,它会将信号量的值加 1。如果此时有其他进程或线程正在等待该信号量(即信号量的值为 0 且有进程在等待队列中),则系统会从等待队列中唤醒一个进程或线程,使其能够执行 P 操作并获取资源。
3、应用场景
- 资源管理:可以用于管理系统中的各种资源,如内存、文件、网络连接等。通过信号量可以确保资源的合理分配和使用,避免资源冲突和过度使用。
- 进程同步:在多个进程或线程协同工作的场景中,信号量可以用于实现进程之间的同步。例如,一个进程需要等待另一个进程完成某个任务后才能继续执行,就可以使用信号量来实现这种等待和唤醒机制。
- 生产者 - 消费者问题:是信号量应用的经典场景。生产者进程生产数据并将其放入缓冲区,消费者进程从缓冲区中取出数据进行消费。通过信号量可以控制生产者和消费者的行为,确保缓冲区不会被过度写入或读取。
三、信号量函数
1、sem_init 函数
- 功能:用于初始化一个信号量。
- 原型:
int sem_init(sem_t *sem, int pshared, unsigned int value);
- 参数:
sem
是指向要初始化的信号量的指针;pshared
指定信号量是否在进程间共享,0 表示仅在线程间共享,非 0 表示在进程间共享;value
是信号量的初始值。 - 返回值:成功时返回 0,失败时返回 - 1,并设置
errno
以指示错误原因。
2、sem_wait 函数
- 功能:对信号量执行 P 操作,即等待信号量变为可用。
- 原型:
int sem_wait(sem_t *sem);
- 参数:
sem
是指向要操作的信号量的指针。 - 返回值:成功时返回 0,若信号量的值为 0,则线程会阻塞直到信号量可用;失败时返回 - 1,并设置
errno
。
3、sem_post 函数
- 功能:对信号量执行 V 操作,释放信号量,使信号量的值加 1。
- 原型:
int sem_post(sem_t *sem);
- 参数:
sem
是指向要操作的信号量的指针。 - 返回值:成功时返回 0,失败时返回 - 1,并设置
errno
。
4、sem_destroy 函数
- 功能:销毁一个信号量,释放相关资源。
- 原型:
int sem_destroy(sem_t *sem);
- 参数:
sem
是指向要销毁的信号量的指针。 - 返回值:成功时返回 0,失败时返回 - 1,并设置
errno
。
5、sem_getvalue 函数
- 功能:获取信号量的当前值。
- 原型:
int sem_getvalue(sem_t *sem, int *sval);
- 参数:
sem
是指向要查询的信号量的指针;sval
是一个整数指针,用于存储信号量的当前值。 - 返回值:成功时返回 0,并将信号量的当前值存储在
sval
指向的位置;失败时返回 - 1,并设置errno
以指示错误原因。
6、sem_trywait 函数
- 功能:尝试对信号量执行 P 操作,但不会阻塞线程。如果信号量的值大于 0,则将信号量的值减 1 并立即返回;如果信号量的值为 0,则立即返回错误,而不会阻塞线程。
- 原型:
int sem_trywait(sem_t *sem);
- 参数:
sem
是指向要操作的信号量的指针。 - 返回值:成功时返回 0,此时表示成功获取信号量并将其值减 1;如果信号量的值为 0,无法获取信号量,则返回 - 1,并将
errno
设置为EAGAIN
。
7、sem_timedwait 函数
- 功能:对信号量执行 P 操作,但会设置一个超时时间。如果在超时时间内信号量变为可用,则获取信号量并返回;如果超时时间已过,信号量仍不可用,则返回错误。
- 原型:
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
- 参数:
sem
是指向要操作的信号量的指针;abs_timeout
是一个指向struct timespec
结构体的指针,用于指定绝对超时时间。 - 返回值:成功时返回 0,若在超时时间内未获取到信号量,则返回 - 1,并将
errno
设置为ETIMEDOUT
。
四、环形队列
1、定义与原理
- 环形队列是一种基于队列的数据结构,它将队列的首尾相连,形成一个环形的存储空间。与普通队列不同,环形队列可以更有效地利用存储空间,避免了普通队列在元素出队后出现的前端空闲空间无法利用的问题。
- 它通过使用两个指针,即队头指针(front)和队尾指针(rear)来管理队列中的元素。当队尾指针到达队列的末尾时,它会重新回到队列的开头,继续存储新元素,从而实现了循环利用空间的功能。
2、操作
- 初始化:创建一个指定大小的数组来存储队列元素,并将队头指针和队尾指针都初始化为 0,表示队列为空。
- 入队操作:当要将一个新元素插入到环形队列中时,首先检查队列是否已满。如果未满,将新元素存储在队尾指针所指向的位置,然后将队尾指针向后移动一位。如果队尾指针已经到达数组的末尾,则将其重新设置为数组的开头位置。
- 出队操作:从环形队列中删除元素时,首先检查队列是否为空。如果不为空,取出队头指针所指向的元素,然后将队头指针向后移动一位。同样,如果队头指针到达数组的末尾,也需要将其重新设置为数组的开头位置。
- 判断队列空满
- 一般采用牺牲一个存储空间的方法来区分队列空和满的情况,即当
(rear + 1) % maxSize == front
时,认为队列已满,其中maxSize
是队列的最大容量;当front == rear
时,认为队列是空的。 - 也可以使用一个计数器来记录队列中元素的个数,当计数器的值为 0 时表示队列为空,当计数器的值等于
maxSize
时表示队列已满。
- 一般采用牺牲一个存储空间的方法来区分队列空和满的情况,即当
五、线程池
线程池是一种多线程处理形式,它将多个线程预先创建并放入一个池中,以方便对线程进行管理和重复利用,从而提高系统性能和资源利用率。以下是关于线程池的详细介绍:
基本原理
- 线程创建与管理:线程池在初始化时会创建一定数量的线程,并将它们放入线程池中。这些线程在创建后不会立即执行具体任务,而是处于等待状态,等待接收任务并执行。
- 任务队列:线程池通常会维护一个任务队列,用于存储待执行的任务。当有新任务到来时,会将任务添加到任务队列中。线程池中的线程会不断从任务队列中获取任务,并执行相应的操作。
- 线程复用:线程执行完一个任务后,不会立即销毁,而是返回到线程池中,继续等待下一个任务。这样可以避免频繁地创建和销毁线程,减少了线程创建和销毁所带来的开销,提高了系统的性能和响应速度。
主要功能
- 提高资源利用率:通过复用线程,避免了因频繁创建和销毁线程而带来的资源浪费,尤其是在处理大量短时间任务时,能显著提高系统资源的利用率。
- 控制并发度:可以限制同时执行的线程数量,避免过多线程同时运行导致系统资源过度消耗,从而保证系统的稳定性和响应能力。
- 简化线程管理:将线程的创建、调度和管理等工作封装在一个线程池中,使得开发者无需直接管理大量的线程,降低了多线程编程的复杂性,提高了代码的可维护性和可读性。
实现方式
- 线程池的组成部分
- 线程集合:存储线程池中的所有线程,一般使用线程数组或线程列表来实现。
- 任务队列:用于存放待执行的任务,通常使用队列数据结构,如阻塞队列来实现。当任务队列满时,新任务可能会被阻塞或根据特定的策略进行处理。
- 线程池管理模块:负责线程池的初始化、线程的创建与销毁、任务的分配与调度等管理工作。它根据任务队列的状态和线程池的配置参数,决定是否需要创建新的线程或回收空闲线程。
- 工作流程
- 任务提交:用户将任务提交到线程池,任务会被放入任务队列中。
- 任务分配:线程池中的线程会不断从任务队列中获取任务。当线程获取到任务后,就开始执行任务。
- 线程管理:线程池管理模块会监控线程的状态,当线程执行完任务后,会将其重新放回线程池中,使其可以继续执行下一个任务。如果线程池中的线程数量超过了最大线程数,或者有空闲线程超过了一定的空闲时间,线程池管理模块会负责销毁这些线程,以释放资源。
六、基于环形队列的消费者模型
1、main函数
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "Task.hpp"
using namespace std;
struct ThreadData
{
RingQueue<Task> *rq;
std::string threadname;
};
void *Productor(void *args)
{
// sleep(3);
ThreadData *td = static_cast<ThreadData*>(args);
RingQueue<Task> *rq = td->rq;
std::string name = td->threadname;
int len = opers.size();
while (true)
{
// 1. 获取数据
int data1 = rand() % 10 + 1;
usleep(10);
int data2 = rand() % 10;
char op = opers[rand() % len];
Task t(data1, data2, op);
// 2. 生产数据
rq->Push(t);
cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;
sleep(1);
}
return nullptr;
}
void *Consumer(void *args)
{
ThreadData *td = static_cast<ThreadData*>(args);
RingQueue<Task> *rq = td->rq;
std::string name = td->threadname;
while (true)
{
// 1. 消费数据
Task t;
rq->Pop(&t);
// 2. 处理数据
t();
cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
// sleep(1);
}
return nullptr;
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>(50);
pthread_t c[5], p[3];
for (int i = 0; i < 1; i++)
{
ThreadData *td = new ThreadData();
td->rq = rq;
td->threadname = "Productor-" + std::to_string(i);
pthread_create(p + i, nullptr, Productor, td);
}
for (int i = 0; i < 1; i++)
{
ThreadData *td = new ThreadData();
td->rq = rq;
td->threadname = "Consumer-" + std::to_string(i);
pthread_create(c + i, nullptr, Consumer, td);
}
for (int i = 0; i < 1; i++)
{
pthread_join(p[i], nullptr);
}
for (int i = 0; i < 1; i++)
{
pthread_join(c[i], nullptr);
}
return 0;
}
2、RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
const static int defaultcap = 5;
template<class T>
class RingQueue{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap = defaultcap)
:ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
{
sem_init(&cdata_sem_, 0, 0);
sem_init(&pspace_sem_, 0, cap);
pthread_mutex_init(&c_mutex_, nullptr);
pthread_mutex_init(&p_mutex_, nullptr);
}
void Push(const T &in) // 生产
{
P(pspace_sem_);
Lock(p_mutex_); // ?
ringqueue_[p_step_] = in;
// 位置后移,维持环形特性
p_step_++;
p_step_ %= cap_;
Unlock(p_mutex_);
V(cdata_sem_);
}
void Pop(T *out) // 消费
{
P(cdata_sem_);
Lock(c_mutex_); // ?
*out = ringqueue_[c_step_];
// 位置后移,维持环形特性
c_step_++;
c_step_ %= cap_;
Unlock(c_mutex_);
V(pspace_sem_);
}
~RingQueue()
{
sem_destroy(&cdata_sem_);
sem_destroy(&pspace_sem_);
pthread_mutex_destroy(&c_mutex_);
pthread_mutex_destroy(&p_mutex_);
}
private:
std::vector<T> ringqueue_;
int cap_;
int c_step_; // 消费者下标
int p_step_; // 生产者下标
sem_t cdata_sem_; // 消费者关注的数据资源
sem_t pspace_sem_; // 生产者关注的空间资源
pthread_mutex_t c_mutex_;
pthread_mutex_t p_mutex_;
};
3、Task.hpp
#pragma once
#include <iostream>
#include <string>
std::string opers="+-*/%";
enum{
DivZero=1,
ModZero,
Unknown
};
class Task
{
public:
Task()
{}
Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
{
}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = DivZero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = ModZero;
else result_ = data1_ % data2_;
} break;
default:
exitcode_ = Unknown;
break;
}
}
void operator ()()
{
run();
}
std::string GetResult()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
std::string GetTask()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=?";
return r;
}
~Task()
{
}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};