当前位置: 首页 > article >正文

moduo之线程池ThreadPool

简介

moduo实现了通用业务线程池,其任务使用无参的函数对象,其任务队列使用的是有界的队列

结构

1
0..n
ThreadPool
- MutexLock mutex_
- Condition notEmpty_
- Condition notFull_
- deque<Task> queue_
- size_t maxQueueSize_
- string name_
- Task threadInitCallback_
- bool running_
- vector<ThreadPtr> threads_
-bool isFull()
-void runInThread()
-Task take()
+void setMaxQueueSize(int maxSize)
+void setThreadInitCallback(const Task& cb)
+const string& name()
+size_t queueSize()
+void start(int numThreads)
+void stop()
+void run(Task f)
Thread
- bool started_
- bool joined_
- pthread_t pthreadId_
- pid_t tid_
- ThreadFunc func_
- string name_
- CountDownLatch latch_
- static AtomicInt32 numCreated_
-void setDefaultName()
+void start()
+int join()
+bool started()
+pid_t tid()
+const string& name()
+static int numCreated()

线程池

线程池属性

有线程池名name_,任务队列的最大数maxQueueSize_,线程池的运行状态running_,工作线程启动时的初始化函数threadInitCallback_

void setMaxQueueSize(int maxSize) 
{ 
	maxQueueSize_ = maxSize; 
}

void setThreadInitCallback(const Task& cb)
{ 
	threadInitCallback_ = cb; 
}

ThreadPool::ThreadPool(const string& nameArg)
  : mutex_(),
    notEmpty_(mutex_),
    notFull_(mutex_),
    name_(nameArg),
    maxQueueSize_(0),
    running_(false)
{
}

const string& name() const
 { 
 	return name_; 
 }

线程池的开启和结束

开启是start,结束是stop,向任务队列中提交任务是run
start设置线程池的运行状态running_ 为true,调整线程池的容量,并且创建指定numThreads个线程,工作线程的执行函数为runInThread,工作线程先执行线程初始化函数,然后从任务队列中取出任务执行

void ThreadPool::start(int numThreads)
{
  running_ = true;
  threads_.reserve(numThreads);
  for (int i = 0; i < numThreads; ++i)
  {
    char id[32];
    snprintf(id, sizeof id, "%d", i+1);
    threads_.emplace_back(new muduo::Thread(
          std::bind(&ThreadPool::runInThread, this), name_+id));
    threads_[i]->start();
  }
  if (numThreads == 0 && threadInitCallback_)
  {
    threadInitCallback_();
  }
}

void ThreadPool::runInThread()
{
  try
  {
    if (threadInitCallback_)
    {
      threadInitCallback_();
    }
    while (running_)
    {
      Task task(take());
      if (task)
      {
        task();
      }
    }
  }
  catch (const Exception& ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
    abort();
  }
  catch (const std::exception& ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    abort();
  }
  catch (...)
  {
    fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
    throw; // rethrow
  }
}

ThreadPool::Task ThreadPool::take()
{
  MutexLockGuard lock(mutex_);
  // always use a while-loop, due to spurious wakeup
  while (queue_.empty() && running_)
  {
    notEmpty_.wait();
  }
  Task task;
  if (!queue_.empty())
  {
    task = queue_.front();
    queue_.pop_front();
    if (maxQueueSize_ > 0)
    {
      notFull_.notify();
    }
  }
  return task;
}

stop设置线程池运行状态为false,同时唤醒队列的两个条件变量,等待所有工作线程执行完成

void ThreadPool::stop()
{
  {
  MutexLockGuard lock(mutex_);
  running_ = false;
  notEmpty_.notifyAll();
  notFull_.notifyAll();
  }
  for (auto& thr : threads_)
  {
    thr->join();
  }
}

run提交任务到工作队列中,如果队列满则等待直到队列有空闲空间,将任务放入工作队列后,唤醒工作线程

void ThreadPool::run(Task task)
{
  if (threads_.empty())
  {
    task();
  }
  else
  {
    MutexLockGuard lock(mutex_);
    while (isFull() && running_)
    {
      notFull_.wait();
    }
    if (!running_) return;
    assert(!isFull());

    queue_.push_back(std::move(task));
    notEmpty_.notify();
  }
}

线程

线程初始化

设置状态变量,线程执行函数,线程默认名,默认是Thread+numCreated_

Thread::Thread(ThreadFunc func, const string& n)
  : started_(false),
    joined_(false),
    pthreadId_(0),
    tid_(0),
    func_(std::move(func)),
    name_(n),
    latch_(1)
{
  setDefaultName();
}

void Thread::setDefaultName()
{
  int num = numCreated_.incrementAndGet();
  if (name_.empty())
  {
    char buf[32];
    snprintf(buf, sizeof buf, "Thread%d", num);
    name_ = buf;
  }
}

线程启动

ThreadData作为线程数据传到创建线程时的参数中,主要包含线程执行函数,线程名,线程id以及创建线程与线程执行同步变量
启动线程的函数为startThread,其会调用ThreadDatarunInThread
start启动线程,修改线程状态started_为true,通过latch_等待线程启动完成

void Thread::start()
{
  assert(!started_);
  started_ = true;
  // FIXME: move(func_)
  detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);
  if (pthread_create(&pthreadId_, NULL, &detail::startThread, data))
  {
    started_ = false;
    delete data; // or no delete?
    LOG_SYSFATAL << "Failed in pthread_create";
  }
  else
  {
    latch_.wait();
    assert(tid_ > 0);
  }
}

void* startThread(void* obj)
{
  ThreadData* data = static_cast<ThreadData*>(obj);
  data->runInThread();
  delete data;
  return NULL;
}

struct ThreadData
{
  typedef muduo::Thread::ThreadFunc ThreadFunc;
  ThreadFunc func_;
  string name_;
  pid_t* tid_;
  CountDownLatch* latch_;

  ThreadData(ThreadFunc func,
             const string& name,
             pid_t* tid,
             CountDownLatch* latch)
    : func_(std::move(func)),
      name_(name),
      tid_(tid),
      latch_(latch)
  { }

  void runInThread()
  {
    *tid_ = muduo::CurrentThread::tid();
    tid_ = NULL;
    latch_->countDown();
    latch_ = NULL;

    muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str();
    ::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName);
    try
    {
      func_();
      muduo::CurrentThread::t_threadName = "finished";
    }
    catch (const Exception& ex)
    {
      muduo::CurrentThread::t_threadName = "crashed";
      fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
      fprintf(stderr, "reason: %s\n", ex.what());
      fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
      abort();
    }
    catch (const std::exception& ex)
    {
      muduo::CurrentThread::t_threadName = "crashed";
      fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
      fprintf(stderr, "reason: %s\n", ex.what());
      abort();
    }
    catch (...)
    {
      muduo::CurrentThread::t_threadName = "crashed";
      fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str());
      throw; // rethrow
    }
  }
};

线程结束

在线程启动后,没有调用join默认是分离的线程
join等待线程执行结束

Thread::~Thread()
{
  if (started_ && !joined_)
  {
    pthread_detach(pthreadId_);
  }
}

int Thread::join()
{
  assert(started_);
  assert(!joined_);
  joined_ = true;
  return pthread_join(pthreadId_, NULL);
}

线程属性

线程运行状态started_,线程idtid_,线程名name_

bool started() const 
{ 
	return started_; 
}

pid_t tid() const 
{ 
	return tid_; 
}

const string& name() const 
{ 
	return name_; 
}

http://www.kler.cn/a/408761.html

相关文章:

  • python Flask指定IP和端口
  • Vue进阶面试题(三)
  • mybatis学习(三)
  • php:使用Ratchet类实现分布式websocket服务
  • html+js实现图片的放大缩小等比缩放翻转,自动播放切换,顺逆时针旋转
  • opencv undefined reference to `cv::noarray()‘ 。window系统配置opencv,找到opencv库,但连接不了
  • JavaScript中的箭头函数以及编写优化
  • Java安卓导航栏设计开发(实战篇)——第十一期
  • mysql-分析并解决mvcc更新丢失问题
  • shell完结
  • git标签和分支
  • 如何在WPF中嵌入其它程序
  • 数据结构--链表实现栈和队列
  • 构建功能完备的Flask Web应用
  • Flink转换算子——flatMap/map/filter/keyby/reduce综合案例
  • meterpreter常用命令 上
  • Python爬虫:如何优雅地获取1688商品详情接口
  • 使用windows窗口展示go-echarts图表
  • Stable Diffusion中的自注意力替换技术与Diffusers实现
  • React中Ant Design组件日期编辑回显
  • 【FPGA开发】Vivado自定义封装IP核,绑定总线
  • ajax (一)
  • timm库加载的模型可视化
  • 【Python-办公自动化】实现自动化输出模板表格报告
  • MongoDB 中设置登录账号密码可以通过以下步骤实现
  • 基于SSM的婚庆管理系统+LW示例参考