随笔 - 298  文章 - 377  trackbacks - 0
<2007年6月>
272829303112
3456789
10111213141516
17181920212223
24252627282930
1234567

常用链接

留言簿(34)

随笔分类

随笔档案

文章档案

相册

收藏夹

搜索

  •  

最新评论

阅读排行榜

评论排行榜

以前在书上看过了IOCP,不过一直都没有写过代码。现在写的时候,着时对很多问题摸不着头脑。不过好在CSDN上有许多的对于IOCP问题的讨论帖,让我受益非浅啊,也把心中的一些迷茫解开了,下面给出的是可以运行的IOCP的C/S代码,自已试了在一个机器上开了一百来个客户端,跑起来暂时没出现问题(因为通信内容太简单了^-^)。

IOCP的三个函数:CreateIoCompletionPort、GetQueuedCompletionStatus、PostQueuedCompletionStatus;一个是用来创建想要的IOCP的HANDLE同时也是用来把我们想要的SOCKET绑定到这个HANDLE上,一个是获取IO这个HANDLE上对应的对列的状态,看有没有事件完成,一个是用来通知所有工作线程退出(这个函数我还没用到,关于这个功用是看资料上说的)。

我在写这个代码的时候,最主要的问题就是当通信完成了之后,是怎么样来判断是哪个SOCKET的哪个状态(SEND还是RECV)完成了。《WINDOWS网络编程》这本书里给的代码不是很全的哦,它的配套光盘又没有,不过好在CSDN里CB那块中有个朋友刚好帖出了这一章的代码。通过比较和一夜的思量,算是搞明白啦。主要的就是以下的数据:

1、在第二次CreateIoCompletionPort中,会传进去一个CompletionKey,这个就是要来关联到我们想要的SOCKET上的一些感兴趣的数据内容,当然最好是要一个SOCKET,也可以是其它,看自己程序的需要了。而通过GetQueueCompletionStatus的通过,就可以获得这些数据的地址了。

typedef struct _PER_HANDLE_DATA
{
    SOCKET sock;
}PER_HANDLE_DATA,* LPPER_HANDLE_DATA;

2、第二个主要的数据结构就是这个了,现在真的是佩服当初设计这个结构的人啊(没办法,自己就是没想到这样利用法)。因为在POST操作(SEND或是RECV)是,都要一个OVERLAPPED,所以就把这个OVERLAPPED和要指明这次POST操作类型的代码OperationType(POST_SEND或POST_RECV)以及其它一些数据(比如接发收的缓冲)。这样子,在GetQueueCompletionStatus的时候,通过获取事件,也同时得到了OperationType和缓冲。这样,知道了通信类型,也得到了缓冲数据的缓冲区。这样就可以控制我们的通信了。

这个例子比较简单,没有复杂的数据处理过程(正在设计中,和大家交流交流)。用的是BCB的平台,不过写法上还是和VC里的一模一样的啊。

typedef struct _PER_IO_OPERATION_DATA
{
    OVERLAPPED Overlapped;
    WSABUF DataBuff[1];
    char Buff[24];
    BOOL OperationType;
}PER_IO_OPERATION_DATA,* LPPER_IO_OPERATION_DATA;

简单的客户端:

//---------------------------------------------------------------------------

#pragma hdrstop
#include <winsock2.h>
#include <stdio.h>
#include <iostream>
using namespace std;
//---------------------------------------------------------------------------

#pragma argsused

SOCKET sockClient;
struct sockaddr_in addrServer;
char buf[24];
int n = 0;
int Init();

int main(int argc, char* argv[])
{
    if(Init() != 0)
        goto theend;

    sockClient = socket(AF_INET,SOCK_STREAM,0);
    if(sockClient == INVALID_SOCKET)
    {
        cout<<"socket 失败"<<endl;
        WSACleanup();
        goto theend;
    }
    memset(&addrServer,0,sizeof(sockaddr_in));
    addrServer.sin_family = AF_INET;
    addrServer.sin_addr.s_addr = inet_addr("127.0.0.1");
    addrServer.sin_port = htons(9090);
    cout<<"连接服务器..."<<endl;
    if(connect(sockClient,(const struct sockaddr *)&addrServer,sizeof(sockaddr)) != 0)
    {
        cout<<"connect 失败"<<endl;
        WSACleanup();
        goto theend;
    }
    cout<<"开始发送测试包"<<endl;
    memset(buf,0,24);
    while(true)
    {
        sprintf(buf,"第%d个包", n);
        cout<<"发送:"<<buf<<endl;
        if(send(sockClient,buf,strlen(buf),0) <= 0)
        {
            cout<<"send失败,可能连接断开"<<endl;
            //break;
            goto theend;
        }
        memset(buf,0,24);

        //接收服务端应答
        if(recv(sockClient,buf,24,0) <= 0)
        {
            cout<<"recv失败,可能连接断开"<<endl;
           //break;
           goto theend;
        }
        cout<<"服务器应答:"<<buf<<endl;
        memset(buf,0,24);

        Sleep(200);
        n++;
    }

   
theend:
    WSACleanup();
    getchar();
    return 0;
}
//---------------------------------------------------------------------------
int Init()
{
    WSAData wsaData;
    if(WSAStartup(MAKEWORD(2,2),&wsaData) != 0)
    {
        cout<<"WSAStartup失败"<<endl;
        return -1;
    }

    if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
    {
        cout<<"SOCKET版本不对"<<endl;
        WSACleanup();
        return -1;
    }
    return 0;
}

服务端。

//---------------------------------------------------------------------------

#pragma hdrstop

//---------------------------------------------------------------------------
#pragma argsused
#pragma comment(lib,"ws2_32.lib")
#include <stdio.h>
#include <memory.h>
#include <winsock2.h>
#include <iostream>
using namespace std;

#define RECV_POSTED 1001
#define SEND_POSTED 1002

int Init();

HANDLE hCompletionPort;
typedef struct _PER_HANDLE_DATA
{
    SOCKET sock;
}PER_HANDLE_DATA,* LPPER_HANDLE_DATA;

typedef struct _PER_IO_OPERATION_DATA
{
    OVERLAPPED Overlapped;
    WSABUF DataBuff[1];
    char Buff[24];
    BOOL OperationType;
}PER_IO_OPERATION_DATA,* LPPER_IO_OPERATION_DATA;

DWORD WINAPI ServerWorkerThread(LPVOID CompletionPort);

int main(int argc, char* argv[])
{
    LPPER_HANDLE_DATA perHandleData;
    LPPER_IO_OPERATION_DATA ioperdata;
    SYSTEM_INFO siSys;
    SOCKET sockListen;
    struct sockaddr_in addrLocal;
    char buf[24];
    int nRet = 0;
    DWORD nThreadID;
    SOCKET sockAccept;
    DWORD dwFlags;
    DWORD dwRecvBytes;
    int nReuseAddr = 1;

    cout<<"初始环境..."<<endl;
    if(Init() != 0)
        goto theend;

    //创建一个IO完成端口
    cout<<"创建一个IO完成端口"<<endl;
    hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
    if(hCompletionPort == INVALID_HANDLE_VALUE)
    {
        cout<<"创建IO完成端口失败"<<endl;
        goto theend;
    }
    //获取CPU数目
    GetSystemInfo(&siSys);
    //创建一定数目的工作者线程,本例中以一个处理器一个线程搭配
    for(int i = 0;i<(int)siSys.dwNumberOfProcessors*2;i++)//NumberOfProcessors
    {
        HANDLE hThread;
        hThread = CreateThread(NULL,0,ServerWorkerThread,(LPVOID)hCompletionPort,0,&nThreadID);
        cout<<"创建工作者线程"<<i<<endl;
        CloseHandle(hThread);
    }
    //创建监听SOCKET
    cout<<"创建监听SOCKET"<<endl;
    sockListen = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
    if(sockListen == SOCKET_ERROR)
    {
        cout<<"WSASocket错误"<<endl;
        goto theend;
    }

    if(setsockopt(sockListen,SOL_SOCKET,SO_REUSEADDR,(const char *)&nReuseAddr,sizeof(int)) != 0)
    {
        cout<<"setsockopt错误"<<endl;
        goto theend;
    }
    addrLocal.sin_family = AF_INET;
    addrLocal.sin_addr.s_addr = htonl(INADDR_ANY);
    addrLocal.sin_port = htons(9090);
    if(bind(sockListen,(struct sockaddr *)&addrLocal,sizeof(sockaddr_in)) != 0)
    {
        cout<<"bind错误"<<endl;
        int n = WSAGetLastError();
        goto theend;
    }
    //准备监听
    cout<<"准备监听"<<endl;
    if(listen(sockListen,5)!=0)
    {
        cout<<"listen错误"<<endl;
        goto theend;
    }
    while(true)
    {
        //接收用户连接,被和完成端口关联
        sockAccept = WSAAccept(sockListen,NULL,NULL,NULL,0);
        perHandleData = (LPPER_HANDLE_DATA)malloc(sizeof(PER_HANDLE_DATA));
        if(perHandleData == NULL)
            continue;
        cout<<"socket number "<<sockAccept<<"接入"<<endl;
        perHandleData->sock = sockAccept;

        ioperdata = (LPPER_IO_OPERATION_DATA)malloc(sizeof(PER_IO_OPERATION_DATA));
        memset(&(ioperdata->Overlapped),0,sizeof(OVERLAPPED));
        (ioperdata->DataBuff[0]).len = 24;
        (ioperdata->DataBuff[0]).buf = ioperdata->Buff;
        ioperdata->OperationType = RECV_POSTED;
        if( ioperdata == NULL)
        {
            free(perHandleData);
            continue;
        }
        //关联
        cout<<"关联SOCKET和完成端口"<<endl;
        if(CreateIoCompletionPort((HANDLE)sockAccept,hCompletionPort,(DWORD)perHandleData,1) == NULL)
        {
            cout<<sockAccept<<"createiocompletionport错误"<<endl;
            free(perHandleData);
            free(ioperdata);
            continue;
        }
        //投递接收操作
        cout<<"投递接收操作"<<endl;
        WSARecv(perHandleData->sock,ioperdata->DataBuff,1,&dwRecvBytes,&dwFlags,&(ioperdata->Overlapped),NULL);
    }
theend:
    getchar();
    return 0;
}
//---------------------------------------------------------------------------
int Init()
{
    WSAData wsaData;
    if(WSAStartup(MAKEWORD(2,2),&wsaData) != 0)
    {
        cout<<"WSAStartup失败"<<endl;
        return -1;
    }

    if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
    {
        cout<<"SOCKET版本不对"<<endl;
        WSACleanup();
        return -1;
    }
    return 0;
}

DWORD WINAPI ServerWorkerThread(LPVOID CompletionPort)
{
    HANDLE ComPort = (HANDLE)CompletionPort;
    DWORD BytesTransferred;
    LPOVERLAPPED Overlapped;
    LPPER_HANDLE_DATA PerHandleData;
    LPPER_IO_OPERATION_DATA PerIoData;
    DWORD SendBytes,RecvBytes;
    DWORD Flags;
    BOOL bT;

    while(TRUE)
    {
        //等待完成端口上SOCKET的完成
        cout<<"等待完成端口上SOCKET的完成"<<endl;
        bT = GetQueuedCompletionStatus(ComPort,
            &BytesTransferred,(LPDWORD)&PerHandleData,
            (LPOVERLAPPED *)&PerIoData,INFINITE);

        //检查是否有错误产生
        if(BytesTransferred == 0 &&
            (PerIoData->OperationType == RECV_POSTED ||
            PerIoData->OperationType == SEND_POSTED))
        {
            //关闭SOCKET
            cout<<PerHandleData->sock<<"SOCKET关闭"<<endl;
            closesocket(PerHandleData->sock);
            free(PerHandleData);
            free(PerIoData);
            continue;
        }

        //为请求服务
       
        if(PerIoData->OperationType == RECV_POSTED)
        {
            //处理
            cout<<"接收处理"<<endl;
            cout<<PerHandleData->sock<<"SOCKET :"<<PerIoData->Buff<<endl;
            //回应客户端
            ZeroMemory(PerIoData->Buff,24);
            strcpy(PerIoData->Buff,"OK");
            Flags = 0;
            ZeroMemory((LPVOID)&(PerIoData->Overlapped),sizeof(OVERLAPPED));
            PerIoData->DataBuff[0].len = 2;
            PerIoData->DataBuff[0].buf = PerIoData->Buff;
            PerIoData->OperationType = SEND_POSTED;
            WSASend(PerHandleData->sock,PerIoData->DataBuff,
                1,&SendBytes,0,&(PerIoData->Overlapped),NULL);
        }
        else //if(PerIoData->OperationType == SEND_POSTED)
        {
            //发送时的处理
            cout<<"发送处理"<<endl;
            Flags = 0;
            ZeroMemory((LPVOID)&(PerIoData->Overlapped),sizeof(OVERLAPPED));
            ZeroMemory(PerIoData->Buff,24);
            PerIoData->DataBuff[0].len = 24;
            PerIoData->DataBuff[0].buf = PerIoData->Buff;
            PerIoData->OperationType = RECV_POSTED;
            WSARecv(PerHandleData->sock,PerIoData->DataBuff,
                1,&RecvBytes,&Flags,&(PerIoData->Overlapped),NULL);
        }
    }
}


posted on 2007-08-17 11:55 聂文龙 阅读(14479) 评论(7)  编辑 收藏 引用 所属分类: net work

FeedBack:
# re: IOCP的例子  2010-07-04 16:03 adsads
请教下:
WSARecv(perHandleData->sock,ioperdata->DataBuff,1,&dwRecvBytes,&dwFlags,&(ioperdata->Overlapped),NULL);

我碰到几个问题:

1、IO投递,WSARecv,是异步IO,还是同步 我就是想知道你是怎么投递的

是不是一调用中广核WSARecv就马上能够返回的吗?

2、关联数据CreateIoCompletionPort()为什么 我在XP上写的代码 这个函数 在 在关联数据的时候,总是失败呢?错误是87 参数不正确,但我按照其他人写的代码,结果全是一样


如果你真的在实战中使用过,我向你请教,希望你不吝赐教



  回复  更多评论
  
# re: IOCP的例子  2010-07-04 16:05 adsads
还是管IO投递

如果你调用WSARecv函数投递,如果这个函数一直堵塞在这里,那谈何效率


如果你真的是做到异步IO的话,希望你能帮我下


因为这个问题,困扰我很久了

就是说:

如何在IOCP中,使用异步IO


  回复  更多评论
  
# re: IOCP的例子 [未登录] 2013-02-28 19:41 Fairy
在我的机子上 怎么不行 wsarecv总是返回10045   回复  更多评论
  
# re: IOCP的例子  2013-06-04 14:54 tankin
@Fairy
Flag 要修改成 0
因为是一个[in][out] 值  回复  更多评论
  
# re: IOCP的例子  2013-11-11 17:12 good90
WSARecv中dwFlags lz都不初始。。  回复  更多评论
  
# re: IOCP的例子  2013-11-22 12:38 路过的
这代码 简直是误导人 在WSAAccept 这里就开始一直在阻塞主线程了 你还怎么异步  回复  更多评论
  
# re: IOCP的例子  2014-11-30 16:16 apc
@路过的
主线程主要处理连接,没有连接时线程挂起,收、发在工作线程,这怎么不是异步  回复  更多评论
  

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理