GetQueuedCompletionStatus函数有个OVERLAPPED结构,很多资料上都采用不同的结构体来扩展该结构,比如有的资料定义:
typedef struct _OVERLAPPEDPLUS
{
OVERLAPPED ol;
SOCKET s, sclient;
int OpCode;
WSABUF wbuf;
DWORD dwBytes, dwFlags;
}OVERLAPPEDPLUS;
然后,当GetQueuedCompletionStatus(hIocp, &dwBytesXfered,(PULONG_PTR)&PerHandleKey, &Overlap, INFINITE);函数返回时候,人们常用OverlapPlus = CONTAINING_RECORD(Overlap, OVERLAPPEDPLUS, ol)得到一些信息。比如此时端口上完成的是什么操作,数据是什么等,还有,系统如何做到自动填充上述的结构的,也就是说,系统怎么知道在Overlap->OpCode存放的应该是操作类型,如读,写操作,而在Overlap->wbuf存放的应该是读写数据。
Overlap->OpCode,操作类型是在投递WSASend,WSARecv的时候,由你自己指定填充这个字段。
因为是非堵塞的,等于投递到与套接字相关联的完成端口上,系统会把把WSASend对应的缓冲区提交到底层缓冲,也可以把WSARecv投递的缓冲区,用接收到的数据填充,每一个WSASend,WSARecv,都应有新申请一个overlaspped plus结构提交,以存放本次投递的IO操作的相关数据,——单IO操作数据所以工作器线程中,从完成端口队列中get得到一个完成包的时候,可以根据单句柄数据知道在这个完成端口上是哪一个套接字投递的IO操作完成了,从get到的overlapped中得到相关的已经完成IO数据和信息,并作相应的处理。比如投递了1M,完成包却告知只完成512K,那么你就知道要把余下的512K继续投递WSASend,当然上一个WSASend的Overlapped这个时候可以重用到下一个WSASend中,这个是允许的,可以用一个字段存放全部1M,把余下未Send成功512k放到wbuf中,继续投递或者投递WSARecv1M数据,却收到一个512K的完成通知,那么你要继续投递WSARecv,当然前一个WSARecv的overlapped也可以重用,不过需要一些处理,把已经接收到的512K保存到某个字段中,再投递一个512K的请求去接收完成端口内部,对投递的Overlapped的填充,好像只有WSARecv的时候填充WSABUF,其他都是投递IO前,代码中显式填充,并投递的。至于完成了多少个字节,是在lpNumberOfBytes中得到。
对GetQueuedCompletionStatus函数解释:
实现从指定的IOCP获取CP。当CP队列为空时,对此函数的调用将被阻塞,而不是一直等待I/O的完成。当CP队列不为空时,被阻塞的线程将以后进先出(LIFO)顺序被释放。对于IOCP机制,它允许多线程并发调用GetQueuedCompletionStatus函数,最大并发数是在调用CreateIoCompletionPort函数时指定的,超出最大并发数的调用线程,将被阻塞。函数解释如下:
声明:
BOOL GetQueuedCompletionStatus(
HANDLE CompletionPort,
LPDWORD lpNumberOfBytes,
PULONG_PTR lpCompletionKey,
LPOVERLAPPED *lpOverlapped,
DWORD dwMilliseconds);
调用参数:
CompletionPort:指定的IOCP,该值由CreateIoCompletionPort函数创建。
lpnumberofbytes:一次完成后的I/O操作所传送数据的字节数。
lpcompletionkey:当文件I/O操作完成后,用于存放与之关联的CK。
lpoverlapped:为调用IOCP机制所引用的OVERLAPPED结构。
dwmilliseconds:用于指定调用者等待CP的时间。
返回值:
调用成功,则返回非零数值,相关数据存于lpNumberOfBytes、lpCompletionKey、lpCompletionKey变量中。失败则返回零值。
// 111.cpp : 定义控制台应用程序的入口点。
//
//Windows系统里,使用完成端口是高性能的方法之一,比如把完成端口使用到线程池和网络服务器里。现在就通过线程池的方
//法来介绍怎么样使用完成端口,高性能的服务器以后再仔细地介绍怎么样构造它。其实完成端口是一个队列,所有的线程都在等
//消息出现,如果队列里有消息,就每个线程去获取一个消息执行它。先用函数CreateIoCompletionPort来创建一个消息队列,然
//后使用GetQueuedCompletionStatus函数来从队列获取消息,使用函数PostQueuedCompletionStatus来向队列里发送消息。通过这
//三个函数就实现完成端口的消息循环处理。
#include "stdafx.h"
#include <winsock2.h>
#define PORT 3000
#define DATA_BUFSIZE 8192
typedef struct
{
OVERLAPPED Overlapped;
WSABUF DataBuf;
CHAR Buffer[DATA_BUFSIZE];
DWORD BytesSEND;
DWORD BytesRECV;
}PER_IO_OPERATION_DATA, *LPPER_IO_OPERATION_DATA;
typedef struct
{
SOCKET Socket;
}PER_HANDLE_DATA,*LPPER_HANDLE_DATA;
DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID) ;
int main(int argc, char* argv[])
{
SOCKADDR_IN InternetAddr;
SOCKET Listen,Accept;
HANDLE CompetionPort;
SYSTEM_INFO SystenInfo;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIOData;
int i;
DWORD RecvBytes;
DWORD Flags;
DWORD ThreadID;
WSADATA wsadata;
DWORD Ret;
if (Ret = WSAStartup(0x2020,&wsadata) != 0)
{
printf("WSAStartup failed with error %d\n",Ret);
return 0;
}
//打开一个空的完成端口
if ((CompetionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0)) == NULL)
{
printf("CreateIoCompletionPort failed with error %d\n",GetLastError());
return 0;
}
GetSystemInfo(&SystenInfo);
// 开启cpu个数的2倍+2个的线程
for (i=0; i < SystenInfo.dwNumberOfProcessors*2 + 2; i++)
{
HANDLE ThreadHandle;
//创建服务器工作线程,并且向线程传送完成端口
if ((ThreadHandle = CreateThread(NULL,0,ServerWorkerThread,CompetionPort,0,&ThreadID)) == NULL)
{
printf("CreateThread failed with error %d\n" ,GetLastError());
return 0;
}
CloseHandle(ThreadHandle);
}
/*
第一
socket()和WSASocket()函数都能返回一个SOCKET套接字;
socket()主要实现同步传输,Socket库中例程在应用于阻塞套接口时会阻塞。
WSASocket()用于异步传输,WSASocket()的发送操作和接收操作都可以被重叠使用。
WSASocket()的接收函数可以被多次调用,发出接收缓冲区,准备接收到来的数据。
发送函数也可以被多次调用,组成一个发送缓冲区队列。
socket()却只能发过之后等待回消息才可做下一步操作!
其次
WSASocket()所支持的版本如下:
Version: Requires Windows Sockets2.0.
Header: Declared in Winsock2.h.
socket()所支持老一些的版本,如Windows Sockets1.0和
Version: Requires Windows Sockets1.1
*/
//打开一个服务器socket
if ((Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)
{
printf("WSASocket() failed with error %d\n", WSAGetLastError());
return 0;
}
//服务器地址信息
InternetAddr.sin_family = AF_INET;
InternetAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);//0 接收所有数据包
InternetAddr.sin_port = htons(PORT);//监听 端口
//绑定服务和端口
if (bind(Listen,(LPSOCKADDR)&InternetAddr,sizeof(InternetAddr)) == SOCKET_ERROR)
{
printf("bind failed with error %d\n",WSAGetLastError());
return 0;
}
/*
int listen(SOCKET s, int users);
服务程序可以调用listen函数使其流套接字s处于监听状态。处于监听状态的流套接字s将维
护一个客户连接请求队列,该队列最多容纳users个客户连接请求。假如该函数执行成功,
则返回0;如果执行失败,则返回SOCKET_ERROR。
*/
if (listen(Listen,10) == SOCKET_ERROR)
{
printf("listen failed with error %d\n",WSAGetLastError());
return 0;
}
//接收连接并且分发给完成端口
while (TRUE)
{
/*
服务程序调用WSAAccept函数从处于监听状态的流套接字Listen的客户连接请求队列中取出排在最前的一个客户请求,
并且创建一个新的套接字来与客户套接字创建连接通道,如果连接成功,就返回新创建的套接字的描述符
*/
if ((Accept = WSAAccept(Listen,NULL,NULL,NULL,0)) == SOCKET_ERROR)
{
printf("WSAAccept failed with error %d\n",WSAGetLastError());
return 0;
}
//创建与套接字相关的套接字信息结构(声请内存)
if ((PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR,sizeof(PER_HANDLE_DATA))) == NULL)
{
printf("GlobalAlloc failed with error %d\n",GetLastError());
return 0;
}
printf("Socket number %d connected\n",Accept);
PerHandleData->Socket = Accept;//结构中存入接收的套接字
//将自WSAAccept返回的新套接字句柄同完成端口关联到一起
if ((CreateIoCompletionPort((HANDLE)Accept,CompetionPort,(DWORD)PerHandleData,0)) == NULL)
{
printf("CreateIoCompletionPort failed with error%d\n",GetLastError());
return 0;
}
//动态数据交换(Dynamic Data Exchange,DDE)也是一种进程间通信形式
//GlobalAlloc在Win16中就已经有了,这个函数返回一个句柄,通过这个句柄,两个进程可以共享一块内存,DDE和剪贴板
//就是通过这个函数交换数据的;malloc是一个库函数,这个函数的功能,是通过内部调用了VirtualAlloc完成的,并
//且分配的内存不能共享
// 创建同下面的WSARecv调用相关的IO套接字信息结构体
if ((PerIOData = (LPPER_IO_OPERATION_DATA) GlobalAlloc(GPTR,sizeof(PER_IO_OPERATION_DATA))) == NULL)
{
printf("GlobalAlloc() failed with error %d\n", GetLastError());
return 0;
}
ZeroMemory(&(PerIOData->Overlapped),sizeof(OVERLAPPED));
PerIOData->BytesRECV = 0;
PerIOData->BytesSEND = 0;
PerIOData->DataBuf.len = DATA_BUFSIZE;
PerIOData->DataBuf.buf = PerIOData->Buffer;
Flags = 0;
//从一个套接口接收数据
if (WSARecv(Accept,&(PerIOData->DataBuf),1,&RecvBytes,&Flags,&(PerIOData->Overlapped),NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecv() failed with error %d\n",WSAGetLastError());
return 0;
}
}
}
return 0;
}
DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)
{
HANDLE CompletionPort = (HANDLE) CompletionPortID;
DWORD BytesTransferred;
LPOVERLAPPED Overlapped;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
DWORD SendBytes, RecvBytes;
DWORD Flags;
while(TRUE)
{
/*
对GetQueuedCompletionStatus函数解释:
实现从指定的IOCP获取CP。当CP队列为空时,对此函数的调用将被阻塞,而不是一直等待I/O的完成。当CP队列不为空时,被
阻塞的线程将以后进先出(LIFO)顺序被释放。对于IOCP机制,它允许多线程并发调用GetQueuedCompletionStatus函数,
最大并发数是在调用CreateIoCompletionPort函数时指定的,超出最大并发数的调用线程,将被阻塞。函数解释如下:
声明:
BOOL GetQueuedCompletionStatus(
HANDLE CompletionPort,
LPDWORD lpNumberOfBytes,
PULONG_PTR lpCompletionKey,
LPOVERLAPPED *lpOverlapped,
DWORD dwMilliseconds);
调用参数:
CompletionPort:指定的IOCP,该值由CreateIoCompletionPort函数创建。
lpnumberofbytes:一次完成后的I/O操作所传送数据的字节数。
lpcompletionkey:当文件I/O操作完成后,用于存放与之关联的CK。
lpoverlapped:为调用IOCP机制所引用的OVERLAPPED结构。
dwmilliseconds:用于指定调用者等待CP的时间。
返回值:
调用成功,则返回非零数值,相关数据存于lpNumberOfBytes、lpCompletionKey、lpCompletionKey变量中。失败则返回零值
*/
if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,
(LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0)
{
printf("GetQueuedCompletionStatus failed with error %d\n", GetLastError());
return 0;
}
// 如果检测到客户端的socket关闭了BytesTransferred=0
if (BytesTransferred == 0)
{
printf("Closing socket %d\n", PerHandleData->Socket);
if (closesocket(PerHandleData->Socket) == SOCKET_ERROR)
{
printf("closesocket() failed with error %d\n", WSAGetLastError());
return 0;
}
GlobalFree(PerHandleData);
GlobalFree(PerIoData);
continue;
}
if (PerIoData->BytesRECV == 0)
{
PerIoData->BytesRECV = BytesTransferred;
PerIoData->BytesSEND = 0;
}
else
{
PerIoData->BytesSEND += BytesTransferred;
}
//把收到的数据发给客户端
if (PerIoData->BytesRECV > PerIoData->BytesSEND)
{
ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
PerIoData->DataBuf.buf = PerIoData->Buffer + PerIoData->BytesSEND;
PerIoData->DataBuf.len = PerIoData->BytesRECV - PerIoData->BytesSEND;
if (WSASend(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &SendBytes, 0,
&(PerIoData->Overlapped), NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSASend() failed with error %d\n", WSAGetLastError());
return 0;
}
}
}
else
{
//投递WSARecv
PerIoData->BytesRECV = 0;
Flags = 0;
ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
PerIoData->DataBuf.len = DATA_BUFSIZE;
PerIoData->DataBuf.buf = PerIoData->Buffer;
if (WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,
&(PerIoData->Overlapped), NULL) == SOCKET_ERROR)
{
printf("%s",PerIoData->DataBuf.buf);
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecv() failed with error %d\n", WSAGetLastError());
return 0;
}
}
}
}
}