Linux:线程池和单例模式
一、普通线程池
1.1 线程池概念
线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价(用空间换时间的一种策略)。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 *
线程池的应用场景:
* 1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
* 2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
* 3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限, 出现错误.
* 线程池示例(也是一个生产消费模型):
* 1. 创建固定数量线程池,循环从任务队列中获取任务对象,
* 2. 获取到任务对象后,执行任务对象中的任务接口
1.2 线程池的实现
线程池:
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
struct ThreadInfo
{
pthread_t tid;
std::string name;
};
static const int defalutnum = 5;
template <class T>
class ThreadPool
{
public:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void Wakeup()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
std::string GetThreadName(pthread_t tid)
{
for (const auto &ti : threads_)
{
if (ti.tid == tid)
return ti.name;
}
return "None";
}
public:
static void *HandlerTask(void *args)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
std::string name = tp->GetThreadName(pthread_self());
while (true)
{
tp->Lock();
while (tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->Pop();
tp->Unlock();
t();
std::cout << name << " run, "
<< "result: " << t.GetResult() << std::endl;
}
}
void Start()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
threads_[i].name = "thread-" + std::to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
}
}
T Pop()
{
T t = tasks_.front();
tasks_.pop();
return t;
}
void Push(const T &t)
{
Lock();
tasks_.push(t);
Wakeup();
Unlock();
}
private:
ThreadPool(int num = defalutnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
std::vector<ThreadInfo> threads_;
std::queue<T> tasks_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
};
如果我们将线程的方法写在类内,我们的phread方法没法传参数!!因为类成员函数默认会携带this指针!! 所以我们不想要这个this指针,就必须把这个成员函数变成静态的!!
但是还不够!!因为如果我们把他定义成静态成员函数,那么他是无法使用类内的非静态成员函数的!!因此我们如果要解决这个问题,我们就可以将this对象通过参数传递过去,这样的话我们就可以在静态函数内通过这个对象去调用类内的非静态成员方法了!!
既然我们这些函数都写在类内了,那么我们就可以将一些函数封装一下,从而增加代码的可读性!
主函数:
#include <iostream>
#include <ctime>
#include "ThreadPool.hpp"
#include "Task.hpp"
pthread_spinlock_t slock;
int main()
{
// pthread_spin_init(&slock, 0);
// pthread_spin_destroy(&slock);
// 如果获取单例对象的时候,也是多线程获取的呢?
std::cout << "process runn..." << std::endl;
sleep(3);
ThreadPool<Task> *tp = new ThreadPool<Task>(5);
srand(time(nullptr) ^ getpid());
while(true)
{
//1. 构建任务
int x = rand() % 10 + 1;
usleep(10);
int y = rand() % 5;
char op = opers[rand()%opers.size()];
Task t(x, y, op);
tp->Push(t);
//2. 交给线程池处理
std::cout << "main thread make task: " << t.GetTask() << std::endl;
sleep(1);
}
}
任务函数:
#pragma once
#include <iostream>
#include <string>
std::string opers="+-*/%";
enum{
DivZero=1,
ModZero,
Unknown
};
class Task
{
public:
Task()
{}
Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
{
}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = DivZero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = ModZero;
else result_ = data1_ % data2_;
} break;
default:
exitcode_ = Unknown;
break;
}
}
void operator ()()
{
run();
}
std::string GetResult()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
std::string GetTask()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=?";
return r;
}
~Task()
{
}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
二、c++模式封装线程库
C++其实是提供了线程库的,但是他的底层也是用的原生线程库做封装,所以也必须指定链接。
#pragma once
#include <iostream>
#include <string>
#include <ctime>
#include <pthread.h>
typedef void (*callback_t)();//函数指针
static int num = 1;
class Thread
{
public:
static void *Routine(void *args)
{
Thread* thread = static_cast<Thread*>(args);
thread->Entery();
return nullptr;
}
public:
Thread(callback_t cb):tid_(0), name_(""), start_timestamp_(0), isrunning_(false),cb_(cb)
{}
void Run()
{
name_ = "thread-" + std::to_string(num++);
start_timestamp_ = time(nullptr);
isrunning_ = true;
pthread_create(&tid_, nullptr, Routine, this);
}
void Join()
{
pthread_join(tid_, nullptr);
isrunning_ = false;
}
std::string Name()
{
return name_;
}
uint64_t StartTimestamp()
{
return start_timestamp_;
}
bool IsRunning()
{
return isrunning_;
}
void Entery()
{
cb_();
}
~Thread()
{}
private:
pthread_t tid_;
std::string name_;
uint64_t start_timestamp_;//启动的时间戳
bool isrunning_;//是否启动了
callback_t cb_;//线程方法!
};
问题:如果我们的线程方法想带参数怎么办??
——>改一下构造函数,多增加一个类对象,将参数传递给这个类对象,然后再在方法里面将这个类对象传递给线程函数
主函数:
#include <iostream>
#include <unistd.h>
#include <vector>
#include "Thread.hpp"
using namespace std;
void Print()
{
while(true)
{
printf("haha, 我是一个封装的线程...\n");
sleep(1);
}
}
int main()
{
std::vector<Thread> threads;
for(int i = 0 ;i < 10; i++)
{
threads.push_back(Thread(Print));
}
for(auto &t : threads)
{
t.Run();
}
for(auto &t : threads)
{
t.Join();
}
// Thread t(Print);
// t.Run();
// cout << "是否启动成功: " << t.IsRunning() << endl;
// cout << "启动成功时间戳: " << t.StartTimestamp() << endl;
// cout << "线程的名字: " << t.Name() << endl;
// t.Join();
return 0;
}
三、线程安全的单例模式
3.1 单例模式和设计模式
单例模式是一种 "经典的, 常用的, 常考的" 设计模式.
IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是设计模式!
单例模式的特点 :
某些类, 只应该具有一个对象(实例), 就称之为单例.
例如一个男人只能有一个媳妇.
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据.
3.2 饿汉模式和懒汉模式
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.
懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度
例子1:我们申请内存的时候,首先是在地址空间上申请,而当我们真正要使用的时候,才会发生缺页中断从而建立虚拟地址和物理地址的映射关系!!
例子2:我们打开文件的时候,文件的属性必然会先被加载起来,但是文件的内容则是我们需要使用的时候才会加载进来!!
饿汉方式实现单例模式
template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};
只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例.
懒汉方式实现单例模式
template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};
存在一个严重的问题, 线程不安全.
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例.
——>所以我们必须加锁!! 可是我们我们会发现其实也就第一次调用的时候可能会出现这种情况,但是后续再次调用, 基本上都是直接返回,所以加锁就没有什么意义了,还大大降低效率!!——>解决方法,外面再加一层判断!!
// 懒汉模式, 线程安全
template <typename T>
class Singleton {
volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
static std::mutex lock;
public:
static T* GetInstance() {
if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能.
lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
if (inst == NULL) {
inst = new T();
}
lock.unlock();
}
return inst;
}
};
注意事项:
1. 加锁解锁的位置
2. 双重 if 判定, 避免不必要的锁竞争
3. volatile关键字防止过度优化
3.3 懒汉模式的线程池修改
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
struct ThreadInfo
{
pthread_t tid;
std::string name;
};
static const int defalutnum = 5;
template <class T>
class ThreadPool
{
public:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void Wakeup()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
std::string GetThreadName(pthread_t tid)
{
for (const auto &ti : threads_)
{
if (ti.tid == tid)
return ti.name;
}
return "None";
}
public:
static void *HandlerTask(void *args)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
std::string name = tp->GetThreadName(pthread_self());
while (true)
{
tp->Lock();
while (tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->Pop();
tp->Unlock();
t();
std::cout << name << " run, "
<< "result: " << t.GetResult() << std::endl;
}
}
void Start()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
threads_[i].name = "thread-" + std::to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
}
}
T Pop()
{
T t = tasks_.front();
tasks_.pop();
return t;
}
void Push(const T &t)
{
Lock();
tasks_.push(t);
Wakeup();
Unlock();
}
static ThreadPool<T> *GetInstance()
{
if (nullptr == tp_) // ???
{
pthread_mutex_lock(&lock_);
if (nullptr == tp_)
{
std::cout << "log: singleton create done first!" << std::endl;
tp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&lock_);
}
return tp_;
}
private:
ThreadPool(int num = defalutnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
ThreadPool(const ThreadPool<T> &) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // a=b=c
private:
std::vector<ThreadInfo> threads_;
std::queue<T> tasks_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
static ThreadPool<T> *tp_;
static pthread_mutex_t lock_;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp_ = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
四、STL、智能指针和线程安全
STL中的容器是否是线程安全的?
——> 不是.
原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.
而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).
因此 STL默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.
智能指针是否是线程安全的?
对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数.
五、其他各种锁
悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行 锁等),当其他线程想要访问数据时,被阻塞挂起。(我们使用的一般都是这个)
乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前, 会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
自旋锁,公平锁,非公平锁?
自旋锁的介绍:
讲个故事,张三发现明天要考试了,非常慌,于是打电话找到了李四,让李四帮他复习一下,李四说我目前还在看书,还得等我一个小时,于是这个时候张三就先去学校旁边的网吧打了一个小时的游戏,等打完回来正好李四下来了 于是一起去复习了 最后考了60分。
第二次又考试了,这次张三还是一样打电话给李四,这个时候李四说他上个厕所就下来了,这个时候你想的是他一会就下来了!那我还是在这等等吧,就不去网吧了。
所以这个时候张三决策的依据就是李四究竟要让他等多久
——> 当前其他申请不到锁的进程是应该阻塞还是应该重复申请,取决于其执行临界区代码的时长
所以我们以前学的锁叫做挂起等待锁,而需要不断申请直到获得的锁叫做自旋锁!!
实现方式:trylock加锁就是如果当前没有加锁成功,就直接返回! 所以我们只要在外围封装一个while循环,那么该线程就会一直申请锁直到申请成功!!
但其实pthread库给我们实现了一个自旋锁!!
第一个就是相当于帮我们封装了这个while循环,他会一直申请直到申请到锁。
第二个就跟前面学的差不多,只要申请失败了就会返回!!
六、读者写者问题、
6.1 引入
读者写者问题本身也是生产消费者模型 遵循321原则,但是其中最大的一个差别就是 读和读是共享关系!!因为读的过程并不会影响到数据!!(比如我们学校的黑板报或者博客)
线程库为我们提供了读写锁
一般来说都是读的多写的少,所以读的竞争能力比写的竞争能力大很多,所以可能会导致写较大概率的饥饿问题!!(中性现象)
有两种策略:读者优先和写者优先 优先就是当两者一块来的时候,让其中一方先进去
线程库的读写锁默认是读者优先!!
6.2 读写锁接口
设置读写优先
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/* pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和 PTHREAD_RWLOCK_PREFER_READER_NP 一致 PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁 */
初始化
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t *restrict attr);
销毁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
加锁和解锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
6.3 样例代码
#include <vector>
#include <sstream>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
volatile int ticket = 1000;
pthread_rwlock_t rwlock;
void * reader(void * arg)
{
char *id = (char *)arg;
while (1) {
pthread_rwlock_rdlock(&rwlock);
if (ticket <= 0) {
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
void * writer(void * arg)
{
char *id = (char *)arg;
while (1) {
pthread_rwlock_wrlock(&rwlock);
if (ticket <= 0) {
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, --ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
struct ThreadAttr
{
pthread_t tid;
std::string id;
};
std::string create_reader_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread reader ", std::ios_base::ate);
oss << i;
return oss.str();
}
std::string create_writer_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread writer ", std::ios_base::ate);
oss << i;
return oss.str();
}
void init_readers(std::vector<ThreadAttr>& vec)
{
for (std::size_t i = 0; i < vec.size(); ++i) {
vec[i].id = create_reader_id(i);
pthread_create(&vec[i].tid, nullptr, reader, (void *)vec[i].id.c_str());
}
}
void init_writers(std::vector<ThreadAttr>& vec)
{
for (std::size_t i = 0; i < vec.size(); ++i) {
vec[i].id = create_writer_id(i);
pthread_create(&vec[i].tid, nullptr, writer, (void *)vec[i].id.c_str());
}
}
void join_threads(std::vector<ThreadAttr> const& vec)
{
// 我们按创建的 逆序 来进行线程的回收
for (std::vector<ThreadAttr>::const_reverse_iterator it = vec.rbegin(); it !=
vec.rend(); ++it) {
pthread_t const& tid = it->tid;
pthread_join(tid, nullptr);
}
}
void init_rwlock()
{
#if 0 // 写优先
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
pthread_rwlock_init(&rwlock, &attr);
pthread_rwlockattr_destroy(&attr);
#else // 读优先,会造成写饥饿
pthread_rwlock_init(&rwlock, nullptr);
#endif
}
int main()
{
// 测试效果不明显的情况下,可以加大 reader_nr
// 但也不能太大,超过一定阈值后系统就调度不了主线程了
const std::size_t reader_nr = 1000;
const std::size_t writer_nr = 2;
std::vector<ThreadAttr> readers(reader_nr);
std::vector<ThreadAttr> writers(writer_nr);
init_rwlock();
init_readers(readers);
init_writers(writers);
join_threads(writers);
join_threads(readers);
pthread_rwlock_destroy(&rwlock);
}
makefile
main: main.cpp g++ -std=c++11 -Wall -Werror $^ -o $@ -lpthread