随笔-150  评论-223  文章-30  trackbacks-0
情景分析
   在网络编程中,通常异步比同步处理更为复杂,但由于异步的事件通知机制,避免了同步方式中的忙等待,提高了吞吐量,因此效率较高,在高性能应用开发中,经常被用到。而在处理异步相关的问题时,状态机模式是一种典型的有效方法,这在libevent、memcached、nginx等开源软件(库)中多次被使用而得到见证。据此,为抛砖引玉,本文展示了使用此方法异步接收变长数据包的实现,这里的变长是指在某一种网络协议中,具有完整意义的数据包长度是不固定的。为了描述方便,我们以一个TCP C/S模式的简单例子为场景分析说明,服务端在某知名端口监听,使用异步IO复用机制epoll ET模式接受连接、接收分析请求、存取数据到内存缓冲中,这种内存缓冲类似于数据库,数据按键值对存取;客户端的请求包括增加、修改和删除三种,每种请求对应的数据包长度不一样。

协议封包
   如下图所示,请求封包各字段从左到右依次为:type字段表示请求类型,占1个字节,值域为{1,2,3},1为增加,2为修改,3为删除;key字段表示键名,占16个字节;val字段表示值,占256个字节;expire表示生存期,占4个字节,单位为秒。显而易见,对于每种类型的请求,其封包长度是固定的。

状态转换

   由于在一条连接上客户端可以先后发送多种不同类型的请求,因此服务端需要接收完整某种请求的包后,才能解析处理。当处于某种特定类型的请求时,接收完它的包,这很容易实现。但当存在多种不同类型的请求时,就需要先识别当前的请求类型,再在这种类型中接收完整的包,然后再识别新的请求类型,继续循环这样的一个过程。因此,这就自然而然地对应到了状态机,如下图所示,有4个状态:1个起始状态prepare,在此状态中识别当前请求类型,转到下一中间状态;3个中间状态addsetdel,分别对应增加修改删除请求,在此状态中不断接收数据,直至接收完整,再转到起始状态。由于从中间状态能转到起始状态,因此就没必要存在结束状态。e1、e2、e3、e4表示不同状态间转化的触发事件。

代码实现

  数据结构定义
   connection类表示一条在客户端和服务端间建立的连接,静态成员函数handle_read被epoll模型当有数据可读时回调,普通成员函数handle_read则做实际的接收处理。
 1  enum read_state { prepare, add, set, del };
 2
 3static const char MSG_TYPE_ADD = 1;
 4static const char MSG_TYPE_SET = 2;
 5static const char MSG_TYPE_DEL = 3;
 6
 7#pragma pack(1)
 8struct msg_add
 9{
10   char key[16];
11   char val[256];
12   uint32_t expire;
13}
;
14
15struct msg_set
16{
17    char key[16];
18    char val[256];
19}
;
20
21struct msg_del
22{
23    char key[16];
24}
;
25#pragma pack()
26
27static const size_t MSG_MAX_SIZE = sizeof(msg_add);
28
29class connection
30{
31public:
32    connection();
33    
34    void recv_add_msg(msg_add* msg);
35    void recv_set_msg(msg_set* msg);
36    void recv_del_msg(msg_get* msg);
37
38    bool send_add_msg(const char* key,const char* val,uint32_t expire);
39    bool send_set_msg(const char* key,const char* val);
40    bool send_del_msg(const char* key);
41    
42private:
43    void reset_state()
44    {  tran_ = 0, size_ = 1; s_ = prepare;}
45    
46    void handle_read();
47    static void handle_read(int fd,short ev,void* arg);    
48    
49private:
50    int sock_;
51    char buf_[MSG_MAX_SIZE];
52    size_t tran_;
53    size_t size_;
54    read_state s_;
55}
;

  服务端异步接收
   最初时处于起始状态prepare,在这个状态中:先接收1个字节,分析请求类型,更新状态,然后继续接收数据。当收到数据read返回时,那么这时已经处于3种中间状态addsetdel之一了,在这个状态中:只要继续收完这种类型的请求包即可解析处理,最后再重设,返回到状态prepare,继续接收下一个请求包。
 1connection::connection()
 2:sock(-1)
 3,tran_(0)
 4,size_(1)
 5,s_(prepare)
 6{
 7}

 8
 9void connection::recv_add_msg(msg_add* msg)
10{
11}

12
13void connection::recv_set_msg(msg_set* msg)
14{
15}

16
17void connection::recv_del_msg(msg_del* msg)
18{
19}

20    
21void connection::handle_read(int fd,short ev,void* arg)
22{
23    static_cast<connection*>(arg)->handle_read();    
24}

25
26void connection::state_machine()
27{
28    switch(s_){
29        case prepare: 
30        if(MSG_TYPE_ADD==buf_[0]){
31            tran_ = 0, size_ = sizeof(msg_add);
32            s_ = add;
33        }
else if(MSG_TYPE_SET==buf_[0]){
34            tran_ = 0, size_ = sizeof(msg_set);
35            s_ = set;
36        }
else if(MSG_TYPE_DEL==buf_[0]){
37            tran_ = 0, size_ = sizeof(msg_del);
38            s_ = del;
39        }
else 
40            assert(false);
41        break;
42        
43        case add:
44        if(tran_ == size_){
45            recv_add_msg(reinterpret_cast<msg_add*>(buf_));
46            reset_state();
47        }

48        break;
49        
50        case set:
51        if(tran_ == size_){
52            recv_set_msg(reinterpret_cast<msg_set*>(buf_));
53            reset_state();
54        }

55        break;
56        
57        case del:
58        if(tran_ == size_){
59            recv_del_msg(reinterpret_cast<msg_del*>(buf_));
60            reset_state();
61        }

62        break;
63    }

64}

65
66void connection::handle_read()
67{
68    ssize_t ret;
69    for(;;){
70        ret = read(sock_,buf_+tran_,size_-tran_);
71        if (ret > 0){
72            tran_ += ret;
73            state_machine();
74        }
else if(ret < 0 && errno == EAGAIN){
75            break;
76        }
else{
77            close(sock_); break;
78        }

79    }

80}

  客户端同步发送
   由于一般大多数的客户端不像服务端要求高性能高并发,因此使用同步方式来发送数据。下面代码忽略了错误处理,为简单方便,发送请求的实现也写在了类connection内,依次为send_add_msg、send_set_msg、send_del_msg成员函数。
 1bool connection::send_iovec(char type,void* msg,size_t len)
 2{
 3    struct iovec iov[2];
 4    iov[0].iov_base = &type;
 5    iov[0].iov_len  = 1;
 6    iov[1].iov_base = msg;
 7    iov[1].iov_len  = len;
 8
 9    return writev(sock_,iov,NUM_ELEMENTS(iov))==1+len;
10}

11
12bool connection::send_add_msg(const char* key,const char* val,uint32_t expire)
13{
14    msg_add msg;
15    strcpy(msg.key,key);
16    strcpy(msg.val,val);
17    msg.expire = expire;
18    return send_iovec(MSG_TYPE_ADD,&msg,sizeof(msg));
19}

20
21bool connection::send_set_msg(const char* key,const char* val)
22{
23    msg_set msg;    
24    strcpy(msg.key,key);
25    strcpy(msg.val,val);    
26    return send_iovec(MSG_TYPE_SET,&msg,sizeof(msg));
27}

28
29bool connection::send_del_msg(const char* key)
30{
31    msg_del msg;
32    strcpy(msg.key,key);
33    return send_iovec(MSG_TYPE_GET,&msg,sizeof(msg));
34}

小结
   虽然以上所述的场景是网络通信,但对于进程间使用管道和字节流套接字的通信,也同样适合。
posted on 2012-09-20 15:48 春秋十二月 阅读(2768) 评论(2)  编辑 收藏 引用 所属分类: Network

评论:
# re: 状态机模式之应用(1): 异步接收变长数据包 2012-09-20 21:42 | haviday
對網絡通訊研究得很深呀  回复  更多评论
  
# re: 状态机模式之应用(1): 异步接收变长数据包 2012-09-21 13:32 | fzy
你这样,逻辑和底层或者中间层发生了强耦合。
  回复  更多评论
  

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理