posts - 311, comments - 0, trackbacks - 0, articles - 0
  C++博客 :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理
本篇将介绍客户端与游戏逻辑服务器连接建立以后,mangosd如何接收、解析和处理客户端发过来的协议。本篇不再讨论mangosd与客户端的认证及建立最终RC4流加密的过程,想了解这部分内容请看该系列的第一篇

 

 

一、acceptor socket的监听启动及注册

mangosd的main ()函数调用单例对象sMaster的Run ()函数,启动监听socket的代码如下:

 

   1: int Master::Run()
   2: {
   3:     ........
   4:  
   5:     ///- Launch the world listener socket
   6:     uint16 wsport = sWorld.getConfig (CONFIG_UINT32_PORT_WORLD);
   7:     std::string bind_ip = sConfig.GetStringDefault ("BindIP", "0.0.0.0");
   8:  
   9:     if (sWorldSocketMgr->StartNetwork (wsport, bind_ip) == -1)
  10:     {
  11:         sLog.outError ("Failed to start network");
  12:         Log::WaitBeforeContinueIfNeed();
  13:         World::StopNow(ERROR_EXIT_CODE);
  14:         // go down and shutdown the server
  15:     }
  16:  
  17:     sWorldSocketMgr->Wait ();
  18:  
  19:     ........
  20: }

 

mangosd用WorldSocketMgr类来管理socket。StartNetwork ()会调用StartReactiveIO ()来启动监听socket,处理代码如下:

   1: int WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
   2: {
   3:     ........
   4:  
   5:     //(1)
   6:     m_NetThreadsCount = static_cast<size_t> (num_threads + 1);
   7:     m_NetThreads = new ReactorRunnable[m_NetThreadsCount];
   8:  
   9:     // -1 means use default
  10:     m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1);
  11:     m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff", 65536);
  12:     if ( m_SockOutUBuff <= 0 )
  13:     {
  14:         sLog.outError ("Network.OutUBuff is wrong in your config file");
  15:         return -1;
  16:     }
  17:  
  18:     //(2)
  19:     WorldSocket::Acceptor *acc = new WorldSocket::Acceptor;
  20:     m_Acceptor = acc;
  21:  
  22:     ACE_INET_Addr listen_addr (port, address);
  23:     if (acc->open (listen_addr, m_NetThreads[0].GetReactor (), ACE_NONBLOCK) == -1)
  24:     {
  25:         sLog.outError ("Failed to open acceptor ,check if the port is free");
  26:         return -1;
  27:     }
  28:  
  29:     //(3)
  30:     for (size_t i = 0; i < m_NetThreadsCount; ++i)
  31:         m_NetThreads[i].Start ();
  32:  
  33:     return 0;
  34: }

(1)ReactorRunnable类继承了ACE_Task_Base,ACE_Task_Base 是ACE 中的任务或主动对象“处理结构”的基类。在ACE 中使用了此类来实现主动对象模式。所有希望成为“主动对象”的对象都必须从此类派生。可以把ACE_TASK 看作是更高级的、更为面向对象的线程类[1]。ACE_Task_Base调用时继承类必须重写svc方法,并且在使用时保证调用了activate ()方法。

(2)指定监听地址,端口并把Acceptor绑定到第一个线程的Reactor上。启动Acceptor开始监听网络IO。

(3)启动所有线程,每个线程上有一个单独的ACE_Reactor* m_Reactor;,这里的Reactor使用的是多线程的ACE_TP_Reactor。可以各自单独完成事件的多路复用。

 

 

二、线程体函数

线程体函数ReactorRunnable::svc () 如下:

   1: virtual int svc ()
   2: {
   3:     //(1)
   4:     WorldDatabase.ThreadStart ();
   5:     
   6:     SocketSet::iterator i, t;
   7:     while (!m_Reactor->reactor_event_loop_done ())
   8:     {
   9:         // dont be too smart to move this outside the loop
  10:         // the run_reactor_event_loop will modify interval
  11:         ACE_Time_Value interval (0, 10000);
  12:  
  13:         //(2)
  14:         if (m_Reactor->run_reactor_event_loop (interval) == -1)
  15:             break;
  16:  
  17:         //(3)
  18:         AddNewSockets ();
  19:  
  20:         for (i = m_Sockets.begin (); i != m_Sockets.end ();)
  21:         {
  22:             //(4)
  23:             if ((*i)->Update () == -1)
  24:             {
  25:                 t = i;
  26:                 ++i;
  27:                 (*t)->CloseSocket ();
  28:                 (*t)->RemoveReference ();
  29:                 --m_Connections;
  30:                 m_Sockets.erase (t);
  31:             }
  32:             else
  33:                 ++i;
  34:         }
  35:     }
  36:  
  37:     WorldDatabase.ThreadEnd ();
  38:     DEBUG_LOG ("Network Thread Exitting");
  39:  
  40:     return 0;
  41: }

(1)会调用mysql_thread_init ()函数,初始化与该线程相关的变量。

(2)run_reactor_event_loop ()函数为多路复用的等待函数,当注册的事件发生、运行超时或者出现错误时返回。

(3)AddNewSockets ()函数会将缓存在m_NewSockets里的新到达的socket添加到SocketSet m_Sockets;里,同时检查并处理m_Sockets;里已经closed的socket。

(4)循环每一个WorldSocket,调用其Update ()方法,这里只处理每个socket的handle_output,即每个在此线程上的写事件,向客户端发送数据。下一节详细介绍:

 

 

三、WorldSocket::Update () 方法

Update方法用于处理每个socket的输出:

   1: int WorldSocket::Update (void)
   2: {
   3:     if (closing_)
   4:         return -1;
   5:  
   6:     //(1)
   7:     if (m_OutActive || m_OutBuffer->length () == 0)
   8:         return 0;
   9:  
  10:     return handle_output (get_handle ());
  11: }

(1)m_OutBuffer有数据时才会调用handle_output,handle_output ()用于处理输出,如果输出不能一次性做完,会调用schedule_wakeup_output ()再次激活write事件。当输出处理完毕后则调用cancel_wakeup_output ()取消激活write事件,使reactor恢复到正常的loop ()循环中。详细过程如下:

 

   1: int WorldSocket::handle_output (ACE_HANDLE)
   2: {
   3:     //(1)
   4:     ACE_GUARD_RETURN (LockType, Guard, m_OutBufferLock, -1);
   5:  
   6:     if (closing_)
   7:         return -1;
   8:  
   9:     const size_t send_len = m_OutBuffer->length ();
  10:     if (send_len == 0)
  11:         return cancel_wakeup_output (Guard);
  12:  
  13: #ifdef MSG_NOSIGNAL
  14:     ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL);
  15: #else
  16:     ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len);
  17: #endif // MSG_NOSIGNAL
  18:  
  19:     if (n == 0)
  20:         return -1;
  21:     else if (n == -1)
  22:     {
  23:         if (errno == EWOULDBLOCK || errno == EAGAIN) //----------(2)
  24:             return schedule_wakeup_output (Guard);
  25:  
  26:         return -1;
  27:     }
  28:     else if (n < (ssize_t)send_len) //now n > 0      //----------(3)
  29:     {
  30:         m_OutBuffer->rd_ptr (static_cast<size_t> (n));
  31:  
  32:         // move the data to the base of the buffer
  33:         m_OutBuffer->crunch ();
  34:  
  35:         return schedule_wakeup_output (Guard);
  36:     }
  37:     else //now n == send_len                         //----------(4)
  38:     {
  39:         m_OutBuffer->reset ();
  40:  
  41:         if (!iFlushPacketQueue ())
  42:             return cancel_wakeup_output (Guard);
  43:         else
  44:             return schedule_wakeup_output (Guard);
  45:     }
  46:  
  47:     ACE_NOTREACHED (return 0);
  48: }

(1)对m_OutBuffer加锁。

(2)考虑信号打断的情况等,暂时不能写。

(3)只发送了部分数据则继续wakeup该线程对应的Reactor。

(4)检查m_OutBuffer数据发送完毕同时等待buffer(PacketQueueT m_PacketQueue;)里已经没有数据时,cancel wakeup让Reactor恢复正常。

 

 

四、socket到对应线程的指派

上一节内容可以看到线程内如何处理socket,及新到达的socket。但从第一节中可知只有第一个线程注册为acceptor线程,那么新连接到达时,是如何被指派到对应的“接待”线程的呢?

可以先看一下ACE_Acceptor的处理时序图:

 


图3.1 连接到达处理时序 [2]

 

上图可以看出,当连接到达时,acceptor会调用对应的SVC_HANDLER的open ()函数,在mangosd里就是acceptor对应的int WorldSocket::open (void *a),如下:

   1: int WorldSocket::open (void *a)
   2: {
   3:     ........
   4:  
   5:     // Hook for the manager.
   6:     if (sWorldSocketMgr->OnSocketOpen (this) == -1)
   7:         return -1;
   8:  
   9:     ........
  10: }

 

OnSocketOpen方法:

   1: int WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
   2: {
   3:     ........
   4:  
   5:     // we skip the Acceptor Thread
   6:     size_t min = 1;
   7:  
   8:     MANGOS_ASSERT (m_NetThreadsCount >= 1);
   9:  
  10:     //(1)
  11:     for (size_t i = 1; i < m_NetThreadsCount; ++i)
  12:         if (m_NetThreads[i].Connections () < m_NetThreads[min].Connections ())
  13:             min = i;
  14:  
  15:     return m_NetThreads[min].AddSocket (sock);
  16: }

(1)将WorldSocket均衡的分配给每个线程。AddSocket ()将socket添加到m_NewSockets中做缓存,待该线程自行调用AddNewSockets ()添加到处理队列里。

 

 

总结:

mangosd对socket的处理因为使用了ACE,逻辑处理代码相对比较简单,写事件的异常处理主要涉及

(1)一次不能写完则不断的wakeup Reactor。

(2)信号中断等错误的判断。似乎这里并没有考虑全面(见附录)

(3)使用另一个buffer缓存,因写缓存m_OutBuffer满而带来的多出的数据。

 

References:

[1] http://blog.csdn.net/yecao_kinux/article/details/1546914

[2] http://postfiles12.naver.net/data41/2009/4/11/187/33_kbkim007.jpg?type=w3