一动不如一静

C++博客 首页 新随笔 联系 聚合 管理
  20 Posts :: 0 Stories :: 10 Comments :: 0 Trackbacks

首先从定义一个reactor开始。
ACE_TP_Reactor select_reactor_one(g_unOneMaxHandle, 0, 0, 0, 1);

上面的这句话触发了一下的一些行为。主要就是给event_handlers_分配了内存。这里就决定了你能支持多少条连接

int
ACE_Select_Reactor_Handler_Repository::open (size_t size)
{
  ACE_TRACE (
" ACE_Select_Reactor_Handler_Repository::open " );
  
this -> max_size_  =  size;
  
this -> max_handlep1_  =   0 ;

#if  defined (ACE_WIN32)
  
//  Try to allocate the memory.*
  ACE_NEW_RETURN ( this -> event_handlers_,
                  ACE_Event_Tuple[size],
                  
- 1 );

  
//  Initialize the ACE_Event_Handler * to { ACE_INVALID_HANDLE, 0 }.
   for  (size_t h  =   0 ; h  <  size;  ++ h)
    
{
      ACE_SELECT_REACTOR_HANDLE (h) 
=  ACE_INVALID_HANDLE;   // 对event_handlers_进行初始化
      ACE_SELECT_REACTOR_EVENT_HANDLER ( this , h)  =   0 ;
    }

#else
  
//  Try to allocate the memory.
  ACE_NEW_RETURN ( this -> event_handlers_,
                  ACE_Event_Handler 
* [size],
                  
- 1 );

  
//  Initialize the ACE_Event_Handler * to NULL.
   for  (size_t h  =   0 ; h  <  size;  ++ h)
    ACE_SELECT_REACTOR_EVENT_HANDLER (
this , h)  =   0 ;
#endif  /* ACE_WIN32 */

  
//  Try to increase the number of handles if <size> is greater than   检查给定的size是否超出了ACE_MaxHandle
  
//  the current limit.
   return  ACE::set_handle_limit (static_cast < int >  (size),  1 );
}

接下来就是注册监听了。
acceptor.open(addr);
那么这句话又干了些什么呢?
int
ACE_SOCK_Acceptor::open (
const ACE_Addr &local_sap,
                         
int reuse_addr,
                         
int protocol_family,
                         
int backlog,
                         
int protocol)
{
  ACE_TRACE (
"ACE_SOCK_Acceptor::open");

  
if (local_sap != ACE_Addr::sap_any)
    protocol_family 
= local_sap.get_type ();
  
else if (protocol_family == PF_UNSPEC)
    
{
#if defined (ACE_HAS_IPV6)
      protocol_family 
= ACE::ipv6_enabled () ? PF_INET6 : PF_INET;
#else
      protocol_family 
= PF_INET;
#endif /* ACE_HAS_IPV6 */
    }


  
if (ACE_SOCK::open (SOCK_STREAM,                 
                      protocol_family,
                      protocol,
                      reuse_addr) 
== -1)
    
return -1;
  
else
    
return this->shared_open (local_sap,
                              protocol_family,
                              backlog);
}

int
ACE_SOCK::open (
int type,
                
int protocol_family,
                
int protocol,
                
int reuse_addr)
{
  ACE_TRACE (
"ACE_SOCK::open");
  
int one = 1;

  
this->set_handle (ACE_OS::socket (protocol_family,
                                    type,
                                    protocol));

  
if (this->get_handle () == ACE_INVALID_HANDLE)
    
return -1;
  
else if (protocol_family != PF_UNIX
           
&& reuse_addr
           
&& this->set_option (SOL_SOCKET,
                                SO_REUSEADDR,
                                
&one,
                                
sizeof one) == -1)
    
{
      
this->close ();
      
return -1;
    }

  
return 0;
}

ACE_INLINE ACE_HANDLE
ACE_OS::socket (
int domain,
                
int type,
                
int proto)
{
  ACE_OS_TRACE (
"ACE_OS::socket");
  ACE_SOCKCALL_RETURN (::socket (domain,
                                 type,
                                 proto),
                       ACE_HANDLE,
                       ACE_INVALID_HANDLE);
}


int
ACE_SOCK_Acceptor::shared_open (
const ACE_Addr &local_sap,
                                
int protocol_family,
                                
int backlog)
{
  ACE_TRACE (
"ACE_SOCK_Acceptor::shared_open");
  
int error = 0;

#if defined (ACE_HAS_IPV6)
  ACE_ASSERT (protocol_family 
== PF_INET || protocol_family == PF_INET6);

  
if (protocol_family == PF_INET6)
    
{
      sockaddr_in6 local_inet6_addr;
      ACE_OS::memset (reinterpret_cast
<void *> (&local_inet6_addr),
                      
0,
                      
sizeof local_inet6_addr);

      
if (local_sap == ACE_Addr::sap_any)
        
{
          local_inet6_addr.sin6_family 
= AF_INET6;
          local_inet6_addr.sin6_port 
= 0;
          local_inet6_addr.sin6_addr 
= in6addr_any;
        }

      
else
        local_inet6_addr 
= *reinterpret_cast<sockaddr_in6 *> (local_sap.get_addr ());

      
// We probably don't need a bind_port written here.
      
// There are currently no supported OS's that define
      
// ACE_LACKS_WILDCARD_BIND.
      if (ACE_OS::bind (this->get_handle (),
                        reinterpret_cast
<sockaddr *> (&local_inet6_addr),
                        
sizeof local_inet6_addr) == -1)
        error 
= 1;
    }

  
else
#endif
  
if (protocol_family == PF_INET)
    
{
      sockaddr_in local_inet_addr;                                  
/***************************addr**********************/
      ACE_OS::memset (reinterpret_cast
<void *> (&local_inet_addr),
                      
0,
                      
sizeof local_inet_addr);

      
if (local_sap == ACE_Addr::sap_any)
        
{
          local_inet_addr.sin_port 
= 0;
        }

      
else
        local_inet_addr 
= *reinterpret_cast<sockaddr_in *> (local_sap.get_addr ());
      
if (local_inet_addr.sin_port == 0)
        
{
          
if (ACE::bind_port (this->get_handle (),
                              ACE_NTOHL (ACE_UINT32 (local_inet_addr.sin_addr.s_addr))) 
== -1)
            error 
= 1;
        }

      
else if (ACE_OS::bind (this->get_handle (),                    /***********************bind**********************/
                             reinterpret_cast
<sockaddr *> (&local_inet_addr),
                             
sizeof local_inet_addr) == -1)
        error 
= 1;
    }

  
else if (ACE_OS::bind (this->get_handle (),
                         (sockaddr 
*) local_sap.get_addr (),
                         local_sap.get_size ()) 
== -1)
    error 
= 1;

  
if (error != 0
      
|| ACE_OS::listen (this->get_handle (),  /****************listen ***************************/
                         backlog) 
== -1)
    
{
      error 
= 1;
      
this->close ();
    }


  
return error ? -1 : 0;
}
接下来就是注册了。
return m_Reactor->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
 那么这句话又做了什么呢?
    ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler
  (ACE_Event_Handler 
*handler,
   ACE_Reactor_Mask mask)
{
  ACE_TRACE (
"ACE_Select_Reactor_T::register_handler");
  ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, 
this->token_, -1));
  
return this->register_handler_i (handler->get_handle (), handler, mask);  /**************所以一定要实现get_handle()方法**************/
}


template 
<class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T
<ACE_SELECT_REACTOR_TOKEN>::register_handler_i
  (ACE_HANDLE handle,
   ACE_Event_Handler 
*event_handler,
   ACE_Reactor_Mask mask)
{
  ACE_TRACE (
"ACE_Select_Reactor_T::register_handler_i");

  
// Insert the <handle, event_handle> tuple into the Handler
  
// Repository.
  return this->handler_rep_.bind (handle, event_handler, mask);
}

int
ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
                                             ACE_Event_Handler 
*event_handler,
                                             ACE_Reactor_Mask mask)
{
  ACE_TRACE (
"ACE_Select_Reactor_Handler_Repository::bind");

  
if (handle == ACE_INVALID_HANDLE)
    handle 
= event_handler->get_handle ();

  
if (this->invalid_handle (handle))
    
return -1;

  
// Is this handle already in the Reactor?
  int existing_handle = 0;

#if defined (ACE_WIN32)

  ssize_t assigned_slot 
= -1;

  
for (ssize_t i = 0; i < this->max_handlep1_; ++i)
    
{
      
// If handle is already registered.
      if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
        
{
          
// Cannot use a different handler for an existing handle.
          if (ACE_SELECT_REACTOR_EVENT_HANDLER (this, i) !=
              event_handler)
            
return -1;

          
// Remember location.
          assigned_slot = i;

          
// Remember that this handle is already registered in the
          
// Reactor.
          existing_handle = 1;

          
// We can stop looking now.
          break;
        }

      
else
        
// Here's the first free slot, so let's take it.
        if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE &&
            assigned_slot 
== -1)
          
{
            assigned_slot 
= i;
          }

    }


  
if (assigned_slot > -1)
    
// We found a spot.
    {
      ACE_SELECT_REACTOR_HANDLE (assigned_slot) 
= handle;
      ACE_SELECT_REACTOR_EVENT_HANDLER (
this, assigned_slot) = event_handler;
    }

  
else if (this->max_handlep1_ < this->max_size_)                   // 第一次添加一定走这里
    {
      
// Insert at the end of the active portion.
      ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle;       //event_handlers_[max_handlep1_] .handle_= handle
      ACE_SELECT_REACTOR_EVENT_HANDLER (thisthis->max_handlep1_) = event_handler;  //event_handlers_[max_handlep1_].event_handle_ = event_handler
      ++this->max_handlep1_;                                            // max_handlep1_增加了
    }

  
else
    
{
      
// No more room at the inn!
      errno = ENOMEM;
      
return -1;
    }


#else

  
// Check if this handle is already registered.
  ACE_Event_Handler *current_handler =
    ACE_SELECT_REACTOR_EVENT_HANDLER (
this, handle);

  
if (current_handler)
    
{
      
// Cannot use a different handler for an existing handle.
      if (current_handler != event_handler)
        
return -1;

      
// Remember that this handle is already registered in the
      
// Reactor.
      existing_handle = 1;
    }


  ACE_SELECT_REACTOR_EVENT_HANDLER (
this, handle) = event_handler;

  
if (this->max_handlep1_ < handle + 1)
    
this->max_handlep1_ = handle + 1;

#endif /* ACE_WIN32 */

  
if (this->select_reactor_.is_suspended_i (handle))               // 检查是否在suspend_set_集合中
    {
      
this->select_reactor_.bit_ops (handle,
                                     mask,
                                     
this->select_reactor_.suspend_set_,
                                     ACE_Reactor::ADD_MASK);
    }

  
else
    
{
      
this->select_reactor_.bit_ops (handle,                           //注意这里添加到wait_set_集合中去了,这个是最要关注的地方
                                     mask,                            
                                     
this->select_reactor_.wait_set_,
                                     ACE_Reactor::ADD_MASK);

      
// Note the fact that we've changed the state of the <wait_set_>,
      
// which is used by the dispatching loop to determine whether it can
      
// keep going or if it needs to reconsult select().
      
// this->select_reactor_.state_changed_ = 1;
    }


  
// If new entry, call add_reference() if needed.
  if (!existing_handle)
    event_handler
->add_reference ();

  
return 0;
}


/*this->select_reactor_.wait_set_中包含这样三个变量rd_mask_/wr_mask_/ex_mask_,每一个结构都入下所示
rd_mask_
      size_=0 
      max_handle_=0xffffffff 
      mask_
          fd_count=0 
          fd_array=0x00126da8  数组长度为1024
     nbits_ 256位的固定数值的数组

*/

int
ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,                                 
/*************设置到wait_set_中******************/
                                  ACE_Reactor_Mask mask,
                                  ACE_Select_Reactor_Handle_Set 
&handle_set,
                                  
int ops)
{

  ACE_FDS_PTMF ptmf  
= &ACE_Handle_Set::set_bit;
  u_long omask 
= ACE_Event_Handler::NULL_MASK;

  
// Find the old reactor masks.  This automatically does the work of
  
// the GET_MASK operation.
  if (handle_set.rd_mask_.is_set (handle))
    ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
  
if (handle_set.wr_mask_.is_set (handle))
    ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK);
  
if (handle_set.ex_mask_.is_set (handle))
    ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK);

  
switch (ops)
    
{
    
case ACE_Reactor::GET_MASK:
      
// The work for this operation is done in all cases at the
      
// begining of the function.
      break;
    
case ACE_Reactor::CLR_MASK:
      ptmf 
= &ACE_Handle_Set::clr_bit;
      
// State was changed. we need to reflect that change in the
      
// dispatch_mask I assume that only ACE_Reactor::CLR_MASK should
      
// be treated here  which means we need to clear the handle|mask
      
// from the current dispatch handler
      this->clear_dispatch_mask (handle, mask);
      
/* FALLTHRU */
    
case ACE_Reactor::SET_MASK:
      
/* FALLTHRU */
    
case ACE_Reactor::ADD_MASK:

      
// The following code is rather subtle  Note that if we are
      
// doing a ACE_Reactor::SET_MASK then if the bit is not enabled
      
// in the mask we need to clear the bit from the ACE_Handle_Set.
      
// On the other hand, if we are doing a ACE_Reactor::CLR_MASK or
      
// a ACE_Reactor::ADD_MASK we just carry out the operations
      
// specified by the mask.

      
// READ, ACCEPT, and CONNECT flag will place the handle in the
      
// read set.
      if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
          
|| ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
          
|| ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
        
{
          (handle_set.rd_mask_.
*ptmf) (handle);
        }

      
else if (ops == ACE_Reactor::SET_MASK)
        handle_set.rd_mask_.clr_bit (handle);

      
// WRITE and CONNECT flag will place the handle in the write set
      if (ACE_BIT_ENABLED (mask,
                           ACE_Event_Handler::WRITE_MASK)
          
|| ACE_BIT_ENABLED (mask,
                              ACE_Event_Handler::CONNECT_MASK))
        
{
          (handle_set.wr_mask_.
*ptmf) (handle);
        }

      
else if (ops == ACE_Reactor::SET_MASK)
        handle_set.wr_mask_.clr_bit (handle);

      
// EXCEPT (and CONNECT on Win32) flag will place the handle in
      
// the except set.
      if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)
#if defined (ACE_WIN32)
          
|| ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)
#endif /* ACE_WIN32 */
          )
        
{
          (handle_set.ex_mask_.
*ptmf) (handle);
        }

      
else if (ops == ACE_Reactor::SET_MASK)
        handle_set.ex_mask_.clr_bit (handle);
      
break;
    
default:
      
return -1;
    }

  
return omask;
}

ACE_INLINE 
void
ACE_Handle_Set::set_bit (ACE_HANDLE handle)
{
  ACE_TRACE (
"ACE_Handle_Set::set_bit");
  
if ((handle != ACE_INVALID_HANDLE)
      
&& (!this->is_set (handle)))
    
{
#if defined (ACE_WIN32)
      FD_SET ((SOCKET) handle,
              
&this->mask_);
      
this->size_++;
#else /* ACE_WIN32 */
#if defined (ACE_HAS_BIG_FD_SET)
      
if (this->size_ == 0)
        FD_ZERO (
&this->mask_);

      
if (handle < this->min_handle_)
        
this->min_handle_ = handle;
#endif /* ACE_HAS_BIG_FD_SET */

      FD_SET (handle,
              
&this->mask_);
      
this->size_++;

      
if (handle > this->max_handle_)
        
this->max_handle_ = handle;
#endif /* ACE_WIN32 */
    }

}


然后就是分发了:

template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T
<ACE_SELECT_REACTOR_TOKEN>::handle_events
  (ACE_Time_Value 
*max_wait_time)
{
  ACE_TRACE (
"ACE_Select_Reactor_T::handle_events");

#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)

  
// Stash the current time -- the destructor of this object will
  
// automatically compute how much time elapsed since this method was
  
// called.
  ACE_Countdown_Time countdown (max_wait_time);

  ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, 
this->token_, -1);

  
if (ACE_OS::thr_equal (ACE_Thread::self (),
                         
this->owner_) == 0 || this->deactivated_)
    
return -1;

  
// Update the countdown to reflect time waiting for the mutex.
  countdown.update ();
#else
  
if (this->deactivated_)
    
return -1;
#endif /* ACE_MT_SAFE */

  
return this->handle_events_i (max_wait_time);
}


template 
<class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T
<ACE_SELECT_REACTOR_TOKEN>::handle_events_i
  (ACE_Time_Value 
*max_wait_time)
{
  
int result = -1;

  ACE_SEH_TRY
    
{
      
// We use the data member dispatch_set_ as the current dispatch
      
// set.

      
// We need to start from a clean dispatch_set
      this->dispatch_set_.rd_mask_.reset ();
      
this->dispatch_set_.wr_mask_.reset ();
      
this->dispatch_set_.ex_mask_.reset ();

      
int number_of_active_handles =
        
this->wait_for_multiple_events (this->dispatch_set_,
                                        max_wait_time);

      result 
=
        
this->dispatch (number_of_active_handles,
                        
this->dispatch_set_); 
    }

  ACE_SEH_EXCEPT (
this->release_token ())
    
{
      
// As it stands now, we catch and then rethrow all Win32
      
// structured exceptions so that we can make sure to release the
      
// <token_> lock correctly.
    }


  
return result;
}

template 
<class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T
<ACE_SELECT_REACTOR_TOKEN>::wait_for_multiple_events
(ACE_Select_Reactor_Handle_Set 
&dispatch_set,
 ACE_Time_Value 
*max_wait_time)
{
    
    u_long width 
= 0;
    ACE_Time_Value timer_buf (
0);
    ACE_Time_Value 
*this_timeout;

    
int number_of_active_handles = this->any_ready (dispatch_set);

    
// If there are any bits enabled in the <ready_set_> then we'll
    
// handle those first, otherwise we'll block in <select>.

    
if (number_of_active_handles == 0)
    
{
        
do
        
{
            this_timeout 
=
                
this->timer_queue_->calculate_timeout (max_wait_time,
                
&timer_buf);
            width 
= (u_long) this->handler_rep_.max_handlep1 ();

            dispatch_set.rd_mask_ 
= this->wait_set_.rd_mask_;
            dispatch_set.wr_mask_ 
= this->wait_set_.wr_mask_;
            dispatch_set.ex_mask_ 
= this->wait_set_.ex_mask_;
            number_of_active_handles 
= ACE_OS::select (int (width),
                dispatch_set.rd_mask_,
                dispatch_set.wr_mask_,
                dispatch_set.ex_mask_,
                this_timeout);
        }

        
while (number_of_active_handles == -1 && this->handle_error () > 0);

        
if (number_of_active_handles > 0)
        
{
#if !defined (ACE_WIN32)
            
// Resynchronize the fd_sets so their "max" is set properly.
            dispatch_set.rd_mask_.sync (this->handler_rep_.max_handlep1 ());
            dispatch_set.wr_mask_.sync (
this->handler_rep_.max_handlep1 ());
            dispatch_set.ex_mask_.sync (
this->handler_rep_.max_handlep1 ());
#endif /* ACE_WIN32 */
        }

        
else if (number_of_active_handles == -1)
        
{
            
// Normally, select() will reset the bits in dispatch_set
            
// so that only those filed descriptors that are ready will
            
// have bits set.  However, when an error occurs, the bit
            
// set remains as it was when the select call was first made.
            
// Thus, we now have a dispatch_set that has every file
            
// descriptor that was originally waited for, which is not
            
// correct.  We must clear all the bit sets because we
            
// have no idea if any of the file descriptors is ready.
            
//
            
// NOTE: We dont have a test case to reproduce this
            
// problem. But pleae dont ignore this and remove it off.
            dispatch_set.rd_mask_.reset ();
            dispatch_set.wr_mask_.reset ();
            dispatch_set.ex_mask_.reset ();
        }

    }


    
// Return the number of events to dispatch.
    return number_of_active_handles;
}

template 
<class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T
<ACE_SELECT_REACTOR_TOKEN>::dispatch
  (
int active_handle_count,
   ACE_Select_Reactor_Handle_Set 
&dispatch_set)
{
  ACE_TRACE (
"ACE_Select_Reactor_T::dispatch");

  
int io_handlers_dispatched = 0;
  
int other_handlers_dispatched = 0;
  
int signal_occurred = 0;
  
// The following do/while loop keeps dispatching as long as there
  
// are still active handles.  Note that the only way we should ever
  
// iterate more than once through this loop is if signals occur
  
// while we're dispatching other handlers.

  
do
    
{
      
// Note that we keep track of changes to our state.  If any of
      
// the dispatch_*() methods below return -1 it means that the
      
// <wait_set_> state has changed as the result of an
      
// <ACE_Event_Handler> being dispatched.  This means that we
      
// need to bail out and rerun the select() loop since our
      
// existing notion of handles in <dispatch_set> may no longer be
      
// correct.
      
//
      
// In the beginning, our state starts out unchanged.  After
      
// every iteration (i.e., due to signals), our state starts out
      
// unchanged again.

      
this->state_changed_ = false;

      
// Perform the Template Method for dispatching all the handlers.

      
// First check for interrupts.
      if (active_handle_count == -1)
        
{
          
// Bail out -- we got here since <select> was interrupted.
          if (ACE_Sig_Handler::sig_pending () != 0)
            
{
              ACE_Sig_Handler::sig_pending (
0);

              
// If any HANDLES in the <ready_set_> are activated as a
              
// result of signals they should be dispatched since
              
// they may be time critical
              active_handle_count = this->any_ready (dispatch_set);

              
// Record the fact that the Reactor has dispatched a
              
// handle_signal() method.  We need this to return the
              
// appropriate count below.
              signal_occurred = 1;
            }

          
else
            
return -1;
        }


      
// Handle timers early since they may have higher latency
      
// constraints than I/O handlers.  Ideally, the order of
      
// dispatching should be a strategy
      else if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1)
        
// State has changed or timer queue has failed, exit loop.
        break;

      
// Check to see if there are no more I/O handles left to
      
// dispatch AFTER we've handled the timers
      else if (active_handle_count == 0)
        
return io_handlers_dispatched
          
+ other_handlers_dispatched
          
+ signal_occurred;

      
// Next dispatch the notification handlers (if there are any to
      
// dispatch).  These are required to handle multi-threads that
      
// are trying to update the <Reactor>.

      
else if (this->dispatch_notification_handlers
               (dispatch_set,
                active_handle_count,
                other_handlers_dispatched) 
== -1)
        
// State has changed or a serious failure has occured, so exit
        
// loop.
        break;

      
// Finally, dispatch the I/O handlers.
      else if (this->dispatch_io_handlers
               (dispatch_set,
                active_handle_count,
                io_handlers_dispatched) 
== -1)
        
// State has changed, so exit loop.
        break;

      
// if state changed, we need to re-eval active_handle_count,
      
// so we will not end with an endless loop
      if (this->state_changed_)
      
{
          active_handle_count 
= this->dispatch_set_.rd_mask_.num_set ()
              
+ this->dispatch_set_.wr_mask_.num_set ()
              
+ this->dispatch_set_.ex_mask_.num_set ();
      }

    }

  
while (active_handle_count > 0);

  
return io_handlers_dispatched + other_handlers_dispatched + signal_occurred;
}


template 
<class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T
<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_handlers
  (ACE_Select_Reactor_Handle_Set 
&dispatch_set,
   
int &number_of_active_handles,
   
int &number_of_handlers_dispatched)
{
  ACE_TRACE (
"ACE_Select_Reactor_T::dispatch_io_handlers");

  
// Handle output events (this code needs to come first to handle the
  
// obscure case of piggy-backed data coming along with the final
  
// handshake message of a nonblocking connection).

  
if (this->dispatch_io_set (number_of_active_handles,
                             number_of_handlers_dispatched,
                             ACE_Event_Handler::WRITE_MASK,
                             dispatch_set.wr_mask_,
                             
this->ready_set_.wr_mask_,
                             
&ACE_Event_Handler::handle_output) == -1)
    
{
      number_of_active_handles 
-= number_of_handlers_dispatched;
      
return -1;
    }


  
// ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("ACE_Select_Reactor_T::dispatch - EXCEPT\n")));
  if (this->dispatch_io_set (number_of_active_handles,
                             number_of_handlers_dispatched,
                             ACE_Event_Handler::EXCEPT_MASK,
                             dispatch_set.ex_mask_,
                             
this->ready_set_.ex_mask_,
                             
&ACE_Event_Handler::handle_exception) == -1)
    
{
      number_of_active_handles 
-= number_of_handlers_dispatched;
      
return -1;
    }


  
// ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("ACE_Select_Reactor_T::dispatch - READ\n")));
  if (this->dispatch_io_set (number_of_active_handles,
                             number_of_handlers_dispatched,
                             ACE_Event_Handler::READ_MASK,
                             dispatch_set.rd_mask_,
                             
this->ready_set_.rd_mask_,
                             
&ACE_Event_Handler::handle_input) == -1)
    
{
      number_of_active_handles 
-= number_of_handlers_dispatched;
      
return -1;
    }


  number_of_active_handles 
-= number_of_handlers_dispatched;
  
return 0;
}

template 
<class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T
<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_set
  (
int number_of_active_handles,
   
int &number_of_handlers_dispatched,
   
int mask,
   ACE_Handle_Set 
&dispatch_mask,
   ACE_Handle_Set 
&ready_mask,
   ACE_EH_PTMF callback)
{
  ACE_TRACE (
"ACE_Select_Reactor_T::dispatch_io_set");
  ACE_HANDLE handle;

  ACE_Handle_Set_Iterator handle_iter (dispatch_mask);

  
while ((handle = handle_iter ()) != ACE_INVALID_HANDLE &&
         number_of_handlers_dispatched 
< number_of_active_handles)
    
{
      
++number_of_handlers_dispatched;

      
this->notify_handle (handle,
                           mask,
                           ready_mask,
                           
this->handler_rep_.find (handle),
                            callback);

      
// clear the bit from that dispatch mask,
      
// so when we need to restart the iteration (rebuilding the iterator)
      
// we will not dispatch the already dispatched handlers
      this->clear_dispatch_mask (handle, mask);

      
if (this->state_changed_)
        
{

          handle_iter.reset_state ();
          
this->state_changed_ = false;
        }

    }


  
return 0;
}

template 
<class ACE_SELECT_REACTOR_TOKEN> void
ACE_Select_Reactor_T
<ACE_SELECT_REACTOR_TOKEN>::notify_handle
  (ACE_HANDLE handle,
   ACE_Reactor_Mask mask,
   ACE_Handle_Set 
&ready_mask,
   ACE_Event_Handler 
*event_handler,
   ACE_EH_PTMF ptmf)
{
  ACE_TRACE (
"ACE_Select_Reactor_T::notify_handle");
  
// Check for removed handlers.
  if (event_handler == 0)
    
return;

  
int reference_counting_required =
    event_handler
->reference_counting_policy ().value () ==
    ACE_Event_Handler::Reference_Counting_Policy::ENABLED;

  
// Call add_reference() if needed.
  if (reference_counting_required)
    
{
      event_handler
->add_reference ();
    }


  
int status = (event_handler->*ptmf) (handle);

  
if (status < 0)
    
this->remove_handler_i (handle, mask);
  
else if (status > 0)
    ready_mask.set_bit (handle);

  
// Call remove_reference() if needed.
  if (reference_counting_required)
    
{
      event_handler
->remove_reference ();
    }

}
最后看看,当我们hand_input返回-1的时候,又干了什么呢?

template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T
<ACE_SELECT_REACTOR_TOKEN>::remove_handler_i
  (ACE_HANDLE handle,
   ACE_Reactor_Mask mask)
{
  ACE_TRACE (
"ACE_Select_Reactor_T::remove_handler_i");

  
// Unbind this handle.
  return this->handler_rep_.unbind (handle, mask);
}

// Remove the binding of <ACE_HANDLE>.

int
ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
                                               ACE_Reactor_Mask mask)
{
  ACE_TRACE (
"ACE_Select_Reactor_Handler_Repository::unbind");

  size_t slot 
= 0;
  ACE_Event_Handler 
*event_handler = this->find (handle, &slot);

  
if (event_handler == 0)
    
return -1;

  
// Clear out the <mask> bits in the Select_Reactor's wait_set.
  this->select_reactor_.bit_ops (handle,
                                 mask,
                                 
this->select_reactor_.wait_set_,
                                 ACE_Reactor::CLR_MASK);

  
// And suspend_set.
  this->select_reactor_.bit_ops (handle,
                                 mask,
                                 
this->select_reactor_.suspend_set_,
                                 ACE_Reactor::CLR_MASK);

  
// Note the fact that we've changed the state of the <wait_set_>,
  
// which is used by the dispatching loop to determine whether it can
  
// keep going or if it needs to reconsult select().
  
// this->select_reactor_.state_changed_ = 1;

  
// If there are no longer any outstanding events on this <handle>
  
// then we can totally shut down the Event_Handler.

  
int has_any_wait_mask =
    (
this->select_reactor_.wait_set_.rd_mask_.is_set (handle)
     
|| this->select_reactor_.wait_set_.wr_mask_.is_set (handle)
     
|| this->select_reactor_.wait_set_.ex_mask_.is_set (handle));
  
int has_any_suspend_mask =
    (
this->select_reactor_.suspend_set_.rd_mask_.is_set (handle)
     
|| this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
     
|| this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));

  
int complete_removal = 0;

  
if (!has_any_wait_mask && !has_any_suspend_mask)
    
{
      
// The handle has been completed removed.
      complete_removal = 1;

      ACE_SELECT_REACTOR_EVENT_HANDLER (
this, slot) = 0;

#if defined (ACE_WIN32)

      ACE_SELECT_REACTOR_HANDLE (slot) 
= ACE_INVALID_HANDLE;

      
if (this->max_handlep1_ == (int) slot + 1)
        
{
          
// We've deleted the last entry (i.e., i + 1 == the current
          
// size of the array), so we need to figure out the last
          
// valid place in the array that we should consider in
          
// subsequent searches.

          
int i;

          
for (i = this->max_handlep1_ - 1;
               i 
>= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE;
               
--i)
            
continue;

          
this->max_handlep1_ = i + 1;
        }


#else

      
if (this->max_handlep1_ == handle + 1)
        
{
          
// We've deleted the last entry, so we need to figure out
          
// the last valid place in the array that is worth looking
          
// at.
          ACE_HANDLE wait_rd_max =
            
this->select_reactor_.wait_set_.rd_mask_.max_set ();
          ACE_HANDLE wait_wr_max 
=
            
this->select_reactor_.wait_set_.wr_mask_.max_set ();
          ACE_HANDLE wait_ex_max 
=
            
this->select_reactor_.wait_set_.ex_mask_.max_set ();

          ACE_HANDLE suspend_rd_max 
=
            
this->select_reactor_.suspend_set_.rd_mask_.max_set ();
          ACE_HANDLE suspend_wr_max 
=
            
this->select_reactor_.suspend_set_.wr_mask_.max_set ();
          ACE_HANDLE suspend_ex_max 
=
            
this->select_reactor_.suspend_set_.ex_mask_.max_set ();

          
// Compute the maximum of six values.
          this->max_handlep1_ = wait_rd_max;
          
if (this->max_handlep1_ < wait_wr_max)
            
this->max_handlep1_ = wait_wr_max;
          
if (this->max_handlep1_ < wait_ex_max)
            
this->max_handlep1_ = wait_ex_max;

          
if (this->max_handlep1_ < suspend_rd_max)
            
this->max_handlep1_ = suspend_rd_max;
          
if (this->max_handlep1_ < suspend_wr_max)
            
this->max_handlep1_ = suspend_wr_max;
          
if (this->max_handlep1_ < suspend_ex_max)
            
this->max_handlep1_ = suspend_ex_max;

          
++this->max_handlep1_;
        }


#endif /* ACE_WIN32 */

    }


  
int requires_reference_counting =
    event_handler
->reference_counting_policy ().value () ==
    ACE_Event_Handler::Reference_Counting_Policy::ENABLED;

  
// Close down the <Event_Handler> unless we've been instructed not
  
// to.
  if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
    event_handler
->handle_close (handle, mask);

  
// Call remove_reference() if the removal is complete and reference
  
// counting is needed.
  if (complete_removal &&
      requires_reference_counting)
    
{
      event_handler
->remove_reference ();
    }


  
return 0;
}


void
ACE_Select_Reactor_Impl::clear_dispatch_mask (ACE_HANDLE handle,
                                              ACE_Reactor_Mask mask)
{
  ACE_TRACE (
"ACE_Select_Reactor_Impl::clear_dispatch_mask");

  
//  Use handle and mask in order to modify the sets
  
// (wait/suspend/ready/dispatch), that way, the dispatch_io_set loop
  
// will not be interrupt, and there will no reason to rescan the
  
// wait_set and re-calling select function, which is *very*
  
// expensive. It seems that wait/suspend/ready sets are getting
  
// updated in register/remove bind/unbind etc functions.  The only
  
// thing need to be updated is the dispatch_set (also can  be found
  
// in that file code as dispatch_mask).  Because of that, we need
  
// that dispatch_set to be member of the ACE_Select_Reactor_impl in
  
// Select_Reactor_Base.h file  That way we will have access to that
  
// member in that function.

  
// We kind of invalidate the iterator in dispatch_io_set because its
  
// an array and index built from the original dispatch-set. Take a
  
// look at dispatch_io_set for more details.

  
// We only need to clr_bit, because we are interested in clearing the
  
// handles that was removed, so no dispatching to these handles will
  
// occur.
  if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) ||
      ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK))
    
{
      
this->dispatch_set_.rd_mask_.clr_bit (handle);
    }

  
if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK))
    
{
      
this->dispatch_set_.wr_mask_.clr_bit (handle);
    }

  
if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
    
{
      
this->dispatch_set_.ex_mask_.clr_bit (handle);
    }


  
// That will make the dispatch_io_set iterator re-start and rescan
  
// the dispatch set.
  this->state_changed_ = true;
}

posted on 2007-02-24 20:30 一动不如一静 阅读(1738) 评论(0)  编辑 收藏 引用 所属分类: ACE

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理