Chosen

常用链接

统计

最新评论

C++实现简单的线程池

线程池编程简介:

    在我们的服务端的程序中运用了大量关于池的概念,线程池、连接池、内存池、对象池等等。使用池的概念后可以高效利用服务器端的资源,比如没有大量的线程在系统中进行上下文的切换,一个数据库连接池,也只需要维护一定里的连接,而不是占用很多数据库连接资源。同时它们也避免了一些耗时的操作,比如创建一个线程,申请一个数据库连接,而且可能就只使用那么一次,然后就立刻释放刚申请的资源,效率很低。

    在我的上一篇blog中已经实现一个线程基类了,在这里我们只需要实现一个线程池类ThreadPool和该线程池调度的工作线程类WorkThread即可,而且WorkThread是继承自Thread类的。

 

实现思路:

    一个简单的线程池的实现思路一般如下:

  1. ThreadPool中创建多个线程(WorkThreadk对象),每个线程均处于阻塞状态,等待任务的到来
  1. ThreadPool提供一个提交任务的接口,如post_job(ProcCallback func, void* data); post_job后会立即返回,不会阻塞
  2. ThreadPool维护一个空闲线程队列,当客户程序调用post_job()后,如果空闲队列中有空闲线程,则取出一个线程句柄,并设置任务再给出新任务通知事件即可,处理等待的线程捕捉到事件信号后便开始执行任务,执行完后将该线程句柄重新push到空闲线程队列中
  3. 该线程池采用回调函数方式

首先我们实现一个WorkThread类:

 1 typedef void (APR_THREAD_FUNC *ProcCallBack)(void*);        //回调函数指针
 2 //由线程池调度的工作线程
 3 class WorkThread : public Thread      //Thread类的实现可参考我上一篇的blog: 《C++封装一个简单的线程类》
 4 {
 5     friend class ThreadPool;
 6 public:
 7     WorkThread(ThreadPool* pthr_pool)
 8     {
 9         thr_pool_    = pthr_pool;
10         cb_func_    = NULL;
11         param_        = NULL;
12     }
13     virtual ~WorkThread(){}
14     void    set_job(ProcCallBack func, void* param)
15     {
16         cb_func_    = func;
17         param_        = param;
18         notify();            //通知有新的任务
19     }
20     //实现Thread的run方法,并调用用户指定的函数
21     virtual void run()
22     {
23         if (cb_func_)
24             cb_func_(param_);
25     
26         //reset callback function pointer
27         cb_func_    = NULL;
28         param_        = NULL;
29     
30         //执行完任务,将该线程句柄移到线程池空闲队列
31         thr_pool_->move_to_idle_que(this);
32     }
33     
34 private:
35     ThreadPool*    thr_pool_;        //线程池指针
36     ProcCallBack    cb_func_;        //回调函数地址
37     void*             param_;           //回调函数参数
38 };

WorkThread中,有一个回调函数指针和参数,当有新任务时,会在run()中被调用,执行完后会将该线程移动到空闲线程队列,等待下一次任务的提交。

ThreadPool类定义如下:

 1 class ThreadPool
 2 {
 3     friend class WorkThread;
 4 public:
 5     ThreadPool();
 6     virtual ~ThreadPool();
 7     int    start_thread_pool(size_t thread_num = 5);        //启动thread_num个线程
 8     int    stop_thread_pool();                                      //线束线程池
 9     void    destroy();                                                //销毁线程池所申请的资源
10     void    post_job(ProcCallBack func, void* data);        //提交任务接口,传入回调函数地址和参数
11 
12 protected:
13     WorkThread*    get_idle_thread();                              //从获得空闲队列中取得一个线程句柄
14     void        append_idle_thread(WorkThread* pthread);    //加入到thread_vec_和idl_que_中
15     void        move_to_idle_que(WorkThread* idlethread);    //将线程句柄加入到idle_que_中
16 
17 private:
18     size_t                thr_num_;                      //线程数目
19     vector<WorkThread*>        thr_vec_;        //线程句柄集合
20     BlockQueue<WorkThread*>    idle_que_;     //空闲线程队列
21 
22 private:
23     // not implement
24     ThreadPool(const ThreadPool& );
25     ThreadPool&    operator=(const ThreadPool& );
26 };
      线程池实现的关键是如何创建多个线程,并且当任务来临时可以从线程池中取一个线程(也就是去得到其中一个线程的指针),然后提交任务并执行。还有一点就是当任务执行完后,应该将该线程句柄重新加入到空闲线程队列,所以我们将ThreadPool的指针传入给了WorkThread,thr_pool_最后可以调用move_to_idle_que(this)来将该线程句柄移到空闲队列中。
ThreadPool中一些关键代码的实现:
 1 int ThreadPool::start_thread_pool(size_t thread_num)
 2 {
 3     assert(thread_num != 0);
 4     thr_num_    = thread_num;
 5     int    ret        = 0;
 6     for (size_t i = 0; i < thr_num_; ++i)
 7     {
 8         WorkThread*    pthr = new WorkThread(this);
 9         pthr->set_thread_id(i);
10         if ((ret = pthr->start()) != 0)
11         {
12             printf("start_thread_pool: failed when create a work thread: %d\n", i);
13             delete pthr;
14             return i;
15         }
16         append_idle_thread(pthr);        
17     }
18     return thr_num_;
19 }
20 int ThreadPool::stop_thread_pool()
21 {
22     for (size_t i = 0; i < thr_vec_.size(); ++i)
23     {
24         WorkThread* pthr = thr_vec_[i];
25         pthr->join();
26         delete pthr;
27     }
28     thr_vec_.clear();
29     idle_que_.clear();
30     return 0;
31 }
32 void ThreadPool::destroy()
33 {
34     stop_thread_pool();
35 }
36 void ThreadPool::append_idle_thread(WorkThread* pthread)
37 {
38     thr_vec_.push_back(pthread);
39     idle_que_.push(pthread);
40 }
41 void ThreadPool::move_to_idle_que(WorkThread* idlethread)
42 {
43     idle_que_.push(idlethread);
44 }
45 WorkThread* ThreadPool::get_idle_thread()
46 {
47     WorkThread*    pthr = NULL;
48     if (!idle_que_.empty())
49         pthr = idle_que_.take();
50     return pthr;
51 }
52 void ThreadPool::post_job(ProcCallBack func, void* data)
53 {
54     assert(func != NULL);
55     WorkThread* pthr = get_idle_thread();
56     while (pthr == NULL)
57     {
58         apr_sleep(500000);
59         pthr = get_idle_thread();
60     }
61     pthr->set_job(func, data);
62 }
ThreadPool中的BlockQueue<WorkThread*> 也就是一个线程安全的队列,即对std::deque做了一个包装,在插入和取出元素时加了一个读写锁。


使用示例:
//任务执行函数,必须是ProcCallback类型
void count(void* param)
{
// do some your work, like: 
int* pi = static_cast<int*>(param);
int val = *pi + 1;
printf("val=%d\n", val);
pelete pi;
}
//程序中使用如下:
ThreadPool* ptp = new ThreadPool();
ptp->start_thread_pool(3); //启动3 个线程
ptp->post_job(count, new int(1)); //提交任务
ptp->post_job(count, new int(2));
ptp->post_job(count, new int(3));
//程序线束时
ptp->stop_thread_pool();
其实count()函数就是我们的业务实现代码,有任务时,可以提交给线程池去执行。
结尾:
    其实实现一个线程池或其它什么池并不难,当然线程安全和效率还是要从多写代码的经验中获取。像这个线程池也就是基于预创多个建线程,保保存好它们的线程句柄,当有新任务时取一个线程执行即可,执行完后一定要归还到空闲线程队列中,当然我们可以在线程池中增加一个任务队列,因为当post_job()时,若当时没有空闲线程,有两种方案,一是等待有空闲线程,二是加入到任务队列,当WorkThread线程执行完一个任务后,从任务队列中取一个任务继续执行即可,不会阻塞在post_job()中。
  另外,我们可以封装一些线程安全的队列和map什么的,这样在程序中就不用担心创建一个多线程共享的队列时,还必须创建一个锁,挺麻烦的,比如上面的BlockQueue<Type>直接拿来用就行了。

posted on 2013-10-07 19:22 Choice 阅读(7249) 评论(0)  编辑 收藏 引用 所属分类: C++


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理