loop_in_codes

低调做技术__欢迎移步我的独立博客 codemaro.com 微博 kevinlynx

#

实现LUA脚本同步处理事件:LUA的coroutine

author : Kevin Lynx

需求

    受WOW的影响,LUA越来越多地被应用于游戏中。脚本被用于游戏中主要用于策划编写游戏规则相关。实际运用中,
我们会将很多宿主语言函数绑定到LUA脚本中,使脚本可以更多地控制程序运行。例如我们可以绑定NPCDialog之类的函数
到LUA中,然后策划便可以在脚本里控制游戏中弹出的NPC对话框。
    我们现在面临这样的需求:对于宿主程序而言,某些功能是不能阻塞程序逻辑的(对于游戏程序尤其如此),但是为
了方便策划,我们又需要让脚本看起来被阻塞了。用NPCDialog举个例子,在脚本中有如下代码 :

    ret = NPCDialog( "Hello bitch" )
   
if ret == OK then print("OK") end


    对于策划而言,NPCDialog应该是阻塞的,除非玩家操作此对话框,点击OK或者关闭,不然该函数不会返回。而对于
宿主程序C++而言,我们如何实现这个函数呢:

 

    static int do_npc_dialog( lua_State *L )
   
{
       
const char *content = lua_tostring( L, -1 );
       
        lua_pushnumber( ret );
       
return 1;
    }


    显然,该函数不能阻塞,否则它会阻塞整个游戏线程,这对于服务器而言是不可行的。但是如果该函数立即返回,那
么它并没有收集到玩家对于那个对话框的操作。
    综上,我们要做的是,让脚本感觉某个操作阻塞,但事实上宿主程序并没有阻塞。

事件机制

    一个最简单的实现(对于C程序员而言也许也是优美的),就是使用事件机制。我们将对话框的操作结果作为一个事件。
脚本里事实上没有哪个函数是阻塞的。为了处理一些“阻塞”函数的处理结果,脚本向宿主程序注册事件处理器(同GUI事件
处理其实是一样的),例如脚本可以这样:

    function onEvent( ret )
       
if ret == OK then print("OK") end
    end
   
-- register event handler
    SetEventHandler(
"onEvent" )
    NPCDialog(
"Hello bitch")


    宿主程序保存事件处理器onEvent函数名,当玩家操作了对话框后,宿主程序回调脚本中的onEvent,完成操作。
    事实上我相信有很多人确实是这么做的。这样做其实就是把一个顺序执行的代码流,分成了很多块。但是对于sleep
这样的脚本调用呢?例如:

 

    --do job A
    sleep(
10)
   
--do job B
    sleep(
10)
   
--do job C
   


    那么采用事件机制将可能会把代码分解为:

    function onJobA
       
--do job A
        SetEventHandlerB(
"onJobB")
        sleep(
10)
    end
    function onJobB
       
--do job B
        SetEventHandlerC(
"onJobC")
    end
    function onJobC
       
--do job C
    end
   
-- script starts here
    SetEventHandlerA(
"onJobA" )
    sleep(
10)


    代码看起来似乎有点难看了,最重要的是它不易编写,策划估计会抓狂的。我想,对于非专业程序员而言,程序的
顺序执行可能理解起来更为容易。

SOLVE IT

    我们的解决方案,其实只有一句话:当脚本执行到阻塞操作时(如NPCDialog),挂起脚本,当宿主程序某个操作完
成时,让脚本从之前的挂起点继续执行。
    这不是一种假想的功能。我在刚开始实现这个功能之前,以为LUA不支持这个功能。我臆想着如下的操作:
    脚本:
    ret = NPCDialog("Hello bitch")
    if ret == 0 then print("OK") end
    宿主程序:

    static int do_npc_dialog( lua_State *L )
   
{
       
        lua_suspend_script( L );
       
    }


    某个地方某个操作完成了:
    lua_resume_script( L );
    当我实现了这个功能后,我猛然发现,实际情况和我这里想的差不多(有点汗颜)。


认识Coroutine

    coroutine是LUA中类似线程的东西,但是它其实和fiber更相似。也就是说,它是一种非抢占式的线程,它的切换取决
于任务本身,也就是取决你,你决定它们什么时候发生切换。建议你阅读lua manual了解更多。
    coroutine支持的典型操作有:lua_yield, lua_resume,也就是我们需要的挂起和继续执行。
    lua_State似乎就是一个coroutine,或者按照LUA文档中的另一种说法,就是一个thread。我这里之所以用’似乎‘是
因为我自己也无法确定,我只能说,lua_State看起来就是一个coroutine。
    LUA提供lua_newthread用于手工创建一个coroutine,然后将新创建的coroutine放置于堆栈顶,如同其他new出来的
对象一样。网上有帖子说lua_newthread创建的东西与脚本里调用coroutine.create创建出来的东西不一样,但是根据我
的观察来看,他们是一样的。lua_newthread返回一个lua_State对象,所以从这里可以看出,“lua_State看起来就是一个
coroutine”。另外,网上也有人说创建新的coroutine代价很大,但是,一个lua_State的代价能有多大?当然,我没做过
测试,不敢多言。
    lua_yield用于挂起一个coroutine,不过该函数只能用于coroutine内部,看看它的参数就知道了。
    lua_resume用于启动一个coroutine,它可以用于coroutine没有运行时启动之,也可以用于coroutine挂起时重新启动
之。lua_resume在两种情况下返回:coroutine挂起或者执行完毕,否则lua_resume不返回。
    lua_yield和lua_resume对应于脚本函数:coroutine.yield和coroutine.resume,建议你写写脚本程序感受下coroutine,
例如:

    function main()
        print(
"main start")
        coroutine.yield()
        print(
"main end")
    end
    co
=coroutine.create( main );
    coroutine.resume(co)


REALLY SOLVE IT

    你可能会想到,我们为脚本定义一个main,然后在宿主程序里lua_newthread创建一个coroutine,然后将main放进去,
当脚本调用宿主程序的某个’阻塞‘操作时,宿主程序获取到之前创建的coroutine,然后yield之。当操作完成时,再resume
之。
    事实上方法是对的,但是没有必要再创建一个coroutine。如之前所说,一个lua_State看上去就是一个coroutine,
而恰好,我们始终都会有一个lua_State。感觉上,这个lua_State就像是main coroutine。(就像你的主线程)
    思路就是这样,因为具体实现时,还是有些问题,所以我罗列每个步骤的代码。
    初始lua_State时如你平时所做:

    lua_State *L = lua_open();
    luaopen_base( L );


    注册脚本需要的宿主程序函数到L里:

    lua_pushcfunction( L, sleep );
    lua_setglobal( L,
"my_sleep" );


    载入脚本文件并执行时稍微有点不同:

    luaL_loadfile( L, "test.lua" );
lua_resume( L,
0 ); /* 调用resume */


    在你的’阻塞‘函数里需要挂起coroutine:

    return lua_yield( L, 0 );


    注意,lua_yield函数非常特别,它必须作为return语句被调用,否则会调用失败,具体原因我也不清楚。而在这里,
它作为lua_CFunction的返回值,会不会引发错误?因为lua_CFunction约定返回值为该函数对于脚本而言的返回值个数。
实际情况是,我看到的一些例子里都这样安排lua_yield,所以i do what they do。

    在这个操作完成后(如玩家操作了那个对话框),宿主程序需要唤醒coroutine:

    lua_resume( L, 0 );

 

    大致步骤就这些。如果你要单独创建新的lua_State,反而会搞得很麻烦,我开始就是那样的做的,总是实现不了自己
预想中的效果。

相关下载:
    例子程序中,我给了一个sleep实现。脚本程序调用sleep时将被挂起,宿主程序不断检查当前时间,当时间到时,resume
挂起的coroutine。下载例子

 

8.13补充

   可能有时候,我们提供给脚本的函数需要返回一些值给脚本,例如NPCDialog返回操作结果,我们只需要在宿主程序里lua_resume

之前push返回值即可,当然,需要设置lua_resume第二个参数为返回值个数。

2.9.2010
    lua_yield( L, nResults )第二个参数指定返回给lua_resume的值个数。如下:

   lua_pushnumber( L, 3 );
   
return lua_yield( L, 1 );
 ..
   
int ret = lua_resume( L, 0 );
   
if( ret == LUA_YIELD )
   
{
         lua_Number r 
= luaL_checknumber( L, -1 );
   }

posted @ 2008-08-12 16:02 Kevin Lynx 阅读(12728) | 评论 (14)编辑 收藏

为http server加入CGI--网页外观的应用程序

Implement CGI in your httpd

author : Kevin Lynx

Purpose

    为我们的http server加入CGI的功能,并不是要让其成为真正响应CGI query功能的web server。正如klhttpd的开发初衷一样,
更多的时候我们需要的是一个嵌入式(embeded)的web server。

    而加入CGI功能(也许它算不上真正的CGI),则是为了让我们与client拥有更多的交互能力。我们也许需要为我们的应用程序
加入一个具有网页外观的界面,也许这才是我的目的。

Brief

    CGI,common gateway interface,在我看来它就是一个公共约定。客户端浏览器提交表单(form)操作结果给服务器,服务器
解析这些操作,完成这些操作,然后创建返回信息(例如一个静态网页),返回给浏览器。
    因为浏览器遵循了CGI发送请求的标准,那么我们所要做的就是解析收到的请求,处理我们自己的逻辑,返回信息给客户端即可。
Detail...
    如果你要获取CGI的更多信息,你可以在网上查阅相关信息。如果你要获取web server对CGI标准的处理方式,我无法提供给你
这些信息。不过我愿意提供我的臆测:
    处理CGI请求通常都是由脚本去完成(当然也可以用任何可执行程序完成)。就我所获取的资料来看,只要一门语言具备基本的
输入输出功能,它就可以被用作CGI脚本。浏览器以key-value的形式提交query string,一个典型的以GET方式提交query string的
URL类似于:
http://host/cgi-bin/some-script.cgi?name=kevin+lynx&age=22。不必拘泥于我上句话中出现的一些让你产生问号的
名词。我可以告诉你,这里举出的URL(也许更准确的说法是URI)中问号后面的字符串即为一个query string。
    我希望你看过我的上一篇草文<实现自己的http server>,你应该知道一个典型的GET请求时怎样的格式,应该知道什么是initial
line,应该知道HTTP请求有很多method,例如GET、POST、HEAD等。
    一个CGI请求通常使用GET或POST作为其request method。对于一个GET类型的CGI请求,一个典型的request类似于:
    GET /cgi-bin/some-script.cgi?name=kevin+lynx&age=22 HTTP/1.1
    other headers...
    ...
    我上面举例的那个URL则可能出现在你的浏览器的地址栏中,它对我们不重要。
    而对于POST类型的CGI请求呢?query string只是被放到了optional body里,如:
    POST /cgi-bin/some-script.cgi HTTP/1.1
    other heads...
    ...
    name=kevin+lynx&age=22
    不管我说到什么,我希望你不要陷入概念的泥潭,我希望你能明确我在brief里提到的what can we do and how to do。

How about the web browser ?

    我总是想引领你走在实践的路上(on the practice way)。作为一个程序员,你有理由自负,但是你没任何理由小觑任何一个
编程问题。打开你的编译器,现在就敲下main。
    so,我们需要知道如何利用我们的浏览器,让我们更为顺手地实践我们的想法。
    你也许会写:<html><head><title>HTML test</title></head><body>it's a html</body></html>这样的HTML标记语言。但是
这没有利用上CGI。check out the url in the reference section and learn how to write FORMs.
    这个时候你应该明白:

   <html>
      
<head>
        
<title>CGI test</title>
      
</head>
      
<body>
        
<FORM ACTION="./cgi-bin/test.cgi">
           
<INPUT TYPE="text" NAME="name" SIZE=20 VALUE="Your name">
         
</FORM>
      
</body>
    
</html>

    
    加入<FORM>来添加表单,加入<INPUT TYPE="text"加一个edit box控件。那么当你在里面输入名字按下回车键时,浏览器将整理
出一个CGI query string发给服务器。
    我希望你能在浏览器里看到以问号打头的key-value形式的字符串。

So here we go...

    在你自己已有的http server的代码基础上,让浏览器来请求一个包含FORM的网页,例如以上我举例的html。当浏览器将请求发
给服务器的时候,看看服务器这边收到的request。即使你猜到了什么,我也建议你看看,这样你将看到一个真正的CGI query是怎么
样的。
    so you know,CGI请求本质上也就是一连串的key-value字符串。真正的http server会将这些字符串传给CGI脚本。脚本只需要
输出处理结果信息到标准输出,服务器就会收集这些输出然后发给客户端。
    你也许在揣摩,我们的httpd如何地exec一个外部程序或脚本,如何等待脚本执行完(需不需要异步?),然后如何收集标准输
出上的信息,然后是否需要同步地将这些信息送往客户端。
    这很没必要。如果我们的http server真的只是被embeded到其他程序的。我们只需要将query的逻辑处理交给上层模块即可。

What's the next..

    现在你知道什么是CGI query string,你也许稍微思考下就知道接下来你该如何去做(if you smart enough)。记住我们的目标,
只是解析CGI query string,然后处理相关逻辑,返回处理结果信息给客户端。
    所以接下来,呃,接下来,everything is up to you:分析那些恼人的字符串,将其从&的间隔中整理出来,得到一个key-value
的表,将这个表给你的上层模块,上层模块根据query scritp决定执行什么逻辑,根据这个表获取需要的参数,处理,将结果格式
化为HTML之类的东西,then response。
    就这么简单。

Example...

    同样,我提供了源码,从而证明我不是纸上谈兵夸夸其谈。我希望我公开源码的习惯能受到你的赞同。

   下载带CGI的http server.

Reference:

http://hoohoo.ncsa.uiuc.edu/cgi/ (获取服务器需要做的)
http://www.ietf.org/rfc/rfc3875 (rfc cgi)
http://www.cmis.brighton.ac.uk/~mas/mas/courses/html/html2.html (学会写带FORM的HTML)

posted @ 2008-08-05 22:01 Kevin Lynx 阅读(4133) | 评论 (8)编辑 收藏

实现自己的http server

Write your own http server

author : Kevin Lynx

Why write your own?

    看这个问题的人证明你知道什么是http server,世界上有很多各种规模的http server,为什么要自己实现一个?其实没什么
理由。我自己问自己,感觉就是在自己娱乐自己,或者说只是练习下网络编程,或者是因为某日我看到某个库宣称自己附带一个小
型的http server时,我不知道是什么东西,于是就想自己去实现一个。

What's httpd ?

    httpd就是http daemon,这个是类unix系统上的名称,也就是http server。httpd遵循HTTP协议,响应HTTP客户端的request,
然后返回response。
    那么,什么是HTTP协议?最简单的例子,就是你的浏览器与网页服务器之间使用的应用层协议。虽然官方文档说HTTP协议可以
建立在任何可靠传输的协议之上,但是就我们所见到的,HTTP还是建立在TCP之上的。
    httpd最简单的response是返回静态的HTML页面。在这里我们的目标也只是一个响应静态网页的httpd而已(也许你愿意加入CGI
特性)。

More details about HTTP protocol

    在这里有必要讲解HTTP协议的更多细节,因为我们的httpd就是要去解析这个协议。
    关于HTTP协议的详细文档,可以参看rfc2616。但事实上对于实现一个简单的响应静态网页的httpd来说,完全没必要读这么一
分冗长的文档。在这里我推荐<HTTP Made Really Easy>,以下内容基本取自于本文档。

- HTTP协议结构
  HTTP协议无论是请求报文(request message)还是回应报文(response message)都分为四部分:
  * 报文头 (initial line )
  * 0个或多个header line
  * 空行(作为header lines的结束)
  * 可选body
  HTTP协议是基于行的协议,每一行以\r\n作为分隔符。报文头通常表明报文的类型(例如请求类型),报文头只占一行;header line
  附带一些特殊信息,每一个header line占一行,其格式为name:value,即以分号作为分隔;空行也就是一个\r\n;可选body通常
  包含数据,例如服务器返回的某个静态HTML文件的内容。举个例子,以下是一个很常见的请求报文,你可以截获浏览器发送的数据
  包而获得:

    1  GET /index.html HTTP/1.1
    2  Accept-Language: zh-cn
    3  Accept-Encoding: gzip, deflate
    4  User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727; MAXTHON 2.0)
    5  Host: localhost
    6  Connection: Keep-Alive
    7
  我为每一行都添加了行号,第1行就是initial line,2-6行是header lines,7行是一个header line的结束符,没有显示出来。
  以下是一个回应报文:
    1  HTTP/1.1 200 OK
    2  Server: klhttpd/0.1.0
    3  Content-Type: text/html
    4  Content-Length: 67
    5
    6  <head><head><title>index.html</title></head><body>index.html</body>
  第6行就是可选的body,这里是index.html这个文件的内容。

- HTTP request method
  因为我们做的事服务器端,所以我们重点对请求报文做说明。首先看initial line,该行包含几个字段,每个字段用空格分开,例
  如以上的GET /index.html HTTP/1.1就可以分为三部分:GET、/index.html、HTTP/1.1。其中第一个字段GET就是所谓的request
  method。它表明请求类型,HTTP有很多method,例如:GET、POST、HEAD等。

  就我们的目标而言,我们只需要实现对GET和HEAD做响应即可。

  GET是最普遍的method,表示请求一个资源。什么是资源?诸如HTML网页、图片、声音文件等都是资源。顺便提一句,HTTP协议
  中为每一个资源设置一个唯一的标识符,就是所谓的URI(更宽泛的URL)。
  HEAD与GET一样,不过它不请求资源内容,而是请求资源信息,例如文件长度等信息。

- More detail
  继续说说initial line后面的内容:
  对应于GET和HEAD两个method,紧接着的字段就是资源名,其实从这里可以看出,也就是文件名(相对于你服务器的资源目录),例
  如这里的/index.html;最后一个字段表明HTTP协议版本号。目前我们只需要支持HTTP1.1和1.0,没有多大的技术差别。

  然后是header line。我们并不需要关注每一个header line。我只罗列有用的header line :
  - Host : 对于HTTP1.1而言,请求报文中必须包含此header,如果没有包含,服务器需要返回bad request错误信息。
  - Date : 用于回应报文,用于客户端缓存数据用。
  - Content-Type : 用于回应报文,表示回应资源的文件类型,以MIME形式给出。什么是MIME?它们都有自己的格式,例如:
    text/html, image/jpg, image/gif等。
  - Content-Length : 用于回应报文,表示回应资源的文件长度。

body域很简单,你只需要将一个文件全部读入内存,然后附加到回应报文段后发送即可,即使是二进制数据。

- 回应报文
  之前提到的一个回应报文例子很典型,我们以其为例讲解。首先是initial line,第一个字段表明HTTP协议版本,可以直接以请求
  报文为准(即请求报文版本是多少这里就是多少);第二个字段是一个status code,也就是回应状态,相当于请求结果,请求结果
  被HTTP官方事先定义,例如200表示成功、404表示资源不存在等;最后一个字段为status code的可读字符串,你随便给吧。

  回应报文中最好跟上Content-Type、Content-Length等header。

具体实现
    正式写代码之前我希望你能明白HTTP协议的这种请求/回应模式,即客户端发出一个请求,然后服务器端回应该请求。然后继续
这个过程(HTTP1.1是长连接模式,而HTTP1.0是短连接,当服务器端返回第一个请求时,连接就断开了)。
    这里,我们无论客户端,例如浏览器,发出什么样的请求,请求什么资源,我们都回应相同的数据:

               

/* 阻塞地接受一个客户端连接 */
        SOCKET con 
= accept( s, 00 ); 
        
/* recv request */
        
char request[1024= 0 };
        ret 
= recv( con, request, sizeof( request ), 0 );
        printf( request );
        
/* whatever we recv, we send 200 response */
        
{
            
char content[] = "<head><head><title>index.html</title></head><body>index.html</body>";
            
char response[512];
            sprintf( response, 
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: %d\r\n\r\n%s", strlen( content ), content );
            ret 
= send( con, response, strlen( response ), 0 );
        }

        closesocket( con ); 

 

    程序以最简单的阻塞模式运行,我们可以将重点放在协议的分析上。运行程序,在浏览器里输入http://localhost:8080/index.html
,然后就可以看到浏览器正常显示content中描述的HTML文件。假设程序在8080端口监听。

   现在你基本上明白了整个工作过程,我们可以把代码写得更全面一点,例如根据GET的URI来载入对应的文件然后回应给客户端。
其实这个很简单,只需要从initial line里解析出(很一般的字符串解析)URI字段,然后载入对应的文件即可。例如以下函数:

void http_response( SOCKET con, const char *request )
{
    
/* get the method */
    
char *token = strtok( request, " " );
    
char *uri = strtok( 0" " );
    
char file[64];
    sprintf( file, 
".%s", uri ); 

    
{
        
/* load the file content */
        FILE 
*fp = fopen( file, "rb" );
        
if( fp == 0 )
        
{
            
/* response 404 status code */
            
char response[] = "HTTP/1.1 404 NOT FOUND\r\n\r\n";
            send( con, response, strlen( response ), 
0 );
        }

        
else
        
{
            
/* response the resource */
            
/* first, load the file */
            
int file_size ;
            
char *content;
            
char response[1024];
            fseek( fp, 
0, SEEK_END );
            file_size 
= ftell( fp );
            fseek( fp, 
0, SEEK_SET );
            content 
= (char*)malloc( file_size + 1 );
            fread( content, file_size, 
1, fp );
            content[file_size] 
= 0

            sprintf( response, 
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: %d\r\n\r\n%s", file_size, content );
            send( con, response, strlen( response ), 
0 );
            free( content );
        }

    }

}
 



其他

    要将这个简易的httpd做完善,我们还需要注意很多细节。包括:对不支持的method返回501错误;对于HTTP1.1要求有Host这个
header;为了支持客户端cache,需要添加Date header;支持HEAD请求等。

    相关下载中我提供了一个完整的httpd library,纯C的代码,在其上加上一层资源载入即可实现一个简单的httpd。在这里我将
对代码做简要的说明:
    evbuffer.h/buffer.c : 取自libevent的buffer,用于缓存数据;
    klhttp-internal.h/klhttp-internal.c :主要用于处理/解析HTTP请求,以及创建回应报文;
    klhttp-netbase.h/klhttp-netbase.c :对socket api的一个简要封装,使用select模型;
    klhttp.h/klhttp.c :库的最上层,应用层主要与该层交互,这一层主要集合internal和netbase。
    test_klhttp.c :一个测试例子。

相关下载:
    klhttpd
    文中相关代码

参考资料:

http://www.w3.org/Protocols/rfc2616/rfc2616.html
http://jmarshall.com/easy/http/
http://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html

posted @ 2008-07-30 16:14 Kevin Lynx 阅读(29673) | 评论 (21)编辑 收藏

自己实现memcached客户端库

Kevin Lynx

7.21.2008

What's memcached ?

memcached是一个以key-value的形式缓存数据的缓存系统。通过将数据缓存到内存中,从而提高数据的获取速度。
memcached以key-value的形式来保存数据,你可以为你每一段数据关联一个key,然后以后可以通过这个key获取
这段数据。

memcached是一个库还是什么?memcached其实是一个单独的网络服务器程序。它的网络底层基于libevent,你可以
将其运行在网络中的一台服务器上,通过网络,在遵循memcached的协议的基础上与memcached服务器进行通信。

What do we want to wrap ?

我们需要做什么?我们只需要遵循memcached的协议(参见该文档),封装网络层的通信,让上层可以通过调用诸如
add/get之类的接口即可实现往memcached服务器缓存数据,以及取数据。上层程序员根本不知道这些数据在网络
上存在过。

这个东西,也就是memcached官方所谓的client apis。你可以使用现成的客户端库,但是你也可以将这种重造轮子
的工作当作一次网络编程的练习。it's up to you.:D

Where to start ?

很遗憾,对于windows用户而言,memcached官方没有给出一个可以执行或者可以直接F7即可得到可执行文件的下载
(如果你是vc用户)。幸运的是,已经有人做了这个转换工作。

你可以从http://jehiah.cz/projects/memcached-win32/这里下载到memcached的windows版本,包括可执行程序和
源代码。

我们直接可以运行memcached.exe来安装/开启memcached服务器,具体步骤在以上页面有所提及:

安装:memcached.exe -d install,这会在windows服务里添加一个memcached服务
运行:memcached.exe 
-d start,你也可以通过windows的服务管理运行。

   
然后,你可以在任务管理器里看到一个'memcached'的进程,很占内存,因为这是memcached。

So, here we go ...

通过以上步骤运行的memcached,默认在11211端口监听(是个TCP连接,可以通过netstat查看)。接下来,我们就可
以connect到该端口上,然后send/recv数据了。发送/接收数据只要遵循memcached的协议格式,一切都很简单。

使用最简单的阻塞socket连接memcached服务器:

       SOCKET s = socket( AF_INET, SOCK_STREAM, 0 );
        
struct sockaddr_in addr;
        memset( 
&addr, 0sizeof( addr ) );
        addr.sin_family 
= AF_INET;
        addr.sin_port 
= htons( 11211 );
        addr.sin_addr.s_addr 
= inet_addr( "127.0.0.1" ); 

        ret 
= connect( s, (struct sockaddr*&addr, sizeof( addr ) );
        
if( ret == 0 )
        
{
            printf( 
"connect ok\n" );
        }
 

       
About the protocol

简单地提一下memcached的协议。

可以说,memcached的协议是基于行的协议,因为无论是客户端请求还是服务器端应答,都是以"\r\n"作为结束符。
memcached的协议将数据(send/recv操作的数据)分为两种类型:命令和用户数据。

命令用于服务器和客户端进行交互;而用户数据,很显然,就是用户想要缓存的数据。

关于用户数据,你只需要将其编码成字节流(说白了,只要send函数允许即可),并附带数据结束标志"\r\n"发送即可。

关于命令,memcached分为如下几种命令:存储数据、删除数据、取出数据、其他一些获取信息的命令。其实你换个角度
想想,memcached主要就是将数据存储到内存里,所以命令也多不了哪去,基本就停留在add/get/del上。

关于key,memcached用key来标识数据,每一个key都是一个不超过255个字符的字符串。

到这里,你可以发现memcached对于数据的存储方式(暴露给上层)多少有点像std::map,如果你愿意,你可以将客户端
API封装成map形式。= =#

具体实现

接下来可以看看具体的实现了。

首先看看存储数据命令,存储数据命令有:add/set/replace/append/prepend/cas。存储命令的格式为:
<command name> <key> <flags> <exptime> <bytes> [noreply]\r\n
具体字段的含义参看protocol.txt文件,这里我对set举例,如下代码,阻塞发送即可:

        char cmd[256] ;
        
char data[] = "test data";
        sprintf( cmd, 
"set TestKey 0 0 %d\r\n", strlen( data ) );
        ret 
= send( s, cmd, strlen( cmd ), 0 ); 


注意:noreply选项对于有些memcached版本并不被支持,例如我们使用的1.2.2版本。注意官方的changelog即可。

当你发送了存储命令后,memcached会等待客户端发送数据块。所以我们继续发送数据块:

 

        ret = send( s, data, strlen( data ), 0 );
        ret 
= send( s, "\r\n"20 ); // 数据结束符

 

然后,正常的话,memcached服务器会返回应答信息给客户端。

 

        char reply[256];
        ret 
= recv( s, reply, sizeof( reply ) - 10 );
        reply[ret] 
= 0;
        printf( 
"server reply : %s\n", reply ); 

 

如果存储成功,服务器会返回STORED字符串。memcached所有应答信息都是以字符串的形式给出的。所以可以直接printf出来。

关于其他的操作,我就不在这里列举例子了。我提供了我封装的memcached客户端库的完整代码下载,使用的是阻塞socket,
对应着memcached的协议看,很容易看懂的。

It's a story about a programmer...

最近发觉自己有点极端,要么写纯C的代码,要么写满是template的泛型代码。

 

相关代码下载

posted @ 2008-07-21 15:54 Kevin Lynx 阅读(6265) | 评论 (5)编辑 收藏

libevent 源码分析:min_heap带来的超时机制

author : Kevin Lynx

什么是min heap ?

    首先看什么是heap,heap是这样一种数据结构:1.它首先是一棵完成二叉树;2.父亲节点始终大于(或其他逻辑关系
)其孩子节点。根据父亲节点与孩子节点的这种逻辑关系,我们将heap分类,如果父亲节点小于孩子节点,那么这个heap
就是min heap。
    就我目前所看到的实现代码来看,heap基本上都是用数组(或者其他的连续存储空间)作为其存储结构的。这可以保证
数组第一个元素就是heap的根节点。这是一个非常重要的特性,它可以保证我们在取heap的最小元素时,其算法复杂度为
O(1)。
    原谅我的教科书式说教,文章后我会提供我所查阅的比较有用的关于heap有用的资料。

What Can It Do?
    libevent中的min_heap其实不算做真正的MIN heap。它的节点值其实是一个时间值。libevent总是保证时间最晚的节
点为根节点。
    libevent用这个数据结构来实现IO事件的超时控制。当某个事件(libevent中的struct event)被添加时(event_add),
libevent将此事件按照其超时时间(由用户设置)保存在min_heap里。然后libevent会定期地去检查这个min_heap,从而实现
了超时机制。

实现

    min_heap相关源码主要集中在min_heap.h以及超时相关的event.c中。
    首先看下min_heap的结构体定义:

typedef struct min_heap
{
    
struct event** p;
    unsigned n, a;
}
 min_heap_t;


    p指向了一个动态分配的数组(随便你怎么说,反正它是一个由realloc分配的连续内存空间),数组元素为event*,这也是
heap中的节点类型。这里libevent使用连续空间去保存heap,也就是保存一棵树。因为heap是完成树,所以可以保证其元素在
数组中是连续的。n表示目前保存了多少个元素,a表示p指向的内存的尺寸。
    struct event这个结构体定义了很多东西,但是我们只关注两个成员:min_heap_idx:表示该event保存在min_heap数组中
的索引,初始为-1;ev_timeout:该event的超时时间,将被用于heap操作中的节点值比较。

    接下来看几个与堆操作不大相关的函数:
    min_heap_elem_greater:比较两个event的超时值大小。
    min_heap_size:返回heap元素值数量。
    min_heap_reserve:调整内存空间大小,也就是调整p指向的内存区域大小。凡是涉及到内存大小调整的,都有一个策略问
题,这里采用的是初始大小为8,每次增大空间时以2倍的速度增加。
    看几个核心的:真正涉及到heap数据结构操作的函数:往堆里插入元素、从堆里取出元素:
    相关函数为:min_heap_push、min_heap_pop、min_heap_erase、min_heap_shift_up_、min_heap_shift_down_。

heap的核心操作:

- 往堆里插入元素:
    往堆里插入元素相对而言比较简单,如图所示,每一次插入时都从树的最右最下(也就是叶子节点)开始。然后比较即将插入
的节点值与该节点的父亲节点的值,如果小于父亲节点的话(不用在意这里的比较规则,上下文一致即可),那么交换两个节点,
将新的父亲节点与其新的父亲节点继续比较。重复这个过程,直到比较到根节点。

  heap_add
    libevent实现这个过程的函数主要是min_heap_shift_up_。每一次min_heap_push时,首先检查存储空间是否足够,然后直接
调用min_heap_shift_up_插入。主要代码如下:

void min_heap_shift_up_(min_heap_t* s, unsigned hole_index, struct event* e)
{
    
/* 获取父节点 */
    unsigned parent 
= (hole_index - 1/ 2;
    
/* 只要父节点还大于子节点就循环 */
    
while(hole_index && min_heap_elem_greater(s->p[parent], e))
    
{
        
/* 交换位置 */
        (s
->p[hole_index] = s->p[parent])->min_heap_idx = hole_index;
        hole_index 
= parent;
        parent 
= (hole_index - 1/ 2;
    }

    (s
->p[hole_index] = e)->min_heap_idx = hole_index;



- 从堆里取元素:

    大部分时候,从堆里取元素只局限于取根节点,因为这个节点是最有用的。对于数组存储结构而言,数组第一个元素即为根
节点。取完元素后,我们还需要重新调整整棵树以使其依然为一个heap。
    这需要保证两点:1.依然是完成树;2.父亲节点依然小于孩子节点。
    在具体实现heap的取元素操作时,具体到代码层次,方法都是有那么点微小差别的。libvent里的操作过程大致如图所示(实际上libevent中父节点的时间值小于子节点的时间值,时间值的比较通过evutil_timercmp实现):

heap_remove
    主要过程分为两步:
    1.比较左右孩子节点,选择最大的节点,移到父亲节点上;按照这种方式处理被选择的孩子节点,直到没有孩子节点为止。例如,
    当移除了100这个节点后,我们在100的孩子节点19和36两个节点里选择较大节点,即36,将36放置到100处;然后选择原来的36的
左右孩子25和1,选25放置于原来的36处;
    2.按照以上方式处理后,树会出现一个空缺,例如原先的25节点,因为被移动到原先的36处,25处就空缺了。因此,为了保证完
成性,就将最右最下的叶子节点(也就是连续存储结构中最后一个元素),例如这里的7,移动到空缺处,然后按照插入元素的方式处理
新插入的节点7。
    完成整个过程。

    libevent完成这个过程的函数主要是min_heap_shift_down_:

/* hole_index 为取出的元素的位置,e为最右最下的元素值 */
void min_heap_shift_down_(min_heap_t* s, unsigned hole_index, struct event* e)
{
    
/* 取得hole_index的右孩子节点索引 */
    unsigned min_child 
= 2 * (hole_index + 1);
    
while(min_child <= s->n)
    
{
        
/* 有点恶心的一个表达式,目的就是取两个孩子节点中较大的那个孩子索引 */
        min_child 
-= min_child == s->|| min_heap_elem_greater(s->p[min_child], s->p[min_child - 1]);
        
/* 找到了位置,这里似乎是个优化技巧,不知道具体原理 */
        
if(!(min_heap_elem_greater(e, s->p[min_child])))
            
break;
        
/* 换位置 */
        (s
->p[hole_index] = s->p[min_child])->min_heap_idx = hole_index;
        
/* 重复这个过程 */
        hole_index 
= min_child;
        min_child 
= 2 * (hole_index + 1);
    }

    
/* 执行第二步过程,将最右最下的节点插到空缺处 */
    min_heap_shift_up_(s, hole_index,  e);
}
 


STL中的heap

值得一提的是,STL中提供了heap的相关操作算法,借助于模板的泛化特性,其适用范围非常广泛。相关函数为:
make_heap, pop_heap, sort_heap, is_heap, sort 。其实现原理同以上算法差不多,相关代码在algorithm里。SGI的
STL在stl_heap.h里。

参考资料:

What is a heap?

Heap_(data_structure)

Heap Remove

posted @ 2008-07-18 15:56 Kevin Lynx 阅读(6553) | 评论 (6)编辑 收藏

libevent 源码分析:evbuffer缓冲(附带libevent vs2005完整包下载)

Author : Kevin Lynx

前言

    可以说对于任何网络库(模块)而言,一个缓冲模块都是必不可少的。缓冲模块主要用于缓冲从网络接收到的数据,以及
用户提交的数据(用于发送)。很多时候,我们还需要将网络模块层(非TCP层)的这些缓冲数据拷贝到用户层,而这些内存拷贝
都会消耗时间。
    在这里,我简要分析下libevent的相关代码(event.h和buffer.c)。

结构

    关于libevent的缓冲模块,主要就是围绕evbuffer结构体展开。先看下evbuffer的定义:

/*event.h*/
struct evbuffer {
    u_char 
*buffer;
    u_char 
*orig_buffer; 

    size_t misalign;
    size_t totallen;
    size_t off; 

    
void (*cb)(struct evbuffer *, size_t, size_t, void *);
    
void *cbarg;
}
;


    libevent的缓冲是一个连续的内存区域,其处理数据的方式(写数据和读数据)更像一个队列操作方式:从后写入,从前
读出。evbuffer分别设置相关指针(一个指标)用于指示读出位置和写入位置。其大致结构如图:

evbuffer_str
    orig_buffer指向由realloc分配的连续内存区域,buffer指向有效数据的内存区域,totallen表示orig_buffer指向的内存
区域的大小,misalign表示buffer相对于orig_buffer的偏移,off表示有效数据的长度。

实际运作

    这里我将结合具体的代码分析libevent是如何操作上面那个队列式的evbuffer的,先看一些辅助函数:

evbuffer_drain:
    该函数主要操作一些指标,当每次从evbuffer里读取数据时,libevent便会将buffer指针后移,同时增大misalign,减小off,
而该函数正是做这件事的。说白了,该函数就是用于调整缓冲队列的前向指标。

evbuffer_expand:
    该函数用于扩充evbuffer的容量。每次向evbuffer写数据时,都是将数据写到buffer+off后,buffer到buffer+off之间已被
使用,保存的是有效数据,而orig_buffer和buffer之间则是因为读取数据移动指标而形成的无效区域。
    evbuffer_expand的扩充策略在于,首先判断如果让出orig_buffer和buffer之间的空闲区域是否可以容纳添加的数据,如果
可以,则移动buffer和buffer+off之间的数据到orig_buffer和orig_buffer+off之间(有可能发生内存重叠,所以这里移动调用的
是memmove),然后把新的数据拷贝到orig_buffer+off之后;如果不可以容纳,那么重新分配更大的空间(realloc),同样会移动
数据。
    扩充内存的策略为:确保新的内存区域最小尺寸为256,且以乘以2的方式逐步扩大(256、512、1024、...)。

    了解了以上两个函数,看其他函数就比较简单了。可以看看具体的读数据和写数据:

evbuffer_add:
    该函数用于添加一段用户数据到evbuffer中。很简单,就是先判断是否有足够的空闲内存,如果没有则调用evbuffer_expand
扩充之,然后直接memcpy,更新off指标。

evbuffer_remove:
    该函数用于将evbuffer中的数据复制给用户空间(读数据)。简单地将数据memcpy,然后调用evbuffer_drain移动相关指标。

其他

    回过头看看libevent的evbuffer其实是非常简单的(跟我那个kl_net里的buffer一样),不知道其他人有没有更优的缓冲管理
方案。evbuffer还提供了两个函数:evbuffer_write和evbuffer_read,用于直接在套接字(其他文件描述符)上写/读数据。

    另外,关于libevent,因为官方提供的VC工程文件有问题,很多人在windows下编译不过。金庆曾提供过一种方法。其实主要
就是修改event-config.h文件,修改编译相关配置。这里我也提供一个解决步骤,顺便提供完整包下载:

1. vs2005打开libevent.dsw,转换四个工程(event_test, libevent, signal_test, time_test)
2. 删除libevent项目中所有的文件,重新添加文件,文件列表如下:
   buffer.c
   evbuffer.c
   evdns.c
   evdns.h
   event.c
   event.h
   event_tagging.c
   event-config.h
   event-internal.h
   evhttp.h
   evrpc.h
   evrpc.c
   evrpc-internal.h
   evsignal.h
   evutil.c
   evutil.h
   http.c
   http-internal.h
   log.c
   log.h
   min_heap.h
   strlcpy.c
   strlcpy-internal.h
   tree.h
   win32.c
   config.h
   signal.c
3. 替换event-config.h,使用libevent-iocp中的
4. 项目设置里添加HAVE_CONFIG_H预处理宏
5. 修改win32.c中win32_init函数,加入WSAStartup函数,类似于:
       WSADATA wd;   
        int err;
        struct win32op *winop;
        size_t size;
        if( ( err = WSAStartup( MAKEWORD( 2, 2 ), &wd ) ) != 0 )
            event_err( 1, "winsock startup failed : %d", err );
6. 修改win32.c中win32_dealloc函数,在函数末尾加上WSACleanup的调用:
        WSACleanup();
6. 至此libevent编译成功;
7. 几个例子程序,只需要加入HAVE_CONFIG_H预处理宏,以及连接ws2_32.lib即可;
   (time_test需要修改time-test.c文件,即在包含event.h前包含windows.h)

libevent-1.4.5-stable-vs2005.zip下载

 

posted @ 2008-07-16 14:28 Kevin Lynx 阅读(9418) | 评论 (14)编辑 收藏

建立异步操作组件:队列和线程

6.25.2008

Kevin Lynx

引言

在一个高效的系统中,我们经常会将一些费时的操作转换为异步操作。例如往数据库中写日志。如果数据库
配置在网络上,那么往数据库中插入一些日志信息将非常慢(相对于程序其他部分)。

如何转换为异步?

将类似于以上过程转换为异步操作,一个典型的做法是:建立一个单独的数据库日志线程,一个线程安全的
队列。要写日志时,只需要往队列里放入数据,数据库日志线程则从这个队列里取数据然后完成写操作。

大致的过程类似于:

typedef safe_list<std::string> SafeList;
SafeList gLogList; 

// database thread.read log string.
unsigned int __stdcall DBThread( void *p )
{
    
whiletrue )
    
{
        
// read gLogList and write log string into the database
    }
 

    _endthreadex( 
0 );
    
return 0;
}
 

// other threads. write log string.
void wrtie_log( const std::string &log )
{
    gLogList.push_back( log );
}
 


将他们包装起来

我们很有可能会在同一个系统中多次遇到类似需要转换为异步操作的地方,如果每一次都手动去创建一个队列和一
个线程,那将会多么乏味啊!懒惰的程序员喜欢重用各种代码。所以,我自己觉得很有必要将这一切封装起来。我
们只需要封装这个队列和创建线程的繁琐细节,让应用层全部专注于具体的逻辑处理:

template <typename _NodeType>
class async_operator
{
public:
    
/// the list node type.
    typedef _NodeType node_type;
    
/// the list.
    typedef multi_list<node_type, Mutex, Semaphore> list_type;
    
/// operator signature
    typedef functor<void, TYPE_LIST1( list_type& )> operator_type;
    
/// init function signature, called when the thread starts.
    typedef functor<void> init_type;
    
/// called before the thread exits.
    typedef functor<void> release_type; 

public:
    
///
    
/// start the thread and execute the operation.
    
/// @param op the callback function operate the list node.
    
/// 

    void execute( operator_type &op, init_type &init = init_type(), release_type &release = release_type() ); 

    
///
    
/// exit the thread.It will block until the thread exited.
    
///

    void exit(); 

    
///
    
/// get the list so that you can pust list nodes.
    
///

    list_type &list(); 

private:
    list_type _list;
    operator_type _operator;
    init_type _init;
    release_type _release;
    thread _thread;
}



我利用了已有的组件:线程安全的容器multi_list、包装任意执行体的functor、线程维护类thread。那么,现在,
应用层只需要定义队列节点类型,写应用相关的回调函数(任意可被functor包装的广义函数)。(见附件例子)

之所以为这个组件加上init和release,是因为有些东西(例如COM)需要在线程启动时初始化,而在线程快结束时释放,例如对于
使用COM的应用来说,就需要在线程初始化时CoInitialize,结束时CoUninitialize。

闲说下其他东西

在本文的附件代码里,你可以获取到functor、thread、multi_list这些东西,所以我有必要提一下。

关于functor,你可以参看<实现functor - 增强型的函数指针>,基本上可以看成增强版的C回调函数;至于multi_list,基本上
是一个container adapter (套用下STL的概念),使用条件变量参与线程同步,据说效率要比简单的互斥高点;至于thread,我需要
特别说下:

thread最为重要的就是为其附加了一个windows的消息队列(只要调用PeekMessage之类的函数该队列就存在),本意是可以让其他线
程传送数据到该线程,但是目前只用于线程退出,即其他线程可以在任何时候要求该线程安全地退出(该线程没有阻塞的情况下,
阻塞时获取不到消息)。我不知道这个安全退出策略是否真的有必要存在,但是我讨厌看到各种撇脚的退出方法(例如设置全局标志
变量,增加额外的--没封装前---event对象之类)。

结束

不知道其他人是如何做这种异步转换操作的,在这里我只是起个抛砖引玉的作用,欢迎大家提出意见。

 

例子下载

posted @ 2008-06-25 15:47 Kevin Lynx 阅读(4998) | 评论 (29)编辑 收藏

IOCP与线程

author : Kevin Lynx

 

什么是完成包?

完成包,即IO Completion Packet,是指异步IO操作完毕后OS提交给应用层的通知包。IOCP维护了一个IO操作结果队列,里面
保存着各种完成包。应用层调用GQCS(也就是GetQueueCompletionStatus)函数获取这些完成包。

最大并发线程数

在一个典型的IOCP程序里,会有一些线程调用GQCS去获取IO操作结果。最大并发线程数指定在同一时刻处理完成包的线程数目。
该参数在调用CreateIoCompletionPort时由NumberOfConcurrentThreads指定。

工作者线程

工作者线程一般指的就是调用GQCS函数的线程。要注意的是,工作者线程数和最大并发线程数并不是同一回事(见下文)。工作者
线程由应用层显示创建(_beginthreadex 之类)。工作者线程通常是一个循环,会不断地GQCS到完成包,然后处理完成包。

调度过程

工作者线程以是否阻塞分为两种状态:运行状态和等待状态。当线程做一些阻塞操作时(线程同步,甚至GQCS空的完成队列),线程
处于等待状态;否则,线程处于运行状态。

另一方面,OS会始终保持某一时刻处于运行状态的线程数小于最大并发线程数。每一个调用GQCS函数的线程OS实际上都会进行记录,
当完成队列里有完成包时,OS会首先检查当前处于运行状态的工作线程数是否小于最大并发线程数,如果小于,OS会按照LIFO的顺
序让某个工作者线程从GQCS返回(此工作者线程转换为运行状态)。如何决定这个LIFO?这是简单地通过调用GQCS函数的顺序决定的。

从这里可以看出,这里涉及到线程唤醒和睡眠的操作。如果两个线程被放置于同一个CPU上,就会有线程切换的开销。因此,为了消
除这个开销,最大并发线程数被建议为设置成CPU数量。

从以上调度过程还可以看出,如果某个处于运行状态的工作者线程在处理完成包时阻塞了(例如线程同步、其他IO操作),那么就有
CPU资源处于空闲状态。因此,我们也看到很多文档里建议,工作者线程数为(CPU数*2+2)。

在一个等待线程转换到运行状态时,有可能会出现短暂的时间运行线程数超过最大并发线程数,这个时候OS会迅速地让这个新转换
的线程阻塞,从而减少这个数量。(关于这个观点,MSDN上只说:by not allowing any new active threads,却没说明not allowing
what)

调度原理

这个知道了其实没什么意义,都是内核做的事,大致上都是操作线程control block,直接摘录<Inside IO Completion Ports>:

The list of threads hangs off the queue object. A thread's control block data structure has a pointer in it that
references the queue object of a queue that it is associated with; if the pointer is NULL then the thread is not
associated with a queue.

So how does NT keep track of threads that become inactive because they block on something other than the completion
port" The answer lies in the queue pointer in a thread's control block. The scheduler routines that are executed
in response to a thread blocking (KeWaitForSingleObject, KeDelayExecutionThread, etc.) check the thread's queue
pointer and if its not NULL they will call KiActivateWaiterQueue, a queue-related function. KiActivateWaiterQueue
decrements the count of active threads associated with the queue, and if the result is less than the maximum and
there is at least one completion packet in the queue then the thread at the front of the queue's thread list is
woken and given the oldest packet. Conversely, whenever a thread that is associated with a queue wakes up after
blocking the scheduler executes the function KiUnwaitThread, which increments the queue's active count.

参考资料

<Inside I/O Completion Ports>:
http://technet.microsoft.com/en-us/sysinternals/bb963891.aspx
<I/O Completion Ports>:
http://msdn.microsoft.com/en-us/library/aa365198(VS.85).aspx
<INFO: Design Issues When Using IOCP in a Winsock Server>:
http://support.microsoft.com/kb/192800/en-us/

posted @ 2008-06-23 17:32 Kevin Lynx 阅读(4794) | 评论 (3)编辑 收藏

SGI STL的内存池

stl中各种容器都有一个可选的模板参数:allocator,也就是一个负责内存分配的组件。STL标准规定的allcator
被定义在memory文件中。STL标准规定的allocator只是单纯地封装operator new,效率上有点过意不去。

SGI实现的STL里,所有的容器都使用SGI自己定义的allocator。这个allocator实现了一个small object的内存池。
Loki里为了处理小对象的内存分配,也实现了类似的内存管理机制。

该内存池大致上,就是一大块一大块地从系统获取内存,然后将其分成很多小块以链表的形式链接起来。其内部
有很多不同类型的链表,不同的链表维护不同大小的内存块。每一次客户端要求分配内存时,allcator就根据请求
的大小找到相应的链表(最接近的尺寸),然后从链表里取出内存。当客户端归还内存时,allocator就将这块内存
放回到对应的链表里。

我简单地画了幅图表示整个结构:

allocator

allocator内部维护一个链表数组,数组元素全部是链表头指针。链表A每一个节点维护一个8bytes的内存块,链表
B每一个节点维护一个16bytes的内存块。

当客户端请求分配10bytes的内存时,allocator将10调整为最接近的16bytes(只能大于10bytes),然后发现16bytes
这个链表(链表B)里有可用内存块,于是从B里取出一块内存返回。当客户端归还时,allocator找到对应的链表,将
内存重新放回链表B即可。

大致过程就这么简单,也许有人要说用链表维护一块内存,链表本身就会浪费一些内存(在我很早前接触内存池时,
总会看到类似的论点= =|),其实通过一些简单的技巧是完全可以避免的。例如,这里allocator维护了很多内存块,
反正这些内存本身就是闲置的,因此我们就可以直接在这些内存里记录链表的信息(下一个元素)。

还是写点代码详细说下这个小技巧:

   

struct Obj
    
{
        Obj 
*next;
    }


    
void *mem = malloc( 100 );
    Obj 
*header = (Obj*) mem;
    Obj 
*cur_obj = header;
    Obj 
*next_obj = cur_obj;
    
forint i = 0; ; ++ i )
    
{
        cur_obj 
= next_obj;
        next_obj 
= (Obj*)((char*)next_obj + 10 );
        
if( i == 9 )
        
{
            cur_obj
->next = 0;
            
break;
        }

        
else
        
{
            cur_obj
->next = next_obj;
        }

    }

    free( mem );

 

这样,通过header指针和next域,就可以逐块(这里是10byts)地访问mem所指向的内存,而这些链表的节点,都
是直接保存在这块内存里的,所以完全没有额外消耗。

我用C模仿着SGI的这个allocator写了个可配置的内存池,在其上按照STL的标准包装了一个allocator,可以直接
用于VC自带的STL里。
测试代码稍微测试了下,发现在不同的机器上有明显的差距。

posted @ 2008-06-12 21:26 Kevin Lynx 阅读(7977) | 评论 (10)编辑 收藏

Proactor和Reactor模式_继续并发系统设计的扫盲

6.6.2008

Kevin Lynx

Proactor和Reactor都是并发编程中的设计模式。在我看来,他们都是用于派发/分离IO操作事件的。这里所谓的
IO事件也就是诸如read/write的IO操作。"派发/分离"就是将单独的IO事件通知到上层模块。两个模式不同的地方
在于,Proactor用于异步IO,而Reactor用于同步IO。

摘抄一些关键的东西:

"
Two patterns that involve event demultiplexors are called Reactor and Proactor [1]. The Reactor patterns
involve synchronous I/O, whereas the Proactor pattern involves asynchronous I/O.
"

关于两个模式的大致模型,从以下文字基本可以明白:

"
An example will help you understand the difference between Reactor and Proactor. We will focus on the read
operation here, as the write implementation is similar. Here's a read in Reactor:

* An event handler declares interest in I/O events that indicate readiness for read on a particular socket ;
* The event demultiplexor waits for events ;
* An event comes in and wakes-up the demultiplexor, and the demultiplexor calls the appropriate handler;
* The event handler performs the actual read operation, handles the data read, declares renewed interest in
  I/O events, and returns control to the dispatcher .

By comparison, here is a read operation in Proactor (true async):

* A handler initiates an asynchronous read operation (note: the OS must support asynchronous I/O). In this
  case, the handler does not care about I/O readiness events, but is instead registers interest in receiving
  completion events;
* The event demultiplexor waits until the operation is completed ;
* While the event demultiplexor waits, the OS executes the read operation in a parallel kernel thread, puts
  data into a user-defined buffer, and notifies the event demultiplexor that the read is complete ;
* The event demultiplexor calls the appropriate handler;
* The event handler handles the data from user defined buffer, starts a new asynchronous operation, and returns
  control to the event demultiplexor.

"

可以看出,两个模式的相同点,都是对某个IO事件的事件通知(即告诉某个模块,这个IO操作可以进行或已经完成)。在结构
上,两者也有相同点:demultiplexor负责提交IO操作(异步)、查询设备是否可操作(同步),然后当条件满足时,就回调handler。
不同点在于,异步情况下(Proactor),当回调handler时,表示IO操作已经完成;同步情况下(Reactor),回调handler时,表示
IO设备可以进行某个操作(can read or can write),handler这个时候开始提交操作。

用select模型写个简单的reactor,大致为:

///
class handler
{
public:
    
virtual void onRead() = 0;
    
virtual void onWrite() = 0;
    
virtual void onAccept() = 0;
}


class dispatch
{
public:
    
void poll()
    
{
        
// add fd in the set.
        
//
        
// poll every fd
        int c = select( 0&read_fd, &write_fd, 00 );
        
if( c > 0 )
        
{
            
for each fd in the read_fd_set
            
{    if fd can read
                    _handler
->onRead();
                
if fd can accept
                    _handler
->onAccept();
            }
 

            
for each fd in the write_fd_set
            
{
                
if fd can write
                    _handler
->onWrite();
            }

        }

    }
 

    
void setHandler( handler *_h )
    
{
        _handler 
= _h;
    }
 

private:
    handler 
*_handler;
}


/// application
class MyHandler : public handler
{
public:
    
void onRead()
    
{
    }
 

    
void onWrite()
    
{
    }
 

    
void onAccept()
    
{
    }

}



在网上找了份Proactor模式比较正式的文档,其给出了一个总体的UML类图,比较全面:

proactor_uml

根据这份图我随便写了个例子代码:

class AsyIOProcessor
{
public:
    
void do_read()
    
{
        
//send read operation to OS
        
// read io finished.and dispatch notification
        _proactor->dispatch_read();
    }
 

private:
    Proactor 
*_proactor;
}


class Proactor
{
public:
    
void dispatch_read()
    
{
        _handlerMgr
->onRead();
    }
 

private:
    HandlerManager 
*_handlerMgr;
}


class HandlerManager
{
public:
    typedef std::list
<Handler*> HandlerList; 

public:
    
void onRead()
    
{
        
// notify all the handlers.
        std::for_each( _handlers.begin(), _handlers.end(), onRead );
    }
 

private:
    HandlerList 
*_handlers;
}


class Handler
{
public:
    
virtual void onRead() = 0;
}


// application level handler.
class MyHandler : public Handler
{
public:
    
void onRead() 
    
{
        
// 
    }

}



Reactor通过某种变形,可以将其改装为Proactor,在某些不支持异步IO的系统上,也可以隐藏底层的实现,利于编写跨平台
代码。我们只需要在dispatch(也就是demultiplexor)中封装同步IO操作的代码,在上层,用户提交自己的缓冲区到这一层,
这一层检查到设备可操作时,不像原来立即回调handler,而是开始IO操作,然后将操作结果放到用户缓冲区(读),然后再
回调handler。这样,对于上层handler而言,就像是proactor一样。详细技法参见这篇文章

其实就设计模式而言,我个人觉得某个模式其实是没有完全固定的结构的。不能说某个模式里就肯定会有某个类,类之间的
关系就肯定是这样。在实际写程序过程中也很少去特别地实现某个模式,只能说模式会给你更多更好的架构方案。

最近在看spserver的代码,看到别人提各种并发系统中的模式,有点眼红,于是才来扫扫盲。知道什么是leader follower模式
reactor, proactor,multiplexing,对于心中的那个网络库也越来越清晰。

最近还干了些离谱的事,写了传说中的字节流编码,用模板的方式实现,不但保持了扩展性,还少写很多代码;处于效率考虑,
写了个static array容器(其实就是template <typename _Tp, std::size_t size> class static_array { _Tp _con[size]),
加了iterator,遵循STL标准,可以结合进STL的各个generic algorithm用,自我感觉不错。基础模块搭建完毕,解析了公司
服务器网络模块的消息,我是不是真的打算用自己的网络模块重写我的验证服务器?在另一个给公司写的工具里,因为实在厌恶
越来越多的重复代码,索性写了几个宏,还真的做到了代码的自动生成:D。

对优雅代码的追求真的成了种癖好.  = =|

posted @ 2008-06-06 13:25 Kevin Lynx 阅读(29408) | 评论 (7)编辑 收藏

仅列出标题
共12页: First 4 5 6 7 8 9 10 11 12