概述
epoll是linux提供一种多路复用的技术,类似各个平台都支持的select,只是epoll在内核的实现做了更多地优化,可以支持比select更多的文件描述符,当然也支持 socket这种网络的文件描述符。linux上的大并发的接入服务器,目前的实现方式肯定都通过epoll实现。
epoll和线程
有很多开发人员用epoll的时候,会开多个线程来进行数据通信,比如一个线程专门accept(我个人早些年在FreeBSD用kqueue的时候,由于对内部机制没有基本了解也这样搞),一个线程收发,或者接收和发送都用各自独立的线程。
通常情况下,accept独立线程是没必要的,accept对于内核而言,就应用程序从内核的未完成的SYN队列读取一点数据而已。具体参见 accept部分: 
 
收发独立成两个线程也没有必要,因为大部分的应用服务器,通常情况下,启动一个线程收发数据,最大数据的收发量瓶颈在于网卡,而不是CPU;像网游接入服务器配置一个KM的网卡,很少有游戏会申请1G的带宽,那一台机器得有多少数据输入和输出。所以我们通信线程为epoll服务器就够了。
epoll的基本原理
为了让某些朋友能读得更连惯,我还是说一下epoll基本原理。
epoll外部表现和select是一样的。他提供READ, WRITE和ERROR等事件。
大致流程像下面这样:
1. 应用注册感兴趣的事件到内核;
2. 内核在某种条件下,将事件通知应用程序;
3. 应用程序收到事件后,根据事件类型做对应的逻辑处理。
原理部分我再说一下,不容易理解的地方,包括水平触发和边缘触发,WRITE事件的事件利用(这个可以结合参考文献1的kqueue的WRITE事件,原理一致的)和修改事件的细节。
水平触发
READ事件,socket recv buff有数据,将一直向应用程序通知,直到buff为空。
WRITE事件,socket send buff从满的状态到可发送数据,将一直通知应用程序,直到buff满。
边缘触发
READ事件,socket recv buff有数据了,只通知应用一次,不管应用程序有没有调用read api,接下来都不通知了。
WRITE事件,socket send buff从满的状态到可以发送数据,只通知一次。
上面这个解释不知道大家能否理解,也只能这样说了。有疑问的做一下试验。另外,这些细节的东西,前几年固定下来后,这几年做的项目,是直接用的,也就很少在涉及细节,是凭理解和记忆写下的文字,万一有错请指正^-^。
WRITE事件的利用
这个还一下不好描述。大概描述一下,详细看参考文献1。大致这样:
1. 逻辑层写数据到应用层的发送buff,向epoll注册一下WRITE事件;
2. 这时epoll会通知应用程序一个WRITE事件;
3. 在WRITE事件响应函数里,从应用层的发送buff读数据,然后用socket send api发送。
因为我在很多实际项目中,看到大家没有利用epoll的WRITE的事件来发数据,特意地说一下。大部分的项目,是直接轮询应用程序的发送队列,我早期项目也是这么干的。
epoll的修改事件
对于这个我的映像比较深刻。epoll的修改事件比较坑爹,不能单独修改某个事件!怎么说呢?比如epoll里已经注册了READ&WRITE事件,你如果想单单重注册一下WRITE事件而且READ事件不变,epoll的epoll_ctl API是做不到的,你必须同时注册READ&WRITE,这个在下面的代码中可以看到。FreeBSD的kqueue在这一点完全满足我们程序员的要求。
抽象epoll API
我把herm socket epoll封装部分贴出来,让朋友们参考一下epoll的用法。大部分错误抛异常代码被我去掉了。
 
- class Multiplexor 
- { 
- public: 
- Multiplexor(int size, int timeout = -1, bool lt = true); 
- ~Multiplexor(); 
- void Run(); 
- void Register(ISockHandler* eh, MultiplexorMask mask); 
- void Remove(ISockHandler* eh); 
- void EnableMask(ISockHandler* eh, MultiplexorMask mask); 
- void DisableMask(ISockHandler* eh, MultiplexorMask mask); 
- private: 
- inline bool OperateHandler(int op, ISockHandler* eh, MultiplexorMask mask) 
- { 
- struct epoll_event evt; 
- evt.data.ptr = eh; 
- evt.events = mask; 
- return epoll_ctl(m_epfd, op, eh->GetHandle(), &evt) != -1; 
- } 
- private: 
- int m_epfd; 
- struct epoll_event* m_evts; 
- int m_size; 
- int m_timeout; 
- __uint32_t m_otherMasks; 
- }; 
 class Multiplexor
{
public:
	Multiplexor(int size, int timeout = -1, bool lt = true);
	~Multiplexor();
	void Run();
	void Register(ISockHandler* eh, MultiplexorMask mask);
	void Remove(ISockHandler* eh);
	void EnableMask(ISockHandler* eh, MultiplexorMask mask);
	void DisableMask(ISockHandler* eh, MultiplexorMask mask);
private:
	inline bool OperateHandler(int op, ISockHandler* eh, MultiplexorMask mask)
	{
		struct epoll_event evt;
		evt.data.ptr = eh;
		evt.events = mask;
		return epoll_ctl(m_epfd, op, eh->GetHandle(), &evt) != -1;
	}
private:
	int m_epfd;
	struct epoll_event* m_evts;
	int m_size;
	int m_timeout;
	__uint32_t m_otherMasks;
};
 
- Multiplexor::Multiplexor(int size, int timeout, bool lt) 
- { 
- m_epfd = epoll_create(size); 
- if (m_epfd == -1) 
- throw HERM_SOCKET_EXCEPTION(ST_OTHER); 
- m_size = size; 
- m_evts = new struct epoll_event[size]; 
- m_timeout = timeout; 
-  
- m_otherMasks = EPOLLERR | EPOLLHUP; 
- if (!lt) 
- m_otherMasks |= EPOLLET; 
- } 
- Multiplexor::~Multiplexor() 
- { 
- close(m_epfd); 
- delete[] m_evts; 
- } 
- void Multiplexor::Run() 
- { 
- int fds = epoll_wait(m_epfd, m_evts, m_size, m_timeout); 
- if (fds == -1) 
- { 
- if (errno == EINTR) 
- return; 
- } 
- for (int i = 0; i < fds; ++i) 
- { 
- __uint32_t evts = m_evts[i].events; 
- ISockHandler* eh = reinterpret_cast<ISockHandler*>(m_evts[i].data.ptr); 
- int stateType = ST_SUCCESS; 
- if (evts & EPOLLIN) 
- stateType = eh->OnReceive(); 
- if (evts & EPOLLOUT) 
- stateType = eh->OnSend(); 
- if (evts & EPOLLERR || evts & EPOLLHUP) 
- stateType = ST_EXCEPT_FAILED; 
- if (stateType != ST_SUCCESS) 
- eh->OnError(stateType, errno); 
- } 
- } 
- void Multiplexor::Register(ISockHandler* eh, MultiplexorMask mask) 
- { 
- MultiplexorMask masks = mask | m_otherMasks; 
- OperateHandler(EPOLL_CTL_ADD, eh, masks); 
- } 
- void Multiplexor::Remove(ISockHandler* eh) 
- { 
-  
- OperateHandler(EPOLL_CTL_DEL, eh, ALL_EVENTS_MASK); 
- } 
- void Multiplexor::EnableMask(ISockHandler* eh, MultiplexorMask mask) 
- { 
- MultiplexorMask masks = mask | Herm::READ_MASK | Herm::WRITE_MASK; 
- OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks); 
- } 
- void Multiplexor::DisableMask(ISockHandler* eh, MultiplexorMask mask) 
- { 
- MultiplexorMask masks = (Herm::READ_MASK | Herm::WRITE_MASK) & (~mask); 
- if (!OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks)) 
- throw HERM_SOCKET_EXCEPTION(ST_OTHER); 
- } 
 Multiplexor::Multiplexor(int size, int timeout, bool lt) 
{
	m_epfd = epoll_create(size);
	if (m_epfd == -1)
		throw HERM_SOCKET_EXCEPTION(ST_OTHER);
	
	m_size = size;
	m_evts = new struct epoll_event[size];
	m_timeout = timeout;
	// sys/epoll.h is no EPOLLRDHUP(0X2000), don't add EPOLLRDHUP
	m_otherMasks = EPOLLERR | EPOLLHUP;
	if (!lt)
		m_otherMasks |= EPOLLET;
}
Multiplexor::~Multiplexor()
{
	close(m_epfd);
	delete[] m_evts;
}
void Multiplexor::Run()
{
	int fds = epoll_wait(m_epfd, m_evts, m_size, m_timeout); 
	if (fds == -1)
	{
		if (errno == EINTR)
			return;
	}
	
	for (int i = 0; i < fds; ++i)
	{
		__uint32_t evts = m_evts[i].events;
		ISockHandler* eh = reinterpret_cast<ISockHandler*>(m_evts[i].data.ptr);
		int stateType = ST_SUCCESS;
		if (evts & EPOLLIN)
			stateType = eh->OnReceive();
		if (evts & EPOLLOUT)
			stateType = eh->OnSend();
		if (evts & EPOLLERR || evts & EPOLLHUP)
			stateType = ST_EXCEPT_FAILED;
		if (stateType != ST_SUCCESS)
			eh->OnError(stateType, errno);
	}
}
void Multiplexor::Register(ISockHandler* eh, MultiplexorMask mask)
{
	MultiplexorMask masks = mask | m_otherMasks;
	OperateHandler(EPOLL_CTL_ADD, eh, masks);
}
void Multiplexor::Remove(ISockHandler* eh)
{
	// Delete fd from epoll, don't need masks
	OperateHandler(EPOLL_CTL_DEL, eh, ALL_EVENTS_MASK);
}
void Multiplexor::EnableMask(ISockHandler* eh, MultiplexorMask mask)
{
	MultiplexorMask masks = mask | Herm::READ_MASK | Herm::WRITE_MASK;
	OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks);
}
void Multiplexor::DisableMask(ISockHandler* eh, MultiplexorMask mask)
{
	MultiplexorMask masks = (Herm::READ_MASK | Herm::WRITE_MASK) & (~mask);
	if (!OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks))
		throw HERM_SOCKET_EXCEPTION(ST_OTHER);
}
上面类就用到epoll_create(), epoll_ctl()和epoll_wait(),以及几种事件。epoll用起来比select清爽一些。
大致用法类似下面这样:
先定义一个Handler
 
- class StreamHandler : public Herm::ISockHandler 
- { 
- public: 
- virtual Herm::Handle GetHandle() const; 
- virtual int OnReceive(int); 
- virtual int OnSend(int); 
- }; 
 class StreamHandler : public Herm::ISockHandler
{  
public:	
	virtual Herm::Handle GetHandle() const;
    virtual int OnReceive(int); 
	virtual int OnSend(int);
};
 
在OnReceive()处理收到数据的动作,在OnSend()。。。。
在通信线程中,大概类似这样的代码,实际看情况。
 
- Multiplexor multiplexor; 
- StreamHandler sh; 
- multiplexor.Register(&sh, READ_EVT); 
- multiplexor.Run(...); 
 Multiplexor multiplexor;
StreamHandler sh;
multiplexor.Register(&sh, READ_EVT);
multiplexor.Run(...);
 
参考文献
1. 
 
FreeBSD上kqueue和epoll是类似的,有兴趣的朋友请参考。
http://blog.csdn.net/herm_lib/article/details/6047038
 
2. 
这里涉及到epoll的通信层如何和逻辑数据交互的问题
 
http://blog.csdn.net/herm_lib/article/details/5980657