D3D

不知何年何月得常所愿, 得,得,得,得常所愿

  C++博客 :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理 ::
  52 随笔 :: 0 文章 :: 208 评论 :: 0 Trackbacks

GetQueuedCompletionStatus函数有个OVERLAPPED结构,很多资料上都采用不同的结构体来扩展该结构,比如有的资料定义:
typedef struct _OVERLAPPEDPLUS
{
OVERLAPPED ol;
SOCKET s, sclient;
int OpCode;
WSABUF wbuf;
DWORD dwBytes, dwFlags;
}OVERLAPPEDPLUS;

然后,当GetQueuedCompletionStatus(hIocp, &dwBytesXfered,(PULONG_PTR)&PerHandleKey, &Overlap, INFINITE);函数返回时候,人们常用OverlapPlus = CONTAINING_RECORD(Overlap, OVERLAPPEDPLUS, ol)得到一些信息。比如此时端口上完成的是什么操作,数据是什么等,还有,系统如何做到自动填充上述的结构的,也就是说,系统怎么知道在Overlap->OpCode存放的应该是操作类型,如读,写操作,而在Overlap->wbuf存放的应该是读写数据。


Overlap->OpCode,操作类型是在投递WSASend,WSARecv的时候,由你自己指定填充这个字段。

因为是非堵塞的,等于投递到与套接字相关联的完成端口上,系统会把把WSASend对应的缓冲区提交到底层缓冲,也可以把WSARecv投递的缓冲区,用接收到的数据填充,每一个WSASend,WSARecv,都应有新申请一个overlaspped plus结构提交,以存放本次投递的IO操作的相关数据,——单IO操作数据所以工作器线程中,从完成端口队列中get得到一个完成包的时候,可以根据单句柄数据知道在这个完成端口上是哪一个套接字投递的IO操作完成了,从get到的overlapped中得到相关的已经完成IO数据和信息,并作相应的处理。比如投递了1M,完成包却告知只完成512K,那么你就知道要把余下的512K继续投递WSASend,当然上一个WSASend的Overlapped这个时候可以重用到下一个WSASend中,这个是允许的,可以用一个字段存放全部1M,把余下未Send成功512k放到wbuf中,继续投递或者投递WSARecv1M数据,却收到一个512K的完成通知,那么你要继续投递WSARecv,当然前一个WSARecv的overlapped也可以重用,不过需要一些处理,把已经接收到的512K保存到某个字段中,再投递一个512K的请求去接收完成端口内部,对投递的Overlapped的填充,好像只有WSARecv的时候填充WSABUF,其他都是投递IO前,代码中显式填充,并投递的。至于完成了多少个字节,是在lpNumberOfBytes中得到。

对GetQueuedCompletionStatus函数解释:
实现从指定的IOCP获取CP。当CP队列为空时,对此函数的调用将被阻塞,而不是一直等待I/O的完成。当CP队列不为空时,被阻塞的线程将以后进先出(LIFO)顺序被释放。对于IOCP机制,它允许多线程并发调用GetQueuedCompletionStatus函数,最大并发数是在调用CreateIoCompletionPort函数时指定的,超出最大并发数的调用线程,将被阻塞。函数解释如下:  
  声明:  
  BOOL   GetQueuedCompletionStatus(  
          HANDLE   CompletionPort,    
          LPDWORD   lpNumberOfBytes,    
          PULONG_PTR   lpCompletionKey,    
          LPOVERLAPPED   *lpOverlapped,    
          DWORD   dwMilliseconds);  
  调用参数:  
  CompletionPort:指定的IOCP,该值由CreateIoCompletionPort函数创建。  
  lpnumberofbytes:一次完成后的I/O操作所传送数据的字节数。  
  lpcompletionkey:当文件I/O操作完成后,用于存放与之关联的CK。  
  lpoverlapped:为调用IOCP机制所引用的OVERLAPPED结构。  
  dwmilliseconds:用于指定调用者等待CP的时间。  
  返回值:  
  调用成功,则返回非零数值,相关数据存于lpNumberOfBytes、lpCompletionKey、lpCompletionKey变量中。失败则返回零值。

// 111.cpp : 定义控制台应用程序的入口点。
//
//Windows系统里,使用完成端口是高性能的方法之一,比如把完成端口使用到线程池和网络服务器里。现在就通过线程池的方
//法来介绍怎么样使用完成端口,高性能的服务器以后再仔细地介绍怎么样构造它。其实完成端口是一个队列,所有的线程都在等
//消息出现,如果队列里有消息,就每个线程去获取一个消息执行它。先用函数CreateIoCompletionPort来创建一个消息队列,然
//后使用GetQueuedCompletionStatus函数来从队列获取消息,使用函数PostQueuedCompletionStatus来向队列里发送消息。通过这
//三个函数就实现完成端口的消息循环处理。

#include 
"stdafx.h"
#include 
<winsock2.h>

#define PORT 3000
#define DATA_BUFSIZE 8192

typedef 
struct
{
    OVERLAPPED Overlapped;
    WSABUF DataBuf;
    CHAR Buffer[DATA_BUFSIZE];
    DWORD BytesSEND;
    DWORD BytesRECV;

}PER_IO_OPERATION_DATA, 
*LPPER_IO_OPERATION_DATA;


typedef 
struct
{
    SOCKET Socket;

}PER_HANDLE_DATA,
*LPPER_HANDLE_DATA;


DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID) ;

int main(int argc, char* argv[])
{

    SOCKADDR_IN InternetAddr;
    SOCKET Listen,Accept;
    HANDLE CompetionPort;
    SYSTEM_INFO SystenInfo;
    LPPER_HANDLE_DATA PerHandleData;
    LPPER_IO_OPERATION_DATA PerIOData;
    
int i;
    DWORD RecvBytes;
    DWORD Flags;
    DWORD ThreadID;
    WSADATA wsadata;
    DWORD Ret;


    
if (Ret = WSAStartup(0x2020,&wsadata) != 0)
    {
        printf(
"WSAStartup failed with error %d\n",Ret);
        
return 0;
    }

    
//打开一个空的完成端口
    if ((CompetionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0)) == NULL)
    {
        printf(
"CreateIoCompletionPort failed with error %d\n",GetLastError());
        
return 0;
    }

    GetSystemInfo(
&SystenInfo);

    
// 开启cpu个数的2倍+2个的线程
    for (i=0; i < SystenInfo.dwNumberOfProcessors*2  + 2; i++)
    {

        HANDLE ThreadHandle;

        
//创建服务器工作线程,并且向线程传送完成端口
        if ((ThreadHandle = CreateThread(NULL,0,ServerWorkerThread,CompetionPort,0,&ThreadID)) == NULL)
        {
            printf(
"CreateThread failed with error %d\n" ,GetLastError());
            
return 0;
        }
        CloseHandle(ThreadHandle);

    }

    
/*
    第一
    socket()和WSASocket()函数都能返回一个SOCKET套接字;
    socket()主要实现同步传输,Socket库中例程在应用于阻塞套接口时会阻塞。
    WSASocket()用于异步传输,WSASocket()的发送操作和接收操作都可以被重叠使用。

    WSASocket()的接收函数可以被多次调用,发出接收缓冲区,准备接收到来的数据。
    发送函数也可以被多次调用,组成一个发送缓冲区队列。
    socket()却只能发过之后等待回消息才可做下一步操作!
    其次    
    WSASocket()所支持的版本如下:   
    Version:   Requires   Windows   Sockets2.0.   
    Header:   Declared   in   Winsock2.h.   
    socket()所支持老一些的版本,如Windows   Sockets1.0和        
    Version:   Requires Windows   Sockets1.1
    
*/

    
//打开一个服务器socket
    if ((Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)
    {
        printf(
"WSASocket() failed with error %d\n", WSAGetLastError());
        
return 0;
    } 

    
//服务器地址信息 
    InternetAddr.sin_family = AF_INET;
    InternetAddr.sin_addr.S_un.S_addr 
= htonl(INADDR_ANY);//0 接收所有数据包
    InternetAddr.sin_port = htons(PORT);//监听 端口

    
//绑定服务和端口
    if (bind(Listen,(LPSOCKADDR)&InternetAddr,sizeof(InternetAddr)) == SOCKET_ERROR)
    {
        printf(
"bind failed with error %d\n",WSAGetLastError());
        
return 0;

    }

    
/*
    int listen(SOCKET s, int users); 
    服务程序可以调用listen函数使其流套接字s处于监听状态。处于监听状态的流套接字s将维
    护一个客户连接请求队列,该队列最多容纳users个客户连接请求。假如该函数执行成功,
    则返回0;如果执行失败,则返回SOCKET_ERROR。
    
*/
    
if (listen(Listen,10== SOCKET_ERROR)
    {
        printf(
"listen failed with error %d\n",WSAGetLastError());
        
return 0;
    }


    
//接收连接并且分发给完成端口
    while (TRUE)
    {
        
/*
        服务程序调用WSAAccept函数从处于监听状态的流套接字Listen的客户连接请求队列中取出排在最前的一个客户请求,
        并且创建一个新的套接字来与客户套接字创建连接通道,如果连接成功,就返回新创建的套接字的描述符
        
*/
        
if ((Accept = WSAAccept(Listen,NULL,NULL,NULL,0)) == SOCKET_ERROR)
        {

            printf(
"WSAAccept failed with error %d\n",WSAGetLastError());
            
return 0;

        }

        
//创建与套接字相关的套接字信息结构(声请内存)
        if ((PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR,sizeof(PER_HANDLE_DATA))) == NULL)
        {
            printf(
"GlobalAlloc failed with error %d\n",GetLastError());
            
return 0;

        }

        printf(
"Socket number %d connected\n",Accept);

        PerHandleData
->Socket = Accept;//结构中存入接收的套接字

        
//将自WSAAccept返回的新套接字句柄同完成端口关联到一起
        if ((CreateIoCompletionPort((HANDLE)Accept,CompetionPort,(DWORD)PerHandleData,0)) == NULL)
        {
            printf(
"CreateIoCompletionPort failed with error%d\n",GetLastError());
            
return 0;

        } 

        
//动态数据交换(Dynamic Data Exchange,DDE)也是一种进程间通信形式
        
//GlobalAlloc在Win16中就已经有了,这个函数返回一个句柄,通过这个句柄,两个进程可以共享一块内存,DDE和剪贴板
        
//就是通过这个函数交换数据的;malloc是一个库函数,这个函数的功能,是通过内部调用了VirtualAlloc完成的,并
        
//且分配的内存不能共享

        
// 创建同下面的WSARecv调用相关的IO套接字信息结构体
        if ((PerIOData = (LPPER_IO_OPERATION_DATA) GlobalAlloc(GPTR,sizeof(PER_IO_OPERATION_DATA))) == NULL) 
        { 
            printf(
"GlobalAlloc() failed with error %d\n", GetLastError()); 
            
return 0
        } 



        ZeroMemory(
&(PerIOData->Overlapped),sizeof(OVERLAPPED));
        PerIOData
->BytesRECV = 0;
        PerIOData
->BytesSEND = 0;
        PerIOData
->DataBuf.len = DATA_BUFSIZE;
        PerIOData
->DataBuf.buf = PerIOData->Buffer;
        Flags 
= 0;

        
//从一个套接口接收数据
        if (WSARecv(Accept,&(PerIOData->DataBuf),1,&RecvBytes,&Flags,&(PerIOData->Overlapped),NULL) == SOCKET_ERROR)
        {
            
if (WSAGetLastError() != ERROR_IO_PENDING)
            {
                printf(
"WSARecv() failed with error %d\n",WSAGetLastError());
                
return 0;
            }
        }

    }

    
return 0;

}


DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID) 

    HANDLE CompletionPort 
= (HANDLE) CompletionPortID; 
    DWORD BytesTransferred; 
    LPOVERLAPPED Overlapped; 
    LPPER_HANDLE_DATA PerHandleData; 
    LPPER_IO_OPERATION_DATA PerIoData; 
    DWORD SendBytes, RecvBytes; 
    DWORD Flags; 

    
while(TRUE) 
    { 
        
/*
        对GetQueuedCompletionStatus函数解释:
        实现从指定的IOCP获取CP。当CP队列为空时,对此函数的调用将被阻塞,而不是一直等待I/O的完成。当CP队列不为空时,被
        阻塞的线程将以后进先出(LIFO)顺序被释放。对于IOCP机制,它允许多线程并发调用GetQueuedCompletionStatus函数,
        最大并发数是在调用CreateIoCompletionPort函数时指定的,超出最大并发数的调用线程,将被阻塞。函数解释如下: 
        声明:   
        BOOL   GetQueuedCompletionStatus(   
        HANDLE   CompletionPort,     
        LPDWORD   lpNumberOfBytes,     
        PULONG_PTR   lpCompletionKey,     
        LPOVERLAPPED   *lpOverlapped,     
        DWORD   dwMilliseconds);   
        调用参数:   
        CompletionPort:指定的IOCP,该值由CreateIoCompletionPort函数创建。   
        lpnumberofbytes:一次完成后的I/O操作所传送数据的字节数。   
        lpcompletionkey:当文件I/O操作完成后,用于存放与之关联的CK。   
        lpoverlapped:为调用IOCP机制所引用的OVERLAPPED结构。   
        dwmilliseconds:用于指定调用者等待CP的时间。   
        返回值:   
        调用成功,则返回非零数值,相关数据存于lpNumberOfBytes、lpCompletionKey、lpCompletionKey变量中。失败则返回零值

        
*/
        
if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, 
            (LPDWORD)
&PerHandleData, (LPOVERLAPPED *&PerIoData, INFINITE) == 0
        { 
            printf(
"GetQueuedCompletionStatus failed with error %d\n", GetLastError()); 
            
return 0
        } 

        
// 如果检测到客户端的socket关闭了BytesTransferred=0
        if (BytesTransferred == 0
        { 
            printf(
"Closing socket %d\n", PerHandleData->Socket); 

            
if (closesocket(PerHandleData->Socket) == SOCKET_ERROR) 
            { 
                printf(
"closesocket() failed with error %d\n", WSAGetLastError()); 
                
return 0
            } 

            GlobalFree(PerHandleData); 
            GlobalFree(PerIoData); 
            
continue
        } 

        
if (PerIoData->BytesRECV == 0
        { 
            PerIoData
->BytesRECV = BytesTransferred; 
            PerIoData
->BytesSEND = 0
        } 
        
else 
        { 
            PerIoData
->BytesSEND += BytesTransferred; 
        } 
        
        
//把收到的数据发给客户端
        if (PerIoData->BytesRECV > PerIoData->BytesSEND) 
        { 
            ZeroMemory(
&(PerIoData->Overlapped), sizeof(OVERLAPPED)); 

            PerIoData
->DataBuf.buf = PerIoData->Buffer + PerIoData->BytesSEND; 
            PerIoData
->DataBuf.len = PerIoData->BytesRECV - PerIoData->BytesSEND; 
            
            
if (WSASend(PerHandleData->Socket, &(PerIoData->DataBuf), 1&SendBytes, 0
                
&(PerIoData->Overlapped), NULL) == SOCKET_ERROR) 
            { 
                
if (WSAGetLastError() != ERROR_IO_PENDING) 
                { 
                    printf(
"WSASend() failed with error %d\n", WSAGetLastError()); 
                    
return 0
                } 
            } 
        } 
        
else 
        {
            
//投递WSARecv
            PerIoData->BytesRECV = 0

            Flags 
= 0
            ZeroMemory(
&(PerIoData->Overlapped), sizeof(OVERLAPPED)); 

            PerIoData
->DataBuf.len = DATA_BUFSIZE; 
            PerIoData
->DataBuf.buf = PerIoData->Buffer; 

            
if (WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1&RecvBytes, &Flags, 
                
&(PerIoData->Overlapped), NULL) == SOCKET_ERROR) 
            { 

                printf(
"%s",PerIoData->DataBuf.buf);

                
if (WSAGetLastError() != ERROR_IO_PENDING) 
                { 
                    printf(
"WSARecv() failed with error %d\n", WSAGetLastError()); 
                    
return 0
                } 
            } 
        } 
    } 
}
posted on 2008-10-17 10:07 王博炜 阅读(880) 评论(8)  编辑 收藏 引用 所属分类: 网络

评论

# re: 完成端口 2008-11-27 10:49 小年
看过你的文章觉得很简洁清楚,很受益。
我最近也研究iocp,感觉千头万绪,似懂非懂。其中遇到一个问题就是如果一次投递多个WSASend的时候会丢数据,我感觉应该是WSASend投递完数据后立即返回,把发送的数据放到缓存中等待发送,可是如果发送过快,就会导致后面的数据覆盖前边没有处理完的数据,导致丢包现象。如果像楼上那样判断数据是否完全发送完毕,如果完成再接着发送下一个数据,这样这个问题就可以解决了。但是如果是这样的话,完成端口的并发性又是如何体现的呢,不知道我上边的理解是否正确。希望大哥指点一下,谢谢~  回复  更多评论
  

# re: 完成端口 2008-11-28 00:08 王博炜
@小年
假设一种情况
我发出两次WSASend,一次投递10字节数据,里面全是a,第二次投递10字节数据,里面全是b
我们期待的顺序应该是一共发出去20字节的数据,前10字节是a,后10字节是b
但是由于WSASend有可能投递不完整,可能第一次只投递出去了5字节,我们在IOCP线程中捕获到了投递不完整,然后投递剩余的数据
那么最终的顺序是不是就变成了,一共20字节数据,前5字节是a,然后10字节的b,最后5字节为a

最好是一次投递一个WSASend(个人理解)  回复  更多评论
  

# re: 完成端口 2008-11-28 10:53 小年
谢谢这么快就回复了~
我昨天又查了一下相关的文档,msdn上有段话是描述iocp的特性的,如下
The most important property of a completion port is the concurrency value. The concurrency value of a completion port is specified when the completion port is created. This value limits the number of runnable threads associated with the completion port. When the total number of runnable threads associated with the completion port reaches the concurrency value, the system blocks the execution of any subsequent threads that specify the completion port until the number of runnable threads associated with the completion port drops below the concurrency value. The most efficient scenario occurs when there are completion packets waiting in the queue, but no waits can be satisfied because the port has reached its concurrency limit. In this case, when a running thread calls GetQueuedCompletionStatus, it will immediately pick up the queued completion packet. No context switches will occur, because the running thread is continually picking up completion packets and the other threads are unable to run.
可能我之前一直理解上有问题,完成端口内置很多处理线程,除了多线程带来的优势,可能在线程切换上节省很多开销,所以节省了很多时间,而不是我理解的一次投递多个WSASend。
  回复  更多评论
  

# re: 完成端口[未登录] 2008-11-28 15:49 王博炜
// 开启cpu个数的2倍+2个的线程
for (i=0; i < SystenInfo.dwNumberOfProcessors*2 + 2; i++)
{

HANDLE ThreadHandle;

//创建服务器工作线程,并且向线程传送完成端口
if ((ThreadHandle = CreateThread(NULL,0,ServerWorkerThread,CompetionPort,0,&ThreadID)) == NULL)
{
printf("CreateThread failed with error %d\n" ,GetLastError());
return 0;
}
CloseHandle(ThreadHandle);

}
开启cpu个数的2倍+2个的线程都在等
消息出现,如果队列里有消息就每个线程去获取一个消息执行它
完成后继续等消息出现
  回复  更多评论
  

# re: 完成端口[未登录] 2008-11-28 15:54 王博炜
@王博炜
在Completion Port上等待的线程不应该做"服务Completion Port"以外的事情  回复  更多评论
  

# re: 完成端口 2008-11-28 23:35 肥仔
@王博炜
其实投递不完整这种情况,理论上是会出现的,但是实际过程中,连续的小包,我没碰到过投递只成功一部分的情况。
理论上可以这样理解,WSASend只是把数据copy到TCP/IP层的内核缓存就算投递成功,每个socket内核的缓存空间是有限制的,如果投递过多,内核的数据来不及发出去,若被占满,下一次投递,可能只成功了部分,因为内核空间不足。
但是我从实际一般性的应用来看,没有遇到过这种情况。  回复  更多评论
  

# re: 完成端口 2009-02-18 17:32 firefly_liu
你好,如果开启了两个以上线程,但特殊情况下,只有一个连接,那么,我测试了一下,两个线程都能接受到GetQueuedCompletionStataus的,但是一个能正确反映出BytesTransferred是几个字节,而另一个却是我定义的缓存的size,而且两个线程都工作,只是有一个出错了,请指教在这里线程如何调度的呢?  回复  更多评论
  

# re: 完成端口 2009-12-30 17:06 tech_study_00
投递WSASend可以投递多个
WSASend(.., buf1, strlen(buf1)...);
WSASend(.., buf2, strlen(buf2)...);

WSASend(.., bufn, strlen(bufn)...);

需要注意的是,当调用GetQueuedCompletionStatus后得到已经发送的通知时,需要检查该包是否已经全部发送完成,如果没有发送完成,需要继续发完该包,判断发送是否全部结束的标准就是:待发送的总字节数和已经发送完的总字节数是否相等。

  回复  更多评论
  


专题:Android  iPad jQuery Chrome OS

博客园首页  IT新闻  知识库  学英语  C++程序员招聘
标题  
姓名  
主页
验证码 *
内容(提交失败后,可以通过“恢复上次提交”恢复刚刚提交的内容)  
  登录  使用高级评论  新用户注册  返回页首  恢复上次提交      
[使用Ctrl+Enter键可以直接提交]
每天10分钟,轻松学英语
网站导航: