loop_in_codes

低调做技术__欢迎移步我的独立博客 codemaro.com

2013年8月15日 #

记一次堆栈平衡错误

最近在一个使用Visual Studio开发的C++程序中,出现了如下错误:

Run-Time Check Failure #0 - The value of ESP was not properly saved across a function call. This is usually a result of calling a function declared with one calling convention with a function pointer declared with a different calling convention.

这个错误主要指的就是函数调用堆栈不平衡。在C/C++程序中,调用一个函数前会保存当前堆栈信息,目标函数返回后会把堆栈恢复到调用前的状态。函数的参数、局部变量会影响堆栈。而函数堆栈不平衡,一般是因为函数调用方式和目标函数定义方式不一致导致,例如:

void __stdcall func(int a) {
}

int main(int argc, char* argv[]) {
    typedef void (*funcptr)(int);
    funcptr ptr = (funcptr) func;
    ptr(1); // 返回后导致堆栈不平衡
    return 0;
}

__stdcall修饰的函数,其函数参数的出栈由被调用者自己完成,而__cdecl,也就是C/C++函数的默认调用约定,则是调用者完成参数出栈。

Visual Studio在debug模式下会在我们的代码中加入不少检查代码,例如以上代码对应的汇编中,就会增加一个检查堆栈是否平衡的函数调用,当出现问题时,就会出现提示Run-Time Check Failure...这样的错误对话框:

call dword ptr [ptr]  ; ptr(1)
add  esp,4  ; cdecl方式,调用者清除参数
cmp  esi,esp  
call @ILT+1345(__RTC_CheckEsp) (0B01546h) ; 检查堆栈是否平衡

但是我们的程序不是这种低级错误。我们调用的函数是放在dll中的,调用约定显示定义为__stdcall,函数声明和实现一致。大致的结构如下:

IParser *parser = CreateParser();
parser->Begin();
...
...
parser->End();
parser->Release(); // 返回后导致堆栈不平衡

IParser的实现在一个dll里,这反而是一个误导人的信息。parser->Release返回后,堆栈不平衡,并且仅仅少了一个字节。一个字节怎么来的?

解决这个问题主要的手段就是跟反汇编,在关键位置查看寄存器和堆栈的内容。编译器生成的代码是正确的,而我们自己的代码乍看上去也没问题。最后甚至使用最傻逼的调试手段–逐行语句注释查错。

具体查错过程就不细说了。解决问题往往需要更多的冷静,和清晰的思路。最终我使用的方法是,在进入Release之前记录堆栈指针的值,堆栈指针的值会被压入堆栈,以在函数返回后从堆栈弹出,恢复堆栈指针。Release的实现很简单,就是删除一个Parser这个对象,但这个对象的析构会导致很多其他对象被析构。我就逐层地检查,是在哪个函数里改变了堆栈里的内容。

理论上,函数本身是操作不到调用者的堆栈的。而现在看来,确实是被调用函数,也就是Release改写了调用者的堆栈内容。要改变堆栈的内容,只有通过局部变量的地址才能做到。

最终,我发现在调用完以下函数后,我跟踪的堆栈地址内容发生了改变:

call llvm::RefCountedBase<clang::TargetOptions>::Release (10331117h)

因为注意到TargetOptions这个字眼,想起了在parser->Begin里有涉及到这个类的使用,类似于:

TargetOptions TO;
...
TargetInfo *TI = TargetInfo::CreateTargetInfo(m_inst.getDiagnostics(), TO);

这部分初始化代码,是直接从网上复制的,因为并不影响主要逻辑,所以从来没对这块代码深究。查看CreateTargetInfo的源码,发现这个函数将TO这个局部变量的地址保存了下来

而在Release中,则会对这个保存的临时变量进行删除操作,形如:

void Delete() const {
  assert (ref_cnt > 0 && "Reference count is already zero.");
  if (--ref_cnt == 0) delete static_cast<const Derived*>(this);
}

但是,问题并不在于对一个局部变量地址进行deletedelete在调试模式下是做了内存检测的,那会导致一种断言。

TargetOptions包含了ref_cnt这个成员。当出了Begin作用域后,parser保存的TargetOptions的地址,指向的内容(堆栈)发生了改变,也就是ref_cnt这个成员变量的值不再正常。由于一些巧合,主要是代码中各个局部变量、函数调用顺序、函数参数个数(曾尝试去除Begin的参数,可以避免错误提示),导致在调用Release前堆栈指针恰好等于之前保存的TargetOptions的地址。注意,之前保存的TargetOptions的地址,和调用Release前的堆栈指针值相同了。

而在TargetOptionsDelete函数中,进行了--ref_cnt,这个变量是TargetOptions的第一个成员,它的减1,也就导致了堆栈内容的改变。

至此,整个来龙去脉算是摸清。

posted @ 2013-08-15 23:01 Kevin Lynx 阅读(1589) | 评论 (1)编辑 收藏

2013年8月8日 #

Dhtcrawler2换用sphinx搜索

dhtcrawler2最开始使用mongodb自带的全文搜索引擎搜索资源。搜索一些短关键字时很容易导致erlang进程call timeout,也就是查询时间太长。对于像avi这种关键字,搜索时间长达十几秒。搜索的资源数量200万左右。这其中大部分资源只是对root文件名进行了索引,即对于多文件资源而言没有索引单个文件名。索引方式有部分资源是按照字符串子串的形式,没有拆词,非常占用存储空间;有部分是使用了rmmseg(我编译了rmmseg-cpp作为erlang nif库调用 erl-rmmseg)进行了拆词,占用空间小了很多,但由于词库问题很多片里的词汇没拆出来。

很早以前我以为搜索耗时的原因是因为数据库太忙,想部署个mongodb集群出来。后来发现数据库没有任何读写的状态下,查询依然慢。终于只好放弃mongodb自带的文本搜索。于是我改用sphinx。简单起见,我直接下载了coreseek4.1(sphinx的一个支持中文拆词的包装)。

现在,已经导入了200多万的资源进sphinx,并且索引了所有文件名,索引文件达800M。对于avi关键字的搜索大概消耗0.2秒的时间。搜索试试

以下记录下sphinx在dhtcrawler的应用

sphinx简介

sphinx包含两个主要的程序:indexer和searchd。indexer用于建立文本内容的索引,然后searchd基于这些索引提供文本搜索功能,而要使用该功能,可以遵循searchd的网络协议连接searchd这个服务来使用。

indexer可以通过多种方式来获取这些文本内容,文本内容的来源称为数据源。sphinx内置mysql这种数据源,意思是可以直接从mysql数据库中取得数据。sphinx还支持xmlpipe2这种数据源,其数据以xml格式提供给indexer。要导入mongodb数据库里的内容,可以选择使用xmlpipe2这种方式。

sphinx document

xmlpipe2数据源需要按照以下格式提交:

<sphinx:docset>
    <sphinx:schema>
        <sphinx:field name="subject"/>
        <sphinx:field name="files"/>
        <sphinx:attr name="hash1" type="int" bits="32"/>
        <sphinx:attr name="hash2" type="int" bits="32"/>
    </sphinx:schema>
    <sphinx:document id="1">
        <subject>this is the subject</subject>
        <files>file content</files>
        <hash1>111</hash1>
    </sphinx:document>
</sphinx:docset>

该文件包含两大部分:schemadocuments,其中schema又包含两部分:fieldattr,其中由field标识的字段就会被indexer读取并全部作为输入文本建立索引,而attr则标识查询结果需要附带的信息;documents则是由一个个sphinx:document组成,即indexer真正要处理的数据。注意其中被schema引用的属性名。

document一个很重要的属性就是它的id。这个id对应于sphinx需要唯一,查询结果也会包含此id。一般情况下,此id可以直接是数据库主键,可用于查询到详细信息。searchd搜索关键字,其实可以看作为搜索这些document,搜索出来的结果也是这些document,搜索结果中主要包含schema中指定的attr。

增量索引

数据源的数据一般是变化的,新增的数据要加入到sphinx索引文件中,才能使得searchd搜索到新录入的数据。要不断地加入新数据,可以使用增量索引机制。增量索引机制中,需要一个主索引和一个次索引(delta index)。每次新增的数据都建立为次索引,然后一段时间后再合并进主索引。这个过程主要还是使用indexer和searchd程序。实际上,searchd是一个需要一直运行的服务,而indexer则是一个建立完索引就退出的工具程序。所以,这里的增量索引机制,其中涉及到的“每隔一定时间就合并”这种工作,需要自己写程序来协调(或通过其他工具)

sphinx与mongodb

上面提到,一般sphinx document的id都是使用的数据库主键,以方便查询。但mongodb中默认情况不使用数字作为主键。dhtcrawler的资源数据库使用的是资源info-hash作为主键,这无法作为sphinx document的id。一种解决办法是,将该hash按位拆分,拆分成若干个sphinx document attr支持位数的整数。例如,info-hash是一个160位的id,如果使用32位的attr(高版本的sphinx支持64位的整数),那么可以把该info-hash按位拆分成5个attr。而sphinx document id则可以使用任意数字,只要保证不冲突就行。当获得查询结果时,取得对应的attr,组合为info-hash即可。

mongodb默认的Object id也可以按这种方式拆分。

dhtcrawler2与sphinx

dhtcrawler2中我自己写了一个导入程序。该程序从mongodb中读出数据,数据到一定量时,就输出为xmlpipe2格式的xml文件,然后建立为次索引,最后合并进主索引。过程很简单,包含两次启动外部进程的工作,这个可以通过erlang中os:cmd完成。

值得注意的是,在从mongodb中读数据时,使用skip基本是不靠谱的,skip 100万个数据需要好几分钟,为了不增加额外的索引字段,我只好在created_at字段上加索引,然后按时间段来读取资源,这一切都是为了支持程序关闭重启后,可以继续上次工作,而不是重头再来。200万的数据,已经处理了好几天了。

后头数据建立好了,需要在前台展示出来。erlang中似乎只有一个sphinx客户端库:giza。这个库有点老,写成的时候貌似还在使用sphinx0.9版本。其中查询代码包含了版本判定,已经无法在我使用的sphinx2.x版本中使用。无奈之下我只好修改了这个库的源码,幸运的是查询功能居然是正常的,意味着sphinx若干个版本了也没改动通信协议?后来,我为了取得查询的统计信息,例如消耗时间以及总结果,我再一次修改了giza的源码。新的版本可以在我的github上找到:my giza,看起来我没侵犯版本协议吧?

目前dhtcrawler的搜索,先是基于sphinx搜索出hash列表,然后再去mongodb中搜索hash对应的资源。事实上,可以为sphinx的document直接附加这些资源的描述信息,就可以避免去数据库查询。但我想,这样会增加sphinx索引文件的大小,担心会影响搜索速度。实际测试时,发现数据库查询有时候还真的很消耗时间,尽管我做了分页,以使得单页仅对数据库进行少量查询。

xml unicode

在导入xml到sphinx的索引过程中,本身我输出的内容都是unicode的,但有很多资源会导致indexer解析xml出错。出错后indexer直接停止对当前xml的处理。后来查阅资料发现是因为这些无法被indexer处理的xml内容包含unicode里的控制字符,例如 ä (U+00E4)。我的解决办法是直接过滤掉这些控制字符。unicode的控制字符参看UTF-8 encoding table and Unicode characters。在erlang中干这个事居然不复杂:

strip_invalid_unicode(<<>>) ->
    <<>>;
strip_invalid_unicode(<<C/utf8, R/binary>>) ->
    case is_valid_unicode(C) of
        true ->
            RR = strip_invalid_unicode(R),
            <<C/utf8, RR/binary>>;
        false ->
            strip_invalid_unicode(R)
    end;
strip_invalid_unicode(<<_, R/binary>>) ->
    strip_invalid_unicode(R).
    
is_valid_unicode(C) when C < 16#20 ->
    false;
is_valid_unicode(C) when C >= 16#7f, C =< 16#ff ->
    false;
is_valid_unicode(_) ->
    true.

posted @ 2013-08-08 23:04 Kevin Lynx 阅读(1358) | 评论 (0)编辑 收藏

2013年7月20日 #

磁力搜索第二版-dhtcrawler2

上篇

下载使用

目前为止dhtcrawler2相对dhtcrawler而言,数据库部分调整很大,DHT部分基本沿用之前。但单纯作为一个爬资源的程序而言,DHT部分可以进行大幅削减,这个以后再说。这个版本更快、更稳定。为了方便,我将编译好的erlang二进制文件作为git的主分支,我还添加了一些Windows下的批处理脚本,总之基本上下载源码以后即可运行。

项目地址:https://github.com/kevinlynx/dhtcrawler2

使用方法

  • 下载erlang,我测试的是R16B版本,确保erl等程序被加入Path环境变量
  • 下载mongodb,解压即用:

      mongod --dbpath xxx --setParameter textSearchEnabled=true
    
  • 下载dhtcrawler2

      git clone https://github.com/kevinlynx/dhtcrawler2.git
    
  • 运行win_start_crawler.bat

  • 运行win_start_hash.bat
  • 运行win_start_http.bat
  • 打开localhost:8000查看stats

爬虫每次运行都会保存DHT节点状态,早期运行的时候收集速度会不够。dhtcrawler2将程序分为3部分:

  • crawler,即DHT爬虫部分,仅负责收集hash
  • hash,准确来讲叫hash reader,处理爬虫收集的hash,处理过程主要涉及到下载种子文件
  • http,使用hash处理出来的数据库,以作为Web端接口

我没有服务器,但程序有被部署在别人的服务器上:bt.cmhttp://222.175.114.126:8000/

其他工具

为了提高资源索引速度,我陆续写了一些工具,包括:

  • import_tors,用于导入本地种子文件到数据库
  • tor_cache,用于下载种子到本地,仅仅提供下载的功能,hash_reader在需要种子文件时,可以先从本地取
  • cache_indexer,目前hash_reader取种子都是从torrage.com之类的种子缓存站点取,这些站点提供了种子列表,cache_indexer将这些列表导入数据库,hash_reader在请求种子文件前可以通过该数据库检查torrage.com上有无此种子,从而减少多余的http请求

这些工具的代码都被放在dhtcrawler2中,可以查看对应的启动脚本来查看具体如何启动。

OS/Database

根据实际的测试效果来看,当收集的资源量过百万时(目前bt.cm录入近160万资源),4G内存的Windows平台,mongodb很容易就会挂掉。挂掉的原因全是1455,页面文件太小。有人建议不要在Windows下使用mongodb,Linux下我自己没做过测试。

mongodb可以部署为集群形式(replica-set),当初我想把http部分的查询放在一个只读的mongodb实例上,但因为建立集群时,要同步已有的10G数据库,而每次同步都以mongodb挂掉结束,遂放弃。在目前bt.cm的配置中,数据库torrent的锁比例(db lock)很容易上50%,这也让http在搜索时,经常出现搜索超时的情况。

技术信息

dhtcrawler最早的版本有很多问题,修复过的最大的一个问题是关于erlang定时器的,在DHT实现中,需要对每个节点每个peer做超时处理,在erlang中的做法直接是针对每个节点注册了一个定时器。这不是问题,问题在于定时器资源就像没有GC的内存资源一样,是会由于程序员的代码问题而出现资源泄漏。所以,dhtcrawler第一个版本在节点数配置在100以上的情况下,用不了多久就会内存耗尽,最终导致erlang虚拟机core dump。

除了这个问题以外,dhtcrawler的资源收录速度也不是很快。这当然跟数据库和获取种子的速度有直接关系。尤其是获取种子,使用的是一些提供info-hash到种子映射的网站,通过HTTP请求来下载种子文件。我以为通过BT协议直接下载种子会快些,并且实时性也要高很多,因为这个种子可能未被这些缓存网站收录,但却可以直接向对方请求得到。为此,我还特地翻阅了相关协议,并且用erlang实现了(以后的文章我会讲到具体实现这个协议)。

后来我怀疑get_peers的数量会不会比announce_peer多,但是理论上一般的客户端在get_peers之后都是announce_peer,但是如果get_peers查询的peers恰好不在线呢?这意味着很多资源虽然已经存在,只不过你恰好暂时请求不到。实际测试时,发现get_peers基本是announce_peer数量的10倍。

将hash的获取方式做了调整后,dhtcrawler在几分钟以内以几乎每秒上百个新增种子的速度工作。然后,程序挂掉。

从dhtcrawler到今天为止的dhtcrawler2,中间间隔了刚好1个月。我的所有业余时间全部扑在这个项目上,面临的问题一直都是程序的内存泄漏、资源收录的速度不够快,到后来又变为数据库压力过大。每一天我都以为我将会完成一个稳定版本,然后终于可以去干点别的事情,但总是干不完,目前完没完都还在观察。我始终明白在做优化前需要进行详尽的数据收集和分析,从而真正地优化到正确的点上,但也总是凭直觉和少量数据分析就开始尝试。

这里谈谈遇到的一些问题。

erlang call timeout

最开始遇到erlang中gen_server:call出现timeout错误时,我还一直以为是进程死锁了。相关代码读来读去,实在觉得不可能发生死锁。后来发现,当erlang虚拟机压力上去后,例如内存太大,但没大到耗尽系统所有内存(耗进所有内存基本就core dump了),进程间的调用就会出现timeout。

当然,内存占用过大可能只是表象。其进程过多,进程消息队列太长,也许才是导致出现timeout的根本原因。消息队列过长,也可能是由于发生了消息泄漏的缘故。消息泄漏我指的是这样一种情况,进程自己给自己发消息(当然是cast或info),这个消息被处理时又会发送相同的消息,正常情况下,gen_server处理了一个该消息,就会从消息队列里移除它,然后再发送相同的消息,这不会出问题。但是当程序逻辑出问题,每次处理该消息时,都会发生多余一个的同类消息,那消息队列自然就会一直增长。

保持进程逻辑简单,以避免这种逻辑错误。

erlang gb_trees

我在不少的地方使用了gb_trees,dht_crawler里就可能出现gb_trees:get(xxx, nil)这种错误。乍一看,我以为我真的传入了一个nil值进去。然后我苦看代码,以为在某个地方我会把这个gb_trees对象改成了nil。但事情不是这样的,gb_tress使用一个tuple作为tree的节点,当某个节点没有子节点时,就会以nil表示。

gb_trees:get(xxx, nil)类似的错误,实际指的是xxx没有在这个gb_trees中找到。

erlang httpc

dht_crawler通过http协议从torrage.com之类的缓存网站下载种子。最开始我为了尽量少依赖第三方库,使用的是erlang自带的httpc。后来发现程序有内存泄漏,google发现erlang自带的httpc早为人诟病,当然也有大神说在某个版本之后这个httpc已经很不错。为了省事,我直接换了ibrowse,替换之后正常很多。但是由于没有具体分析测试过,加之时间有点远了,我也记不太清细节。因为早期的http请求部分,没有做数量限制,也可能是由于我的使用导致的问题。

某个版本后,我才将http部分严格地与hash处理部分区分开来。相较数据库操作而言,http请求部分慢了若干数量级。在hash_reader中将这两块分开,严格限制了提交给httpc的请求数,以获得稳定性。

对于一个复杂的网络系统而言,分清哪些是耗时的哪些是不大耗时的,才可能获得性能的提升。对于hash_reader而言,处理一个hash的速度,虽然很大程度取决于数据库,但相较http请求,已经快很多。它在处理这些hash时,会将数据库已收录的资源和待下载的资源分离开,以尽快的速度处理已存在的,而将待下载的处理速度交给httpc的响应速度。

erlang httpc ssl

ibrowse处理https请求时,默认和erlang自带的httpc使用相同的SSL实现。这经常导致出现tls_connection进程挂掉的错误,具体原因不明。

erlang调试

首先合理的日志是任何系统调试的必备。

我面临的大部分问题都是内存泄漏相关,所以依赖的erlang工具也是和内存相关的:

  • 使用etop,可以检查内存占用多的进程、消息队列大的进程、CPU消耗多的进程等等:

      spawn(fun() -> etop:start([{output, text}, {interval, 10}, {lines, 20}, {sort, msg_q }]) end).
    
  • 使用erlang:system_info(allocated_areas).检查内存使用情况,其中会输出系统timer数量

  • 使用erlang:process_info查看某个具体的进程,这个甚至会输出消息队列里的消息

hash_writer/crawler

crawler本身仅收集hash,然后写入数据库,所以可以称crawler为hash_writer。这些hash里存在大量的重复。hash_reader从数据库里取出这些hash然后做处理。处理过程会首先判定该hash对应的资源是否被收录,没有收录就先通过http获取种子。

在某个版本之后,crawler会简单地预先处理这些hash。它缓存一定数量的hash,接收到新hash时,就合并到hash缓存里,以保证缓存里没有重复的hash。这个重复率经过实际数据分析,大概是50%左右,即收到的100个请求里,有50个是重复的。这样的优化,不仅会降低hash数据库的压力,hash_reader处理的hash数量少了,也会对torrent数据库有很大提升。

当然进一步的方案可以将crawler和hash_reader之间交互的这些hash直接放在内存中处理,省去中间数据库。但是由于mongodb大量使用虚拟内存的缘故(内存映射文件),经常导致服务器内存不够(4G),内存也就成了珍稀资源。当然这个方案还有个弊端是难以权衡hash缓存的管理。crawler收到hash是一个不稳定的过程,在某些时间点这些hash可能爆多,而hash_reader处理hash的速度也会不太稳定,受限于收到的hash类别(是新增资源还是已存在资源)、种子请求速度、是否有效等。

当然,也可以限制缓存大小,以及对hash_reader/crawler处理速度建立关系来解决这些问题。但另一方面,这里的优化是否对目前的系统有提升,是否是目前系统面临的最大问题,却是需要考究的事情。

cache indexer

dht_crawler是从torrage.com等网站获取种子文件,这些网站看起来都是使用了相同的接口,其都有一个sync目录,里面存放了每天每个月索引的种子hash,例如 http://torrage.com/sync/。这个网站上是否有某个hash对应的种子,就可以从这些索引中检查。

hash_reader在处理新资源时,请求种子的过程中发现大部分在这些服务器上都没有找到,也就是发起的很多http请求都是404回应,这不但降低了系统的处理能力、带宽,也降低了索引速度。所以我写了一个工具,先手工将sync目录下的所有文件下载到本地,然后通过这个工具 (cache indexer) 将这些索引文件里的hash全部导入数据库。在以后的运行过程中,该工具仅下载当天的索引文件,以更新数据库。 hash_reader 根据配置,会首先检查某个hash是否存在该数据库中,存在的hash才可能在torrage.com上下载得到。

种子缓存

hash_reader可以通过配置,将下载得到的种子保存在本地文件系统或数据库中。这可以建立自己的种子缓存,但保存在数据库中会对数据库造成压力,尤其在当前测试服务器硬件环境下;而保存为本地文件,又特别占用硬盘空间。

基于BT协议的种子下载

通过http从种子缓存里取种子文件,可能会没有直接从P2P网络里取更实时。目前还没来得及查看这些种子缓存网站的实现原理。但是通过BT协议获取种子会有点麻烦,因为dht_crawler是根据get_peer请求索引资源的,所以如果要通过BT协议取种子,那么这里还得去DHT网络里查询该种子,这个查询过程可能会较长,相比之下会没有http下载快。而如果通过announce_peer来索引新资源的话,其索引速度会大大降低,因为announce_peer请求比get_peer请求少很多,几乎10倍。

所以,这里的方案可能会结合两者,新开一个服务,建立自己的种子缓存。

中文分词

mongodb的全文索引是不支持中文的。我在之前提到,为了支持搜索中文,我将字符串拆成了若干子串。这样的后果就是字符串索引会稍稍偏大,而且目前这一块的代码还特别简单,会将很多非文字字符也算在内。后来我加了个中文分词库,使用的是rmmseg-cpp。我将其C++部分抽离出来编译成erlang nif,这可以在我的github上找到。

但是这个库拆分中文句子依赖于词库,而这个词库不太新,dhtcrawler爬到的大部分资源类型你们也懂,那些词汇拆出来的比率不太高,这会导致搜索出来的结果没你想的那么直白。当然更新词库应该是可以解决这个问题的,目前还没有时间顾这一块。

总结

一个老外对我说过,”i have 2 children to feed, so i will not do this only for fun”。

你的大部分编程知识来源于网络,所以稍稍回馈一下不会让你丢了饭碗。

我很穷,如果你能让我收获金钱和编程成就,还不会嫌我穿得太邋遢,that’s really kind of you。

posted @ 2013-07-20 16:37 Kevin Lynx 阅读(2238) | 评论 (0)编辑 收藏

2013年6月20日 #

使用erlang实现P2P磁力搜索-实现

上篇,本篇谈谈一些实现细节。

这个爬虫程序主要的问题在于如何获取P2P网络中分享的资源,获取到资源后索引到数据库中,搜索就是自然而然的事情。

DHT

DHT网络本质上是一个用于查询的网络,其用于查询一个资源有哪些计算机正在下载。每个资源都有一个20字节长度的ID用于标示,称为infohash。当一个程序作为DHT节点加入这个网络时,就会有其他节点来向你查询,当你做出回应后,对方就会记录下你。对方还会询问其他节点,当对方开始下载这个infohash对应的资源时,他就会告诉所有曾经询问过的节点,包括你。这个时候就可以确定,这个infohash对应的资源在这个网络中是有效的。

关于这个网络的工作原理,参看:P2P中DHT网络爬虫以及写了个磁力搜索的网页

获取到infohash后能做什么?关键点在于,我们现在使用的磁力链接(magnet url),是和infohash对应起来的。也就是拿到infohash,就等于拿到一个磁力链接。但是这个爬虫还需要建立资源的信息,这些信息来源于种子文件。种子文件其实也是对应到一个资源,种子文件包含资源名、描述、文件列表、文件大小等信息。获取到infohash时,其实也获取到了对应的计算机地址,我们可以在这些计算机上下载到对应的种子文件。

但是我为了简单,在获取到infohash后,从一些提供映射磁力链到种子文件服务的网站上直接下载了对应的种子。dhtcrawler里使用了以下网站:

http://torrage.com
https://zoink.it
http://bt.box.n0808.com

使用这些网站时,需提供磁力哈希(infohash可直接转换),构建特定的URL,发出HTTP请求即可。

   U1 = "http://torrage.com/torrent/" ++ MagHash ++ ".torrent",
    U2 = "https://zoink.it/torrent/" ++ MagHash ++ ".torrent",
    U3 = format_btbox_url(MagHash),

format_btbox_url(MagHash) ->
    H = lists:sublist(MagHash, 2),
    T = lists:nthtail(38, MagHash),
    "http://bt.box.n0808.com/" ++ H ++ "/" ++ T ++ "/" ++ MagHash ++ ".torrent".

但是,以一个节点的身份加入DHT网络,是无法获取大量查询的。在DHT网络中,每个节点都有一个ID。每个节点在查询信息时,仅询问离信息较近的节点。这里的信息除了infohash外还包含节点,即节点询问一个节点,这个节点在哪里。DHT的典型实现中(Kademlia),使用两个ID的xor操作来确定距离。既然距离的计算是基于ID的,为了尽可能获取整个DHT网络交换的信息,爬虫程序就可以建立尽可能多的DHT节点,让这些节点的ID均匀地分布在ID取值区间内,以这样的方式加入网络。

在dhtcrawler中,我使用以下方式产生了N个大致均匀分布的ID:

create_discrete_ids(1) ->
    [dht_id:random()];
create_discrete_ids(Count) ->
    Max = dht_id:max(),
    Piece = Max div Count,
    [random:uniform(Piece) + Index * Piece || Index <- lists:seq(0, Count - 1)].

除了尽可能多地往DHT网络里部署节点之外,对单个节点而言,也有些注意事项。例如应尽可能快地将自己告诉尽可能多的节点,这可以在启动时进行大量的随机infohash的查询。随着查询过程的深入,该节点会与更多的节点打交道。因为DHT网络里的节点实际上是不稳定的,它今天在线,明天后天可能不在线,所以计算你的ID固定,哪些节点与你较近,本身就是个相对概念。节点在程序退出时,也最好将自己的路由信息(与自己交互的节点列表)保存起来,这样下次启动时就可以更快地加入网络。

在dhtcrawler的实现中,每个节点每个一定时间,都会向网络中随机查询一个infohash,这个infohash是随机产生的。其查询目的不在于infohash,而在于告诉更多的节点,以及在其他节点上保持自己的活跃。

handle_event(startup, {MyID}) ->
    timer:apply_interval(?QUERY_INTERVAL, ?MODULE, start_tell_more_nodes, [MyID]).

start_tell_more_nodes(MyID) ->
    spawn(?MODULE, tell_more_nodes, [MyID]).

tell_more_nodes(MyID) ->
    [search:get_peers(MyID, dht_id:random()) || _ <- lists:seq(1, 3)].

DHT节点的完整实现是比较繁琐的,涉及到查询以及繁杂的各种对象的超时(节点、桶、infohash),而超时的处理并不是粗暴地做删除操作。因为本身是基于UDP协议,你得对这些超时对象做进一步的查询才能正确地进一步做其他事情。而搜索也是个繁杂的事情,递归地查询节点,感觉上,你不一定离目标越来越近,由于被查询节点的不确定性(无法确定对方是否在玩弄你,或者本身对方就是个傻逼),你很可能接下来要查询的节点反而离目标变远了。

在我第一次的DHT实现中,我使用了类似transmission里DHT实现的方法,不断无脑递归,当搜索有太久时间没得到响应后终止搜索。第二次实现时,我就使用了etorrent里的实现。这个搜索更聪明,它记录搜索过的节点,并且检查是否离目标越来越远。当远离目标时,就认为搜索是不太有效的,不太有效的搜索尝试几次就可以放弃。

实际上,爬虫的实现并不需要完整地实现DHT节点的正常功能。爬虫作为一个DHT节点的唯一动机仅是获取网络里其他节点的查询。而要完成这个功能,你只需要装得像个正常人就行。这里不需要保存infohash对应的peer列表,面临每一次查询,你随便回复几个节点地址就可以。但是这里有个责任问题,如果整个DHT网络有2000个节点,而你这个爬虫就有1000个节点,那么你的随意回复,就可能导致对方根本找不到正确的信息,这样你依然得不到有效的资源。(可以利用这一点破坏DHT网络)

DHT的实现没有使用第三方库。

种子

种子文件的格式同DHT网络消息格式一样,使用一种称为bencode的文本格式来编码。种子文件分为两类:单个文件和多个文件。

文件的信息无非就是文件名、大小。文件名可能包含utf8编码的名字,为了后面处理的方便,dhtcrawler都会优先使用utf8编码。

   {ok, {dict, Info}} = dict:find(<<"info">>, TD),
    case type(Info) of
        single -> {single, parse_single(Info)};
        multi -> {multi, parse_multi(Info)}
    end.
parse_single(Info) ->
    Name = read_string("name", Info),
    {ok, Length} = dict:find(<<"length">>, Info),
    {Name, Length}.

parse_multi(Info) ->
    Root = read_string("name", Info),
    {ok, {list, Files}} = dict:find(<<"files">>, Info),
    FileInfo = [parse_file_item(Item) || {dict, Item} <- Files],
    {Root, FileInfo}.

数据库

我最开始在选用数据库时,为了不使用第三方库,打算使用erlang自带的mnesia。但是因为涉及到字符串匹配搜索,mnesia的查询语句在我看来太不友好,在经过一些资料查阅后就直接放弃了。

然后我打算使用couchdb,因为它是erlang写的,而我正在用erlang写程序。第一次接触非关系型数据库,发现NoSQL数据库使用起来比SQL类的简单多了。但是在erlang里要使用couchdb实在太折腾了。我使用的客户端库是couchbeam。

因为couchdb暴露的API都是基于HTTP协议的,其数据格式使用了json,所以couchbeam实际上就是对各种HTTP请求、回应和json的包装。但是它竟然使用了ibrowse这个第三方HTTP客户端库,而不是erlang自带的。ibrowse又使用了jiffy这个解析json的库。这个库更惨烈的是它的解析工作都是交给C语言写的动态库来完成,我还得编译那个C库。

couchdb看起来不支持字符串查询,我得自己创建一个view,这个view里我通过翻阅了一些资料写了一个将每个doc的name拆分成若干次查询结果的map。这个map在处理每一次查询时,我都得动态更新之。couchdb是不支持局部更新的,这还不算大问题。然后很高兴,终于支持字符串查询了。这里的字符串查询都是基于字符串的子串查询。但是问题在于,太慢了。每一次在WEB端的查询,都直接导致erlang进程的call超时。

要让couchdb支持字符串查询,要快速,当然是有解决方案的。但是这个时候我已经没有心思继续折腾,任何一个库、程序如果接口设计得如此不方便,那就可以考虑换一个其他的。

我选择了mongodb。同样的基于文档的数据库。2.4版本还支持全文搜索。什么是全文搜索呢,这是一种基于单词的全文搜索方式。hello world我可以搜索hello,基于单词。mongodb会自动拆词。更关键更让人爽的是,要开启这个功能非常简单:设置启动参数、建立索引。没了。mongodb的erlang客户端库mongodb-erlang也只依赖一个bson-erlang库。然后我又埋头苦干,几个小时候我的这个爬虫程序就可以在浏览器端搜索关键字了。

后来我发现,mongodb的全文搜索是不支持中文的。因为它还不知道中文该怎么拆词。恰好我有个同事做过中文拆词的研究,看起来涉及到很复杂的算法。直到这个时候,我他妈才醒悟,我为什么需要基于单词的搜索。我们大部分的搜索其实都是基于子字符串的搜索。

于是,我将种子文件的名字拆分成了若干个子字符串,将这些子字符串以数组的形式作为种子文档的一个键值存储,而我依然还可以使用全文索引,因为全文索引会将整个字符串作为单词比较。实际上,基于一般的查询方式也是可以的。当然,索引还是得建立。

使用mongodb时唯一让我很不爽的是mongodb-erlang这个客户端库的文档太欠缺。这还不算大问题,因为看看源码参数还是可以大概猜到用法。真正悲剧的是mongodb的有些查询功能它是不支持的。例如通过cursor来排序来限制数量。在cursor模块并没有对应的mongodb接口。最终我只好通过以下方式查询,我不明白batchsize,但它可以工作:

search_announce_top(Conn, Count) ->
    Sel = {'$query', {}, '$orderby', {announce, -1}},
    List = mongo_do(Conn, fun() ->
        Cursor = mongo:find(?COLLNAME, Sel, [], 0, Count), 
        mongo_cursor:rest(Cursor)
    end),
    [decode_torrent_item(Item) || Item <- List].

另一个悲剧的是,mongodb-erlang还不支持文档的局部更新,它的update接口直接要求传入整个文档。几经折腾,我可以通过runCommand来完成:

inc_announce(Conn, Hash) when is_list(Hash) ->
    Cmd = {findAndModify, ?COLLNAME, query, {'_id', list_to_binary(Hash)}, 
        update, {'$inc', {announce, 1}},
        new, true},
    Ret = mongo_do(Conn, fun() ->
        mongo:command(Cmd)
    end).

Unicode

不知道在哪里我看到过erlang说自己其实是不需要支持unicode的,因为这门语言本身是通过list来模拟字符串。对于unicode而言,对应的list保存的本身就是整数值。但是为了方便处理,erlang还是提供了一些unicode操作的接口。

因为我需要将种子的名字按字拆分,对于a中文这样的字符串而言,我需要拆分成以下结果:

a
a中
a中文
中
中文
文

那么,在erlang中当我获取到一个字符串list时,我就需要知道哪几个整数合起来实际上对应着一个汉字。erlang里unicode模块里有几个函数可以将unicode字符串list对应的整数合起来,例如:[111, 222, 333]可能表示的是一个汉字,将其转换以下可得到[111222333]这样的形式。

split(Str) when is_list(Str) ->
    B = list_to_binary(Str), % 必须转换为binary
    case unicode:characters_to_list(B) of
        {error, L, D} ->
            {error, L, D};
        {incomplete, L, D} ->
            {incomplete, L, D};
        UL ->
        {ok, subsplit(UL)}
    end.

subsplit([]) ->
    [];

subsplit(L) ->
    [_|R] = L,
    {PreL, _} = lists:splitwith(fun(Ch) -> not is_spliter(Ch) end, L),
    [unicode:characters_to_binary(lists:sublist(PreL, Len)) 
        || Len <- lists:seq(1, length(PreL))] ++ subsplit(R).

除了这里的拆字之外,URL的编码、数据库的存储都还好,没遇到问题。

注意,以上针对数据库本身的吐槽,完全基于我不熟悉该数据库的情况下,不建议作为你工具选择的参考。

erlang的稳定性

都说可以用erlang来编写高容错的服务器程序。看看它的supervisor,监视子进程,自动重启子进程。天生的容错功能,就算你宕个几次,单个进程自动重启,整个程序看起来还稳健地在运行,多牛逼啊。再看看erlang的进程,轻量级的语言特性,就像OOP语言里的一个对象一样轻量。如果说使用OOP语言写程序得think in object,那用erlang你就得think in process,多牛逼多骇人啊。

实际上,以我的经验来看,你还得以传统的思维去看待erlang的进程。一些多线程程序里的问题,在erlang的进程环境中依然存在,例如死锁。

在erlang中,对于一些异步操作,你可以通过进程间的交互将这个操作包装成同步接口,例如ping的实现,可以等到对方回应之后再返回。被阻塞的进程反正很轻量,其包含的逻辑很单一。这不但是一种良好的包装,甚至可以说是一种erlang-style。但这很容易带来死锁。在最开始的时候我没有注意这个问题,当爬虫节点数上升的时候,网络数据复杂的时候,似乎就出现了死锁型宕机(进程互相等待太久,直接timeout)。

另一个容易在多进程环境下出现的问题就是消息依赖的上下文改变问题。当投递一个消息到某个进程,到这个消息被处理之前,这段时间这个消息关联的逻辑运算所依赖的上下文环境改变了,例如某个ets元素不见了,在处理这个消息时,你还得以多线程编程的思维来编写代码。

至于supervisor,这玩意你得端正态度。它不是用来包容你的傻逼错误的。当你写下傻逼代码导致进程频繁崩溃的时候,supervisor屁用没有。supervisor的唯一作用,仅仅是在一个确实本身可靠的系统,确实人品问题万分之一崩溃了,重启它。毕竟,一个重启频率的推荐值,是一个小时4次。

posted @ 2013-06-20 20:40 Kevin Lynx 阅读(2287) | 评论 (1)编辑 收藏

使用erlang实现P2P磁力搜索(开源)

接上回对DHT网络的研究,我用erlang克隆了一个磁力搜索引擎。我这个实现包含了完整的功能,DHT网络的加入、infohash的接收、种子的获取、资源信息的索引、搜索。

如下图:

screenshot

在我的笔记本上,我开启了100个DHT节点,大致均匀地分布在DHT网络里,资源索引速度大概在1小时一万个左右(包含重复资源)。

这个程序包含三大部分:

这两个项目总共包含大概2500行的erlang代码。其中,DHT实现部分将DHT网络的加入包装成一个库,爬虫部分在搜索种子时,暂时没有使用P2P里的种子下载方式,而是使用现成的磁力链转种子的网站服务,这样我只需要使用erlang自带的HTTP客户端就可以获取种子信息。爬虫在获取到种子信息后,将数据存储到mongodb里。WEB端我为了尽量少用第三方库,我只好使用erlang自带的HTTP服务器,因此网页内容的创建没有模板系统可用,只好通过字符串构建,编写起来不太方便。

使用

整个程序依赖了两个库:bson-erlang和mongodb-erlang,但下载依赖库的事都可以通过rebar解决,项目文件里我已经包含了rebar的执行程序。我仅在Windows7上测试过,但理论上在所有erlang支持的系统上都可以。

  • 下载安装mongodb
  • 进入mongodb bin目录启动mongodb,数据库目录保存在db下,需手动建立该目录

      mongod --dbpath db --setParameter textSearchEnabled=true
    
  • 下载erlang,我使用的是R16B版本

  • 下载dhtcrawler,不需要单独下载kdht,待会下载依赖项的时候会自动下载

      git clone git@github.com:kevinlynx/dhtcrawler.git
    
  • cmd进入dhtcrawler目录,下载依赖项前需保证环境变量里有git,例如D:\Program Files (x86)\Git\cmd,需注意不要将bash的目录加入进来,使用以下命令下载依赖项

      rebar get-deps
    
  • 编译

      rebar compile
    
  • 在dhtcrawler目录下,启动erlang

      erl -pa ebin
    
  • 在erlang shell里运行爬虫,erlang语句以点号(.)作为结束

      crawler_app:start().
    
  • erlang shell里运行HTTP服务器

      crawler_http:start().
    
  • 浏览器里输入localhost:8000/index.html,这个时候还没有索引到资源,建议监视网络流量以观察爬虫程序是否正确工作

爬虫程序启动时会读取priv/dhtcrawler.config配置文件,该文件里配置了DHT节点的UDP监听端口、节点数量、数据库地址等,可自行配置。

接下来我会谈谈各部分的实现方法。

posted @ 2013-06-20 14:44 Kevin Lynx 阅读(2733) | 评论 (2)编辑 收藏

2013年6月10日 #

使用ActionScript开发Ice Web客户端

我们目前的项目服务器端使用了Ice来构建。Ice有一套自己的网络协议,客户端和服务器端可以基于此协议来交互。由于Ice使用Slice这种中间语言来描述服务器和客户端的交互接口,所以它可以做到极大限度地屏蔽网络协议这个细节。也就是说,我们只要借助Ice和Slice,我们可以轻松地编写网络程序。

然后,我们的后端现在需要一个运行在Web浏览器上的客户端。要与Ice做交互,如果使用TCP协议的话,得保证是长连接的。但HTTP是短连接的。而另一方面,我们还需要选择一个Ice支持的和Web相关的语言来做这件事情。如果要在浏览器端直接与Ice服务建立连接,可供选择的语言/平台包括:

  • Flash
  • Silverlight

因为我之前用erlang简单写了个Ice的客户端库,所以我对Ice底层协议有一定了解,可以不一定使用Ice支持的语言,所以HTML5其实也是个选择。此外,如果在浏览器端使用Applet,Java可能也是个选择。

其实几个月前在这块的技术选择问题上我就做过简单的研究,当时确定的方案是使用Flash。但是,后来人员招聘上遇到了问题,看起来要招一个会ActionScript和前端页面技术的程序员来做我们这种项目,似乎大材小用,成本显高了。

那么,考虑到团队里有现成的Java程序员,而且看起来招一个会用Java写网站的程序员简单又便宜,似乎是排除技术原因的最好选择。但是,如果不在浏览器端直接连接服务器来做交互,而是让Web服务器端来做中转的话,会面临不少问题:

  • 浏览器端操作结果的获取问题,说白了就是非实时了,得用Ajax等等技术去模拟实时,代价就是不断轮训,也就是通常说的poll
  • Web服务器端需要编写大量代码:对用户操作的映射,结果缓存等等

如果能用Flash包装与服务器交互的部分,而把UI相关的东西留给HTML/JS/CSS去做,那是不是可行一点?如果只是用ActionScript编写与服务器端的交互逻辑代码,我就不需要花时间去系统学习ActionScript,甚至如何用Flash做界面,我甚至不用搞懂这些技术之间的关系。基本上看些Ice for ActionScript的例子代码,就可以完成这件事情。

以下记录一些主要的过程/方法:

ActionScript程序的开发

开发一个嵌入到网页中的FLASH,只需要Flex SDK。SDK里自带了一些编译器相关工具。我不打算使用IDE,因为看起来IDE更复杂。简单的google之后,基本就可以构建出一个Flash文件:

  • 构建基本的程序需要一个mxml文件,这个文件里主要用来捕获Flash在页面上初始化完成这个事件,以初始化内部逻辑
  • 编写ActionScript源码,看起来其文件、类的组织方式很像Java
  • 使用Flex SDK中的mxmlc程序来编译,生成swf文件,例如:

    mxmlc myflexapp.mxml -library-path+=xxx.swc

  • 嵌入到网页中,简单的做法可以借助swfobject.js这个库,嵌入的方式:

     <script type="text/javascript" src="swfobject.js"></script>
    <script type="text/javascript">
        var flashvars = {};
        var params = {};
      params.play = "true";
        params.quality = "high";
        params.bgcolor = "white";
        params.allowscriptaccess = "always";
        params.allowfullscreen = "true";
        var attributes = {};
        attributes.id = "asclient";
        attributes.name = "asclient";
        attributes.align = "middle";
        swfobject.embedSWF("asclient.swf", "flashContent", "1", "1",
            "0", "", 
            flashvars, params, attributes);
        swfobject.createCSS("#flashContent", "display:none;");
    </script>

自然,页面中需加入flashContent这个div:

     <div id="flashContent">
        <p>no flash</p>
    </div>

我的mxml文件也很简单:

<?xml version="1.0" encoding="utf-8"?>
<s:Application 
    xmlns:fx="http://ns.adobe.com/mxml/2009" 
    xmlns:s="library://ns.adobe.com/flex/spark" 
    xmlns:mx="library://ns.adobe.com/flex/mx"
    applicationComplete="doApplicationComplete()" >
    <fx:Script>
    <![CDATA[
       import ASClient.Coordinator;
       import flash.external.ExternalInterface;

       private var _coordinator:Coordinator;

       public function doApplicationComplete():void
       {
            trace("doApplicationComplete");
            _coordinator = new Coordinator();
            _coordinator.reg_methods();
            ExternalInterface.call("as_ready"); 
       } 
     ]]>
    </fx:Script>
</s:Application>

ActionScript日志

我通过日志来调试ActionScript代码。最简单的方式就是通过trace函数来输出日志。要成功输出日志包含以下步骤:

  • 给浏览器安装调试版本的Flash Player
  • 日志是输出到用户目录下的,并且需要手动创建日志目录(Administrator替换为用户名):

      C:\Users\Administrator\AppData\Roaming\Macromedia\Flash Player\Logs
    
  • 用户目录下新建配置文件mm.cfg:

      AS3StaticProfile=0
      AS3Verbose=0
      TraceOutputFileEnable=1 
      TraceOutputBuffered=0
      ErrorReportingEnable=1  
      AS3Trace=0
    
  • 编译DEBUG版本的Flash文件,可以修改flex sdk下的flex-config.xml文件,里面增加debug=true配置即可

在开发过程中需要注意浏览器缓存问题,当编译出新的Flash文件后,浏览器即使页面刷新也可能使用的是缓存里的Flash。当然,最重要的,是通过浏览器来访问这个包含了Flash的网页,Web服务器随意。

Flash Policy文件

在Flash的某个版本后,Flash中如果要向外建立socket连接,是首先要取得目标主机返回的policy文件的。也就是在建立连接前,Flash底层会先向目标主机询问得到一个描述访问权限的文件。

简单来说,目标主机需要在843端口上建立TCP监听,一旦有网络连接,就发送以下内容,内容后需添加0x00用于标示结束。(当然,具体细节还挺多,自行google)

<cross-domain-policy>
     <allow-access-from domain="*" to-ports="*" />
</cross-domain-policy>

最开始我使用的是朋友给的现成的Policy服务,虽然我写的Flash可以成功连接我的Ice服务,但始终要等待2秒以上的时间。google Flash Policy相关内容,可以发现确实存在一个延时,但那是因为目标主机没有在843端口服务。后来我自己用erlang写了个Policy服务,延时就没有了。猜测可能是他的Policy服务没有添加0x00作为结束导致。

ActionScript与JavaScript的交互

既然我使用ActionScript来包装与服务器的交互,那么JavaScript就必然需要和ActionScript通信。这个通信过程也就是在JavaScript中调用ActionScript中的函数,反过来亦然。这个过程很简单:

在JavaScript中调用ActionScript函数:

首先是ActionScript需要注册哪些函数可以被调用:

ExternalInterface.addCallback("service_loadall", loadAll);

通过ExternalInterface.addCallback注册的函数,其实是个closure,所以在类中注册自己的成员函数都可以(因为成员函数会使用this,形成了一个closure)。

然后在JavaScript中调用:

    function asObject() {
        // asclient是嵌入Flash时填入的name和(或?)id
        return window.document.asclient;
    }
    var as = asObject();
    as.service_loadall();

在ActionScript中调用JavaScript中调用则更简单,一句话:

ExternalInterface.call("service_load_done", args);

至于在两者之间的函数参数传递,其类型都可以自动映射。但因为我的应用里数据较为复杂,我就将数据转换为JSON格式,在JavaScript这边操作较为简单。

页面切换

这里我们需要的Web前端页面,更像是一个管理系统,所以页面切换是很有可能的。问题在于,当出现页面跳转时,Flash对象会重新初始化,新的页面无法使用前一个页面建立好的网络连接(或者能?)。为了解决这个问题,服务器当然可以设计一种重登录机制,方便客户端以一种特殊的方式进入系统,绕过正常的登录环节。但是我们使用了Glacier2这个网关,在这个网关上有针对连接的超时管理,这样反复建立新的连接对资源太浪费了。

综上,我想只能通过前端去规避这个问题。例如,前端开发人员依然可以分开设计很多页面,页面里也可以使用正常的链接。我们编写一些JavaScript代码,将页面里的链接替换成对应的JavaScript代码,动态载入新的页面内容,然后对页面内的部分内容进行替换,从而尽可能让页面设计人员编写正常的网页,同时也解决页面切换问题。

这是个蹩脚的方法,但在我有限的前端知识体系下,似乎也只能这样干了。

posted @ 2013-06-10 21:30 Kevin Lynx 阅读(1148) | 评论 (0)编辑 收藏

2013年5月19日 #

P2P中DHT网络爬虫

DHT网络爬虫基于DHT网络构建了一个P2P资源搜索引擎。这个搜索引擎不但可以用于构建DHT网络中活跃的资源索引(活跃的资源意味着该网络中肯定有人至少持有该资源的部分数据),还可以分析出该网络中的热门分享资源。小虾不久前发布了一个这样的搜索引擎:磁力搜索。他也写博客对此稍作了介绍:写了个磁力搜索的网页 - 收录最近热门分享的资源。网络上其实也有其他人做了类似的应用:DHT monitoringCrawling Bittorrent DHT

但是他的这篇文章仅介绍了DHT网络的大致工作原理,并且这个爬虫的具体工作原理也没有提到。对此我查阅了些文章及代码,虽然从原理上梳理出了整个实现方案,但很多细节还是不甚清楚。所以本文仅作概要介绍。

DHT/Magnet/Torrent

在P2P网络中,要通过种子文件下载一个资源,需要知道整个P2P网络中有哪些计算机正在下载/上传该资源。这里将这些提供某个资源下载的计算机定义为peer。传统的P2P网络中,存在一些tracker服务器,这些服务器的作用主要用于跟踪某个资源有哪些关联的peer。下载这个资源当然得首先取得这些peer。

DHT的出现用于解决当tracker服务器不可用时,P2P客户端依然可以取得某个资源的peer。DHT解决这个问题,是因为它将原来tracker上的资源peer信息分散到了整个网络中。这里将实现了DHT协议的计算机定义为节点(node)。通常一个P2P客户端程序既是peer也是节点。DHT网络有多种实现算法,例如Kademlia。

当某个P2P客户端通过种子文件下载资源时,如果没有tracker服务器,它就会向DHT网络查询这个资源对应的peer列表。资源的标识在DHT网络中称为infohash,是一个20字节长的字符串,一般通过sha1算法获得,也就是一个类似UUID的东西。

实际上,种子文件本身就对应着一个infohash,这个infohash是通过种子文件的文件描述信息动态计算得到。一个种子文件包含了对应资源的描述信息,例如文件名、文件大小等。Magnet,这里指的是磁力链接,它是一个类似URL的字符串地址。P2P软件通过磁力链接,会下载到一个种子文件,然后根据该种子文件继续真实资源的下载。

磁力链接中包含的最重要的信息就是infohash。这个infohash一般为40字节或32字节,它其实只是资源infohash(20字节)的一种编码形式。

Kademlia

Kademlia是DHT网络的一种实现。网络上关于这个算法的文章,主要是围绕整个DHT网络的实现原理进行论述。个人觉得这些文章很蛋疼,基本上读了之后对于要如何去实现一个DHT客户端还是没有概念。这里主要可参考P2P中DHT网络介绍,以及BitTorrent网站上的DHT协议描述

Kad的主要目的是用于查询某个资源对应的peer列表,而这个peer列表实际上是分散在整个网络中。网络中节点数量很大,如果要获得peer列表,最简单的做法无非就是依次询问网络中的每个节点。这当然不可行。所以在Kad算法中,设立了一个路由表。每一个节点都有一份路由表。这个是按照节点之间的距离关系构建出来的。节点之间的距离当然也有特定的算法定义,在Kad中通过对两个节点的ID进行异或操作得到。节点的ID和infohash通过相同算法构建,都是20字节长度。节点和infohash之间也有距离关系,实际上表示的是节点和资源的距离关系。

有了这个路由表之后,再通过一个基于距离关系的查找算法,就可以实现不用挨个遍历就找到特定的节点。而查找资源peer这个操作,正是基于节点查找这个过程。

路由表的实现,按我的理解,有点类似一般的hash表结构。在这个表中有160个桶,称为K桶,这个桶的数量在实现上可以动态增长。每个桶保存有限个元素,例如K取值为8,指的就是这个桶最多保存8个元素。每个元素就是一个节点,节点包含节点ID、地址信息以及peer信息。这些桶可以通过距离值索引得到,即距离值会经过一个hash算法,使其值落到桶的索引范围内。

要加入一个DHT网络,需要首先知道这个网络中的任意一个节点。如何获得这个节点?在一些开源的P2P软件中,会提供一些节点地址,例如transmission中提供的dht.transmissionbt.com:6881。

协议

Kad定义了节点之间的交互协议。这些协议支撑了整个DHT网络里信息分布式存储的实现。这些协议都是使用UDP来传送。其协议格式使用一种称为bencode的编码方式来编码协议数据。bencode是一种文本格式的编码,它还用于种子文件内的信息编码。

Kad协议具体格式可参考BitTorrent的定义:DHT Protocol。这些协议包括4种请求:ping,find_node,get_peer,announce_peer。在有些文档中这些请求的名字会有不同,例如announce_peer又被称为store,get_peer被称为find_value。这4种请求中,都会有对应的回应消息。其中最重要的消息是get_peer,其目的在于在网络中查找某个资源对应的peer列表。

值得一提的是,所有这些请求,包括各种回应,都可以用于处理该消息的节点构建路由表。因为路由表本质就是存储网络中的节点信息。

ping

用于确定某个节点是否在线。这个请求主要用于辅助路由表的更新。

find_node

用于查找某个节点,以获得其地址信息。当某个节点接收到该请求后,如果目标节点不在自己的路由表里,那么就返回离目标节点较近的K个节点。这个消息可用于节点启动时构建路由表。通过find_node方式构建路由表,其实现方式为向DHT网络查询自己。那么,接收该查询的节点就会一直返回其他节点了列表,查询者递归查询,直到无法查询为止。那么,什么时候无法继续查询呢?这一点我也不太清楚。每一次查询得到的都是离目标节点更接近的节点集,那么理论上经过若干次递归查询后,就无法找到离目标节点更近的节点了,因为最近的节点是自己,但自己还未完全加入网络。这意味着最后所有节点都会返回空的节点集合,这样就算查询结束?

实际上,通过find_node来构建路由表,以及顺带加入DHT网络,这种方式什么时候停止在我看来并不重要。路由表的构建并不需要在启动时构建完成,在以后与其他节点的交互过程中,路由表本身就会慢慢地得到构建。在初始阶段尽可能地通过find_node去与其他节点交互,最大的好处无非就是尽早地让网络中的其他节点认识自己。

get_peer

通过资源的infohash获得资源对应的peer列表。当查询者获得资源的peer列表后,它就可以通过这些peer进行资源下载了。收到该请求的节点会在自己的路由表中查找该infohash,如果有收录,就返回对应的peer列表。如果没有,则返回离该infohash较近的若干个节点。查询者若收到的是节点列表,那么就会递归查找。这个过程同find_node一样。

值得注意的是,get_peer的回应消息里会携带一个token,该token会用于稍后的announce_peer请求。

announce_peer

该请求主要目的在于通知,通知其他节点自己开始下载某个资源。这个消息用于构建网络中资源的peer列表。当一个已经加入DHT网络的P2P客户端通过种子文件开始下载资源时,首先在网络中查询该资源的peer列表,这个过程通过get_peer完成。当某个节点从get_peer返回peer时,查询者开始下载,然后通过announce_peer告诉返回这个peer的节点。

announce_peer中会携带get_peer回应消息里的token。关于这一点,我有一个疑问是,在P2P中DHT网络介绍文档中提到:

(announce_peer)同时会把自己的peer信息发送给先前的告诉者和自己K桶中的k个最近的节点存储该peer-list信息

不管这里提到的K的最近的节点是离自己最近,还是离资源infohash最近的节点,因为处理announce_peer消息时,有一个token的验证过程。但是这K个节点中,并没有在之前创建对应的token。我通过transmission中的DHT实现做了个数据收集,可以证明的是,announce_peer消息是不仅仅会发给get_peer的回应者的。

DHT爬虫

DHT爬虫是一个遵循Kad协议的假节点程序。具体可以参考小虾发布的那个网站应用:磁力搜索

这个爬虫的实现方式,主要包含以下内容:

  • 通过其他节点的announce_peer发来的infohash确认网络中有某个资源可被下载
  • 通过从网络中获取这个资源的种子文件,来获得该资源的描述

通过累计收集得到的资源信息,就可以提供一个资源搜索引擎,或者构建资源统计信息。以下进一步描述实现细节。整个爬虫的实现依赖了一个很重要的信息,那就是资源的infohash实际上就是一个磁力链接(当然需要包装一下数据)。这意味着一旦我们获得了一个infohash,我们就等于获得了一个种子。

获得资源通知

当爬虫程序加入DHT网络后,它总会收到其他节点发来的announce_peer消息。announce_peer消息与get_peer消息里都带了资源的infohash,但是get_peer里的infohash对应的资源在该网络中不一定存在,即该资源没有任何可用peer。而announce_peer则表示已经确认了该网络中有节点正在下载该资源,也即该资源的数据确实存在该网络中。

所以,爬虫程序需要尽最大努力地获取其他节点发来的announce_peer消息。如果announce_peer消息会发送给离消息发送节点较近的节点,那么,一方面,爬虫程序应该将自己告诉网络中尽可能多的节点。这可以通过一次完整的find_node操作实现。另一方面,爬虫程序内部实现可以部署多个DHT节点,总之目的在于尽可能地让爬虫程序称为其他节点的较近者。

当收集到infohash之后,爬虫程序还需要通过该infohash获得对应资源的描述信息。

获取资源信息

获得资源描述信息,其实就是通过infohash获得对应的种子文件。这需要实现P2P协议里的文件分享协议。种子文件的获取其实就是一个文件下载过程,下载到种子文件之后,就可以获取到资源描述。这个过程一种简单的方法,就是从infohash构建出一个磁力链接,然后交给一个支持磁力下载的程序下载种子。

从infohash构建出磁力链接非常简单,只需要将infohash编码成磁力链接的xt字段即可,构建实现可以从transmission源码里找到:

/* 这个算法其实和printf("%02x", sha1[i])一样 */
void tr_sha1_to_hex (char *out, const unsigned char *sha1)
{
int i;
static const char hex[] = "0123456789abcdef";
for (i=0; i<20; ++i) {
const unsigned int val = *sha1++;
*out++ = hex[val >> 4];
*out++ = hex[val & 0xf];
}
*out = '\0';
}
void appendMagnet(FILE *fp, const unsigned char *info_hash) {
char out[48];
tr_sha1_to_hex(out, info_hash);
fprintf(fp, "magnet:?xt=urn:btih:%s", out);
}

现在你就可以做一个实验,在transmission的DHT实现中,在announce_peer消息的处理代码中,将收到的infohash通过上面的appendMagnet转换为磁力链接输出到日志文件里。然后,可以通过支持磁力链接的程序(例如QQ旋风)直接下载。有趣的是,当QQ旋风开始下载该磁力链接对应的种子文件时,你自己的测试程序能收到QQ旋风程序发出的announce_peer消息。当然,你得想办法让这个测试程序尽可能地让其他节点知道你,这可以通过很多方式实现。

总结

最终的DHT爬虫除了需要实现DHT协议之外,还需要实现P2P文件下载协议,甚至包括一个种子文件解析模块。看起来包含不小的工作量。而如果要包装为一个资源搜索引擎,还会涉及到资源存储以及搜索,更别说前端呈现了。这个时候,如果你使用的语言不包含这些工具库,那实在是太悲剧了。没错,我就没找到一个erlang DHT库(倒是有erlang实现的BT客户端,懒得去剥了)。

UPDATE

通过详细阅读transmission里的DHT实现,一些之前的疑惑随之解开。

announce_peer会发给哪些节点

在一次对infohash的查询过程中,所有对本节点发出的get_peer作出回应的节点(不论这个回应节点回应的是nodes还是peers),当本节点取得peer信息时,就会对所有这些节点发出announce_peer。get_peer的回应消息里,不论是peer还是node,都会携带一个token,这样在将来收到对方的announce_peer时,就可以验证该token。

节点和bucket状态

在本地的路由表中,保存的node是有状态之分的。状态分为三种:good/dubious/bad。good节点基本可以断定该节点是一个在线的并且可以正常回应消息的节点;而bad节点则是可确定的无效节点,通常会尽快从路由表中移除;而dubious则是介于good和bad节点之间,表示可能有问题的节点,需要进一步发送例如ping消息来确认其状态。路由表中应该尽可能保证保存的是good节点,对查询消息的回应里也需携带好的节点。

bucket也是有状态的,当一个bucket中的所有节点在一定时间之内都没有任何活动的时候,该bucket则应该考虑进行状态的确认,确认方式可以随机选择该bucket中的节点进行find_node操作(这也是find_node除了用于启动之外的唯一作用,但具体实现不见得使用这种方式)。没有消息来往的bucket则应该考虑移除。DHT中几乎所有操作都会涉及到bucket的索引,如果索引到一个所有节点都有问题的bucket,那么该操作可能就无法完成。

search在何时停止

首先,某次发起的search,无论是对node还是对peer,都可能导致进一步产生若干个search。这些search都是基于transaction id来标识的。由一次search导致产生的所有子search都拥有相同的transaction id,以使得在该search成功或失败时可以通过该transaction id来删除对应的所有search。transaction id也就是DHT中每个消息消息头”t”的值。

但是search何时停止?transmission中是通过超时机制来停止。在search过程中,如果长时间没有收到跟该search关联的节点发来的回应消息,那么就撤销该search,表示搜索失败。

参考资料

posted @ 2013-05-19 21:51 Kevin Lynx 阅读(4174) | 评论 (0)编辑 收藏

2013年5月9日 #

Erlang使用感受

用erlang也算写了些代码了,主要包括使用RabbitMQ的练习,以及最近写的kl_tservericerl。其中icerl是一个实现了Ice的erlang库。

erlang的书较少,我主要读过<Programming Erlang>和<Erlang/OTP in Action>。其实erlang本身就语言来说的话比较简单,同ruby一样,类似这种本身目标是应用于实际软件项目的语言都比较简单,对应的语法书很快可以翻完。

这里我仅谈谈自己在编写erlang代码过程中的一些感受。

语法

erlang语法很简单,接触过函数式语言的程序员上手会很快。它没有类似common lisp里宏这种较复杂的语言特性。其语法元素很紧凑,不存在一些用处不大的特性。在这之前,我学习过ruby和common lisp。ruby代码写的比common lisp多。但是在学习erlang的过程中我的脑海里却不断出现common lisp里的语法特性。这大概是因为common lisp的语法相对ruby来说,更接近erlang。

编程模式

erlang不是一个面向对象的语言,它也不同common lisp提供多种编程模式。它的代码就是靠一个个函数组织出来的。面向对象语言在语法上有一点让我很爽的是,其函数调用更自然。erlang的接口调用就像C语言里接口的调用一样:

func(Obj, args)
Obj->func(args)

即需要在函数第一个参数传递操作对象。但是面向对象语言也会带来一些语法的复杂性。如果一门语言可以用很少的语法元素表达很多信息,那么我觉得这门语言就是门优秀的语言。

表达式/语句

erlang里没有语句,全部是表达式,意思是所有语法元素都是有返回值的。这实在太好了,全世界都有返回值可以让代码写起来简单多了:

    Flag = case func() of 1 -> true; 0 -> false end, 

命名

我之所以不想写一行python代码的很大一部分原因在于这门语言居然要求我必须使用代码缩进来编程,真是不敢相信。erlang里虽然没有此规定,却也有不同的语法元素有大小写的限定。变量首字母必须大写,atom必须以小写字母开头,更霸气的是模块命名必须和文件名相同。

变量

erlang里的变量是不可更改的。实际上给一个变量赋值,严格来说应该叫bound,即绑定。这个特性完全就是函数式语言里的特性。其带来的好处就像函数式语言宣扬的一样,这会使得代码没有副作用(side effect)。因为程序里的所有函数不论怎样调用,其程序状态都不会改变,因为变量无法被改变。

变量不可更改,直接意味着全局变量没有存在的意义,也就意味着不论你的系统是多么复杂地被构建出来,当系统崩溃时,其崩溃所在位置的上下文就足够找到问题。

但是变量不可改变也会带来一些代码编写上的不便。我想这大概是编程思维的转变问题。erlang的语法特性会强迫人编写非常短小的函数,你大概不愿意看到你的函数实现里出现Var1/Var2/Var3这样的变量,而实际上这样的命名在命令式语言里其实指的是同一个变量,只不过其值不同而已。

但是我们的程序总是应该有状态的。在erlang里我们通过不断创建新的变量来存储这个状态。我们需要通过将这个状态随着我们的程序流程不断地通过函数参数和返回值传递下去。

atom

atom这个语法特性本身没问题,它就同lisp里的atom一样,没什么意义,就是一个名字。它主要用在增加代码的可读性上。但是这个atom带来的好处,直接导致erlang不去内置诸如true/false这种关键字。erlang使用true/false这两个atom来作为boolean operator的返回值。但erlang里严格来说是没有布尔类型的。这其实没什么,糟糕的是,对于一些较常见的函数返回值,例如true/false,erlang程序员之间就得做约定。要表示一个函数执行失败了,我可以返回false、null、failed、error、nil,甚至what_the_fuck,这一度让我迷惘。

list/tuple

erlang里的list当然没有lisp里的list牛逼,别人整个世界就是由list构成的。在一段时间里,我一直以为list里只能保存相同类型的元素,而tuple才是用于保存不同类型元素的容器。直到有一天我发现tuple的操作不能满足我的需求了,我才发现list居然是可以保存不同类型的。

list相对于tuple而言,更厉害的地方就在于头匹配,意思是可以通过匹配来拆分list的头和剩余部分。

匹配(match)

erlang的匹配机制是个好东西。这个东西贯穿了整个语言。在我理解看来,匹配机制减少了很多判断代码。它试图用一个期望的类型去匹配另一个东西,如果这个东西出了错,它就无法完成这个匹配。无法完成匹配就导致程序断掉。

匹配还有个方便的地方在于可以很方便地取出record里的成员,或者tuple和list的某个部分,这其实增强了其他语法元素的能力。

循环

erlang里没有循环语法元素,这真是太好了。函数式语言里为什么要有循环语法呢?common lisp干毛要加上那些复杂的循环(宏),每次我遇到需要写循环的场景时,我都诚惶诚恐,最后还是用递归来解决。

同样,在erlang里我们也是用函数递归来解决循环问题。甚至,我们还有list comprehension。当我写C++代码时,我很不情愿用循环去写那些容器遍历代码,幸运的是在C++11里通过lambda和STL里那些算法我终于不用再写这样的循环代码了。

if/case/guard

erlang里有条件判定语法if,甚至还有类似C语言里的switch…case。这个我一时半会还不敢评价,好像haskell里也保留了if。erlang里同haskell一样有guard的概念,这其实是一种变相的条件判断,只不过其使用场景不一样。

进程

并发性支持属于erlang的最大亮点。erlang里的进程概念非常简单,基于消息机制,程序员从来不需要担心同步问题。每个进程都有一个mailbox,用于缓存发送到此进程的消息。erlang提供内置的语法元素来发送和接收消息。

erlang甚至提供分布式支持,更酷的是你往网络上的其他进程发送消息,其语法和往本地进程发送是一样的。

模块加载

如果我写了一个erlang库,该如何在另一个erlang程序里加载这个库?这个问题一度让我迷惘。erlang里貌似有对库打包的功能(.ez?),按理说应该提供一种整个库加载的方式,然后可以通过手动调用函数或者指定代码依赖项来加载。结果不是这样。

erlang不是按整个库来加载的,因为也没有方式去描述一个库(应该有第三方的)。当我们调用某个模块里的函数时,erlang会自动从某个目录列表里去搜索对应的beam文件。所以,可以通过在启动erlang添加这个模块文件所在目录来实现加载,这还是自动的。当然,也可以在erlang shell里通过函数添加这个目录。

OTP

使用erlang来编写程序,最大的优势可能就是其OTP了。OTP基本上就是一些随erlang一起发布的库。这些库中最重要的一个概念是behaviour。behaviour其实就是提供了一种编程框架,应用层提供各种回调函数给这个框架,从而获得一个健壮的并发程序。

application behaviour

application behaviour用于组织一个erlang程序,通过一个配置文件,和提供若干回调,就可以让我们编写的erlang程序以一种统一的方式启动。我之前写的都是erlang库,并不需要启动,而是提供给应用层使用,所以也没使用该behaviour。

gen_server behaviour

这个behaviour应该是使用频率很高的。它封装了进程使用的细节,本质上也就是将主动收取消息改成了自动收取,收取后再回调给你的模块。

supervisor behaviour

这个behaviour看起来很厉害,通过对它进行一些配置,你可以把你的并发程序里的所有进程建立成树状结构。这个结构的牛逼之处在于,当某个进程挂掉之后,通过supervisor可以自动重新启动这个挂掉的进程,当然重启没这么简单,它提供多种重启规则,以让整个系统确实通过重启变成正常状态。这实在太牛逼了,这意味着你的服务器可以7x24小时地运行了,就算有问题你也可以立刻获得一个重写工作的系统。

热更新

代码热更新对于一个动态语言而言其实根本算不上什么优点,基本上动态语言都能做到这一点。但是把热更新这个功能加到一个用于开发并发程序的语言里,那就很牛逼了。你再一次可以确保你的服务器7x24小时不停机维护。

gen_tcp

最开始我以为erlang将网络部分封装得已经认不出有socket这个概念了。至少,你也得有一个牛逼的网络库吧。结果发现依然还是socket那一套。然后我很失望。直到后来,发现使用一些behaviour,加上调整gen_tcp的一些option,居然可以以很少的代码写出一个维护大量连接的TCP服务器。是啊,erlang天生就是并发的,在传统的网络模型中,我们会觉得使用one-thread-per-connection虽然简单却不是可行的,因为thread是OS资源,太昂贵。但是在erlang里,one-process-per-connection却是再自然不过的事情。你要是写个erlang程序里面却只有一个process你都不好意思告诉别人你写的是erlang。process是高效的(对我们这种二流程序员而言),它就像C++里一个很普通的对象一样。

在使用gen_tcp的过程中我发现一个问题,不管我使用哪一种模型,我竟然找不到一种温柔的关闭方式。我查看了几个tutorial,这些混蛋竟然没有一个人提到如何去正常关闭一个erlang TCP服务器。后来,我没有办法,只好使用API强制关闭服务器进程。

Story

其实,我和erlang之间是有故事的。我并不是这个月开始才接触erlang。早在2009年夏天的时候我就学习过这门语言。那时候我还没接触过任何函数式语言,那时候lua里的闭包都让我觉得新奇。然后无意间,我莫名其妙地接触了haskell(<Real World Haskell>),在我决定开始写点什么haskell练习时,我发现我无从下手,最后,Monads把我吓哭了。haskell实在太可怕了。

紧接着我怀揣着对函数式语言的浓烈好奇心看到了erlang。当我看到了concurrent programming的章节时,在一个燥热难耐的下午我的领导找到了我,同我探讨起erlang对我们的网游服务器有什么好处。然后,我结束我了的erlang之旅。

时隔四年,这种小众语言,居然进入了中国程序员的视野,并被用于开发网页游戏服务器。时代在进步,我们总是被甩在后面。

posted @ 2013-05-09 21:24 Kevin Lynx 阅读(865) | 评论 (0)编辑 收藏

2013年4月12日 #

erlang和RabbitMQ学习总结

AMQP和RabbitMQ概述

AMQP(Advanced Message Queue Protocol)定义了一种消息系统规范。这个规范描述了在一个分布式的系统中各个子系统如何通过消息交互。而RabbitMQ则是AMQP的一种基于erlang的实现。

AMQP将分布式系统中各个子系统隔离开来,子系统之间不再有依赖。子系统仅依赖于消息。子系统不关心消息的发送者,也不关心消息的接受者。

AMQP中有一些概念,用于定义与应用层的交互。这些概念包括:message、queue、exchange、channel, connection, broker、vhost。

注:到目前为止我并没有打算使用AMQP,所以没有做更深入的学习,仅为了找个机会写写erlang代码,以下信息仅供参考。

  • message,即消息,简单来说就是应用层需要发送的数据
  • queue,即队列,用于存储消息
  • exchange,有翻译为“路由”,它用于投递消息,应用程序在发送消息时并不是指定消息被发送到哪个队列,而是将消息投递给路由,由路由投递到队列
  • channel,几乎所有操作都在channel中进行,有点类似一个沟通通道
  • connection,应用程序与broker的网络连接
  • broker,可简单理解为实现AMQP的服务,例如RabbitMQ服务

关于AMQP可以通过一篇很有名的文章了解更多:RabbitMQ+Python入门经典 兔子和兔子窝

RabbitMQ的运行需要erlang的支持,erlang和RabbitMQ在windows下都可以直接使用安装程序,非常简单。RabbitMQ还支持网页端的管理,这需要开启一些RabbitMQ的插件,可以参考官方文档

RabbitMQ本质上其实是一个服务器,与这个服务器做交互则是通过AMQP定义的协议,应用可以使用一个实现了AMQP协议的库来与服务器交互。这里我使用erlang的一个客户端,对应着RabbitMQ的tutorial,使用erlang实现了一遍。基于这个过程我将一些关键实现罗列出来以供记忆:

主要功能使用

关于RabbitMQ erlang client的使用说明可以参考官方文档。这个client library下载下来后是两个ez文件,其实就是zip文件,本身是erlang支持的库打包格式,但据说这个feature还不成熟。总之我是直接解压,然后在环境变量中指定ERL_LIBS到解压目录。使用时使用include_lib包含库文件(类似C语言里的头文件):

    -include_lib("amqp_client/include/amqp_client.hrl").

Connection/Channel

对于连接到本地的RabbitMQ服务:

    {ok, Connection} = amqp_connection:start(#amqp_params_network{}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

创建Queue

每个Queue都有名字,这个名字可以人为指定,也可以由系统分配。Queue创建后如果不显示删除,断开网络连接是不会自动删除这个Queue的,这个可以在RabbitMQ的web管理端看到。

    #'queue.declare_ok'{queue = Q}
        = amqp_channel:call(Channel, #'queue.declare'{queue = <<"rpc_queue">>}),

但也可以指定Queue会在程序退出后被自动删除,需要指定exclusive参数:

    QDecl = #'queue.declare'{queue = <<>>, exclusive = true},
    #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, QDecl),

上例中queue的名字未指定,由系统分配。

发送消息

一般情况下,消息其实是发送给exchange的:

    Payload = <<"hello">>
    Publish = #'basic.publish'{exchange = <<"log_exchange">>},
    amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),

exchange有一系列规则,决定某个消息将被投递到哪个队列。

发送消息时也可以不指定exchange,这个时候消息的投递将依赖于routing_keyrouting_key在这种场景下就对应着目标queue的名字:

    #'queue.declare_ok'{queue = Q}
        = amqp_channel:call(Channel, #'queue.declare'{queue = <<"rpc_queue">>}),
    Payload = <<"hello">>,
    Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
    amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),

接收消息

可以通过注册一个消息consumer来完成消息的异步接收:

    Sub = #'basic.consume' {queue = Q},
    #'basic.consume_ok'{consumer_tag = Tag} = amqp_channel:subscribe(Channel, Sub, self()),

以上注册了了一个consumer,监听变量Q指定的队列。当有消息到达该队列时,系统就会向consumer进程对应的mailbox投递一个通知,我们可以使用receive来接收该通知:

    loop(Channel) ->
        receive 
            % This is the first message received (from RabbitMQ)
            #'basic.consume_ok'{} -> 
                loop(Channel);
            % a delivery
            {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{payload = Payload}} ->
                echo(Payload),
                % ack the message
                amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
                loop(Channel);
        ...

绑定exchange和queue

绑定(binding)其实也算AMQP里的一个关键概念,它用于建立exchange和queue之间的联系,以方便exchange在收到消息后将消息投递到队列。我们不一定需要将队列和exchange绑定起来。

    Binding = #'queue.bind'{queue = Queue, exchange = Exchange, routing_key = RoutingKey},
    #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding)

在绑定的时候需要填入一个routing_key的参数,不同类型的exchange对该值的处理方式不一样,例如后面提到fanout类型的exchange时,就不需要该值。

更多细节

通过阅读RabbitMQ tutorial,我们还会获得很多细节信息。例如exchange的种类、binding等。

exchange分类

exchange有四种类型,不同类型决定了其在收到消息后,该如何处理这条消息(投递规则),这四种类型为:

  • fanout
  • direct
  • topic
  • headers

fanout类型的exchange是一个广播exchange,它在收到消息后会将消息广播给所有绑定到它上面的队列。绑定(binding)用于将队列和exchange关联起来。我们可以在创建exchange的时候指定exchange的类型:

    Declare = #'exchange.declare'{exchange = <<"my_exchange">>, type = <<"fanout">>}
    #'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare)

direct类型的exchange在收到消息后,会将此消息投递到发送消息时指定的routing_key和绑定队列到exchange上时的routing_key相同的队列里。可以多次绑定一个队列到一个exchange上,每次指定不同的routing_key,就可以接收多种routing_key类型的消息。注意,绑定队列时我们可以填入一个routing_key,发送消息时也可以指定一个routing_key

topic类型的exchange相当于是direct exchange的扩展,direct exchange在投递消息到队列时,是单纯的对routing_key做相等判定,而topic exchange则是一个routing_key的字符串匹配,就像正则表达式一样。在routing_key中可以填入一种字符串匹配符号:

* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.

header exchange tutorial中未提到,我也不深究

消息投递及回应

每个消息都可以提供回应,以使RabbitMQ确定该消息确实被收到。RabbitMQ重新投递消息仅依靠与consumer的网络连接情况,所以只要网络连接正常,consumer卡死也不会导致RabbitMQ重投消息。如下回应消息:

    amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),

其中Tag来源于接收到消息时里的Tag

如果有多个consumer监听了一个队列,RabbitMQ会依次把消息投递到这些consumer上。这里的投递原则使用了round robin方法,也就是轮流方式。如前所述,如果某个consumer的处理逻辑耗时严重,则将导致多个consumer出现负载不均衡的情况,而RabbitMQ并不关心consumer的负载。可以通过消息回应机制来避免RabbitMQ使用这种消息数平均的投递原则:

    Prefetch = 1,
    amqp_channel:call(Channel, #'basic.qos'{prefetch_count = Prefetch})

消息可靠性

RabbitMQ可以保证消息的可靠性,这需要设置消息和队列都为durable的:

    #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, #'queue.declare'{queue = <<"hello_queue">>, durable = true}),

    Payload = <<"foobar">>,
    Publish = #'basic.publish'{exchange = "", routing_key = Queue},
    Props = #'P_basic'{delivery_mode = 2}, %% persistent message
    Msg = #amqp_msg{props = Props, payload = Payload},
    amqp_channel:cast(Channel, Publish, Msg)

参考

除了参考RabbitMQ tutorial外,还可以看看别人使用erlang是如何实现这些tutorial的,github上有一个这样的项目:rabbitmq-tutorials。我自己也实现了一份,包括rabbitmq-tutorials中没实现的RPC。后来我发现原来rabbitmq erlang client的实现里已经包含了一个RPC模块。

posted @ 2013-04-12 21:27 Kevin Lynx 阅读(3083) | 评论 (0)编辑 收藏

2013年3月21日 #

浅析软件工程开发方法学RUP

前言

因为之前一直处在游戏开发行业,由于种种原因一直对软件工程中的项目管理、项目开发方法缺乏体验。虽然项目中也曾倡导编写更多的文档,无论是模块说明文档还是设计文档,但效果一直不好。不甚理想的地方主要体现在文档的规范性欠缺、不统一、浮于表面没有实质内容。文档的编写缺乏详尽的方法指导,那么所谓的设计文档要么是用来敷衍上级要么就是随着开发人员的水平不一而千差万别。

当我开始目前这个非游戏项目时,我也曾想,前期做好结构设计,制定好关键问题的解决方案,那么要完成这个项目就不在话下了。但是我很快就面临了一个问题:需求不定。回想身处游戏公司的那些日子,程序员总是抱怨策划需求变更过快过多,在每一次策划提出一个需求变更时,谨慎的程序员都会再三让策划保证:放心,不会变了。而我面临的问题则更为严峻。我意识到,项目的需求,就连用户也无法一一罗列出来。我们需要的是需求调研。但就算你将客户的所有需求全部挖掘出来后(这几乎不可能,因为他们自己也不太清楚自己想要什么),当你交付了第一个软件版本,几乎可以肯定客户会提出一大堆的需求变更:我要的不是这个,我要的那个怎么没有,哦,我当初以为你说的是另一个意思。

当然,需求调研这种工作不是让程序员去做的(那会更悲剧,无论是对客户还是对程序员而言,他们都是在对牛弹琴)。但需求的不确定性也总是存在的。

事实上,需求变化本身就是一个很正常的现象。我一向愿意更悲观地处理软件开发方面的问题,所谓小心使得万年船。基于此,我决定摆好心态学学软件开发的方法学。

概要

本文简要描述、总结了RUP开发方法学的主要内容,结合我自己的感受阐述了一些RUP的核心原则。我相信我所理解的内容是肤浅的,对于非代码的表达我更相信其是存在歧义的。所以本文仅当是一种经验参考,不必当真。

RUP据传是用于指导大型甚至超大型项目开发的,我们做的不是这样规模的项目。但是我们需要记录下整个项目的开发过程,通过这个过程中产出的工件任何一个人可以看出这个项目是如何实现出来的,其目的在于规避唯有从海量代码中才能熟悉项目实现这种问题。这里出现了一个概念:工件,其指的是软件项目开发过程中任何留下记录的事物,例如文档、图、代码等。RUP的一个重要思想,在于其整个软件开发过程都是可推导的。例如我们通常说的软件架构,或小一点的模块结构,都是通过开发过程中前面阶段产出的工件推导得出,而不是凭借程序员的经验拍脑袋想出来的(经验不太可靠,并且千差万别,而推导意味着将每个环节变得可靠)。我们借助RUP的这个特性,创建这些工件,用以建立起软件实现的可靠知识库。

RUP概览

以下均摘自<Thinking in UML>中对RUP的描述:

统一过程归纳和集成了软件开发活动中的最佳实践,它定义了软件开发过程中最重要的阶段和工作(四个阶段和九个核心工作流),定义了参与软件开发过程的各种角色和他们的职责,还定义了软件生产过程中产生的工件,并提供了模板。最后,采用演进式软件生命周期(迭代)将工作、角色和工件串在一起,形成了统一过程。

统一过程是一种追求稳定的软件方法,它追求开发稳定架构,控制变更

统一过程集成了面向对象方法、UML语言、核心工作流、工件模板和过程指导等知识

简单来说,RUP作为一种软件项目开发方法学,它定义了软件开发的每一个过程,最重要的是它指导了在每一个过程需要产出什么,这些产出又是怎样得到。它试图规范化整个流程,以规避需求变更,项目参与者水平不一等带来的项目不可控等问题,以期一个软件产品稳定地开发出来。在一个项目开发过程中,最核心的资源是人,最不可控的亦是人。

RUP过程与实践

我觉得要快速学习一种知识,需要首先获得这门知识的总体框架。另一方面,在我们获得更多信息后,我们需要挖掘出这门知识的核心思想。学习RUP我觉得从这两方面入手是相对比较快速和有效的手段。

RUP框架

RUP定义了软件开发过程的四个阶段,以及9个工作流程。一张极为经典的RUP开发过程框架图如下:

rup

RUP将整个软件开发过程分为四个阶段:

  • 先启(Inception)、
  • 精化(Elaboration)
  • 构建(Construction)
  • 产品化(Transition)

每一个阶段的工作分为9个流程:

  • 业务建模
  • 需求
  • 分析设计
  • 实施
  • 测试
  • 部署
  • 配置与变更管理
  • 项目管理
  • 环境

其中,前6个流程被统称为”engineering disciplines”,后3个流程被称为”supporting disciplines”。当然,我们主要关注前6个流程。那么,这些工作流程和开发阶段又有什么关系呢?上图中其实已经阐明了这些关系。

RUP指导迭代开发。在软件开发的这4个阶段中,每一个阶段会被分为若干次迭代。而每一次迭代则涵盖了这9个工作流程。随着开发阶段向产品化靠近,自然而然地,需求的变更、增加自然会减少,所以从图中可以看出,开发过程越到后期,其工作流程中关于需求的工作则越少。同样,在先启阶段,其需求相关的工作则占据了该阶段的主要工作内容。

RUP中的迭代要求在每一次迭代中,都会完整地实施一遍整个工作流程。在软件实施阶段,甚至会在每一个迭代过程完后输出一个可运行的软件版本。这个版本可能会被交付给客户,以期进一步地在功能需求上取得与客户一致的意见。这倒是同敏捷开发有点类似。

既然制定了工作流程,那每一个工作流程该如何去实施呢?RUP制定了每个工作流程需要参与的角色,这些角色需要从事的活动,以及这些活动产生的工件。

这句话实际上反映了RUP的一个重要信息,摘自wiki:

RUP is based on a set of building blocks, or content elements, describing what is to be produced, the necessary skills required and the step-by-step explanation describing how specific development goals are to be achieved. The main building blocks, or content elements, are the following:

  • Roles (who) – A Role defines a set of related skills, competencies and responsibilities.
  • Work Products (what) – A Work Product represents something resulting from a task, including all the documents and models produced while working through the process.
  • Tasks (how) – A Task describes a unit of work assigned to a Role that provides a meaningful result.

RUP建模

在我看来,RUP每个工作流程所完成的工作,就是一个建模的过程。所谓建模,简单来说就是将需要描述的事物通过更系统的形式表达出来,以期获得对该事物更深入的理解。<Thinking in UML>中定义建模概念为:

建模(modeling),是指通过对客观事物建立一种抽象的方法用以表征事物并获得对事物本身的理解,同时把这种理解概念化,将这些逻辑概念组织起来,构成一种对所观察的对象的内部结构和工作原理的便于理解的表达。

在这里,建模的过程需要使用一些工具。在RUP中建模使用UML来完成。在<Thinking in UML>中讲述了UML的核心模型,包括:

  • 业务用例模型
  • 概念用例模型
  • 系统用例模型
  • 领域模型
  • 分析模型
  • 软件架构和框架
  • 设计模型
  • 组件模型
  • 实施模型

可能在大家的普遍认识中,UML无非就是几种图,并且粗看一眼理解起来也不困难,甚至还能用来画画类图做下代码结构设计。但UML的作用不仅仅如此。

以上所描述的UML核心模型中,每个模型并不单指的的是一种UML图。每个模型实际上都会包含几种UML图,会包含若干张UML图。这些模型基本上渗透于RUP的9个工作流程中,只不过不同的工作流程使用的模型比重不一而已。

例如在“分析设计”工作流程中,可能会使用到系统用例模型、分析模型、软件架构和框架、设计模型等,而业务用例模型可能在这个流程中根本不会用到;相反,业务用例模型则可能在“业务建模”流程中被广泛使用。

前已述及在RUP的每个工作流程中,RUP定义了该流程需要参与的角色,以及这些角色需要进行的活动,例如这里可以看看“分析设计”流程中的角色和活动集(摘自<Thinking in UML>):

analyse-action

相应的,在该工作流程中需要产出的工件集为(摘自<Thinking in UML>):

analyse-ar

既然使用了UML作为建模工具,所以可以简单地说这些工件主要就是UML图,当然也会有其他文档性质的事物,例如网络协议结构、数据库表等UML无法描述的东西则通过普通文字性文档描述。

RUP最佳实践

到目前为止我们已经了解到RUP定义了开发过程(phase),定义了每个过程包含的若干工作流程,还定义了每个工作流程需要哪些角色从事哪些活动来完成哪些工件。除此之外,RUP还提供了6条最佳实践用以指导软件开发:

  • 迭代开发
  • 管理需求
  • 使用基于组件的架构
  • 可视建模
  • 持续的质量验证
  • 控制变更

这些实践在我看来仅仅是一些项目开发的指导原则,它们渗透到每一个过程,每一个工作流程。在项目过程中实践这些原则,用以确保项目的成功。例如我们使用UML建模,以达到“可视建模”。我们通过建立需求用例,以“管理需求”。

RUP核心思想

似乎没有文档来专门阐述RUP的核心思想,但我觉得掌握其核心思想才是学习的要点所在。要理解一种软件开发方法学的核心思想,其实最好是将其与别的方法学做比较。这里先就我的一些感想做阐述。

用例驱动

用例驱动指的是整个软件项目的推进过程,是依靠“用例”来完成。<Thinking in UML>:

在实际的软件项目中,一个软件要实现的功能通过用例来捕获,接下来的所有分析、设计、实现、测试都由用例来取得,即以实现用例为目标。在统一过程中用例能够驱动的不仅仅是分析设计。

用例简单来说就是描述了一个系统功能。但必须注意的是,这仅仅是它定义的一小部分。用例主要分布在“业务建模”、“需求”、“分析设计”这些工作过程中。在不同的过程中用例的粒度和性质都不一样。例如对于一个借书系统而言,在业务建模阶段,我们可以获取出一个“借书”用例,其系统边界甚至不是系统而可能仅关注这个业务本身(因为这个阶段还没有考虑到计算机如何实现这个借书业务);在系统分析阶段,我们就可以将“借书”这个用例细化为用户和计算机软件系统的交互;进一步地,我们可能会进一步精化这个用例,例如用户通过网页终端“借书”。(这里描述了很多建模的细节,可不必深究,本文只给出一个概要性的介绍)

我们说用例驱动软件开发,但它如何驱动的呢?我在实际的建模过程中,最明显的感受就是用例驱动了整个建模过程。

  • 在需求分析阶段,我以系统使用者的角度绘制出了一份用例图,用于表达使用者对该系统的需求
  • 然后我绘制序列图(活动图等)来实现这些用例,也就是阐述使用者具体是如何与系统交互的
  • 从之前的建模过程中我获得对系统功能需求方面的认识
  • 基于前面的分析我可以绘制出系统用例图,以明确系统的各个功能需求
  • 同样针对用例绘制用例实现图
  • 用例本身应该包含更多的文档,因此我编写用例规约用以详细描述各个用例
  • 从用例规约、用例实现中我可以抽离出一些分析类(较设计类更高抽象的类),包含用例场景中涉及到的实体,控制逻辑
  • 细化这些分析类,将分析类组织起来形成系统,我会用界面类去衔接各个控制类
  • 将得到的分析类按模块来划分,从而可以得到一个初步的系统架构
  • 初步考虑系统实现,我甚至会得到一个初步是的系统部署图
  • 回过头不断审视系统用例,以确认我是否实现了所有用例,这可以保证我的分析实现了所有需求,我不用枚举所有系统特性是否被我考虑周全,我仅需在已有用例图中核实
  • 基于模块实现各个用例,或者基于分析类来实现系统用例
  • 通过重新绘制以及核实用例,可以进一步精化分析类,分析类在很大程度上会一一对应到设计类,而设计类则对应到实际的代码
  • 可以进入设计阶段,设计阶段会考虑到系统的实现细节,例如使用的语言,使用的框架等

进入设计阶段后,虽然可以进一步建模,得到会直接映射到代码的类图、序列图等,但这样的图在面临需求变更时,基本上会面临修改,这意味着维护这些文档需要耗费精力。所以,<Thinking in UML>中主张将精力放在维护分析类模型中,而通过其他约定实现从分析类到实际代码的转换。我觉得这个也在理。

规范化整个过程

我个人觉得RUP的一大特点在于规范化了整个软件开发过程,每一个步骤需要哪些角色参与,该干什么,怎么去干都有指导。加之这些活动的”可推导性“,这意味着不论参与角色属于什么水平,都可以稳固地推进项目进程。

此外,这种规范化也会给项目留下详细的演化过程。你可以明确地看到整个软件是如何演化出最终的产品代码,可以深入地理解项目代码中的设计。

总结

我只是一个RUP新手,即便如此,我依然不觉得RUP是软件开发的万能药。我相信任何软件开发方法都是有局限性的。我们在实际使用的时候也只是吸取其精华。不同的开发方法其适用范围也是不一样的。正如有人将RUP和XP做比较时说,如果你使用RUP去开一个杂货铺,在没开张之前你就已经破产了;同样如果你用XP去做飞机,飞机毁了十来次也许才能做出来(from <Thinking in UML> again)。

参考文档

posted @ 2013-03-21 21:41 Kevin Lynx 阅读(1302) | 评论 (0)编辑 收藏

仅列出标题  下一页