C++ 技术中心

   :: 首页 :: 联系 ::  :: 管理
  158 Posts :: 0 Stories :: 85 Comments :: 0 Trackbacks

公告

郑重声明:本BLOG所发表的原创文章,作者保留一切权利。必须经过作者本人同意后方可转载,并注名作者(天空)和出处(CppBlog.com)。作者Email:coder@luckcoder.com

留言簿(9)

搜索

  •  

最新随笔

最新评论

评论排行榜

epoll是linux下高并发服务器的完美方案,因为是基于事件触发的,所以比select快的不只是一个数量级。
单线程epoll,触发量可达到15000,但是加上业务后,因为大多数业务都与数据库打交道,所以就会存在阻塞的情况,这个时候就必须用多线程来提速。
业务在线程池内,这里要加锁才行。测试结果2300个/s
测试工具:stressmark
因为加了适用与ab的代码,所以也可以适用ab进行压力测试。
char buf[1000] = {0};
sprintf(buf,"HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s","Hello world!\n");
send(socketfd,buf, strlen(buf),0);
#include <iostream>
#include 
<sys/socket.h>
#include 
<sys/epoll.h>
#include 
<netinet/in.h>
#include 
<arpa/inet.h>
#include 
<fcntl.h>
#include 
<unistd.h>
#include 
<stdio.h>
#include 
<pthread.h>

#include 
<errno.h>
 
#define MAXLINE 10
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 8006
#define INFTIM 1000
 
//线程池任务队列结构体

struct task{
  
int fd; //需要读写的文件描述符

  
struct task *next; //下一个任务

}
;
 
//用于读写两个的两个方面传递参数

struct user_data{
  
int fd;
  unsigned 
int n_size;
  
char line[MAXLINE];
}
;
 
//线程的任务函数

void * readtask(void *args);
void * writetask(void *args);
 
 
//声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件

struct epoll_event ev,events[20];
int epfd;
pthread_mutex_t mutex;
pthread_cond_t cond1;
struct task *readhead=NULL,*readtail=NULL,*writehead=NULL;
 
void setnonblocking(int sock)
{
     
int opts;
     opts
=fcntl(sock,F_GETFL);
     
if(opts<0)
     
{
          perror(
"fcntl(sock,GETFL)");
          exit(
1);
     }

    opts 
= opts|O_NONBLOCK;
     
if(fcntl(sock,F_SETFL,opts)<0)
     
{
          perror(
"fcntl(sock,SETFL,opts)");
          exit(
1);
     }
 
}

 
int main()
{
     
int i, maxi, listenfd, connfd, sockfd,nfds;
     pthread_t tid1,tid2;
    
     
struct task *new_task=NULL;
     
struct user_data *rdata=NULL;
     socklen_t clilen;
    
     pthread_mutex_init(
&mutex,NULL);
     pthread_cond_init(
&cond1,NULL);
     
//初始化用于读线程池的线程

     pthread_create(
&tid1,NULL,readtask,NULL);
     pthread_create(
&tid2,NULL,readtask,NULL);
    
     
//生成用于处理accept的epoll专用的文件描述符 

     epfd
=epoll_create(256);
 
     
struct sockaddr_in clientaddr;
     
struct sockaddr_in serveraddr;
     listenfd 
= socket(AF_INET, SOCK_STREAM, 0);
     
//把socket设置为非阻塞方式

     setnonblocking(listenfd);
     
//设置与要处理的事件相关的文件描述符

     ev.data.fd
=listenfd;
     
//设置要处理的事件类型

     ev.events
=EPOLLIN|EPOLLET;
     
//注册epoll事件

     epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,
&ev);
    
     bzero(
&serveraddr, sizeof(serveraddr)); 
     serveraddr.sin_family 
= AF_INET; 
     serveraddr.sin_port
=htons(SERV_PORT);
     serveraddr.sin_addr.s_addr 
= INADDR_ANY;
     bind(listenfd,(sockaddr 
*)&serveraddr, sizeof(serveraddr));
     listen(listenfd, LISTENQ);
    
     maxi 
= 0;
     
for ( ; ; ) {
          
//等待epoll事件的发生

          nfds
=epoll_wait(epfd,events,20,500);
          
//处理所发生的所有事件 

        
for(i=0;i<nfds;++i)
        
{
               
if(events[i].data.fd==listenfd)
               
{
                   
                    connfd 
= accept(listenfd,(sockaddr *)&clientaddr, &clilen);
                    
if(connfd<0){
                      perror(
"connfd<0");
                      exit(
1);
                   }

                    setnonblocking(connfd);
                   
                    
char *str = inet_ntoa(clientaddr.sin_addr);
                    
//std::cout<<"connec_ from >>"<<str<<std::endl;

                    
//设置用于读操作的文件描述符

                    ev.data.fd
=connfd;
                    
//设置用于注测的读操作事件

                 ev.events
=EPOLLIN|EPOLLET;
                    
//注册ev

                 epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,
&ev);
               }

            
else if(events[i].events&EPOLLIN)
            
{
                    
//printf("reading!/n"); 

                    
if ( (sockfd = events[i].data.fd) < 0continue;
                    new_task
=new task();
                    new_task
->fd=sockfd;
                    new_task
->next=NULL;
                    
//添加新的读任务

                    pthread_mutex_lock(
&mutex);
                    
if(readhead==NULL)
                    
{
                      readhead
=new_task;
                      readtail
=new_task;
                    }
 
                    
else
                    

                     readtail
->next=new_task;
                      readtail
=new_task;
                    }
 
                   
//唤醒所有等待cond1条件的线程

                    pthread_cond_broadcast(
&cond1);
                    pthread_mutex_unlock(
&mutex); 
              }

               
else if(events[i].events&EPOLLOUT)
               

                 
/*
              rdata=(struct user_data *)events[i].data.ptr;
                 sockfd = rdata->fd;
                 write(sockfd, rdata->line, rdata->n_size);
                 delete rdata;
                 //设置用于读操作的文件描述符
                 ev.data.fd=sockfd;
                 //设置用于注测的读操作事件
               ev.events=EPOLLIN|EPOLLET;
                 //修改sockfd上要处理的事件为EPOLIN
               epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
             
*/

               }

                             
          }

         
     }

}


static int count111 = 0;
static time_t oldtime = 0, nowtime = 0;
void * readtask(void *args)
{
   
   
int fd=-1;
   unsigned 
int n;
   
//用于把读出来的数据传递出去

   
struct user_data *data = NULL;
   
while(1){
        
        pthread_mutex_lock(
&mutex);
        
//等待到任务队列不为空

        
while(readhead==NULL)
             pthread_cond_wait(
&cond1,&mutex);
        
        fd
=readhead->fd;
        
//从任务队列取出一个读任务

        
struct task *tmp=readhead;
        readhead 
= readhead->next;
        delete tmp;
        pthread_mutex_unlock(
&mutex);
        data 
= new user_data();
        data
->fd=fd;
        

        
char recvBuf[1024= {0}
        
int ret = 999;
        
int rs = 1;

        
while(rs)
        
{
            ret 
= recv(fd,recvBuf,1024,0);// 接受客户端消息

            
if(ret < 0)
            
{
                
//由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可//读在这里就当作是该次事件已处理过。

                
if(errno == EAGAIN)
                
{
                    printf(
"EAGAIN\n");
                    
break;
                }

                
else{
                    printf(
"recv error!\n");
        
                    close(fd);
                    
break;
                }

            }

            
else if(ret == 0)
            
{
                
// 这里表示对端的socket已正常关闭. 

                rs 
= 0;
            }

            
if(ret == sizeof(recvBuf))
                rs 
= 1// 需要再次读取

            
else
                rs 
= 0;
        }

        
if(ret>0){

        
//-------------------------------------------------------------------------------


            data
->n_size=n;


            count111 
++;

            
struct tm *today;
            time_t ltime;
            time( 
&nowtime );

            
if(nowtime != oldtime){
                printf(
"%d\n", count111);
                oldtime 
= nowtime;
                count111 
= 0;
            }


            
char buf[1000= {0};
            sprintf(buf,
"HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s","Hello world!\n");
            send(fd,buf,strlen(buf),
0);
            close(fd);


       }

   }

}



posted on 2013-11-26 15:49 C++技术中心 阅读(4520) 评论(2)  编辑 收藏 引用 所属分类: Linux 编程

Feedback

# re: 高并发的epoll+线程池,业务在线程池内[未登录] 2013-11-27 12:05 春秋十二月
这个例子实现描述的epoll+theadpool方案不太好
1)fd读事件用加锁的queue通知,造成epoll thead和read thread间的同步,及many read threads间的锁竞争
2)fd写事件没有处理,后面的send(fd,buf,strlen(buf),0)也没处理好  回复  更多评论
  

# re: 高并发的epoll+线程池,业务在线程池内 2013-11-27 18:34 老爷
大哥, 你可以封装下啊, 你不觉得看起来挺费劲吗? 看起来费劲, 用起来就更不用说了~  回复  更多评论
  


只有注册用户登录后才能发表评论。
【推荐】超50万行VC++源码: 大型组态工控、电力仿真CAD与GIS源码库
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理