随笔 - 6  文章 - 11  trackbacks - 0
<2011年2月>
303112345
6789101112
13141516171819
20212223242526
272812345
6789101112

常用链接

留言簿(1)

随笔档案

搜索

  •  

最新评论

阅读排行榜

评论排行榜

      在C++中要进行并发处理,不可避免要使用多线程,在传统的教科书中,大家都是采用最原始的多线程技术,应用逻辑和线程并发策略紧密绑定。
      在一个典型的服务器程序中,客户端的请求往往包含了很多不同的逻辑命令,如在一个线程处理函数中,需要根据客户端的命令代码处理不同的业务逻辑:

int thrad_main(int cmd_id,char *data){
   switch(cmd_id)
   {
   case 1:
      ...
      break;
   case 2:
      ...
      break;
   }
}

   如此这般,业务处理逻辑和线程逻辑紧密耦合,这是一种很“丑陋”的代码。
   如何通过一种优雅的方法,分离并发逻辑和业务逻辑,通过通用的并发框架,业务逻辑设计者只需要关心自己的逻辑代码,交给“线程池”去处理即可,而不需要去关心如何创建线程,等待线程结果这些琐碎的“小事”?

   很简单,高手出招,必谈模式,下面是一种常用的并发模式,领导者/追随者线程池模型:

   在一组预先分配的线程中通过“互斥”锁来同步线程之间的行为,“线程”们通过“民主选举”选出一位代表“领导者”站在最前端接收请求,拿到“任务”后,就从身后的候选“继任者”中选出一个线程代替自己作为“领导者”,自己则变成“工作者”就跑到后面默默去执行处理命令,这个“任务”是一个包含待处理数据和处理逻辑的自说明性任务,也就是说所有的线程不必事先知道怎么处理接收到的任务,因为他所拿到的“任务包”中就包含了如何处理任务的说明。就像一个“代工工厂”的工人一样,无需任何文化基础,会干活就行。
   那如何实现自说明任务呢?我们定义了一种称为“Method_Request”的对象,它包含一个接口“virtual int call (void) = 0;”,线程池接受的任务就是这种Method_Request对象的实例,比如一个通知线程池结束工作的Method_Request可以定义为如下的类:
1     class ExitRequest : public ACE_Method_Request
2     {
3     public:
4         virtual int call (void){
5             return -1;  // Cause exit.
6         }
7     };
8 
      我们重载call接口,添加处理这个请求的逻辑代码,由于仅仅实现通知线程池结束工作的操作,我们返回一个特殊值“-1”,即可只是线城池:“工作完成了,你赶快洗洗睡吧!”,线程池会检查Method_Request对象的返回值,如果是“0”就是处理正常完成,继续等待下一个任务,如果是“-1”,就关闭所有线程。

      再来一个复杂点的例子,派生的Method_Request不仅有处理逻辑,还包括了需要处理的数据:
 1 
 2 class M2M_EventRequest : public ACE_Method_Request
 3 {
 4     // Lua解释器,每个事件使用自己单独的脚本上下文
 5     LuaVM::ALEE_LuaService & m_svcs;
 6     ALEE_ScriptList_t & m_cmds;
 7 
 8     // 事件内容
 9     std::string m_type_name;
10     xml_event_t m_xml_event;
11 
12     // 调试信息
13     DebugInfo_ptr m_debug;
14 
15 public:
16     M2M_EventRequest(
17         LuaVM::ALEE_LuaService & svcs,
18         ALEE_ScriptList_t &cmds,
19         string const & type_name,
20         xml_event_t event);
21 
22     M2M_EventRequest(
23         LuaVM::ALEE_LuaService & svcs,
24         ALEE_ScriptList_t &cmds,
25         string const & type_name,
26         xml_event_t event,
27         DebugInfo_ptr debug);
28 
29     virtual ~M2M_EventRequest (void);
30 
31     virtual int call (void);
32 };
33 
      这个Method_Request的功能是,命令线程池调用Lua解析器处理一段脚本代码,详细逻辑就不解释了,仅仅是一个示例,我们的重点在于线程池的实现。
      下面就公布这个“万能线程池的”实现,其实这是一个基于ACE的线程库实现的“领导者/追随者”模式,我在其基础上进行了改进,增加了自适应功能,可以根据请求队列的负载,自动调整线程池中的线程数目。
      闲话少说,上代码,看得懂的童鞋恭喜你内力深厚,还望多提宝贵意见,看不懂得小盆友也可以努力学习,提高自己:
// LeaderFollower.h
 1 #pragma once
 2 
 3 #include "dllmain.h"
 4 #include <map>
 5 #include <ace/Synch.h>    // ACE_Thread_Mutex
 6 #include <ace/Task.h>    // ACE_Task
 7 
 8 // 线程状态
 9 enum LF_Status_t
10 {
11     TH_LEADER_ACTIVE,
12     TH_FOLLOWER,
13     TH_WORKER,
14     TH_READY,
15     TH_STOP,
16 };
17 
18 struct LF_StatusTime_t
19 {
20     LF_Status_t    status;
21     ACE_Time_Value working_tv;
22     ACE_Time_Value start_time;
23     ACE_Time_Value stop_time;
24     ACE_Time_Value work_start;
25     ACE_Time_Value work_time;
26 };
27 
28 typedef std::map<ACE_thread_t,LF_StatusTime_t>  LF_StatusTimeList_t;
29 
30 class LF_Follower;
31 
32 // 领导者-追随者线程池 模式实现
33 class CPPXCORBA_API LeaderFollower
34 {
35 public:
36     LeaderFollower(void);
37     ~LeaderFollower(void);
38 
39 protected:
40     LF_Follower * make_follower(void);
41     int     become_leader(void);
42     int     elect_new_leader(void);
43     bool leader_active(void);
44     void set_active_leader(ACE_thread_t leader);
45 
46 private:
47     ACE_Unbounded_Queue<LF_Follower*>   m_followers;
48     ACE_Thread_Mutex                    m_followers_lock;
49     ACE_Thread_Mutex                    m_leader_lock;
50     ACE_thread_t                        m_current_leader;
51 
52     //////////////////////////////////////////////////////////////////////////
53     /// 线程池状态监控
54 public:
55     const LF_StatusTimeList_t & get_status(voidconst;
56     const float get_load_rate(voidconst;
57 
58 protected:
59     void set_status(LF_Status_t status);
60     void set_worktime(ACE_Time_Value work_time);
61 
62 private:
63     LF_StatusTimeList_t m_status_time_list;
64     ACE_Thread_Mutex    m_status_lock;
65 };
66 

// LeaderFollower.cpp
  1 #include "stdafx.h"
  2 #include "LeaderFollower.h"
  3 #include "../cppx.core/dllmain.h"
  4 
  5 // 追随者标记
  6 class LF_Follower
  7 {
  8     ACE_Condition<ACE_Thread_Mutex> m_cond;
  9     ACE_thread_t                    m_owner;
 10 
 11 public:
 12     LF_Follower(ACE_Thread_Mutex &leader_lock) : m_cond(leader_lock) {
 13         m_owner = ACE_Thread::self();
 14     }
 15     int wait(void){
 16         return m_cond.wait();
 17     }
 18     int signal(void){
 19         return m_cond.signal();
 20     }
 21     ACE_thread_t owner(void){
 22         return m_owner;
 23     }
 24 
 25 };
 26 
 27 //////////////////////////////////////////////////////////////////////////
 28 LeaderFollower::LeaderFollower(void) :
 29 m_current_leader(0)
 30 {
 31 }
 32 
 33 LeaderFollower::~LeaderFollower(void)
 34 {
 35 }
 36 
 37 LF_Follower * 
 38 LeaderFollower::make_follower( void )
 39 {
 40     ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, 0);
 41     
 42     LF_Follower *fw;
 43     ACE_NEW_RETURN(fw, LF_Follower(m_leader_lock), 0);
 44     m_followers.enqueue_tail(fw);
 45     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) make_follower \t: Now has %d followers.\n"), m_followers.size()));
 46     return fw;
 47 }
 48 
 49 int 
 50 LeaderFollower::become_leader( void )
 51 {
 52     ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1);
 53 
 54     if( leader_active() && m_current_leader != ACE_Thread::self() ){
 55         while(leader_active()){
 56             set_status(TH_FOLLOWER);
 57             auto_ptr<LF_Follower> fw(make_follower());
 58             fw->wait();         // Wait until told to do so.
 59         }
 60     }
 61 
 62     // Mark yourself as the active leader.
 63     set_active_leader(ACE_Thread::self());
 64     set_status(TH_LEADER_ACTIVE);
 65     //ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) become_leader \t: Becoming the leader.\n")));
 66     return 0;
 67 }
 68 
 69 int 
 70 LeaderFollower::elect_new_leader( void )
 71 {
 72     ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1);
 73 
 74     set_active_leader(0);
 75 
 76     // Wake up a follower
 77     if!m_followers.is_empty() ){
 78         ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, -1);
 79 
 80         // Get the old follower.
 81         LF_Follower *fw;
 82         if( m_followers.dequeue_head(fw) != 0 )
 83             return -1;
 84 
 85         //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Resigning and electing %d.\n"), fw->owner()));
 86         return (fw->signal() == 0? 0 : -1;
 87     }
 88 
 89     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Oops no followers left\n")));
 90     return -1;
 91 }
 92 
 93 bool 
 94 LeaderFollower::leader_active( void )
 95 {
 96     return (m_current_leader != 0);
 97 }
 98 
 99 void 
100 LeaderFollower::set_active_leader( ACE_thread_t leader )
101 {
102     m_current_leader = leader;
103 }
104 
105 void LeaderFollower::set_worktime( ACE_Time_Value work_time )
106 {
107     ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
108     LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()];
109     info.working_tv = work_time;
110 }
111 
112 void LeaderFollower::set_status( LF_Status_t status )
113 {
114     ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
115     LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()];
116     switch(status)
117     {
118     case TH_READY:
119         info.start_time = ACE_OS::gettimeofday();
120         break;
121     case TH_STOP:
122         info.stop_time = ACE_OS::gettimeofday();
123         break;
124     case TH_WORKER:
125         info.work_start = ACE_OS::gettimeofday();
126         break;
127     case TH_LEADER_ACTIVE:
128     case TH_FOLLOWER:
129         if( info.status == TH_WORKER )
130             info.work_time += ACE_OS::gettimeofday() - info.work_start;
131         break;
132     }
133     info.status = status;
134 }
135 
136 const LF_StatusTimeList_t & 
137 LeaderFollower::get_status( void ) const
138 {
139     return m_status_time_list;
140 }
141 
142 const float 
143 LeaderFollower::get_load_rate( void ) const
144 {
145     ACE_Time_Value work_time,run_time;
146     foreach(const LF_StatusTimeList_t::value_type & info,get_status()){
147         if( info.second.status != TH_STOP ){
148             work_time += info.second.work_time;
149             run_time += ACE_OS::gettimeofday() - info.second.start_time;
150         }
151     }
152     return (float)work_time.usec()/run_time.usec()*100;
153 }
154 

// LF_ThreadPool.h
 1 #pragma once
 2 
 3 #include "LeaderFollower.h"
 4 
 5 #include <ace/Task.h>
 6 #include <ace/Activation_Queue.h>
 7 #include <ace/Method_Request.h>
 8 
 9 class CPPXCORBA_API LF_ThreadPool :
10     public ACE_Task_Base,
11     public LeaderFollower
12 {
13     class ExitRequest : public ACE_Method_Request
14     {
15     public:
16         virtual int call (void){
17             return -1;  // Cause exit.
18         }
19     };
20 
21     bool m_bShutdown;
22     bool m_bRunning;
23     ACE_Activation_Queue m_activation_queue_;
24 
25     static const size_t ScheduleTime = 10;
26     static const size_t MinThreadNum = 10;
27     static const size_t MaxThreadNum = 20;
28 
29 public:
30     LF_ThreadPool(void);
31     ~LF_ThreadPool(void);
32 
33     virtual int svc(void);
34 
35     int start_stread_pool( void );
36     int stop_thread_pool( void );
37     int post_request( ACE_Method_Request *request );
38 
39     int get_queue_load(void){ return m_activation_queue_.method_count(); }
40     int get_max_thread(void){ return MaxThreadNum; }
41     int get_min_thread(void){ return MinThreadNum; }
42 
43 private:
44     int _fork_new_thread( void );
45     int _post_exit_request(void);
46 };
47 

// LF_ThreadPool.cpp
  1 #include "stdafx.h"
  2 #include "LF_ThreadPool.h"
  3 
  4 LF_ThreadPool::LF_ThreadPool(void) :
  5 m_bShutdown(false),
  6 m_bRunning(false)
  7 {
  8 }
  9 
 10 LF_ThreadPool::~LF_ThreadPool(void)
 11 {
 12 }
 13 
 14 int LF_ThreadPool::svc( void )
 15 {
 16     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread started.\t: %d working threads left.\n"),thr_count()));
 17 
 18     // 线程开始运行
 19     m_bRunning = true;
 20 
 21     set_status(TH_READY);
 22 
 23     while(true){
 24         // Block until this thread is the leader.
 25         become_leader();
 26 
 27         // 设置线程空闲时间,空闲线程将会自动退出
 28         ACE_Time_Value tv(ScheduleTime);
 29         tv += ACE_OS::gettimeofday();
 30 
 31         // 从队列获取下一个请求,并获得所有权
 32         auto_ptr<ACE_Method_Request> request(m_activation_queue_.dequeue(&tv));
 33         if( request.get() == 0 ){                                               // 长时间没有请求,dequeue超时返回
 34             if( elect_new_leader() == 0 && thr_count() > MinThreadNum )         // 成功选择新的领导者,且工作线程数大于最少线程数
 35                 break;                                                          // 结束当前线程
 36             if( thr_count() < MinThreadNum && thr_count() < MaxThreadNum )      // 工作线程数小于最少线程数,创建新的线程
 37                 _fork_new_thread();
 38             continue;                                                           // 继续担当领导者(优先成为领导者),或返回线程池等待
 39         }
 40 
 41         // Elect a new leader then process the request
 42         if( elect_new_leader() != 0 || thr_count() < MinThreadNum )             // 没有空余线程可成为领导者,或者线程池容量调整
 43             if!m_bShutdown )                                                  // 且没有调度关闭
 44                 if( thr_count() < MaxThreadNum )                                // 未达到线程数上线
 45                     _fork_new_thread();                                         // 创建新的线程
 46 
 47         // Invoke the method request.
 48         set_status(TH_WORKER);
 49 
 50         ACE_Time_Value tv_start,tv_finish,tv_working;
 51         tv_start = ACE_OS::gettimeofday();
 52 
 53         int result = request->call();
 54 
 55         tv_finish = ACE_OS::gettimeofday();
 56         tv_working = tv_finish - tv_start;
 57         set_worktime(tv_working);
 58 
 59         if( result == -1 ){
 60             if( thr_count() > 1 )                                                // If received a ExitMethod, Notify the next Thread(if exists) to exit too.
 61                 _post_exit_request();
 62             break;
 63         }
 64     }
 65 
 66     // 剩下最后一个线程,线程池停止
 67     if( thr_count() == 1 )
 68         m_bRunning = false;
 69 
 70     set_status(TH_STOP);
 71     ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread stoped.\t: %d working threads left.\n"),thr_count()-1));
 72     return 0;
 73 }
 74 
 75 int LF_ThreadPool::start_stread_pool( void )
 76 {
 77     m_bShutdown = false;
 78     return activate(THR_NEW_LWP| THR_JOINABLE,MinThreadNum);
 79 }
 80 
 81 int LF_ThreadPool::stop_thread_pool( void )
 82 {
 83     // 线程池已停止
 84     if!m_bRunning )
 85         return 0;
 86 
 87     m_bShutdown = true;
 88     _post_exit_request();
 89     return wait();
 90 }
 91 
 92 int LF_ThreadPool::post_request( ACE_Method_Request *request )
 93 {
 94     ACE_TRACE (ACE_TEXT ("SvcThreadPool::enqueue"));
 95     return m_activation_queue_.enqueue (request);
 96 }
 97 
 98 int LF_ThreadPool::_fork_new_thread( void )
 99 {
100     return activate(THR_NEW_LWP| THR_JOINABLE,1,1);
101 }
102 
103 int LF_ThreadPool::_post_exit_request( void )
104 {
105     return post_request(new ExitRequest);
106 }
107 

      怎么样?很简单吧?什么?怎么用?Oh My Lady GaGa!还是告诉你吧:
1 m_pool.post_request(new M2M_EventRequest(m_lua_svc,m_lua_scripts,type_name,xml_event,*iter));
      需要线程池出来干活的时候,创建一个请求对象,扔给他就行了!

      好了,代码就是最好的文档,C++开源社区给了我成长的土壤,希望能对后来者有所帮助。
      把这些东西贴出来,是为了整理自己的大脑,免得这些曾经顶着熊猫眼熬出来的东西,尘封在茫茫的代码海洋中,取之于前辈,还之于后人。也希望有更多的高手能够慷慨布道,壮大我们的C++社区。
posted on 2011-02-28 15:46 风雷九州 阅读(4061) 评论(3)  编辑 收藏 引用

FeedBack:
# re: 一个基于ACE的负载自适应万能线程池实现 2011-02-28 17:25 true
常见的一种需求:把一个客户端的处理始终绑定到某一个线程,就是说各个请求之间是有时序要求的。线程池的自适应要考虑一下这个问题  回复  更多评论
  
# re: 一个基于ACE的负载自适应万能线程池实现 2011-02-28 17:49 风雷九州
@true

嗯,这个当初也有考虑到,是为了线程的上下文缓存尽量少切换,在数据处理中为了充分发挥CPU的缓存性能,还需要考虑线程的优先执行CPU等。

这些措施在性能要求十分苛刻的情况下时必须要考虑的,我的方案目前仅是实现了客户端请求与线程的分离,使程序的架构更灵活,能够满足一般的服务器并发性能要求即可。

@true

我理解错了,你说的可能是,需求要求客户端的请求是有固定的时序的,但是我的方案并不是用来处理客户端并发的,线程池处理的是大量的设备消息,这些消息通常大量并发到达,而且相互之间没有什么关系,故不需要某个特定的线程来处理。

设备的状态是保存在单独的状态服务中的的,任何一个线程接到处理任务都能够处理设备的状态逻辑,多线程之间是通过“读写锁”共享状态服务的。

除非是考虑到线程执行上下文切换的代价带来的性能损失,否则逻辑上是不关心某个请求是被哪个线程处理的。  回复  更多评论
  
# re: 一个基于ACE的负载自适应万能线程池实现 2011-02-28 22:19 liquanhai
非常感谢,学习了  回复  更多评论
  

只有注册用户登录后才能发表评论。
【推荐】超50万行VC++源码: 大型组态工控、电力仿真CAD与GIS源码库
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理