moduo之线程池ThreadPool
简介
moduo实现了通用业务线程池,其任务使用无参的函数对象,其任务队列使用的是有界的队列
结构
线程池
线程池属性
有线程池名name_
,任务队列的最大数maxQueueSize_
,线程池的运行状态running_
,工作线程启动时的初始化函数threadInitCallback_
void setMaxQueueSize(int maxSize)
{
maxQueueSize_ = maxSize;
}
void setThreadInitCallback(const Task& cb)
{
threadInitCallback_ = cb;
}
ThreadPool::ThreadPool(const string& nameArg)
: mutex_(),
notEmpty_(mutex_),
notFull_(mutex_),
name_(nameArg),
maxQueueSize_(0),
running_(false)
{
}
const string& name() const
{
return name_;
}
线程池的开启和结束
开启是start
,结束是stop
,向任务队列中提交任务是run
start
设置线程池的运行状态running_
为true,调整线程池的容量,并且创建指定numThreads
个线程,工作线程的执行函数为runInThread
,工作线程先执行线程初始化函数,然后从任务队列中取出任务执行
void ThreadPool::start(int numThreads)
{
running_ = true;
threads_.reserve(numThreads);
for (int i = 0; i < numThreads; ++i)
{
char id[32];
snprintf(id, sizeof id, "%d", i+1);
threads_.emplace_back(new muduo::Thread(
std::bind(&ThreadPool::runInThread, this), name_+id));
threads_[i]->start();
}
if (numThreads == 0 && threadInitCallback_)
{
threadInitCallback_();
}
}
void ThreadPool::runInThread()
{
try
{
if (threadInitCallback_)
{
threadInitCallback_();
}
while (running_)
{
Task task(take());
if (task)
{
task();
}
}
}
catch (const Exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort();
}
catch (const std::exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
throw; // rethrow
}
}
ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
while (queue_.empty() && running_)
{
notEmpty_.wait();
}
Task task;
if (!queue_.empty())
{
task = queue_.front();
queue_.pop_front();
if (maxQueueSize_ > 0)
{
notFull_.notify();
}
}
return task;
}
stop
设置线程池运行状态为false,同时唤醒队列的两个条件变量,等待所有工作线程执行完成
void ThreadPool::stop()
{
{
MutexLockGuard lock(mutex_);
running_ = false;
notEmpty_.notifyAll();
notFull_.notifyAll();
}
for (auto& thr : threads_)
{
thr->join();
}
}
run
提交任务到工作队列中,如果队列满则等待直到队列有空闲空间,将任务放入工作队列后,唤醒工作线程
void ThreadPool::run(Task task)
{
if (threads_.empty())
{
task();
}
else
{
MutexLockGuard lock(mutex_);
while (isFull() && running_)
{
notFull_.wait();
}
if (!running_) return;
assert(!isFull());
queue_.push_back(std::move(task));
notEmpty_.notify();
}
}
线程
线程初始化
设置状态变量,线程执行函数,线程默认名,默认是Thread
+numCreated_
Thread::Thread(ThreadFunc func, const string& n)
: started_(false),
joined_(false),
pthreadId_(0),
tid_(0),
func_(std::move(func)),
name_(n),
latch_(1)
{
setDefaultName();
}
void Thread::setDefaultName()
{
int num = numCreated_.incrementAndGet();
if (name_.empty())
{
char buf[32];
snprintf(buf, sizeof buf, "Thread%d", num);
name_ = buf;
}
}
线程启动
ThreadData
作为线程数据传到创建线程时的参数中,主要包含线程执行函数,线程名,线程id以及创建线程与线程执行同步变量
启动线程的函数为startThread
,其会调用ThreadData
的runInThread
start
启动线程,修改线程状态started_
为true,通过latch_
等待线程启动完成
void Thread::start()
{
assert(!started_);
started_ = true;
// FIXME: move(func_)
detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);
if (pthread_create(&pthreadId_, NULL, &detail::startThread, data))
{
started_ = false;
delete data; // or no delete?
LOG_SYSFATAL << "Failed in pthread_create";
}
else
{
latch_.wait();
assert(tid_ > 0);
}
}
void* startThread(void* obj)
{
ThreadData* data = static_cast<ThreadData*>(obj);
data->runInThread();
delete data;
return NULL;
}
struct ThreadData
{
typedef muduo::Thread::ThreadFunc ThreadFunc;
ThreadFunc func_;
string name_;
pid_t* tid_;
CountDownLatch* latch_;
ThreadData(ThreadFunc func,
const string& name,
pid_t* tid,
CountDownLatch* latch)
: func_(std::move(func)),
name_(name),
tid_(tid),
latch_(latch)
{ }
void runInThread()
{
*tid_ = muduo::CurrentThread::tid();
tid_ = NULL;
latch_->countDown();
latch_ = NULL;
muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str();
::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName);
try
{
func_();
muduo::CurrentThread::t_threadName = "finished";
}
catch (const Exception& ex)
{
muduo::CurrentThread::t_threadName = "crashed";
fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort();
}
catch (const std::exception& ex)
{
muduo::CurrentThread::t_threadName = "crashed";
fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
muduo::CurrentThread::t_threadName = "crashed";
fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str());
throw; // rethrow
}
}
};
线程结束
在线程启动后,没有调用join
默认是分离的线程
join
等待线程执行结束
Thread::~Thread()
{
if (started_ && !joined_)
{
pthread_detach(pthreadId_);
}
}
int Thread::join()
{
assert(started_);
assert(!joined_);
joined_ = true;
return pthread_join(pthreadId_, NULL);
}
线程属性
线程运行状态started_
,线程idtid_
,线程名name_
bool started() const
{
return started_;
}
pid_t tid() const
{
return tid_;
}
const string& name() const
{
return name_;
}