随笔-2  评论-0  文章-0  trackbacks-0

头文件

 1#include <ace/Thread_Mutex.h>
 2#include <ace/Guard_T.h>
 3#include <ace/Svc_Handler.h>
 4#include <ace/Condition_T.h>
 5#include <ace/Atomic_Op.h>
 6
 7#include <queue>
 8
 9// ------------------------------------------------------------------------------------------------------------------------
10
11
12class Svc_Thread_Pool : public ACE_Task_Base
13{
14public:
15
16    enum 
17    {
18        DEFAULT_POOL_SIZE = 8
19        MAX_TIMEOUT = 5,
20    }
;
21
22
23    Svc_Thread_Pool ();
24
25    ~Svc_Thread_Pool ();
26
27    int open (int pool_size = DEFAULT_POOL_SIZE);
28
29    virtual int open (void *args)
30    {
31        return     ACE_Task_Base::open(args);
32    }
;
33
34    int svc ();
35
36    int add_to_pool (ACE_Task_Base * tb);
37
38    bool done ()
39    {
40        return (_shutdown == 0? false : true;
41    }
;
42
43    void shutdown ();
44
45protected:
46    ACE_Thread_Mutex                        _workers_lock;
47    ACE_Condition<ACE_Thread_Mutex>            _cond;
48    std::queue<ACE_Task_Base *>                _workers_queue;
49    ACE_Atomic_Op<ACE_Thread_Mutex, int>    _shutdown;
50}
;
51
52// ------------------------------------------------------------------------------------------------------------------------
53


实现

  1
  2
  3// ------------------------------------------------------------------------------------------------------------------------
  4
  5Svc_Thread_Pool::Svc_Thread_Pool () 
  6                : ACE_Task_Base(ACE_Thread_Manager::instance ()), 
  7                _workers_lock(), //_cond_lock(), 
  8                _cond (/*_cond_lock*/_workers_lock), 
  9                _shutdown (0)
 10{
 11}

 12
 13// ------------------------------------------------------------------------------------------------------------------------
 14
 15Svc_Thread_Pool::~Svc_Thread_Pool ()
 16{
 17}

 18
 19// ------------------------------------------------------------------------------------------------------------------------
 20
 21int Svc_Thread_Pool::open (int pool_size)
 22{
 23    return activate (THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, pool_size);
 24}

 25
 26// ------------------------------------------------------------------------------------------------------------------------
 27
 28int Svc_Thread_Pool::svc ()
 29{
 30    ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Svc_Thread_Pool svc() started \n")));
 31
 32    while (!done ()) {
 33        // 锁定队列并获取一个 ACE_Task_Base *。
 34
 35        ACE_Task_Base * tb = 0;
 36        assert (_workers_lock.acquire () == 0);
 37        if (_workers_queue.size () != 0{
 38            tb = _workers_queue.front ();
 39            _workers_queue.pop ();
 40            // 解锁 队列
 41            assert (_workers_lock.release () == 0);
 42
 43            // 执行 ACE_Task_Base::svc ()
 44            if (tb) {
 45                // svc 中应该自己解决自己数据的同步问题。并且自己加入 Svc_Thread_Pool 中
 46                // ACE_Task_Base 应该自己保存 Svc_Thread_Pool 的对象指针。
 47                tb->svc ();
 48            }

 49        }

 50
 51        else {
 52            // 解锁 队列
 53            assert (_cond.wait () == 0);
 54            assert (_workers_lock.release () == 0);
 55            
 56
 57            ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Svc_Thread_Pool recv msg \n")));
 58        }

 59    }

 60
 61    ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Svc_Thread_Pool svc() end !!\n")));
 62
 63    return 0;
 64}

 65
 66// ------------------------------------------------------------------------------------------------------------------------
 67
 68int Svc_Thread_Pool::add_to_pool (ACE_Task_Base * tb)
 69{
 70    if (_shutdown != 0{
 71        return -1;
 72    }

 73    // 锁定队列,并加入工作者线程
 74    assert (_workers_lock.acquire () == 0);
 75    _workers_queue.push (tb);
 76    // 通知
 77    assert (_cond.signal () == 0);
 78    // 解锁队列
 79    assert (_workers_lock.release () == 0);
 80
 81    printf ("add_to_pool\n");
 82    return 0;
 83}

 84
 85// ------------------------------------------------------------------------------------------------------------------------
 86
 87void Svc_Thread_Pool::shutdown ()
 88{
 89    _shutdown = 1;
 90    assert (_workers_lock.acquire () == 0);    // lock queue
 91    while (_workers_queue.size () != 0{
 92        _workers_queue.pop ();
 93    }

 94    assert (_cond.broadcast () == 0);
 95    assert (_workers_lock.release () == 0);    // unlock queue
 96
 97    
 98
 99    ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t)broadcast\n")));
100
101    //assert (_workers_lock.acquire () == 0);
102    //assert (_cond.broadcast () == 0);
103    //assert (_workers_lock.release () == 0);
104
105
106    this->wait();
107    printf ("shutdown\n");
108}

109
110// ------------------------------------------------------------------------------------------------------------------------
111


测试

 1
 2#include "../cmn/extend_ace/Svc_Thread_Pool.h"
 3
 4Svc_Thread_Pool    tp;
 5
 6class My_Task : public ACE_Task_Base
 7{
 8public:
 9    My_Task ()
10    {
11        j = i ++;
12    }

13
14    int svc ()
15    {
16        // 先锁定 handle_close ().禁止svc的时候被reactor handle_close (),或者禁止调用handle_close,让他在svc中自己调用。
17        // lock
18        //ACE_OS::sleep (0);
19        printf ("%d: thread: %d\n", j, ACE_Thread::self ());
20        //tp.add_to_pool (this);
21        return 0;
22        // unlock
23    }

24    static int i;
25    int            j;
26}
;
27
28int My_Task::i = 0;
29
30int test ()
31{
32    tp.open (8);
33
34    ACE_OS::sleep (3);
35    My_Task        mt1, mt2, mt3, mt4, mt5, mt6, mt7, mt8, mt9, mt10;
36
37    tp.add_to_pool (&mt1);
38    tp.add_to_pool (&mt2);
39    tp.add_to_pool (&mt3);
40    tp.add_to_pool (&mt4);
41    tp.add_to_pool (&mt5);
42    tp.add_to_pool (&mt6);
43    tp.add_to_pool (&mt7);
44    tp.add_to_pool (&mt8);
45    tp.add_to_pool (&mt9);
46    tp.add_to_pool (&mt10);
47
48
49
50    ACE_OS::sleep (6);
51    tp.shutdown ();
52    //ACE_OS::sleep (5);
53
54    //tp.wait();
55    system ("pause");
56    return 0;
57}

58
posted on 2009-06-12 23:21 风化血 阅读(2172) 评论(0)  编辑 收藏 引用

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理