25.单例模式实现线程池
一、线程池的概念
1.1 线程池的介绍
线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
1.2 线程池的应用场景
• 当需要大量的线程来完成任务,且完成任务的时间比较短。 比如WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。但对于长时间的任务,比如⼀个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
• 对性能要求苛刻的应用。 比如要求服务器迅速响应客户请求。
• 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。 突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数码最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。
1.3 线程池的种类
• 创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务接口。
• 浮动线程池,其他同上。
此处,我们选择固定线程个数的线程池。
二、线程池的简单实现
//ThreadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <memory>
#include <functional>
#include <unistd.h>
#include "log.hpp" // 引入日志
#include "thread.hpp" // 引入线程
#include "mutex.hpp" // 引入锁
#include "cond.hpp" // 引入条件变量
namespace ThreadPoolModual
{
using namespace ThreadModual;
using namespace CondModul;
using namespace MutexModel;
using namespace LogMudual;
const static int gdefaultthreadnum = 10;
template <typename T>
class ThreadPool
{
private:
// 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用!
void HandlerTask()
{
LOG(LogLevel::INFO) << "entering HandlerTask modual ...";
while (true)
{
LockGuard lock(_mutex);
T t;
{
// 1. 保证队列安全
// 2. 队列中不一定有数据
while (_task_queue.empty() && _isrunning)
{
_waitnum++;
_cond.Wait(_mutex);
_waitnum--;
}
// 2.1 如果线程池已经退出了 && 任务队列是空的
if (_task_queue.empty() && !_isrunning)
break;
// 2.2 如果线程池不退出 && 任务队列不是空的
// 2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后退出
// 3. ⼀定有任务, 处理任务
t = _task_queue.front();
_task_queue.pop();
}
// 4. 处理任务,这个任务属于线程独占的任务
t();
}
}
public:
ThreadPool(int threadnum = gdefaultthreadnum) : _threadnum(threadnum),
_waitnum(0),
_isrunning(false)
{
// 创建线程池
for (int num = 0; num < _threadnum; num++)
{
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this));
LOG(LogLevel::INFO) << "init thread " << _threads.back().Name() << " done";
}
}
void Start()
{
if (_isrunning)
return;
_isrunning = true;
for (auto &thread : _threads)
{
thread.Start();
LOG(LogLevel::INFO) << "start thread " << thread.Name() << "done";
}
}
void Stop()
{
LockGuard lock(_mutex);
if (_isrunning)
{
_isrunning = false;
if (_waitnum > 0)
_cond.NotifyAll();
}
LOG(LogLevel::DEBUG) << "threadpool is exitting...";
}
void Wait()
{
for (auto &thread : _threads)
{
thread.Join();
LOG(LogLevel::INFO) << "Recycling " << thread.Name() << " done";
}
}
void Enqueue(const T &t)
{
LockGuard lock(_mutex);
_task_queue.push(t);
if (_waitnum > 0)
_cond.Notify();
}
~ThreadPool()
{
}
private:
int _threadnum;
std::vector<Thread> _threads;
std::queue<T> _task_queue;
Mutex _mutex;
Cond _cond;
int _waitnum;
bool _isrunning;
};
}
//ThreadPool.cc
#include "ThreadPool.hpp"
using namespace ThreadPoolModual;
using task_t = function<void()>;
void func()
{
sleep(1);
}
int main()
{
ENABLE_CONSOLE_LOG_STRATEGY();
std::unique_ptr<ThreadPool<task_t>> tp = std::make_unique<ThreadPool<task_t>>();
tp->Start();
int cnt = 10;
char c;
while (cnt)
{
tp->Enqueue(func);
cnt--;
sleep(1);
}
tp->Stop();
tp->Wait();
return 0;
}
三、单例模式
对于某些类(如上面所写的线程池)只应该具有一个对象(实例)就称之为单例。
单例模式的实现方式有两种:懒汉方式实现和饿汉方式实现。
3.1 饿汉方式实现单例
吃完饭,立刻洗碗, 这种就是饿汉方式。 因为下一顿吃的时候可以拿着碗就能吃饭。即在程序运行起来之后就加载到内存中。
template <typename T>
class Singleton
{
static T data;
public:
static T* GetInstance()
{
return &data;
}
};
3.2 懒汉方式实现单例
吃完饭,先把碗放下,然后下一顿饭用到这个碗了再洗就是懒汉方式。即延迟实现。
template <typename T>
class Singleton
{
static T* inst;
public:
static T* GetInstance()
{
if (inst == NULL)
inst = new T();
return inst;
}
};
但是这存在着线程不安全的问题。第一次调用GetInstance的时候,如果两个线程同时调用,可能会创建出两份T对象的实例,但是后续再次调用就没有问题了。
// 懒汉模式, 线程安全
template <typename T>
class Singleton
{
volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
static std::mutex lock;
public:
static T* GetInstance()
{
if (inst == NULL) // 双重判定空指针, 降低锁冲突的概率, 提高性能.
{
lock.lock(); // 使⽤互斥锁来保证多线程情况下也只调⽤⼀次 new .
if (inst == NULL)
inst = new T();
lock.unlock();
}
return inst;
}
};
四、线程池的单例模式
//ThreadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <memory>
#include <functional>
#include <unistd.h>
#include "log.hpp" // 引入日志
#include "thread.hpp" // 引入线程
#include "mutex.hpp" // 引入锁
#include "cond.hpp" // 引入条件变量
namespace ThreadPoolModual
{
using namespace ThreadModual;
using namespace CondModul;
using namespace MutexModel;
using namespace LogMudual;
const static int gdefaultthreadnum = 10;
template <typename T>
class ThreadPool
{
private:
// 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用!
void HandlerTask()
{
LOG(LogLevel::INFO) << "entering HandlerTask modual ...";
while (true)
{
LockGuard lock(_mutex);
T t;
{
// 1. 保证队列安全
// 2. 队列中不一定有数据
while (_task_queue.empty() && _isrunning)
{
_waitnum++;
_cond.Wait(_mutex);
_waitnum--;
}
// 2.1 如果线程池已经退出了 && 任务队列是空的
if (_task_queue.empty() && !_isrunning)
break;
// 2.2 如果线程池不退出 && 任务队列不是空的
// 2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后退出
// 3. ⼀定有任务, 处理任务
t = _task_queue.front();
_task_queue.pop();
}
// 4. 处理任务,这个任务属于线程独占的任务
t();
}
}
//私有实现
ThreadPool(int threadnum = gdefaultthreadnum) : _threadnum(threadnum),
_waitnum(0),
_isrunning(false)
{
// 创建线程池
for (int num = 0; num < _threadnum; num++)
{
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this));
LOG(LogLevel::INFO) << "init thread " << _threads.back().Name() << " done";
}
}
// 复制拷贝禁用赋值
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
ThreadPool(const ThreadPool<T> &) = delete;
public:
static ThreadPool<T> *GetInstance()
{
if (_instance == nullptr)
{
LockGuard lock(_lock);
if (_instance == nullptr)
{
LOG(LogLevel::INFO) << "单例首次启动";
_instance = new ThreadPool<T>();
}
}
return _instance;
}
void Start()
{
if (_isrunning)
return;
_isrunning = true;
for (auto &thread : _threads)
{
thread.Start();
LOG(LogLevel::INFO) << "start thread " << thread.Name() << "done";
}
}
void Stop()
{
LockGuard lock(_mutex);
if (_isrunning)
{
_isrunning = false;
if (_waitnum > 0)
_cond.NotifyAll();
}
LOG(LogLevel::DEBUG) << "threadpool is exitting...";
}
void Wait()
{
for (auto &thread : _threads)
{
thread.Join();
LOG(LogLevel::INFO) << "Recycling " << thread.Name() << " done";
}
}
void Enqueue(const T &t)
{
LockGuard lock(_mutex);
_task_queue.push(t);
if (_waitnum > 0)
_cond.Notify();
}
~ThreadPool()
{
}
private:
int _threadnum;
std::vector<Thread> _threads;
std::queue<T> _task_queue;
Mutex _mutex;
Cond _cond;
int _waitnum;
bool _isrunning;
// 添加单例模式
static ThreadPool<T> *_instance;
static Mutex _lock;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;
template <typename T>
Mutex ThreadPool<T>::_lock;
}
//ThreadPool.cc
#include "ThreadPool.hpp"
using namespace ThreadPoolModual;
using task_t = function<void()>;
void func()
{
sleep(1);
}
int main()
{
ENABLE_CONSOLE_LOG_STRATEGY();
int cnt = 10;
while (cnt)
{
ThreadPool<task_t>::GetInstance()->Enqueue(func);
cnt--;
}
ThreadPool<task_t>::GetInstance()->Stop();
ThreadPool<task_t>::GetInstance()->Wait();
return 0;
}