【Linux】线程池
线程池:
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景
- 需要大量线程完成任务,且完成的时间比较短。
- 对性能要求苛刻的应用。
- 接收突发性的大量请求,但不至于使服务器因此产生大量线程的应用。
组成
线程池的本质是生产消费模型。既然是生产消费模型,也就需要一个队列来存放任务。
这个队列在多线程环境下属于公共资源,访问公共资源就需要加锁。所以一个线程池的组成就是:线程、任务队列、锁。
代码
LockGuard.hpp
这个是管理锁的类。
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *lock_p = nullptr): lock_p_(lock_p)
{}
void lock()
{
if(lock_p_) pthread_mutex_lock(lock_p_);
}
void unlock()
{
if(lock_p_) pthread_mutex_unlock(lock_p_);
}
~Mutex()
{}
private:
pthread_mutex_t *lock_p_;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *mutex): mutex_(mutex)
{
mutex_.lock();
}
~LockGuard()
{
mutex_.unlock();
}
private:
Mutex mutex_;
};
Thread.hpp
这个是封装的线程类
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <functional>
#include <pthread.h>
namespace ThreadNs
{
typedef std::function<void *(void *)> func_t;
const int num = 1024;
class Thread
{
private:
// 在类内创建线程,想让线程执行对应的方法,需要将方法设置成为static
static void *start_routine(void *args) // 类内成员,有缺省参数
{
Thread *_this = static_cast<Thread *>(args);
return _this->callback();
}
public:
Thread()
{
char namebuffer[num];
snprintf(namebuffer, sizeof namebuffer, "thread-%d", threadnum++);
name_ = namebuffer;
}
void start(func_t func, void *args = nullptr)
{
func_ = func;
args_ = args;
int n = pthread_create(&tid_, nullptr, start_routine, this);
assert(n == 0);
(void)n;
}
void join()
{
int n = pthread_join(tid_, nullptr);
assert(n == 0);
(void)n;
}
std::string threadname()
{
return name_;
}
~Thread()
{
}
void *callback() { return func_(args_);}
private:
std::string name_;
func_t func_;
void *args_;
pthread_t tid_;
static int threadnum;
};
int Thread::threadnum = 1;
}
ThreadPool.hpp
线程池类
#pragma once
#include "Thread.hpp"
#include "LockGuard.hpp"
#include <vector>
#include <queue>
#include <mutex>
#include <pthread.h>
#include <unistd.h>
using namespace ThreadNs;
const int gnum = 10;
template <class T>
class ThreadPool;
template <class T>
class ThreadData
{
public:
ThreadPool<T> *threadpool;
std::string name;
public:
ThreadData(ThreadPool<T> *tp, const std::string &n) : threadpool(tp), name(n)
{
}
};
template <class T>
class ThreadPool
{
private:
static void *handlerTask(void *args)
{
ThreadData<T> *td = (ThreadData<T> *)args;
while (true)
{
T t;
{
LockGuard lockguard(td->threadpool->mutex());
while (td->threadpool->isQueueEmpty())
{
td->threadpool->threadWait();
}
t = td->threadpool->pop();
}
std::cout << td->name << " 获一取了个任务: " << t.toTaskString() << " 并处理完成,结果是:" << t() << std::endl;
}
delete td;
return nullptr;
}
public:
void lockQueue() { pthread_mutex_lock(&_mutex); }
void unlockQueue() { pthread_mutex_unlock(&_mutex); }
bool isQueueEmpty() { return _task_queue.empty(); }
void threadWait() { pthread_cond_wait(&_cond, &_mutex); }
T pop()
{
T t = _task_queue.front();
_task_queue.pop();
return t;
}
pthread_mutex_t *mutex()
{
return &_mutex;
}
public:
ThreadPool(const int &num = gnum) : _num(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
for (int i = 0; i < _num; i++)
{
_threads.push_back(new Thread());
}
}
void run()
{
for (const auto &t : _threads)
{
ThreadData<T> *td = new ThreadData<T>(this, t->threadname());
t->start(handlerTask, td);
std::cout << t->threadname() << " start ..." << std::endl;
}
}
void push(const T &in)
{
LockGuard lockguard(&_mutex);
_task_queue.push(in);
pthread_cond_signal(&_cond);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
for (const auto &t : _threads)
delete t;
}
private:
int _num; //线程的数量
std::vector<Thread *> _threads;
std::queue<T> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
Task.hpp
任务类
#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <functional>
class Task
{
using func_t = std::function<int(int,int,char)>;
public:
Task()
{}
Task(int x, int y, char op, func_t func)
:_x(x), _y(y), _op(op), _callback(func)
{}
std::string operator()()
{
int result = _callback(_x, _y, _op);
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
return buffer;
}
std::string toTaskString()
{
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
return buffer;
}
private:
int _x;
int _y;
char _op;
func_t _callback;
};
int mymath(int x, int y, char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
std::cerr << "div zero error!" << std::endl;
result = -1;
}
else
result = x / y;
}
break;
case '%':
{
if (y == 0)
{
std::cerr << "mod zero error!" << std::endl;
result = -1;
}
else
result = x % y;
}
break;
default:
break;
}
return result;
}
main.cc
测试结果
单例模式版线程池
单例模式
某些类, 只应该具有一个对象(实例), 就称之为单例。单例模式有俩种实现方式:饿汉和懒汉
饿汉方式实现单例模式
template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};
懒汉方式实现单例模式
template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};
这里存在一个严重的问题:线程不安全。
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例.
懒汉方式实现单例模式(线程安全版本)
// 懒汉模式, 线程安全、
template <typename T>
class Singleton
{
volatile static T *inst; // 需要设置 volatile 关键字, 否则可能被编译器优化. static
std::mutex lock;
public:
static T *GetInstance()
{
if (inst == NULL)
{ // 双重判定空指针, 降低锁冲突的概率, 提高性能.
lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
if (inst == NULL)
{
inst = new T();
}
lock.unlock();
}
return inst;
}
};