﻿<?xml version="1.0" encoding="utf-8" standalone="yes"?><rss version="2.0" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:trackback="http://madskills.com/public/xml/rss/module/trackback/" xmlns:wfw="http://wellformedweb.org/CommentAPI/" xmlns:slash="http://purl.org/rss/1.0/modules/slash/"><channel><title>C++博客-the journey is the reward...</title><link>http://www.cppblog.com/adapterofcoms/</link><description /><language>zh-cn</language><lastBuildDate>Thu, 09 Apr 2026 05:36:55 GMT</lastBuildDate><pubDate>Thu, 09 Apr 2026 05:36:55 GMT</pubDate><ttl>60</ttl><item><title>一个基于Event Poll(epoll)的TCP Server Framework,浅析epoll</title><link>http://www.cppblog.com/adapterofcoms/archive/2010/08/03/122063.html</link><dc:creator>adapterofcoms</dc:creator><author>adapterofcoms</author><pubDate>Tue, 03 Aug 2010 06:37:00 GMT</pubDate><guid>http://www.cppblog.com/adapterofcoms/archive/2010/08/03/122063.html</guid><wfw:comment>http://www.cppblog.com/adapterofcoms/comments/122063.html</wfw:comment><comments>http://www.cppblog.com/adapterofcoms/archive/2010/08/03/122063.html#Feedback</comments><slash:comments>1</slash:comments><wfw:commentRss>http://www.cppblog.com/adapterofcoms/comments/commentRss/122063.html</wfw:commentRss><trackback:ping>http://www.cppblog.com/adapterofcoms/services/trackbacks/122063.html</trackback:ping><description><![CDATA[<span style="FONT-SIZE: 10pt">&nbsp;&nbsp;&nbsp;epoll,event poll,on linux kernel 2.6.x.pthread,nptl-2.12<br>&nbsp;&nbsp;&nbsp;LT/ET:ET也会多次发送event,当然频率远低于LT,但是epoll one shot才是真正的对"one connection&nbsp;VS one thread in worker thread pool,不依赖于任何connection-data-queue"的基础支持&nbsp;.我看到大部分对epoll_wait的处理模式如下，很教科化，因为man-pages就是这样举例子的。<br>man-pages epoll_wait handle:<br>#define <strong>MAX_EVENTS</strong> 10<br>struct epoll_event&nbsp;events[MAX_EVENTS];<br>for (;;) <br>{<br>&nbsp;&nbsp; <strong>nfds</strong> = epoll_wait(epollfd, events, <strong>MAX_EVENTS</strong>, -1);<br>&nbsp;&nbsp; <strong>for(1~nfds)<br></strong>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; handle <strong>events[i];<br></strong>&nbsp;&nbsp; ......<br>}<br><strong>epoll_ctl_<font style="FONT-FAMILY: " color=#ff0000>add</font></strong>的当然是EPOLLIN|<strong>EPOLLET</strong>,此外我就不知道处理上面代码的是一条线程还是一堆线程（in threadpool），但愿不是一条线程吧！如果是的话，难不成等处理完这<strong>MAX_EVENTS</strong> 个再去处理接下来的<strong>MAX_EVENTS</strong> 个？慢否？ 但是如果是一堆线程的话，你是否考虑过如何处理request-data in one connection在逻辑上的完整性，也就是一个request-data-packet可能会被分割成若干次发送，在上面的处理模式中你真的要好好设计一下了。<br>而我的epoll_wait处理模式如下：<br>struct epoll_event <strong>activeEvent</strong>;<br>for(;;)<br>{<br>&nbsp;&nbsp; epoll_wait(epollfd, <strong>&amp;activeEvent</strong>, <strong><font style="FONT-FAMILY: " color=#ff0000>1/*很惊讶吗，但绝不是一条线程在运行这段代码,而是一堆*/</font></strong>, timeout);<br>&nbsp;&nbsp; if&nbsp;handle activeEvent success<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <strong>epoll_ctl_<font style="FONT-FAMILY: " color=#ff0000>mod</font></strong> EPOLLIN|EPOLLET|<strong><font style="FONT-FAMILY: " color=#ff0000>EPOLLONESHOT<br></font></strong>&nbsp; ......<br>}<br>处理上面代码的当然是一堆线程in threadpool,而且<strong>epoll_ctl_<font style="FONT-FAMILY: " color=#ff0000>add</font></strong>的是EPOLLIN|EPOLLET|<strong><font style="FONT-FAMILY: " color=#ff0000>EPOLLONESHOT<br></font></strong>因为我的设计理念是严格遵守<strong>one connection VS one thread in worker thread pool。<br></strong>所以我下面的server框架的基本模型是:<br>One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.<br>在epollWorkerRoutine中有以下的职责:<br>1.handle request,当忙时增加epollWorkerThread数量但不超过maxThreads,post/MOD EPOLLIN|<strong>EPOLLONESHOT</strong> Interested Event to epoll.<br>2.timeout时检查是否空闲和当前epollWorkerThread数量,当空闲时保持或减少至minThreads数量.<br>3.对所有Accepted-socket管理生命周期,这里利用系统的keepalive probes,若想实现业务层"心跳探测"只需将QSS_SIO_KEEPALIVE_VALS_TIMEOUT 改回系统默认的2小时.这里并不维护所有连接列表，当然你可以在/proc/getpid/fd下找到所有的socket fd.<br>4.linux上的non-blocking socket的操作仍然依赖recv,send，不像windows上的wsarecv+overlapped,即便不用fcntl fd o_nonblock也可以立即返回。我这里把send动作实现成了blocking的（internalBlockingSender），同样的道理，non-blocking send依然会形成响应数据在逻辑上的碎片错序，特别是你如果采用上面那个教科化的处理模式的化，并且还是多线程的话，那么这简直就乱透了。当然你可以使用response-data-queue来达到异步准确发送数据的目的。<br><br>下面结合源码,浅析一下epoll programming:<br><strong>socketserver.h<br></strong>#ifndef __Q_SOCKET_SERVER__<br>#define __Q_SOCKET_SERVER__<br>#include &lt;errno.h&gt;<br>#include &lt;sys/socket.h&gt;<br>#include &lt;netinet/in.h&gt;<br>#include &lt;netinet/tcp.h&gt;<br>#include &lt;arpa/inet.h&gt;<br>#include &lt;sys/types.h&gt;<br>#include &lt;string.h&gt;<br>#include &lt;sys/epoll.h&gt;<br>#include &lt;pthread.h&gt;<br>#include &lt;unistd.h&gt;<br>#include &lt;fcntl.h&gt;<br>#include &lt;stdio.h&gt;<br>#include &lt;stdlib.h&gt;<br>#include &lt;time.h&gt;<br>#define SOCKET_ERROR -1<br>#define INVALID_SOCKET -1<br>typedef int SOCKET;<br>typedef struct sockaddr_in SOCKADDR_IN;<br>typedef unsigned short WORD;<br>typedef unsigned int DWORD;
<p>#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60<br>#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5<br>#define QSS_SIO_KEEPALIVE_VALS_COUNT 3<br>#define MAX_THREADS 100<br>#define MAX_THREADS_MIN&nbsp; 10<br>#define MIN_WORKER_WAIT_TIMEOUT&nbsp; 20*1000<br>#define MAX_WORKER_WAIT_TIMEOUT&nbsp; 60*MIN_WORKER_WAIT_TIMEOUT<br>#define MAX_THREADPOOLS&nbsp; 32</p>
<p>#define MAX_BUF_SIZE 1024<br>/* <strong>ulimit -n opened FDs per process</strong>.记得修改哦，否则还是select效果,就不是epoll效果了哦，呵呵*/<br>#define BLOCKING_SEND_TIMEOUT 20</p>
<p>typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose<br>typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);<br>typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR</p>
<p>typedef struct {<br>&nbsp; WORD passive;<br>&nbsp; WORD port;//uint16_t<br>&nbsp; WORD minThreads;<br>&nbsp; WORD maxThreads;<br>&nbsp; pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE<br>&nbsp; volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped<br>&nbsp; int&nbsp; workerWaitTimeout;//wait timeout<br>&nbsp; volatile int workerCounter;<br>&nbsp; volatile int currentBusyWorkers;<br>&nbsp; volatile int CSocketsCounter;<br>&nbsp; CSocketLifecycleCallback cslifecb;<br>&nbsp; InternalProtocolHandler protoHandler;<br>&nbsp; SOCKET server_s;<br>&nbsp; SOCKADDR_IN serv_addr;<br>&nbsp; int epollFD;//main epoller.<br>&nbsp; int BSendEpollFD;//For blocking send.<br>}QSocketServer;</p>
<p>typedef struct {<br>&nbsp; SOCKET client_s;<br>&nbsp; SOCKADDR_IN client_addr;<br>&nbsp; uint32_t curEvents;</p>
<p>&nbsp; char buf[MAX_BUF_SIZE];<br>&nbsp; DWORD numberOfBytesTransferred;<br>&nbsp; char * data;</p>
<p>&nbsp; int BSendEpollFDRelated;<br>&nbsp; pthread_mutex_t writableLock;<br>&nbsp; pthread_cond_t&nbsp; writableMonitor;<br>}QSSEPollEvent;//for per connection</p>
<p>int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);<br>int startSocketServer(QSocketServer *qss);<br>int shutdownSocketServer(QSocketServer *qss);<br>#endif</p>
<p><strong>qsocketserver_model.c&nbsp; //</strong>下面的代码离生产环境还差<strong>内存池</strong>和<strong>logger</strong>哦！<br>&nbsp;</span>#include "socketserver.h"<br>#include &lt;dirent.h&gt;<br>#include &lt;regex.h&gt;<br>#define DIGIT_PATTERN_STRING "^[0-9]+$"<br>void *&nbsp; <strong>epollWorkerRoutine</strong>(void *);<br>void *&nbsp; <strong>blockingSendEpollerRoutine</strong>(void *);<br>int isDigitStr(const char *str){<br>&nbsp;&nbsp;&nbsp; int ret=-1;<br>&nbsp;&nbsp;&nbsp; regex_t regex;<br>&nbsp;&nbsp;&nbsp; regmatch_t matchs[1];<br>&nbsp;&nbsp;&nbsp; if(!regcomp(&amp;regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*<strong>这里不要传0哦，否则nomatch</strong>*/)){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ret=!regexec(&amp;regex,str, 1,matchs,0);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; regfree(&amp;regex);<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; return ret;<br>}</p>
<p>static int setNonBlocking(int sock)<br>{<br>&nbsp;&nbsp;&nbsp; int opts;<br>&nbsp;&nbsp;&nbsp; opts=fcntl(sock,F_GETFL);<br>&nbsp;&nbsp;&nbsp; if(opts==-1)<br>&nbsp;&nbsp;&nbsp; {<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; perror("fcntl(sock,GETFL) failed!\n");<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return opts;<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; opts = opts|O_NONBLOCK;<br>&nbsp;&nbsp;&nbsp; opts=fcntl(sock,F_SETFL,opts);<br>&nbsp;&nbsp;&nbsp; if(opts==-1)<br>&nbsp;&nbsp;&nbsp; {<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; perror("fcntl(sock,SETFL,opts) failed!\n");<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return opts;<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; return 1;<br>}</p>
<p>static void adjustQSSWorkerLimits(QSocketServer *qss){<br>&nbsp;&nbsp; //to adjust availabe size.<br>}<br>typedef struct{<br>&nbsp;QSocketServer * qss;<br>&nbsp;pthread_t th;<br>}QSSWORKER_PARAM;</p>
<p>static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){<br>&nbsp;WORD res=0;<br>&nbsp;if(qss-&gt;workerCounter&lt;qss-&gt;minThreads||(qss-&gt;currentBusyWorkers==qss-&gt;workerCounter&amp;&amp;qss-&gt;workerCounter&lt;qss-&gt;maxThreads))<br>&nbsp;{<br>&nbsp;&nbsp;QSSWORKER_PARAM * pParam=NULL;<br>&nbsp;&nbsp;int i=0;<br>&nbsp;&nbsp;pthread_spin_lock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;if(qss-&gt;workerCounter+addCounter&lt;=qss-&gt;maxThreads)<br>&nbsp;&nbsp;&nbsp;for(;i&lt;addCounter;i++)<br>&nbsp;&nbsp;&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;pParam=malloc(sizeof(QSSWORKER_PARAM));</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;if(pParam){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_create(&amp;pParam-&gt;th,NULL,epollWorkerRoutine,pParam);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pParam-&gt;qss=qss;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;qss-&gt;workerCounter++,res++;<br>&nbsp;&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;pthread_spin_unlock(&amp;qss-&gt;g_spinlock);<br>&nbsp;}<br>&nbsp;return res;<br>}</p>
<p align=left>static void SOlogger(const char * msg,SOCKET s){<br>&nbsp;perror(msg);<br>&nbsp;&nbsp;&nbsp; if(s&gt;0)<br>&nbsp;&nbsp;&nbsp; close(s);<br>}</p>
<p>static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){<br>&nbsp;&nbsp;&nbsp; QSSEPollEvent *qssEPEvent=event-&gt;data.ptr;<br>&nbsp;&nbsp;&nbsp; int ret;<br>&nbsp;&nbsp;&nbsp; printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent-&gt;client_s,pthread_self());<br>&nbsp;&nbsp;&nbsp; if((ret=recv(qssEPEvent-&gt;client_s,qssEPEvent-&gt;buf,MAX_BUF_SIZE,0))&gt;0){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //sleep(5);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ret=_blockingSender(senderBase,qssEPEvent-&gt;client_s,qssEPEvent-&gt;buf,ret);<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp; printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent-&gt;client_s,pthread_self());<br>&nbsp;return ret;<br>}</p>
<p>int <strong>createSocketServer</strong>(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)<br>{</p>
<p>&nbsp;QSocketServer * qss=malloc(sizeof(QSocketServer));<br>&nbsp;qss-&gt;passive=passive;<br>&nbsp;qss-&gt;port=port;<br>&nbsp;qss-&gt;minThreads=minThreads;<br>&nbsp;qss-&gt;maxThreads=maxThreads;<br>&nbsp;qss-&gt;workerWaitTimeout=workerWaitTimeout;<br>&nbsp;qss-&gt;lifecycleStatus=0;<br>&nbsp;pthread_spin_init(&amp;qss-&gt;g_spinlock,PTHREAD_PROCESS_PRIVATE);<br>&nbsp;qss-&gt;workerCounter=0;<br>&nbsp;qss-&gt;currentBusyWorkers=0;<br>&nbsp;qss-&gt;CSocketsCounter=0;<br>&nbsp;qss-&gt;cslifecb=cslifecb,qss-&gt;protoHandler=protoHandler;<br>&nbsp;if(!qss-&gt;protoHandler)<br>&nbsp;&nbsp;qss-&gt;protoHandler=_InternalProtocolHandler;<br>&nbsp;adjustQSSWorkerLimits(qss);<br>&nbsp;*qss_ptr=qss;<br>&nbsp;return 1;<br>}</p>
<p>int <strong>startSocketServer</strong>(QSocketServer *qss)<br>{<br>&nbsp;&nbsp;&nbsp;&nbsp;if(qss==NULL)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return 0;<br>&nbsp;&nbsp;&nbsp; else{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_spin_lock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qss-&gt;lifecycleStatus==0){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; qss-&gt;lifecycleStatus=1;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_spin_unlock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_spin_unlock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return 0;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; //bzero(&amp;qss-&gt;serv_addr, sizeof(qss-&gt;serv_addr));</p>
<p>&nbsp;qss-&gt;serv_addr.sin_family=AF_INET;<br>&nbsp;qss-&gt;serv_addr.sin_port=htons(qss-&gt;port);<br>&nbsp;inet_aton("127.0.0.1",&amp;(qss-&gt;serv_addr.sin_addr));<br>&nbsp;//qss-&gt;serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");</p>
<p>&nbsp;qss-&gt;server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);<br>&nbsp;if(setNonBlocking(qss-&gt;server_s)==-1)<br>&nbsp;{<br>&nbsp;&nbsp;&nbsp; SOlogger("setNonBlocking server_s failed.\n",0);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return 0;<br>&nbsp;}</p>
<p>&nbsp;if(qss-&gt;server_s==INVALID_SOCKET)<br>&nbsp;{<br>&nbsp;&nbsp;SOlogger("socket failed.\n",0);<br>&nbsp;&nbsp;return 0;<br>&nbsp;}</p>
<p>&nbsp;if(bind(qss-&gt;server_s,(struct sockaddr *)&amp;qss-&gt;serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)<br>&nbsp;{<br>&nbsp;&nbsp;SOlogger("bind failed.\n",qss-&gt;server_s);<br>&nbsp;&nbsp;return 0;<br>&nbsp;}</p>
<p>&nbsp;if(listen(qss-&gt;server_s,SOMAXCONN/*<strong>这个宏windows也有，这里是128，当然你可以设的小些，它影响开销的</strong>*/)==SOCKET_ERROR)<br>{<br>&nbsp;&nbsp;SOlogger("listen failed.\n",qss-&gt;server_s);<br>&nbsp; return 0;<br>}<br>&nbsp;&nbsp;&nbsp; qss-&gt;epollFD=epoll_create1(0);/*<strong>这里不是epoll_create(size)哦，你可能不知道如何设置size,所以忽略它吧*/</strong><br>&nbsp;&nbsp;&nbsp; if(qss-&gt;epollFD==-1){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; SOlogger("epoll_create1 0, main epollFD&nbsp; failed.\n",qss-&gt;server_s);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return 0;<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; qss-&gt;BSendEpollFD=epoll_create1(0);//<strong>for blocking send</strong>.<br>&nbsp;&nbsp;&nbsp; if(qss-&gt;BSendEpollFD==-1){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss-&gt;server_s);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return 0;<br>&nbsp;&nbsp;&nbsp; }</p>
<p>&nbsp;&nbsp;&nbsp; {//ADD ACCEPT EVENT<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; struct epoll_event _epEvent;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; qssEPEvent-&gt;client_s=qss-&gt;server_s;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; _epEvent.events=qssEPEvent-&gt;curEvents=EPOLLIN|EPOLLET;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; _epEvent.data.ptr=qssEPEvent;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(epoll_ctl(qss-&gt;epollFD,EPOLL_CTL_ADD,qss-&gt;server_s,&amp;_epEvent)==-1){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; SOlogger("epoll_ctl server_s to accept failed.\n",qss-&gt;server_s);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; free(qssEPEvent);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return 0;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; {//starup blocking send epoller.<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pParam-&gt;qss=qss;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_create(&amp;pParam-&gt;th,NULL,blockingSendEpollerRoutine,pParam);<br>&nbsp;&nbsp;&nbsp; }</p>
<p>&nbsp;//initialize worker for epoll events.<br>&nbsp;addQSSWorker(qss,qss-&gt;minThreads);<br>&nbsp;qss-&gt;lifecycleStatus=2;<br>&nbsp;return 1;<br>}</p>
<p>int <strong>shutdownSocketServer</strong>(QSocketServer *qss){<br>&nbsp;&nbsp;&nbsp; //change qss-&gt;lifecycleStatus<br>&nbsp;&nbsp;&nbsp; if(qss==NULL)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return 0;<br>&nbsp;&nbsp;&nbsp; else{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_spin_lock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qss-&gt;lifecycleStatus==2){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; qss-&gt;lifecycleStatus=3;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_spin_unlock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_spin_unlock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return 0;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; /*shutdown server-listening socket,<strong>这里优雅的做法是shutdown--notify--&gt;epoll--&gt;close.记得shutdown会发送EOF的哦</strong>*/<br>&nbsp;&nbsp;&nbsp; shutdown(qss-&gt;server_s,SHUT_RDWR);</p>
<p>&nbsp;&nbsp;&nbsp; // /proc/getpid/fd&nbsp; shutdown all socket cs != serv_s<br>&nbsp;&nbsp;&nbsp; {<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; char dirBuf[64];<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; struct dirent * de;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DIR *pd=NULL;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int sockFD;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sprintf(dirBuf,"/proc/%d/fd/",getpid());<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pd=opendir(dirBuf);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(pd!=NULL){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while((de=readdir(pd))!=NULL){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(isDigitStr(de-&gt;d_name)){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sockFD=atoi(de-&gt;d_name);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(isfdtype(sockFD,S_IFSOCK))<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; shutdown(sockFD,SHUT_RDWR);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; closedir(pd);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; /*fstat(ret,&amp;_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;return 1;<br>}<br><br>static int <strong>onAcceptRoutine</strong>(QSocketServer * qss)<br>{<br>&nbsp;&nbsp;&nbsp; SOCKADDR_IN client_addr;<br>&nbsp;&nbsp;&nbsp;&nbsp;unsigned int client_addr_leng=sizeof(SOCKADDR_IN);<br>&nbsp;&nbsp;&nbsp;&nbsp;SOCKET cs;<br>&nbsp;&nbsp;&nbsp;&nbsp;struct epoll_event _epEvent;<br>&nbsp;&nbsp;&nbsp;&nbsp;QSSEPollEvent *qssEPEvent=NULL;<br>&nbsp;&nbsp;&nbsp; cs=accept(qss-&gt;server_s,(struct sockaddr *)&amp;client_addr,&amp;client_addr_leng);<br>&nbsp;&nbsp;&nbsp; if(cs==INVALID_SOCKET)<br>&nbsp;&nbsp;&nbsp; {<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; printf("onAccept failed:%d,%s\n",errno,strerror(errno));<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; epoll_ctl(qss-&gt;epollFD,EPOLL_CTL_DEL,qss-&gt;server_s,NULL);//EINVAL 22&nbsp; Invalid argument<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return 0;<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; if(setNonBlocking(cs)==-1)<br>&nbsp;&nbsp;&nbsp; {<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return 0;<br>&nbsp;&nbsp;&nbsp; }</p>
<p>&nbsp;&nbsp;&nbsp; {// set keepalive option<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int keepAlive = 1;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&amp;keepAlive, sizeof(keepAlive))||<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&amp;keepIdle, sizeof(keepIdle))||<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&amp;keepInterval, sizeof(keepInterval))||<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&amp;keepCount, sizeof(keepCount)))<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return 0;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; qssEPEvent=malloc(sizeof(QSSEPollEvent));<br>&nbsp;&nbsp;&nbsp; qssEPEvent-&gt;client_s=cs;<br>&nbsp;&nbsp;&nbsp; {<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; _epEvent.events=qssEPEvent-&gt;curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; qssEPEvent-&gt;BSendEpollFDRelated=0;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <strong>_epEvent.data.<span style="COLOR: #ff0000">ptr</span>=qssEPEvent</strong>;<strong>/*这里又和教科的不一样哦，真正的user data用ptr,而不是单一的fd*/</strong><br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(epoll_ctl(qss-&gt;epollFD,EPOLL_CTL_ADD,cs,&amp;_epEvent)==-1){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; free(qssEPEvent);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return 0;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_spin_lock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; qss-&gt;CSocketsCounter++;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_spin_unlock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qss-&gt;cslifecb)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; qss-&gt;cslifecb(cs,0);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);<br>&nbsp;&nbsp;&nbsp; return 1;<br>}</p>
<p>typedef struct{<br>&nbsp;&nbsp;&nbsp; QSocketServer * qss;<br>&nbsp;&nbsp;&nbsp; QSSEPollEvent * event;<br>}InternalSenderBase_t;</p>
<p>static int <strong>internalBlockingSender</strong>(void * senderBase,int cs, void * _buf, size_t nbs){<br>&nbsp;&nbsp;&nbsp; InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;<br>&nbsp;&nbsp;&nbsp; char * _sbuf=_buf;<br>&nbsp;&nbsp;&nbsp; int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&amp;errno;</p>
<p>&nbsp;&nbsp;&nbsp; QSSEPollEvent *qssEPEvent=NULL;<br>&nbsp;&nbsp;&nbsp; struct epoll_event _epEvent;</p>
<p>&nbsp;&nbsp;&nbsp; struct timespec sendTimeo;</p>
<p>&nbsp;&nbsp;&nbsp; while(1){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; *errno_ptr=0;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while(sum&lt;nbs&amp;&amp;(ret=send(cs,_sbuf,nbs-sum,0))&gt;0)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sum+=ret,_sbuf+=ret;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(sum==nbs||ret==0)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; break;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; else if(ret==-1){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(errno==EAGAIN&amp;&amp;sum&lt;nbs){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; qssEPEvent=sb-&gt;event;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; _epEvent.data.ptr=qssEPEvent;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; _epEvent.events=<strong>EPOLLOUT|EPOLLET|EPOLLONESHOT</strong>;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qssEPEvent-&gt;<strong>BSendEpollFDRelated</strong>==0){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_mutex_init(&amp;qssEPEvent-&gt;writableLock,NULL);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_cond_init(&amp;qssEPEvent-&gt;writableMonitor,NULL);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; qssEPEvent-&gt;<strong>BSendEpollFDRelated</strong>=1;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <strong>curEpoll_ctl_opt=EPOLL_CTL_ADD</strong>;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <strong>curEpoll_ctl_opt=EPOLL_CTL_MOD</strong>;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {//wait writable.<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int flag=0;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_mutex_lock(&amp;qssEPEvent-&gt;writableLock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(<strong>epoll_ctl</strong>(sb-&gt;qss-&gt;<strong>BSendEpollFD</strong>,curEpoll_ctl_opt,qssEPEvent-&gt;client_s,&amp;_epEvent)==0){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sendTimeo.tv_nsec=0,sendTimeo.tv_sec=<strong>time(NULL)+BLOCKING_SEND_TIMEOUT</strong>;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int err=<strong>pthread_cond_timedwait</strong>(&amp;qssEPEvent-&gt;writableMonitor,&amp;qssEPEvent-&gt;writableLock,&amp;sendTimeo);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(err)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; flag=-1;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; flag=-1;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_mutex_unlock(&amp;qssEPEvent-&gt;writableLock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(flag==-1)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; break;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(errno==EAGAIN&amp;&amp;sum==nbs)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ret=nbs;//it is ok;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; break;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; }//end while.<br>&nbsp;&nbsp;&nbsp; return ret;<br>}<br>void *&nbsp; <strong>blockingSendEpollerRoutine</strong>(void *_param){<br>&nbsp;&nbsp;&nbsp; QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;<br>&nbsp;&nbsp;&nbsp;&nbsp;QSocketServer * qss=pParam-&gt;qss;<br>&nbsp;&nbsp;&nbsp;&nbsp;//pthread_t * curThread=&amp;pParam-&gt;th;<br>&nbsp;&nbsp;&nbsp;&nbsp;struct epoll_event epEvents[qss-&gt;maxThreads];<br>&nbsp;&nbsp;&nbsp;&nbsp;QSSEPollEvent *qssEPEvent=NULL;<br>&nbsp;&nbsp;&nbsp;&nbsp;int pollRes,*errno_ptr=&amp;errno;</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;free(pParam);<br>&nbsp;&nbsp;&nbsp; while(1){</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pollRes=epoll_wait(qss-&gt;BSendEpollFD,epEvents,qss-&gt;maxThreads,-1);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(pollRes&gt;=1){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int i=0;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; for(;i&lt;pollRes;i++)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(epEvents[i].events&amp;<strong>EPOLLOUT</strong>){//<strong>这个epollfd只应该做以下的事情，少做为快!</strong><br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; qssEPEvent=epEvents[i].data.ptr;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <strong>pthread_mutex_lock</strong>(&amp;qssEPEvent-&gt;writableLock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <strong>pthread_cond_signal</strong>(&amp;qssEPEvent-&gt;writableMonitor);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <strong>pthread_mutex_unlock</strong>(&amp;qssEPEvent-&gt;writableLock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else if(pollRes==-1){//errno&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; break;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</p>
<p>&nbsp;&nbsp;&nbsp; }</p>
<p>&nbsp;&nbsp;&nbsp; return NULL;<br>}<br><br>void *&nbsp; <strong>epollWorkerRoutine</strong>(void * _param){<br>&nbsp;&nbsp;&nbsp;&nbsp;QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;<br>&nbsp;&nbsp;&nbsp;&nbsp;QSocketServer * qss=pParam-&gt;qss;<br>&nbsp;&nbsp;&nbsp;&nbsp;pthread_t * curThread=&amp;pParam-&gt;th;<br>&nbsp;&nbsp;&nbsp;&nbsp;struct epoll_event _epEvent;<br>&nbsp;&nbsp;&nbsp;&nbsp;QSSEPollEvent *qssEPEvent=NULL;<br>&nbsp;&nbsp;&nbsp;&nbsp;InternalSenderBase_t _senderBase;<br>&nbsp;&nbsp;&nbsp;&nbsp;int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&amp;errno;<br>&nbsp;&nbsp;&nbsp; _senderBase.qss=qss;<br>&nbsp;&nbsp;&nbsp; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;free(pParam);<br>&nbsp;&nbsp;&nbsp; while(!exitCode){</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pollRes=epoll_wait(qss-&gt;epollFD,&amp;_epEvent,1,qss-&gt;workerWaitTimeout);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(pollRes==1){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <strong>qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;</strong></p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qssEPEvent-&gt;client_s==qss-&gt;server_s)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {//Accepted Socket.<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <strong>onAcceptRoutine</strong>(qss);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; continue;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qss-&gt;<strong>protoHandler</strong>){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; _senderBase.event=_epEvent.data.ptr;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_spin_lock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; qss-&gt;currentBusyWorkers++;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_spin_unlock(&amp;qss-&gt;g_spinlock);</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; addQSSWorker(qss,1);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <strong>handleCode</strong>=qss-&gt;<strong>protoHandler</strong>(&amp;_epEvent,<strong>internalBlockingSender,&amp;_senderBase</strong>);</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_spin_lock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; qss-&gt;currentBusyWorkers--;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_spin_unlock(&amp;qss-&gt;g_spinlock);</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(<strong>handleCode&gt;0</strong>){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; _epEvent.events=EPOLLIN|EPOLLET|<strong>EPOLLONESHOT</strong>;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(<strong>epoll_ctl</strong>(qss-&gt;epollFD,EPOLL_CTL_MOD,qssEPEvent-&gt;client_s,&amp;_epEvent)==-1)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; SOErrOccurred=2;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else if(pollRes==0){//timeout<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qss-&gt;lifecycleStatus&lt;=3&amp;&amp;qss-&gt;currentBusyWorkers==0&amp;&amp;qss-&gt;workerCounter&gt;qss-&gt;minThreads)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_spin_lock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qss-&gt;lifecycleStatus&lt;=3&amp;&amp;qss-&gt;currentBusyWorkers==0&amp;&amp;qss-&gt;workerCounter&gt;qss-&gt;minThreads){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; qss-&gt;workerCounter--;//until qss-&gt;workerCounter decrease to qss-&gt;minThreads<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exitCode=2;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_spin_unlock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else if(qss-&gt;lifecycleStatus&gt;=4)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exitCode=4;</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else if(pollRes==-1){//errno<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exitCode=1;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(SOErrOccurred){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qss-&gt;cslifecb)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;qss-&gt;cslifecb(qssEPEvent-&gt;client_s,-1);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;/*if(qssEPEvent)*/{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;epoll_ctl(qss-&gt;epollFD,EPOLL_CTL_DEL,qssEPEvent-&gt;client_s,NULL);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;epoll_ctl(qss-&gt;BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent-&gt;client_s,NULL);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; close(qssEPEvent-&gt;client_s);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qssEPEvent-&gt;BSendEpollFDRelated){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_cond_destroy(&amp;qssEPEvent-&gt;writableMonitor);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pthread_mutex_destroy(&amp;qssEPEvent-&gt;writableLock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;free(qssEPEvent);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_spin_lock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if(<strong>--qss-&gt;CSocketsCounter</strong>==0&amp;&amp;qss-&gt;lifecycleStatus&gt;=3){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//for qss workerSize,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;qss-&gt;lifecycleStatus=4;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;exitCode=3;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_spin_unlock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }//SOErrOccurred handle;</p>
<p>&nbsp;&nbsp;&nbsp; }//end main while.</p>
<p>&nbsp;&nbsp;&nbsp; if(exitCode!=2){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int clearup=0;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_spin_lock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if(<strong>!--qss-&gt;workerCounter</strong>&amp;&amp;qss-&gt;lifecycleStatus&gt;=4){//clearup QSS<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;clearup=1;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_spin_unlock(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if(clearup){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;close(qss-&gt;epollFD);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;close(qss-&gt;BSendEpollFD);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pthread_spin_destroy(&amp;qss-&gt;g_spinlock);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;free(qss);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;&nbsp; }//exitCode handle;<br>&nbsp;return NULL;<br>}</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<img src ="http://www.cppblog.com/adapterofcoms/aggbug/122063.html" width = "1" height = "1" /><br><br><div align=right><a style="text-decoration:none;" href="http://www.cppblog.com/adapterofcoms/" target="_blank">adapterofcoms</a> 2010-08-03 14:37 <a href="http://www.cppblog.com/adapterofcoms/archive/2010/08/03/122063.html#Feedback" target="_blank" style="text-decoration:none;">发表评论</a></div>]]></description></item><item><title> 一个基于完成端口的TCP Server Framework,浅析IOCP</title><link>http://www.cppblog.com/adapterofcoms/archive/2010/06/26/118781.html</link><dc:creator>adapterofcoms</dc:creator><author>adapterofcoms</author><pubDate>Sat, 26 Jun 2010 09:30:00 GMT</pubDate><guid>http://www.cppblog.com/adapterofcoms/archive/2010/06/26/118781.html</guid><wfw:comment>http://www.cppblog.com/adapterofcoms/comments/118781.html</wfw:comment><comments>http://www.cppblog.com/adapterofcoms/archive/2010/06/26/118781.html#Feedback</comments><slash:comments>1</slash:comments><wfw:commentRss>http://www.cppblog.com/adapterofcoms/comments/commentRss/118781.html</wfw:commentRss><trackback:ping>http://www.cppblog.com/adapterofcoms/services/trackbacks/118781.html</trackback:ping><description><![CDATA[&nbsp;&nbsp;&nbsp; 如果你不投递（POST）Overlapped&nbsp;I/O，那么I/O Completion&nbsp;Ports&nbsp;只能为你提供一个Queue.&nbsp;<br>&nbsp;&nbsp;&nbsp; CreateIoCompletionPort的NumberOfConcurrentThreads：<br>1.只有当第二个参数ExistingCompletionPort为NULL时它才有效，它是个max threads limits.<br>2.大家有谁把它设置为超出cpu个数的值，当然不只是cpu个数的2倍，而是下面的MAX_THREADS 100甚至更大。<br>对于这个值的设定，msdn并没有说非得设成cpu个数的2倍，而且也没有把减少线程之间上下文交换这些影响扯到这里来。I/O Completion Ports MSDN:"If your transaction required a <strong>lengthy computation</strong>, a <strong>larger</strong> <strong>concurrency value</strong> will allow more threads to run. Each completion packet may take longer to finish, <strong>but more completion packets will be processed at the same time</strong>. "。<br>&nbsp;&nbsp;&nbsp; 对于struct OVERLAPPED，我们常会如下扩展，<br>typedef struct {<br>&nbsp; WSAOVERLAPPED <strong>overlapped</strong>; //<strong><span style="COLOR: #ff0000">must be first member</span>?</strong>&nbsp;&nbsp;&nbsp;是的，必须是第一个。如果你不肯定，你可以试试。<br>&nbsp; SOCKET client_s;<br>&nbsp; SOCKADDR_IN client_addr;<br>&nbsp; WORD <strong>optCode</strong>;//<strong>1--read,2--send.</strong>&nbsp; 有人常会定义这个数据成员，但也有人不用，争议在send/WSASend,此时的同步和异步是否有必要？&nbsp;至少我下面的server更本就没用它。<br>&nbsp; char buf[MAX_BUF_SIZE];<br>&nbsp; WSABUF <strong>wsaBuf</strong>;//<span style="COLOR: #ff0000"><strong>inited ?</strong></span>&nbsp; 这个不要忘了！<br>&nbsp; DWORD numberOfBytesTransferred;<br>&nbsp; DWORD flags;&nbsp;&nbsp;&nbsp;
<p>}QSSOverlapped;//<strong>for per connection<br></strong>我下面的server框架的基本思想是:<br>One connection&nbsp;VS one thread in worker thread pool&nbsp;,worker thread performs completionWorkerRoutine.<br>A&nbsp;Acceptor thread 专门用来accept socket,关联至IOCP,并WSARecv:post Recv Completion Packet to IOCP.<br>在completionWorkerRoutine中有以下的职责:<br>1.handle request,当忙时增加completionWorkerThread数量但不超过maxThreads,post Recv Completion Packet to IOCP.<br>2.timeout时检查是否空闲和当前completionWorkerThread数量,当空闲时保持或减少至minThreads数量.<br>3.对所有Accepted-socket管理生命周期,这里利用系统的keepalive probes,若想实现业务层"心跳探测"只需将QSS_SIO_KEEPALIVE_VALS_TIMEOUT 改回系统默认的2小时.<br><strong>下面结合源代码,浅析一下IOCP</strong>:<br><strong>socketserver.h<br></strong>#ifndef __Q_SOCKET_SERVER__<br>#define __Q_SOCKET_SERVER__<br>#include &lt;winsock2.h&gt;<br>#include &lt;mstcpip.h&gt;<br>#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60*1000<br>#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5*1000</p>
<p>#define MAX_THREADS 100<br>#define MAX_THREADS_MIN&nbsp; 10<br>#define MIN_WORKER_WAIT_TIMEOUT&nbsp; 20*1000<br>#define MAX_WORKER_WAIT_TIMEOUT&nbsp; 60*MIN_WORKER_WAIT_TIMEOUT</p>
<p>#define MAX_BUF_SIZE 1024<br><br>/*当Accepted socket和socket关闭或发生异常时回调CSocketLifecycleCallback*/<br>typedef void (*CSocketLifecycleCallback)(SOCKET cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose//注意OnClose此时的socket未必可用,可能已经被非正常关闭或其他异常.<br><br>/*协议处理回调*/<br>typedef int (*InternalProtocolHandler)(LPWSAOVERLAPPED overlapped);//return -1:SOCKET_ERROR</p>
<p>typedef struct Q_SOCKET_SERVER SocketServer;<br>DWORD initializeSocketServer(SocketServer ** ssp,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,long workerWaitTimeout);<br>DWORD startSocketServer(SocketServer *ss);<br>DWORD shutdownSocketServer(SocketServer *ss);</p>
<p>#endif<br>&nbsp;<strong>qsocketserver.c&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 简称 qss,相应的OVERLAPPED简称qssOl.<br></strong>#include "socketserver.h"<br>#include "stdio.h"<br>typedef struct {&nbsp; <br>&nbsp; WORD <strong>passive</strong>;//<strong>daemon</strong><br>&nbsp; WORD port;<br>&nbsp; WORD minThreads;<br>&nbsp; WORD maxThreads;<br>&nbsp; volatile long <strong>lifecycleStatus</strong>;//0-created,1-starting, 2-running,3-stopping,4-exitKeyPosted,5-stopped <br>&nbsp; long&nbsp; workerWaitTimeout;//wait timeout&nbsp; <br>&nbsp; CRITICAL_SECTION QSS_LOCK;<br>&nbsp; volatile long <strong>workerCounter</strong>;<br>&nbsp; volatile long <strong>currentBusyWorkers</strong>;<br>&nbsp; volatile long <strong>CSocketsCounter</strong>;//<strong>Accepted-socket引用计数<br></strong>&nbsp; CSocketLifecycleCallback cslifecb;<br>&nbsp; InternalProtocolHandler protoHandler;<br>&nbsp; WORD wsaVersion;//=MAKEWORD(2,0);<br>&nbsp; WSADATA wsData;<br>&nbsp; SOCKET server_s;<br>&nbsp; SOCKADDR_IN serv_addr;<br>&nbsp; HANDLE iocpHandle;<br>}QSocketServer;</p>
<p>typedef struct {<br>&nbsp; WSAOVERLAPPED overlapped;&nbsp; <br>&nbsp; SOCKET client_s;<br>&nbsp; SOCKADDR_IN client_addr;<br>&nbsp; WORD optCode;<br>&nbsp; char buf[MAX_BUF_SIZE];<br>&nbsp; WSABUF wsaBuf;<br>&nbsp; DWORD numberOfBytesTransferred;<br>&nbsp; DWORD flags;<br>}QSSOverlapped;</p>
<p>DWORD&nbsp; <strong>acceptorRoutine</strong>(LPVOID);<br>DWORD&nbsp; <strong>completionWorkerRoutine</strong>(LPVOID);</p>
<p>static void adjustQSSWorkerLimits(QSocketServer *qss){<br>&nbsp;&nbsp;/*adjust size and timeout.*/<br>&nbsp;&nbsp;/*if(qss-&gt;maxThreads &lt;= 0) {<br>&nbsp;&nbsp;&nbsp;qss-&gt;maxThreads = MAX_THREADS;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } else if (qss-&gt;maxThreads &lt; MAX_THREADS_MIN) {&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;qss-&gt;maxThreads = MAX_THREADS_MIN;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qss-&gt;minThreads &gt;&nbsp; qss-&gt;maxThreads) {<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;qss-&gt;minThreads =&nbsp; qss-&gt;maxThreads;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qss-&gt;minThreads &lt;= 0) {<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(1 == qss-&gt;maxThreads) {<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;qss-&gt;minThreads = 1;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } else {<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;qss-&gt;minThreads = qss-&gt;maxThreads/2;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qss-&gt;workerWaitTimeout&lt;MIN_WORKER_WAIT_TIMEOUT) <br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;qss-&gt;workerWaitTimeout=MIN_WORKER_WAIT_TIMEOUT;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(qss-&gt;workerWaitTimeout&gt;MAX_WORKER_WAIT_TIMEOUT)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;qss-&gt;workerWaitTimeout=MAX_WORKER_WAIT_TIMEOUT;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; */<br>}</p>
<p>typedef struct{<br>&nbsp;QSocketServer * qss;<br>&nbsp;HANDLE th;<br>}QSSWORKER_PARAM;</p>
<p>static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){<br>&nbsp;WORD res=0;<br>&nbsp;if(qss-&gt;workerCounter&lt;qss-&gt;minThreads||(qss-&gt;currentBusyWorkers==qss-&gt;workerCounter&amp;&amp;qss-&gt;workerCounter&lt;qss-&gt;maxThreads)){<br>&nbsp;&nbsp;DWORD threadId;<br>&nbsp;&nbsp;QSSWORKER_PARAM * pParam=NULL;<br>&nbsp;&nbsp;int i=0;&nbsp;&nbsp;<br>&nbsp;&nbsp;EnterCriticalSection(&amp;qss-&gt;QSS_LOCK);<br>&nbsp;&nbsp;if(qss-&gt;workerCounter+addCounter&lt;=qss-&gt;maxThreads)<br>&nbsp;&nbsp;&nbsp;for(;i&lt;addCounter;i++)<br>&nbsp;&nbsp;&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;pParam=malloc(sizeof(QSSWORKER_PARAM));<br>&nbsp;&nbsp;&nbsp;&nbsp;if(pParam){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pParam-&gt;th=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)completionWorkerRoutine,pParam,CREATE_SUSPENDED,&amp;threadId);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pParam-&gt;qss=qss;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;ResumeThread(pParam-&gt;th);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;qss-&gt;workerCounter++,res++;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;<br>&nbsp;&nbsp;LeaveCriticalSection(&amp;qss-&gt;QSS_LOCK);<br>&nbsp;}&nbsp;&nbsp;<br>&nbsp;return res;<br>}</p>
<p>static void SOlogger(const char * msg,SOCKET s,int clearup){<br>&nbsp;perror(msg);<br>&nbsp;if(s&gt;0)<br>&nbsp;closesocket(s);<br>&nbsp;if(clearup)<br>&nbsp;WSACleanup();<br>}</p>
<p>static int _InternalEchoProtocolHandler(LPWSAOVERLAPPED overlapped){<br>&nbsp;QSSOverlapped *qssOl=(QSSOverlapped *)overlapped;<br>&nbsp;<br>&nbsp;printf("numOfT:%d,WSARecvd:%s,\n",qssOl-&gt;numberOfBytesTransferred,qssOl-&gt;buf);<br>&nbsp;//Sleep(500);&nbsp;<br>&nbsp;return send(qssOl-&gt;client_s,qssOl-&gt;buf,qssOl-&gt;numberOfBytesTransferred,0);<br>}</p>
<p>DWORD <strong>initializeSocketServer</strong>(SocketServer ** ssp,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,long workerWaitTimeout){<br>&nbsp;QSocketServer * qss=malloc(sizeof(QSocketServer));<br>&nbsp;qss-&gt;passive=passive&gt;0?1:0;<br>&nbsp;qss-&gt;port=port;<br>&nbsp;qss-&gt;minThreads=minThreads;<br>&nbsp;qss-&gt;maxThreads=maxThreads;<br>&nbsp;qss-&gt;workerWaitTimeout=workerWaitTimeout;<br>&nbsp;qss-&gt;wsaVersion=MAKEWORD(2,0);&nbsp;<br>&nbsp;qss-&gt;lifecycleStatus=0;<br>&nbsp;InitializeCriticalSection(&amp;qss-&gt;QSS_LOCK);<br>&nbsp;qss-&gt;workerCounter=0;<br>&nbsp;qss-&gt;currentBusyWorkers=0;<br>&nbsp;qss-&gt;CSocketsCounter=0;<br>&nbsp;qss-&gt;cslifecb=cslifecb,qss-&gt;protoHandler=protoHandler;<br>&nbsp;if(!qss-&gt;protoHandler)<br>&nbsp;&nbsp;qss-&gt;protoHandler=_InternalEchoProtocolHandler;&nbsp;<br>&nbsp;adjustQSSWorkerLimits(qss);<br>&nbsp;*ssp=(SocketServer *)qss;<br>&nbsp;return 1;<br>}</p>
<p>DWORD <strong>startSocketServer</strong>(SocketServer *ss){&nbsp;<br>&nbsp;QSocketServer * qss=(QSocketServer *)ss;<br>&nbsp;if(qss==NULL||InterlockedCompareExchange(&amp;qss-&gt;lifecycleStatus,1,0))<br>&nbsp;&nbsp;return 0;&nbsp;<br>&nbsp;qss-&gt;serv_addr.sin_family=AF_INET;<br>&nbsp;qss-&gt;serv_addr.sin_port=htons(qss-&gt;port);<br>&nbsp;qss-&gt;serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");<br>&nbsp;if(WSAStartup(qss-&gt;wsaVersion,&amp;qss-&gt;wsData)){&nbsp;&nbsp;<br>&nbsp; /*<strong>这里还有个插曲就是这个WSAStartup被调用的时候,它居然会启动一条额外的线程,当然稍后这条线程会自动退出的</strong>.不知<strong>WSAClearup</strong>又会如何?......*/</p>
<p>&nbsp;&nbsp;SOlogger("WSAStartup failed.\n",0,0);<br>&nbsp;&nbsp;return 0;<br>&nbsp;}<br>&nbsp;qss-&gt;server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);<br>&nbsp;if(qss-&gt;server_s==INVALID_SOCKET){&nbsp;&nbsp;<br>&nbsp;&nbsp;SOlogger("socket failed.\n",0,1);<br>&nbsp;&nbsp;return 0;<br>&nbsp;}<br>&nbsp;if(bind(qss-&gt;server_s,(LPSOCKADDR)&amp;qss-&gt;serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR){&nbsp;&nbsp;<br>&nbsp;&nbsp;SOlogger("bind failed.\n",qss-&gt;server_s,1);<br>&nbsp;&nbsp;return 0;<br>&nbsp;}<br>&nbsp;if(<strong>listen</strong>(qss-&gt;server_s,<strong>SOMAXCONN</strong>)==SOCKET_ERROR)/*这里来谈谈<strong>backlog</strong>,很多人不知道设成何值,我见到过1,5,50,100的,有人说设定的越大越耗资源,的确,这里设成SOMAXCONN不代表windows会真的使用SOMAXCONN,而是" If set to SOMAXCONN, the underlying service provider responsible for socket <em>s</em> will set the backlog to a maximum <strong>reasonable</strong> value. "，同时在现实环境中，不同操作系统支持TCP缓冲队列有所不同，所以还不如让操作系统来决定它的值。像Apache这种服务器：<br>#ifndef DEFAULT_LISTENBACKLOG<br>#define DEFAULT_LISTENBACKLOG 511<br>#endif<br>*/<br>&nbsp;&nbsp;&nbsp; {&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <br>&nbsp;&nbsp;SOlogger("listen failed.\n",qss-&gt;server_s,1);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return 0;<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp;qss-&gt;iocpHandle=<strong>CreateIoCompletionPort</strong>(<strong>INVALID_HANDLE_VALUE</strong>,NULL,0,<strong>/*NumberOfConcurrentThreads--&gt;*/qss-&gt;maxThreads</strong>);<br>&nbsp;//initialize worker for completion routine.<br>&nbsp;addQSSWorker(qss,qss-&gt;minThreads);&nbsp;&nbsp;<br>&nbsp;qss-&gt;lifecycleStatus=2;<br>&nbsp;{<br>&nbsp;&nbsp;QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));<br>&nbsp;&nbsp;pParam-&gt;qss=qss;<br>&nbsp;&nbsp;pParam-&gt;th=NULL;<br>&nbsp;&nbsp;if(qss-&gt;<strong>passive</strong>){<br>&nbsp;&nbsp;&nbsp;DWORD threadId;<br>&nbsp;&nbsp;&nbsp;pParam-&gt;th=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)acceptorRoutine,pParam,0,&amp;threadId);&nbsp;<br>&nbsp;&nbsp;}else<br>&nbsp;&nbsp;&nbsp;return acceptorRoutine(pParam);<br>&nbsp;}<br>&nbsp;return 1;<br>}</p>
<p>DWORD <strong>shutdownSocketServer</strong>(SocketServer *ss){<br>&nbsp;QSocketServer * qss=(QSocketServer *)ss;<br>&nbsp;if(qss==NULL||InterlockedCompareExchange(&amp;qss-&gt;lifecycleStatus,3,2)!=2)<br>&nbsp;&nbsp;return 0;&nbsp;<br>&nbsp;closesocket(qss-&gt;server_s/*<strong>listen-socket</strong>*/);//<strong>..other accepted-sockets associated with the listen-socket will not be closed,except WSACleanup is called..</strong>&nbsp;<br>&nbsp;if(qss-&gt;CSocketsCounter==0)<br>&nbsp;&nbsp;qss-&gt;lifecycleStatus=4,PostQueuedCompletionStatus(qss-&gt;iocpHandle,0,-1,NULL);<br>&nbsp;WSACleanup();&nbsp;&nbsp;<br>&nbsp;return 1;<br>}</p>
<p>DWORD&nbsp; <strong>acceptorRoutine</strong>(LPVOID ss){<br>&nbsp;QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)ss;<br>&nbsp;QSocketServer * qss=pParam-&gt;qss;<br>&nbsp;HANDLE curThread=pParam-&gt;th;<br>&nbsp;QSSOverlapped *qssOl=NULL;<br>&nbsp;SOCKADDR_IN client_addr;<br>&nbsp;int client_addr_leng=sizeof(SOCKADDR_IN);<br>&nbsp;SOCKET cs;&nbsp;<br>&nbsp;free(pParam);<br>&nbsp;while(1){&nbsp;&nbsp;<br>&nbsp;&nbsp;printf("accept starting.....\n");<br>&nbsp;&nbsp;<strong>cs/*Accepted-socket*/</strong>=<strong>accept</strong>(qss-&gt;server_s,(LPSOCKADDR)&amp;client_addr,&amp;client_addr_leng);<br>&nbsp;&nbsp;if(cs==INVALID_SOCKET)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {<br>&nbsp;&nbsp;&nbsp;printf("accept failed:%d\n",GetLastError());&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; break;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else{//<strong>SO_KEEPALIVE,SIO_KEEPALIVE_VALS</strong> 这里是利用系统的"<strong>心跳探测</strong>",keepalive probes.linux:setsockopt,SOL_TCP:TCP_KEEPIDLE,TCP_KEEPINTVL,TCP_KEEPCNT<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; struct tcp_keepalive alive,aliveOut;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int so_keepalive_opt=1;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DWORD outDW;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(!setsockopt(cs,SOL_SOCKET,SO_KEEPALIVE,(char *)&amp;so_keepalive_opt,sizeof(so_keepalive_opt))){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; alive.onoff=TRUE;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; alive.keepalivetime=QSS_SIO_KEEPALIVE_VALS_TIMEOUT;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; alive.keepaliveinterval=QSS_SIO_KEEPALIVE_VALS_INTERVAL;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(WSAIoctl(cs,SIO_KEEPALIVE_VALS,&amp;alive,sizeof(alive),&amp;aliveOut,sizeof(aliveOut),&amp;outDW,NULL,NULL)==SOCKET_ERROR){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; printf("WSAIoctl SIO_KEEPALIVE_VALS failed:%d\n",GetLastError());&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; break;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; printf("setsockopt SO_KEEPALIVE failed:%d\n",GetLastError());&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; break;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;<br>&nbsp;&nbsp;}<br>&nbsp;&nbsp;<br>&nbsp;&nbsp;CreateIoCompletionPort((HANDLE)cs,qss-&gt;iocpHandle,cs,0);<br>&nbsp;&nbsp;if(qssOl==NULL){<br>&nbsp;&nbsp;&nbsp;qssOl=malloc(sizeof(QSSOverlapped));&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;}<br>&nbsp;&nbsp;qssOl-&gt;client_s=cs;<br>&nbsp;&nbsp;qssOl-&gt;wsaBuf.len=MAX_BUF_SIZE,qssOl-&gt;wsaBuf.buf=qssOl-&gt;buf,qssOl-&gt;numberOfBytesTransferred=0,qssOl-&gt;flags=0;//initialize WSABuf.<br>&nbsp;&nbsp;memset(&amp;qssOl-&gt;overlapped,0,sizeof(WSAOVERLAPPED));&nbsp;&nbsp;<br>&nbsp;&nbsp;{<br>&nbsp;&nbsp;&nbsp;DWORD lastErr=GetLastError();<br>&nbsp;&nbsp;&nbsp;int ret=0;<br>&nbsp;&nbsp;&nbsp;SetLastError(0);<br>&nbsp;&nbsp;&nbsp;ret=WSARecv(cs,&amp;qssOl-&gt;wsaBuf,1,&amp;qssOl-&gt;numberOfBytesTransferred,&amp;qssOl-&gt;flags,&amp;qssOl-&gt;overlapped,NULL);<br>&nbsp;&nbsp;&nbsp;if(ret==0||(ret==SOCKET_ERROR&amp;&amp;GetLastError()==WSA_IO_PENDING)){<br>&nbsp;&nbsp;&nbsp;&nbsp;InterlockedIncrement(&amp;qss-&gt;<strong>CSocketsCounter</strong>);//<strong>Accepted-socket计数递增.</strong><br>&nbsp;&nbsp;&nbsp;&nbsp;if(qss-&gt;cslifecb)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;qss-&gt;cslifecb(cs,0);<br>&nbsp;&nbsp;&nbsp;&nbsp;qssOl=NULL;<br>&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;if(!GetLastError())<br>&nbsp;&nbsp;&nbsp;&nbsp;SetLastError(lastErr);<br>&nbsp;&nbsp;}<br>&nbsp;&nbsp;<br>&nbsp;&nbsp;printf("accept flags:%d ,cs:%d.\n",GetLastError(),cs);<br>&nbsp;}//end while.</p>
<p>&nbsp;if(qssOl)<br>&nbsp;&nbsp;free(qssOl);<br>&nbsp;if(qss)<br>&nbsp;&nbsp;shutdownSocketServer((SocketServer *)qss);<br>&nbsp;if(curThread)<br>&nbsp;&nbsp;CloseHandle(curThread);</p>
<p>&nbsp;return 1;<br>}</p>
<p>static int postRecvCompletionPacket(QSSOverlapped * qssOl,int SOErrOccurredCode){&nbsp;<br>&nbsp;int SOErrOccurred=0;&nbsp;<br>&nbsp;DWORD lastErr=GetLastError();<br>&nbsp;SetLastError(0);<br>&nbsp;//SOCKET_ERROR:-1,WSA_IO_PENDING:997<br>&nbsp;if(WSARecv(qssOl-&gt;client_s,&amp;qssOl-&gt;wsaBuf,1,&amp;qssOl-&gt;numberOfBytesTransferred,&amp;qssOl-&gt;flags,&amp;qssOl-&gt;overlapped,NULL)==SOCKET_ERROR<br>&nbsp;&nbsp;&amp;&amp;GetLastError()!=WSA_IO_PENDING)//this case lastError maybe 64, 10054&nbsp;<br>&nbsp;{<br>&nbsp;&nbsp;SOErrOccurred=SOErrOccurredCode;&nbsp;&nbsp;<br>&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;if(!GetLastError())<br>&nbsp;&nbsp;SetLastError(lastErr);&nbsp;<br>&nbsp;if(SOErrOccurred)<br>&nbsp;&nbsp;printf("worker[%d] postRecvCompletionPacket SOErrOccurred=%d,preErr:%d,postedErr:%d\n",GetCurrentThreadId(),SOErrOccurred,lastErr,GetLastError());<br>&nbsp;return SOErrOccurred;<br>}</p>
<p>DWORD&nbsp; <strong>completionWorkerRoutine</strong>(LPVOID ss){<br>&nbsp;QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)ss;<br>&nbsp;QSocketServer * qss=pParam-&gt;qss;<br>&nbsp;HANDLE curThread=pParam-&gt;th;<br>&nbsp;QSSOverlapped * qssOl=NULL;<br>&nbsp;DWORD numberOfBytesTransferred=0;<br>&nbsp;ULONG_PTR completionKey=0;<br>&nbsp;int postRes=0,handleCode=0,exitCode=0,SOErrOccurred=0;&nbsp;<br>&nbsp;free(pParam);<br>&nbsp;while(!exitCode){<br>&nbsp;&nbsp;SetLastError(0);<br>&nbsp;&nbsp;if(GetQueuedCompletionStatus(qss-&gt;iocpHandle,&amp;numberOfBytesTransferred,&amp;completionKey,(LPOVERLAPPED *)&amp;qssOl,qss-&gt;workerWaitTimeout)){<br>&nbsp;&nbsp;&nbsp;if(<strong>completionKey==-1</strong>&amp;&amp;qss-&gt;lifecycleStatus&gt;=4)<br>&nbsp;&nbsp;&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;printf("worker[%d] completionKey -1:%d \n",GetCurrentThreadId(),GetLastError());<br>&nbsp;&nbsp;&nbsp;&nbsp;if(qss-&gt;workerCounter&gt;1)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;PostQueuedCompletionStatus(qss-&gt;iocpHandle,0,-1,NULL);<br>&nbsp;&nbsp;&nbsp;&nbsp;exitCode=1;<br>&nbsp;&nbsp;&nbsp;&nbsp;<strong>break;<br></strong>&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;&nbsp;if(numberOfBytesTransferred&gt;0){&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;InterlockedIncrement(&amp;qss-&gt;currentBusyWorkers);<br>&nbsp;&nbsp;&nbsp;&nbsp;addQSSWorker(qss,1);<br>&nbsp;&nbsp;&nbsp;&nbsp;handleCode=qss-&gt;protoHandler((LPWSAOVERLAPPED)qssOl);&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;InterlockedDecrement(&amp;qss-&gt;currentBusyWorkers);&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;if(handleCode&gt;=0){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;SOErrOccurred=postRecvCompletionPacket(qssOl,1);<br>&nbsp;&nbsp;&nbsp;&nbsp;}else<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;SOErrOccurred=2;&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;}else{<br>&nbsp;&nbsp;&nbsp;&nbsp;printf("worker[%d] numberOfBytesTransferred==0 ***** closesocket servS or cs *****,%d,%d ,ol is:%d\n",GetCurrentThreadId(),GetLastError(),completionKey,qssOl==NULL?0:1);<br>&nbsp;&nbsp;&nbsp;&nbsp;SOErrOccurred=3;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;<br>&nbsp;&nbsp;}else{ //GetQueuedCompletionStatus rtn FALSE, lastError 64 ,<strong>995</strong>[<strong>timeout worker thread exit</strong>.] ,WAIT_TIMEOUT:258&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;if(qssOl){<br>&nbsp;&nbsp;&nbsp;&nbsp;SOErrOccurred=postRecvCompletionPacket(qssOl,4);<br>&nbsp;&nbsp;&nbsp;}else {&nbsp;&nbsp;&nbsp;&nbsp;</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;printf("worker[%d] GetQueuedCompletionStatus F:%d \n",GetCurrentThreadId(),GetLastError());<br>&nbsp;&nbsp;&nbsp;&nbsp;if(GetLastError()!=WAIT_TIMEOUT){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;exitCode=2;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;}else{//wait timeout&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if(qss-&gt;lifecycleStatus!=4&amp;&amp;qss-&gt;currentBusyWorkers==0&amp;&amp;qss-&gt;workerCounter&gt;qss-&gt;minThreads){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;EnterCriticalSection(&amp;qss-&gt;QSS_LOCK);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if(qss-&gt;lifecycleStatus!=4&amp;&amp;qss-&gt;currentBusyWorkers==0&amp;&amp;qss-&gt;workerCounter&gt;qss-&gt;minThreads){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;qss-&gt;workerCounter--;//until qss-&gt;workerCounter decrease to qss-&gt;minThreads<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;exitCode=3;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;LeaveCriticalSection(&amp;qss-&gt;QSS_LOCK);<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;}//end GetQueuedCompletionStatus.</p>
<p>&nbsp;&nbsp;if(<strong>SOErrOccurred</strong>){&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;if(qss-&gt;cslifecb)<br>&nbsp;&nbsp;&nbsp;&nbsp;qss-&gt;cslifecb(qssOl-&gt;client_s,-1);<br>&nbsp;&nbsp;&nbsp;/*if(qssOl)*/{<br>&nbsp;&nbsp;&nbsp;&nbsp;closesocket(qssOl-&gt;client_s);<br>&nbsp;&nbsp;&nbsp;&nbsp;free(qssOl);<br>&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;&nbsp;if(InterlockedDecrement(&amp;qss-&gt;<strong>CSocketsCounter</strong>)==0&amp;&amp;qss-&gt;lifecycleStatus&gt;=3){&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;//for qss workerSize,PostQueuedCompletionStatus -1<br>&nbsp;&nbsp;&nbsp;&nbsp;qss-&gt;lifecycleStatus=4,PostQueuedCompletionStatus(qss-&gt;iocpHandle,0,-1,NULL);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<br>&nbsp;&nbsp;&nbsp;&nbsp;exitCode=4;<br>&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;}<br>&nbsp;&nbsp;<strong>qssOl=NULL,numberOfBytesTransferred=0,completionKey=0,SOErrOccurred=0;//for net while.<br></strong>&nbsp;}//end while.</p>
<p>&nbsp;//last to do <br>&nbsp;if(exitCode!=3){&nbsp;<br>&nbsp;&nbsp;int clearup=0;<br>&nbsp;&nbsp;EnterCriticalSection(&amp;qss-&gt;QSS_LOCK);<br>&nbsp;&nbsp;if(!--qss-&gt;workerCounter&amp;&amp;qss-&gt;lifecycleStatus&gt;=4){//clearup QSS<br>&nbsp;&nbsp;&nbsp;&nbsp;clearup=1;<br>&nbsp;&nbsp;}<br>&nbsp;&nbsp;LeaveCriticalSection(&amp;qss-&gt;QSS_LOCK);<br>&nbsp;&nbsp;if(clearup){<br>&nbsp;&nbsp;&nbsp;DeleteCriticalSection(&amp;qss-&gt;QSS_LOCK);<br>&nbsp;&nbsp;&nbsp;CloseHandle(qss-&gt;iocpHandle);<br>&nbsp;&nbsp;&nbsp;free(qss);&nbsp;<br>&nbsp;&nbsp;}<br>&nbsp;}<br>&nbsp;CloseHandle(curThread);<br>&nbsp;return 1;<br>}<br>------------------------------------------------------------------------------------------------------------------------<br>&nbsp; &nbsp; 对于IOCP的LastError的辨别和处理是个难点,所以请注意我的<strong>completionWorkerRoutine的while结构</strong>,<br>结构如下:<br>while(!exitCode){<br>&nbsp;&nbsp;&nbsp; if(<strong>completionKey==-1</strong>){...<strong>break</strong>;}<br>&nbsp;&nbsp;&nbsp; if(<strong>GetQueuedCompletionStatus</strong>){/*在这个if体中只要你投递的OVERLAPPED is not NULL,那么这里你得到的就是<strong>它</strong>.*/<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(numberOfBytesTransferred&gt;0){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; /*在这里handle request,<strong>记得要继续投递你的OVERLAPPED哦!</strong> */<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; /*这里可能客户端或服务端closesocket(the socket),<strong>但是OVERLAPPED is not NULL,只要你投递的不为NULL!</strong>*/<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>&nbsp;&nbsp;&nbsp; }else{/*在这里的if体中,虽然GetQueuedCompletionStatus return <strong>FALSE</strong>,但是不代表OVERLAPPED一定为NULL.<strong>特别是OVERLAPPED is not NULL的情况下,不要以为LastError发生了,就代表当前的socket无用或发生致命的异常,比如发生lastError:995这种情况下此时的socket有可能是一切正常的可用的,你不应该关闭它</strong>.*/<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(OVERLAPPED is not NULL){<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; /*<strong>这种情况下,请不管37,21继续投递吧!在投递后再检测错误</strong>.*/<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }else{&nbsp;<br><br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;&nbsp; }<br>&nbsp; if(<strong>socket error occured</strong>){<br><br>&nbsp; }<br>&nbsp; prepare for next while.<br>}&nbsp;<br></p>
<p><strong>&nbsp;&nbsp;&nbsp; 行文仓促,难免有错误或不足之处,希望大家踊跃指正评论,谢谢!<br></p>
<p><strong>&nbsp;&nbsp;&nbsp; 这个模型在性能上还是有改进的空间哦！<br></strong></strong></p>
<img src ="http://www.cppblog.com/adapterofcoms/aggbug/118781.html" width = "1" height = "1" /><br><br><div align=right><a style="text-decoration:none;" href="http://www.cppblog.com/adapterofcoms/" target="_blank">adapterofcoms</a> 2010-06-26 17:30 <a href="http://www.cppblog.com/adapterofcoms/archive/2010/06/26/118781.html#Feedback" target="_blank" style="text-decoration:none;">发表评论</a></div>]]></description></item></channel></rss>