小明思考

高性能服务器端计算
posts - 70, comments - 428, trackbacks - 0, articles - 0
  C++博客 :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

用完成端口写的echo server

Posted on 2007-06-06 17:44 小明 阅读(9506) 评论(13)  编辑 收藏 引用 所属分类: Win32Network/ACE
完成端口网上的例子很多,但觉得都挺复杂的

写了一个简化版的,方便学习,也加了注释。

有任何问题,欢迎跟我讨论。

========代码来了=========

#include <winsock2.h>
#include 
<mswsock.h>
#include 
<windows.h>

#include 
<iostream>
using namespace std;

int g_ThreadCount;
HANDLE g_hIOCP 
= INVALID_HANDLE_VALUE;
SOCKET g_ServerSocket 
= INVALID_SOCKET;

// Maximum Buffer Size
#define MAX_BUFF_SIZE                8192

enum IO_OPERATION{IO_READ,IO_WRITE};

struct IO_DATA{
    WSAOVERLAPPED               Overlapped;
    
char                        Buffer[MAX_BUFF_SIZE];
    WSABUF                      wsabuf;
    
int                         nTotalBytes;
    
int                         nSentBytes;
    IO_OPERATION                opCode;
    SOCKET                      activeSocket;
};

DWORD WINAPI WorkerThread (LPVOID WorkThreadContext) {
    LPWSAOVERLAPPED lpOverlapped 
= NULL;
    IO_DATA 
*lpIOContext = NULL; 
    WSABUF buffSend;
    DWORD dwRecvNumBytes 
= 0;
    DWORD dwSendNumBytes 
= 0;
    DWORD dwFlags 
= 0;
    DWORD dwIoSize 
= 0;
       BOOL bSuccess 
= FALSE;
    
int nRet = 0;

       
while1 ) {
         
void * lpCompletionKey = NULL;
         bSuccess 
= GetQueuedCompletionStatus(g_hIOCP, &dwIoSize,
                                             (LPDWORD)
&lpCompletionKey,
                                             (LPOVERLAPPED 
*)&lpOverlapped, 
                                             INFINITE);
         
if!bSuccess )
         {
            cout 
<< "GetQueuedCompletionStatus() failed:"<<GetLastError()<<endl;
            
break;
         }

         lpIOContext 
= (IO_DATA *)lpOverlapped;

         
if(dwIoSize == 0//socket closed?
         {
             cout 
<< "Client disconnect" << endl;
             closesocket(lpIOContext
->activeSocket);
             delete lpIOContext;
             
continue;
         }

         
if(lpIOContext->opCode == IO_READ) // a read operation complete
         {
                lpIOContext
->nTotalBytes  = lpIOContext->wsabuf.len;
                lpIOContext
->nSentBytes   = 0;
                lpIOContext
->opCode = IO_WRITE;
                dwFlags 
= 0;
                nRet 
= WSASend(
                              lpIOContext
->activeSocket,
                              
&lpIOContext->wsabuf, 1&dwSendNumBytes,
                              dwFlags,
                              
&(lpIOContext->Overlapped), NULL);
                
if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {
                        cout 
<< "WASSend Failed::Reason Code::"<< WSAGetLastError() << endl;
                        closesocket(lpIOContext
->activeSocket);
                        delete lpIOContext;
                        
continue;
                }
         }
         
else if(lpIOContext->opCode == IO_WRITE) //a write operation complete
         {
                lpIOContext
->nSentBytes  += dwIoSize;
                dwFlags 
= 0;
                
if( lpIOContext->nSentBytes < lpIOContext->nTotalBytes ) {
                    lpIOContext
->opCode = IO_WRITE;
                    
// A Write operation has not completed yet, so post another
                    
// Write operation to post remaining data.
                    buffSend.buf = lpIOContext->Buffer + lpIOContext->nSentBytes;
                    buffSend.len 
= lpIOContext->nTotalBytes - lpIOContext->nSentBytes;
                    nRet 
= WSASend (
                                   lpIOContext
->activeSocket,
                                   
&buffSend, 1&dwSendNumBytes,
                                   dwFlags,
                                   
&(lpIOContext->Overlapped), NULL);

                    
if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {
                        cout 
<< "WASSend Failed::Reason Code::"<< WSAGetLastError() << endl;
                        closesocket(lpIOContext
->activeSocket);
                        delete lpIOContext;
                        
continue;
                    }
                } 
else {
                    
// Write operation completed, so post Read operation.
                    lpIOContext->opCode = IO_READ; 
                    dwRecvNumBytes 
= 0;
                    dwFlags 
= 0;
                    lpIOContext
->wsabuf.buf = lpIOContext->Buffer,
                    ZeroMemory(lpIOContext
->wsabuf.buf,MAX_BUFF_SIZE);
                    lpIOContext
->Overlapped.Internal = 0;
                    lpIOContext
->Overlapped.InternalHigh = 0;
                    lpIOContext
->Overlapped.Offset = 0;
                    lpIOContext
->Overlapped.OffsetHigh = 0;
                    lpIOContext
->Overlapped.hEvent = NULL;
                    lpIOContext
->wsabuf.len = MAX_BUFF_SIZE;
                    nRet 
= WSARecv(
                                  lpIOContext
->activeSocket,
                                  
&lpIOContext->wsabuf, 1&dwRecvNumBytes,
                                  
&dwFlags,
                                  
&lpIOContext->Overlapped, NULL);
                    
if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {
                        cout 
<< "WASRecv Failed::Reason Code::"<< WSAGetLastError() << endl;
                        closesocket(lpIOContext
->activeSocket);
                        delete lpIOContext;
                        
continue;
                    } 
                }
         }
    }
    
return 0;
}

void main (int argc, char * argv[])
{
    { 
// Init winsock2
        WSADATA wsaData;
        ZeroMemory(
&wsaData,sizeof(WSADATA));
        
int retVal = -1;
        
if( (retVal = WSAStartup(MAKEWORD(2,2), &wsaData)) != 0 ) {
            cout 
<< "WSAStartup Failed::Reason Code::"<< retVal << endl;
            
return;
        }
    }

    {  
//Create socket
        g_ServerSocket = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED);

        
if( g_ServerSocket == INVALID_SOCKET ) {
            cout 
<< "Server Socket Creation Failed::Reason Code::" << WSAGetLastError() << endl;
            
return;
        }
    }

    {   
//bind
        sockaddr_in service;
        service.sin_family 
= AF_INET;
        service.sin_addr.s_addr 
= htonl(INADDR_ANY);
        service.sin_port 
= htons(5000);
        
int retVal = bind(g_ServerSocket,(SOCKADDR *)&service,sizeof(service));
        
if( retVal == SOCKET_ERROR ) {
            cout 
<< "Server Soket Bind Failed::Reason Code::"<< WSAGetLastError() << endl;
            
return;
        }
    }

    {   
//listen
        int retVal = listen(g_ServerSocket, 8);
        
if( retVal == SOCKET_ERROR ) {
            cout 
<< "Server Socket Listen Failed::Reason Code::"<< WSAGetLastError() << endl;
            
return;
        }
    }

    {   
// Create IOCP
        SYSTEM_INFO sysInfo;
        ZeroMemory(
&sysInfo,sizeof(SYSTEM_INFO));
        GetSystemInfo(
&sysInfo);
        g_ThreadCount 
= sysInfo.dwNumberOfProcessors * 1;
        g_hIOCP 
= CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,g_ThreadCount);
        
if (g_hIOCP == NULL) {
            cout 
<< "CreateIoCompletionPort() Failed::Reason::"<< GetLastError() << endl;
            
return;            
        }

           
if (CreateIoCompletionPort((HANDLE)g_ServerSocket,g_hIOCP,0,0== NULL){
            cout 
<< "Binding Server Socket to IO Completion Port Failed::Reason Code::"<< GetLastError() << endl;
            
return ;    
        }
    }

    {  
//Create worker threads
        for( DWORD dwThread=0; dwThread < g_ThreadCount; dwThread++ )
        {
            HANDLE  hThread;
            DWORD   dwThreadId;

            hThread 
= CreateThread(NULL, 0, WorkerThread, 00&dwThreadId);
            CloseHandle(hThread);
        }
    }

    { 
//accept new connection
        while(1)
        {
            SOCKET ls 
= accept( g_ServerSocket, NULL, NULL );
            
if(ls == SOCKET_ERROR)  break;
            cout 
<< "Client connected." << endl;

            { 
//diable buffer to improve performance
                int nZero = 0;
                setsockopt(ls, SOL_SOCKET, SO_SNDBUF, (
char *)&nZero, sizeof(nZero));
            }

            
if (CreateIoCompletionPort((HANDLE)ls,g_hIOCP,0,0== NULL){
                cout 
<< "Binding Client Socket to IO Completion Port Failed::Reason Code::"<< GetLastError() << endl;
                closesocket(ls);
            }
            
else { //post a recv request
                IO_DATA * data = new IO_DATA;
                ZeroMemory(
&data->Overlapped,sizeof(data->Overlapped));
                ZeroMemory(data
->Buffer,sizeof(data->Buffer));
                data
->opCode = IO_READ;
                data
->nTotalBytes = 0;
                data
->nSentBytes  = 0;
                data
->wsabuf.buf  = data->Buffer;
                data
->wsabuf.len  = sizeof(data->Buffer);
                data
->activeSocket = ls;
                DWORD dwRecvNumBytes
=0,dwFlags=0;
                
int nRet = WSARecv(ls,&data->wsabuf, 1&dwRecvNumBytes,
                                  
&dwFlags,
                                  
&data->Overlapped, NULL);
                
if(nRet == SOCKET_ERROR  && (ERROR_IO_PENDING != WSAGetLastError())){
                    cout 
<< "WASRecv Failed::Reason Code::"<< WSAGetLastError() << endl;
                    closesocket(ls);
                    delete data;
                }
            }
        }
    }

    closesocket(g_ServerSocket);
    WSACleanup();
}

Feedback

# re: 用完成端口写的echo server  回复  更多评论   

2007-06-07 17:08 by 天津大学计算机学院 常兴龙
不错,写得很好

# re: 用完成端口写的echo server  回复  更多评论   

2007-08-18 16:21 by aeeida
&buffSend 里面是 要发送的内容?
&data->wsabuf 里面是 接收到的内容吗?

# re: 用完成端口写的echo server  回复  更多评论   

2007-08-31 11:14 by cocobear
WINDOWS编程
怎么连功能说明也没有啊

# re: 用完成端口写的echo server  回复  更多评论   

2007-12-07 14:52 by ACE user
用ACE_Proactor更方便

# re: 用完成端口写的echo server  回复  更多评论   

2008-07-30 21:00 by ....
这种才是最好的资料,简短的例子.

# re: 用完成端口写的echo server  回复  更多评论   

2008-08-02 23:58 by grwrg
seg

# re: 用完成端口写的echo server  回复  更多评论   

2008-08-02 23:59 by grwrg
srt

# re: 用完成端口写的echo server  回复  更多评论   

2008-11-02 22:48 by Jerson Ju
不错,是我在互联网上看到的最好的一个例子。谢谢。

# re: 用完成端口写的echo server  回复  更多评论   

2008-11-05 15:29 by Guoguo
if( !bSuccess )
{
cout << "GetQueuedCompletionStatus()
failed:"<<GetLastError()<<endl;
break;
}
把break改成Continue出错不会退出.

# re: 用完成端口写的echo server[未登录]  回复  更多评论   

2008-12-26 18:03 by Kevin
else if(lpIOContext->opCode == IO_WRITE)
{
lpIOContext->nSentBytes += dwIoSize;
dwFlags = 0;
if( lpIOContext->nSentBytes < lpIOContext->nTotalBytes )
{
lpIOContext->opCode = IO_WRITE;

这一步,怎么还是投递Write事件呢?这会不会造成Write事件不停的被触发呢?

# re: 用完成端口写的echo server  回复  更多评论   

2009-02-11 22:51 by ddk
既然叫完成端口,GetQueuedCompletionStatus返回后,传输已经completed了,即便lpIOContext->nSentBytes < lpIOContext->nTotalBytes 成立也不应该再发,此时应是有错误产生,closesocket应更合适。

# re: 用完成端口写的echo server  回复  更多评论   

2009-04-02 05:33 by xxq57@163.com
这个程序,要改几个地方,笔者肯定没有去调试。
1.要注释掉下列代码
// if (CreateIoCompletionPort((HANDLE)g_ServerSocket,g_hIOCP,0,0) == NULL)//
// {
// //cout << "Binding Server Socket to IO Completion Port Failed::Reason Code::"<< GetLastError() << endl;
// return ;
// }
原因:这里加g_ServerSocket服务句柄有意义吗?完成端口是管理客户端来的SOCKET句柄的。
2.ACCEPT后,完成端口再绑定客户端来的句柄时,要把客户端的来的句柄用KEY参数传入,以便在GetQueuedCompletionStatus()的KEY参数中能拿到客户端来的SOCKET,用于线程内部调用。

意见:共享资源是好的,但是没有调试就不负责任了。

# re: 用完成端口写的echo server  回复  更多评论   

2009-04-02 05:51 by xxq57@163.com
除了,我说的几个问题,还是有其他很多问题,建议初学者看:
http://blog.csdn.net/ventry/archive/2008/10/22/3126202.aspx
这个可以调试通过

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理