loop_in_codes

低调做技术__欢迎移步我的独立博客 codemaro.com

2014年6月1日 #

select真的有限制吗

在刚开始学习网络编程时,似乎莫名其妙地就会被某人/某资料告诉select函数是有fd(file descriptor)数量限制的。在最近的一次记忆里还有个人笑说select只支持64个fd。我甚至还写过一篇不负责任甚至错误的博客(突破select的FD_SETSIZE限制)。有人说,直接重新定义FD_SETSIZE就可以突破这个select的限制,也有人说除了重定义这个宏之外还的重新编译内核。

事实具体是怎样的?实际上,造成这些混乱的原因恰好是不同平台对select的实现不一样。

Windows的实现

MSDN.aspx)上对select的说明:

int select(
  _In_     int nfds,
  _Inout_  fd_set *readfds,
  _Inout_  fd_set *writefds,
  _Inout_  fd_set *exceptfds,
  _In_     const struct timeval *timeout
);

nfds [in] Ignored. The nfds parameter is included only for compatibility with Berkeley sockets.

第一个参数MSDN只说没有使用,其存在仅仅是为了保持与Berkeley Socket的兼容。

The variable FD_SETSIZE determines the maximum number of descriptors in a set. (The default value of FD_SETSIZE is 64, which can be modified by defining FD_SETSIZE to another value before including Winsock2.h.) Internally, socket handles in an fd_set structure are not represented as bit flags as in Berkeley Unix.

Windows上select的实现不同于Berkeley Unix,后者使用位标志来表示socket

在MSDN的评论中有人提到:

Unlike the Linux versions of these macros which use a single calculation to set/check the fd, the Winsock versions use a loop which goes through the entire set of fds each time you call FD_SET or FD_ISSET (check out winsock2.h and you’ll see). So you might want to consider an alternative if you have thousands of sockets!

不同于Linux下处理fd_set的那些宏(FD_CLR/FD_SET之类),Windows上这些宏的实现都使用了一个循环,看看这些宏的大致实现(Winsock2.h):

#define FD_SET(fd, set) do { \
    u_int __i; \
    for (__i = 0; __i < ((fd_set FAR *)(set))->fd_count; __i++) { \
        if (((fd_set FAR *)(set))->fd_array[__i] == (fd)) { \
            break; \
        } \
    } \
    if (__i == ((fd_set FAR *)(set))->fd_count) { \
        if (((fd_set FAR *)(set))->fd_count < FD_SETSIZE) { \
            ((fd_set FAR *)(set))->fd_array[__i] = (fd); \
            ((fd_set FAR *)(set))->fd_count++; \
        } \
    } \
} while(0)

看下Winsock2.h中关于fd_set的定义:

typedef struct fd_set {
    u_int fd_count;
    SOCKET fd_array[FD_SETSIZE];
} fd_set;

再看一篇更重要的MSDN Maximum Number of Sockets Supported.aspx):

The Microsoft Winsock provider limits the maximum number of sockets supported only by available memory on the local computer. The maximum number of sockets that a Windows Sockets application can use is not affected by the manifest constant FD_SETSIZE. If an application is designed to be capable of working with more than 64 sockets using the select and WSAPoll functions, the implementor should define the manifest FD_SETSIZE in every source file before including the Winsock2.h header file.

Windows上select支持的socket数量并不受宏FD_SETSIZE的影响,而仅仅受内存的影响。如果应用程序想使用超过FD_SETSIZE的socket,仅需要重新定义FD_SETSIZE即可。

实际上稍微想想就可以明白,既然fd_set里面已经有一个socket的数量计数,那么select的实现完全可以使用这个计数,而不是FD_SETSIZE这个宏。那么结论是,select至少在Windows上并没有socket支持数量的限制。当然效率问题这里不谈。

这看起来推翻了我们一直以来没有深究的一个事实。

Linux的实现

在上面提到的MSDN中,其实已经提到了Windows与Berkeley Unix实现的不同。在select的API文档中也看到了第一个参数并没有说明其作用。看下Linux的man

nfds is the highest-numbered file descriptor in any of the three sets, plus 1.

第一个参数简单来说就是最大描述符+1。

An fd_set is a fixed size buffer. Executing FD_CLR() or FD_SET() with a value of fd that is negative or is equal to or larger than FD_SETSIZE will result in undefined behavior.

明确说了,如果调用FD_SET之类的宏fd超过了FD_SETSIZE将导致undefined behavior。也有人专门做了测试:select system call limitation in Linux。也有现实遇到的问题:socket file descriptor (1063) is larger than FD_SETSIZE (1024), you probably need to rebuild Apache with a larger FD_SETSIZE

看起来在Linux上使用select确实有FD_SETSIZE的限制。有必要看下相关的实现 fd_set.h

typedef __uint32_t      __fd_mask;

/* 32 = 2 ^ 5 */
#define __NFDBITS       (32)
#define __NFDSHIFT      (5)
#define __NFDMASK       (__NFDBITS - 1)

/*
 * Select uses bit fields of file descriptors.  These macros manipulate
 * such bit fields.  Note: FD_SETSIZE may be defined by the user.
 */

#ifndef FD_SETSIZE
#define FD_SETSIZE      256
#endif

#define __NFD_SIZE      (((FD_SETSIZE) + (__NFDBITS - 1)) / __NFDBITS)

typedef struct fd_set {
    __fd_mask       fds_bits[__NFD_SIZE];
} fd_set;

在这份实现中不同于Windows实现,它使用了位来表示fd。看下FD_SET系列宏的大致实现:

#define FD_SET(n, p)    \
   ((p)->fds_bits[(unsigned)(n) >> __NFDSHIFT] |= (1 << ((n) & __NFDMASK)))

添加一个fd到fd_set中也不是Windows的遍历,而是直接位运算。这里也有人对另一份类似实现做了剖析:linux的I/O多路转接select的fd_set数据结构和相应FD_宏的实现分析。在APUE中也提到fd_set

这种数据类型(fd_set)为每一可能的描述符保持了一位。

既然fd_set中不包含其保存了多少个fd的计数,那么select的实现里要知道自己要处理多少个fd,那只能使用FD_SETSIZE宏去做判定,但Linux的实现选用了更好的方式,即通过第一个参数让应用层告诉select需要处理的最大fd(这里不是数量)。那么其实现大概为:

for (int i = 0; i < nfds; ++i) {
    if (FD_ISSET...
       ...
}

如此看来,Linux的select实现则是受限于FD_SETSIZE的大小。这里也看到,fd_set使用位数组来保存fd,那么fd本身作为一个int数,其值就不能超过FD_SETSIZE这不仅仅是数量的限制,还是其取值的限制。实际上,Linux上fd的取值是保证了小于FD_SETSIZE的(但不是不变的)Is the value of a Linux file descriptor always smaller than the open file limits?

Each process is further limited via the setrlimit(2) RLIMIT_NOFILE per-process limit on the number of open files. 1024 is a common RLIMIT_NOFILE limit. (It’s very easy to change this limit via /etc/security/limits.conf.)

fd的取值会小于RLIMIT_NOFILE,有很多方法可以改变这个值。这个值默认情况下和FD_SETSIZE应该是一样的。这个信息告诉我们,Linux下fd的取值应该是从0开始递增的(理论上,实际上还有stdin/stdout/stderr之类的fd)。这才能保证select的那些宏可以工作。

应用层使用

标准的select用法应该大致如下:

while (true) {
    ...
    select(...)
    for-each socket {
        if (FD_ISSET(fd, set))
            ...
    }

    ...
}

即遍历目前管理的fd,通过FD_ISSET去判定当前fd是否有IO事件。因为Windows的实现FD_ISSET都是一个循环,所以有了另一种不跨平台的用法:

while (true) {
    ...
    select(. &read_sockets, &write_sockets..)
    for-each read_socket {
        use fd.fd_array[i)
    }
    ...
}

总结

  • Windows上select没有fd数量的限制,但因为使用了循环来检查,所以效率相对较低
  • Linux上selectFD_SETSIZE的限制,但其相对效率较高

posted @ 2014-06-01 23:45 Kevin Lynx 阅读(1116) | 评论 (1)编辑 收藏

2014年5月4日 #

Muduo源码阅读

最近简单读了下muduo的源码,本文对其主要实现/结构简单总结下。

muduo的主要源码位于net文件夹下,base文件夹是一些基础代码,不影响理解网络部分的实现。muduo主要类包括:

  • EventLoop
  • Channel
  • Poller
  • TcpConnection
  • TcpClient
  • TcpServer
  • Connector
  • Acceptor
  • EventLoopThread
  • EventLoopThreadPool

其中,Poller(及其实现类)包装了Poll/EPoll,封装了OS针对设备(fd)的操作;Channel是设备fd的包装,在muduo中主要包装socket;TcpConnection抽象一个TCP连接,无论是客户端还是服务器只要建立了网络连接就会使用TcpConnection;TcpClient/TcpServer分别抽象TCP客户端和服务器;Connector/Acceptor分别包装TCP客户端和服务器的建立连接/接受连接;EventLoop是一个主控类,是一个事件发生器,它驱动Poller产生/发现事件,然后将事件派发到Channel处理;EventLoopThread是一个带有EventLoop的线程;EventLoopThreadPool自然是一个EventLoopThread的资源池,维护一堆EventLoopThread。

阅读库源码时可以从库的接口层着手,看看关键功能是如何实现的。对于muduo而言,可以从TcpServer/TcpClient/EventLoop/TcpConnection这几个类着手。接下来看看主要功能的实现:

建立连接

    TcpClient::connect 
        -> Connector::start 
            -> EventLoop::runInLoop(Connector::startInLoop...
            -> Connector::connect             

EventLoop::runInLoop接口用于在this所在的线程运行某个函数,这个后面看下EventLoop的实现就可以了解。 网络连接的最终建立是在Connector::connect中实现,建立连接之后会创建一个Channel来代表这个socket,并且绑定事件监听接口。最后最重要的是,调用Channel::enableWritingChannel有一系列的enableXX接口,这些接口用于标识自己关心某IO事件。后面会看到他们的实现。

Connector监听的主要事件无非就是连接已建立,用它监听读数据/写数据事件也不符合设计。TcpConnection才是做这种事的。

客户端收发数据

当Connector发现连接真正建立好后,会回调到TcpClient::newConnection,在TcpClient构造函数中:

    connector_->setNewConnectionCallback(
      boost::bind(&TcpClient::newConnection, this, _1));

TcpClient::newConnection中创建一个TcpConnection来代表这个连接:

    TcpConnectionPtr conn(new TcpConnection(loop_,
                                            connName,
                                            sockfd,
                                            localAddr,
                                            peerAddr));

    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    ...
    conn->connectEstablished();

并同时设置事件回调,以上设置的回调都是应用层(即库的使用者)的接口。每一个TcpConnection都有一个Channel,毕竟每一个网络连接都对应了一个socket fd。在TcpConnection构造函数中创建了一个Channel,并设置事件回调函数。

TcpConnection::connectEstablished函数最主要的是通知Channel自己开始关心IO读取事件:

    void TcpConnection::connectEstablished()
    {
        ...
        channel_->enableReading();

这是自此我们看到的第二个Channel::enableXXX接口,这些接口是如何实现关心IO事件的呢?这个后面讲到。

muduo的数据发送都是通过TcpConnection::send完成,这个就是一般网络库中在不使用OS的异步IO情况下的实现:缓存应用层传递过来的数据,在IO设备可写的情况下尽量写入数据。这个主要实现在TcpConnection::sendInLoop中。

    TcpConnection::sendInLoop(....) {
        ...
        // if no thing in output queue, try writing directly
        if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)  // 设备可写且没有缓存时立即写入
        { 
            nwrote = sockets::write(channel_->fd(), data, len);
        }
        ...
        // 否则加入数据到缓存,等待IO可写时再写
        outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
        if (!channel_->isWriting())
        {
            // 注册关心IO写事件,Poller就会对写做检测
            channel_->enableWriting();
        }
        ...     
    }

当IO可写时,Channel就会回调TcpConnection::handleWrite(构造函数中注册)

    void TcpConnection::handleWrite()
    {
        ...
        if (channel_->isWriting())
        {
            ssize_t n = sockets::write(channel_->fd(),
                               outputBuffer_.peek(),
                               outputBuffer_.readableBytes());

服务器端的数据收发同客户端机制一致,不同的是连接(TcpConnection)的建立方式不同。

服务器接收连接

服务器接收连接的实现在一个网络库中比较重要。muduo中通过Acceptor类来接收连接。在TcpClient中,其Connector通过一个关心Channel可写的事件来通过连接已建立;在Acceptor中则是通过一个Channel可读的事件来表示有新的连接到来:

    Acceptor::Acceptor(....) {
        ...
        acceptChannel_.setReadCallback(
            boost::bind(&Acceptor::handleRead, this));
        ... 
    }

    void Acceptor::handleRead()
    {
        ...
        int connfd = acceptSocket_.accept(&peerAddr); // 接收连接获得一个新的socket
        if (connfd >= 0)
        {
            ...
            newConnectionCallback_(connfd, peerAddr); // 回调到TcpServer::newConnection

TcpServer::newConnection中建立一个TcpConnection,并将其附加到一个EventLoopThread中,简单来说就是给其配置一个线程:

    void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
    {
        ...
        EventLoop* ioLoop = threadPool_->getNextLoop();
        TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                                connName,
                                                sockfd,
                                                localAddr,
                                                peerAddr));
        connections_[connName] = conn;
        ...
        ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));

IO的驱动

之前提到,一旦要关心某IO事件了,就调用Channel::enableXXX,这个如何实现的呢?

    class Channel {
        ...
        void enableReading() { events_ |= kReadEvent; update(); }
        void enableWriting() { events_ |= kWriteEvent; update(); }
       
    void Channel::update()
    {
        loop_->updateChannel(this);
    }

    void EventLoop::updateChannel(Channel* channel)
    {
        ...
        poller_->updateChannel(channel);
    }

最终调用到Poller::upateChannel。muduo中有两个Poller的实现,分别是Poll和EPoll,可以选择简单的Poll来看:

    void PollPoller::updateChannel(Channel* channel)
    {
      ...
      if (channel->index() < 0)
      {
        // a new one, add to pollfds_
        assert(channels_.find(channel->fd()) == channels_.end());
        struct pollfd pfd;
        pfd.fd = channel->fd();
        pfd.events = static_cast<short>(channel->events()); // 也就是Channel::enableXXX操作的那个events_
        pfd.revents = 0;
        pollfds_.push_back(pfd); // 加入一个新的pollfd
        int idx = static_cast<int>(pollfds_.size())-1;
        channel->set_index(idx);
        channels_[pfd.fd] = channel;

可见Poller就是把Channel关心的IO事件转换为OS提供的IO模型数据结构上。通过查看关键的pollfds_的使用,可以发现其主要是在Poller::poll接口里。这个接口会在EventLoop的主循环中不断调用:

    void EventLoop::loop()
    {
      ...
      while (!quit_)
      {
        activeChannels_.clear();
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        ...
        for (ChannelList::iterator it = activeChannels_.begin();
            it != activeChannels_.end(); ++it)
        {
          currentActiveChannel_ = *it;
          currentActiveChannel_->handleEvent(pollReturnTime_); // 获得IO事件,通知各注册回调
        }

整个流程可总结为:各Channel内部会把自己关心的事件告诉给Poller,Poller由EventLoop驱动检测IO,然后返回哪些Channel发生了事件,EventLoop再驱动这些Channel调用各注册回调。

从这个过程中可以看出,EventLoop就是一个事件产生器。

线程模型

在muduo的服务器中,muduo的线程模型是怎样的呢?它如何通过线程来支撑高并发呢?其实很简单,它为每一个线程配置了一个EventLoop,这个线程同时被附加了若干个网络连接,这个EventLoop服务于这些网络连接,为这些连接收集并派发IO事件。

回到TcpServer::newConnection中:

    void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
    {
      ...
      EventLoop* ioLoop = threadPool_->getNextLoop();
      ...
      TcpConnectionPtr conn(new TcpConnection(ioLoop, // 使用这个选择到的线程中的EventLoop
                                              connName,
                                              sockfd,
                                              localAddr,
                                              peerAddr));
      ...
      ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));

注意TcpConnection::connectEstablished是如何通过Channel注册关心的IO事件到ioLoop的。

极端来说,muduo的每一个连接线程可以只为一个网络连接服务,这就有点类似于thread per connection模型了。

网络模型

传说中的Reactor模式,以及one loop per thread,基于EventLoop的作用,以及线程池与TcpConnection的关系,可以醍醐灌顶般理解以下这张muduo的网络模型图了:


总结

本文主要对muduo的主要结构及主要机制的实现做了描述,其他如Buffer的实现、定时器的实现大家都可以自行研究。muduo的源码很清晰,通过源码及配合陈硕博客上的内容可以学到一些网络编程方面的经验。

posted @ 2014-05-04 18:22 Kevin Lynx 阅读(1125) | 评论 (3)编辑 收藏

2013年8月15日 #

记一次堆栈平衡错误

最近在一个使用Visual Studio开发的C++程序中,出现了如下错误:

Run-Time Check Failure #0 - The value of ESP was not properly saved across a function call. This is usually a result of calling a function declared with one calling convention with a function pointer declared with a different calling convention.

这个错误主要指的就是函数调用堆栈不平衡。在C/C++程序中,调用一个函数前会保存当前堆栈信息,目标函数返回后会把堆栈恢复到调用前的状态。函数的参数、局部变量会影响堆栈。而函数堆栈不平衡,一般是因为函数调用方式和目标函数定义方式不一致导致,例如:

void __stdcall func(int a) {
}

int main(int argc, char* argv[]) {
    typedef void (*funcptr)(int);
    funcptr ptr = (funcptr) func;
    ptr(1); // 返回后导致堆栈不平衡
    return 0;
}

__stdcall修饰的函数,其函数参数的出栈由被调用者自己完成,而__cdecl,也就是C/C++函数的默认调用约定,则是调用者完成参数出栈。

Visual Studio在debug模式下会在我们的代码中加入不少检查代码,例如以上代码对应的汇编中,就会增加一个检查堆栈是否平衡的函数调用,当出现问题时,就会出现提示Run-Time Check Failure...这样的错误对话框:

call dword ptr [ptr]  ; ptr(1)
add  esp,4  ; cdecl方式,调用者清除参数
cmp  esi,esp  
call @ILT+1345(__RTC_CheckEsp) (0B01546h) ; 检查堆栈是否平衡

但是我们的程序不是这种低级错误。我们调用的函数是放在dll中的,调用约定显示定义为__stdcall,函数声明和实现一致。大致的结构如下:

IParser *parser = CreateParser();
parser->Begin();
...
...
parser->End();
parser->Release(); // 返回后导致堆栈不平衡

IParser的实现在一个dll里,这反而是一个误导人的信息。parser->Release返回后,堆栈不平衡,并且仅仅少了一个字节。一个字节怎么来的?

解决这个问题主要的手段就是跟反汇编,在关键位置查看寄存器和堆栈的内容。编译器生成的代码是正确的,而我们自己的代码乍看上去也没问题。最后甚至使用最傻逼的调试手段–逐行语句注释查错。

具体查错过程就不细说了。解决问题往往需要更多的冷静,和清晰的思路。最终我使用的方法是,在进入Release之前记录堆栈指针的值,堆栈指针的值会被压入堆栈,以在函数返回后从堆栈弹出,恢复堆栈指针。Release的实现很简单,就是删除一个Parser这个对象,但这个对象的析构会导致很多其他对象被析构。我就逐层地检查,是在哪个函数里改变了堆栈里的内容。

理论上,函数本身是操作不到调用者的堆栈的。而现在看来,确实是被调用函数,也就是Release改写了调用者的堆栈内容。要改变堆栈的内容,只有通过局部变量的地址才能做到。

最终,我发现在调用完以下函数后,我跟踪的堆栈地址内容发生了改变:

call llvm::RefCountedBase<clang::TargetOptions>::Release (10331117h)

因为注意到TargetOptions这个字眼,想起了在parser->Begin里有涉及到这个类的使用,类似于:

TargetOptions TO;
...
TargetInfo *TI = TargetInfo::CreateTargetInfo(m_inst.getDiagnostics(), TO);

这部分初始化代码,是直接从网上复制的,因为并不影响主要逻辑,所以从来没对这块代码深究。查看CreateTargetInfo的源码,发现这个函数将TO这个局部变量的地址保存了下来

而在Release中,则会对这个保存的临时变量进行删除操作,形如:

void Delete() const {
  assert (ref_cnt > 0 && "Reference count is already zero.");
  if (--ref_cnt == 0) delete static_cast<const Derived*>(this);
}

但是,问题并不在于对一个局部变量地址进行deletedelete在调试模式下是做了内存检测的,那会导致一种断言。

TargetOptions包含了ref_cnt这个成员。当出了Begin作用域后,parser保存的TargetOptions的地址,指向的内容(堆栈)发生了改变,也就是ref_cnt这个成员变量的值不再正常。由于一些巧合,主要是代码中各个局部变量、函数调用顺序、函数参数个数(曾尝试去除Begin的参数,可以避免错误提示),导致在调用Release前堆栈指针恰好等于之前保存的TargetOptions的地址。注意,之前保存的TargetOptions的地址,和调用Release前的堆栈指针值相同了。

而在TargetOptionsDelete函数中,进行了--ref_cnt,这个变量是TargetOptions的第一个成员,它的减1,也就导致了堆栈内容的改变。

至此,整个来龙去脉算是摸清。

posted @ 2013-08-15 23:01 Kevin Lynx 阅读(1869) | 评论 (1)编辑 收藏

2013年8月8日 #

Dhtcrawler2换用sphinx搜索

dhtcrawler2最开始使用mongodb自带的全文搜索引擎搜索资源。搜索一些短关键字时很容易导致erlang进程call timeout,也就是查询时间太长。对于像avi这种关键字,搜索时间长达十几秒。搜索的资源数量200万左右。这其中大部分资源只是对root文件名进行了索引,即对于多文件资源而言没有索引单个文件名。索引方式有部分资源是按照字符串子串的形式,没有拆词,非常占用存储空间;有部分是使用了rmmseg(我编译了rmmseg-cpp作为erlang nif库调用 erl-rmmseg)进行了拆词,占用空间小了很多,但由于词库问题很多片里的词汇没拆出来。

很早以前我以为搜索耗时的原因是因为数据库太忙,想部署个mongodb集群出来。后来发现数据库没有任何读写的状态下,查询依然慢。终于只好放弃mongodb自带的文本搜索。于是我改用sphinx。简单起见,我直接下载了coreseek4.1(sphinx的一个支持中文拆词的包装)。

现在,已经导入了200多万的资源进sphinx,并且索引了所有文件名,索引文件达800M。对于avi关键字的搜索大概消耗0.2秒的时间。搜索试试

以下记录下sphinx在dhtcrawler的应用

sphinx简介

sphinx包含两个主要的程序:indexer和searchd。indexer用于建立文本内容的索引,然后searchd基于这些索引提供文本搜索功能,而要使用该功能,可以遵循searchd的网络协议连接searchd这个服务来使用。

indexer可以通过多种方式来获取这些文本内容,文本内容的来源称为数据源。sphinx内置mysql这种数据源,意思是可以直接从mysql数据库中取得数据。sphinx还支持xmlpipe2这种数据源,其数据以xml格式提供给indexer。要导入mongodb数据库里的内容,可以选择使用xmlpipe2这种方式。

sphinx document

xmlpipe2数据源需要按照以下格式提交:

<sphinx:docset>
    <sphinx:schema>
        <sphinx:field name="subject"/>
        <sphinx:field name="files"/>
        <sphinx:attr name="hash1" type="int" bits="32"/>
        <sphinx:attr name="hash2" type="int" bits="32"/>
    </sphinx:schema>
    <sphinx:document id="1">
        <subject>this is the subject</subject>
        <files>file content</files>
        <hash1>111</hash1>
    </sphinx:document>
</sphinx:docset>

该文件包含两大部分:schemadocuments,其中schema又包含两部分:fieldattr,其中由field标识的字段就会被indexer读取并全部作为输入文本建立索引,而attr则标识查询结果需要附带的信息;documents则是由一个个sphinx:document组成,即indexer真正要处理的数据。注意其中被schema引用的属性名。

document一个很重要的属性就是它的id。这个id对应于sphinx需要唯一,查询结果也会包含此id。一般情况下,此id可以直接是数据库主键,可用于查询到详细信息。searchd搜索关键字,其实可以看作为搜索这些document,搜索出来的结果也是这些document,搜索结果中主要包含schema中指定的attr。

增量索引

数据源的数据一般是变化的,新增的数据要加入到sphinx索引文件中,才能使得searchd搜索到新录入的数据。要不断地加入新数据,可以使用增量索引机制。增量索引机制中,需要一个主索引和一个次索引(delta index)。每次新增的数据都建立为次索引,然后一段时间后再合并进主索引。这个过程主要还是使用indexer和searchd程序。实际上,searchd是一个需要一直运行的服务,而indexer则是一个建立完索引就退出的工具程序。所以,这里的增量索引机制,其中涉及到的“每隔一定时间就合并”这种工作,需要自己写程序来协调(或通过其他工具)

sphinx与mongodb

上面提到,一般sphinx document的id都是使用的数据库主键,以方便查询。但mongodb中默认情况不使用数字作为主键。dhtcrawler的资源数据库使用的是资源info-hash作为主键,这无法作为sphinx document的id。一种解决办法是,将该hash按位拆分,拆分成若干个sphinx document attr支持位数的整数。例如,info-hash是一个160位的id,如果使用32位的attr(高版本的sphinx支持64位的整数),那么可以把该info-hash按位拆分成5个attr。而sphinx document id则可以使用任意数字,只要保证不冲突就行。当获得查询结果时,取得对应的attr,组合为info-hash即可。

mongodb默认的Object id也可以按这种方式拆分。

dhtcrawler2与sphinx

dhtcrawler2中我自己写了一个导入程序。该程序从mongodb中读出数据,数据到一定量时,就输出为xmlpipe2格式的xml文件,然后建立为次索引,最后合并进主索引。过程很简单,包含两次启动外部进程的工作,这个可以通过erlang中os:cmd完成。

值得注意的是,在从mongodb中读数据时,使用skip基本是不靠谱的,skip 100万个数据需要好几分钟,为了不增加额外的索引字段,我只好在created_at字段上加索引,然后按时间段来读取资源,这一切都是为了支持程序关闭重启后,可以继续上次工作,而不是重头再来。200万的数据,已经处理了好几天了。

后头数据建立好了,需要在前台展示出来。erlang中似乎只有一个sphinx客户端库:giza。这个库有点老,写成的时候貌似还在使用sphinx0.9版本。其中查询代码包含了版本判定,已经无法在我使用的sphinx2.x版本中使用。无奈之下我只好修改了这个库的源码,幸运的是查询功能居然是正常的,意味着sphinx若干个版本了也没改动通信协议?后来,我为了取得查询的统计信息,例如消耗时间以及总结果,我再一次修改了giza的源码。新的版本可以在我的github上找到:my giza,看起来我没侵犯版本协议吧?

目前dhtcrawler的搜索,先是基于sphinx搜索出hash列表,然后再去mongodb中搜索hash对应的资源。事实上,可以为sphinx的document直接附加这些资源的描述信息,就可以避免去数据库查询。但我想,这样会增加sphinx索引文件的大小,担心会影响搜索速度。实际测试时,发现数据库查询有时候还真的很消耗时间,尽管我做了分页,以使得单页仅对数据库进行少量查询。

xml unicode

在导入xml到sphinx的索引过程中,本身我输出的内容都是unicode的,但有很多资源会导致indexer解析xml出错。出错后indexer直接停止对当前xml的处理。后来查阅资料发现是因为这些无法被indexer处理的xml内容包含unicode里的控制字符,例如 ä (U+00E4)。我的解决办法是直接过滤掉这些控制字符。unicode的控制字符参看UTF-8 encoding table and Unicode characters。在erlang中干这个事居然不复杂:

strip_invalid_unicode(<<>>) ->
    <<>>;
strip_invalid_unicode(<<C/utf8, R/binary>>) ->
    case is_valid_unicode(C) of
        true ->
            RR = strip_invalid_unicode(R),
            <<C/utf8, RR/binary>>;
        false ->
            strip_invalid_unicode(R)
    end;
strip_invalid_unicode(<<_, R/binary>>) ->
    strip_invalid_unicode(R).
    
is_valid_unicode(C) when C < 16#20 ->
    false;
is_valid_unicode(C) when C >= 16#7f, C =< 16#ff ->
    false;
is_valid_unicode(_) ->
    true.

posted @ 2013-08-08 23:04 Kevin Lynx 阅读(1672) | 评论 (0)编辑 收藏

2013年7月20日 #

磁力搜索第二版-dhtcrawler2

上篇

下载使用

目前为止dhtcrawler2相对dhtcrawler而言,数据库部分调整很大,DHT部分基本沿用之前。但单纯作为一个爬资源的程序而言,DHT部分可以进行大幅削减,这个以后再说。这个版本更快、更稳定。为了方便,我将编译好的erlang二进制文件作为git的主分支,我还添加了一些Windows下的批处理脚本,总之基本上下载源码以后即可运行。

项目地址:https://github.com/kevinlynx/dhtcrawler2

使用方法

  • 下载erlang,我测试的是R16B版本,确保erl等程序被加入Path环境变量
  • 下载mongodb,解压即用:

      mongod --dbpath xxx --setParameter textSearchEnabled=true
    
  • 下载dhtcrawler2

      git clone https://github.com/kevinlynx/dhtcrawler2.git
    
  • 运行win_start_crawler.bat

  • 运行win_start_hash.bat
  • 运行win_start_http.bat
  • 打开localhost:8000查看stats

爬虫每次运行都会保存DHT节点状态,早期运行的时候收集速度会不够。dhtcrawler2将程序分为3部分:

  • crawler,即DHT爬虫部分,仅负责收集hash
  • hash,准确来讲叫hash reader,处理爬虫收集的hash,处理过程主要涉及到下载种子文件
  • http,使用hash处理出来的数据库,以作为Web端接口

我没有服务器,但程序有被部署在别人的服务器上:bt.cmhttp://222.175.114.126:8000/

其他工具

为了提高资源索引速度,我陆续写了一些工具,包括:

  • import_tors,用于导入本地种子文件到数据库
  • tor_cache,用于下载种子到本地,仅仅提供下载的功能,hash_reader在需要种子文件时,可以先从本地取
  • cache_indexer,目前hash_reader取种子都是从torrage.com之类的种子缓存站点取,这些站点提供了种子列表,cache_indexer将这些列表导入数据库,hash_reader在请求种子文件前可以通过该数据库检查torrage.com上有无此种子,从而减少多余的http请求

这些工具的代码都被放在dhtcrawler2中,可以查看对应的启动脚本来查看具体如何启动。

OS/Database

根据实际的测试效果来看,当收集的资源量过百万时(目前bt.cm录入近160万资源),4G内存的Windows平台,mongodb很容易就会挂掉。挂掉的原因全是1455,页面文件太小。有人建议不要在Windows下使用mongodb,Linux下我自己没做过测试。

mongodb可以部署为集群形式(replica-set),当初我想把http部分的查询放在一个只读的mongodb实例上,但因为建立集群时,要同步已有的10G数据库,而每次同步都以mongodb挂掉结束,遂放弃。在目前bt.cm的配置中,数据库torrent的锁比例(db lock)很容易上50%,这也让http在搜索时,经常出现搜索超时的情况。

技术信息

dhtcrawler最早的版本有很多问题,修复过的最大的一个问题是关于erlang定时器的,在DHT实现中,需要对每个节点每个peer做超时处理,在erlang中的做法直接是针对每个节点注册了一个定时器。这不是问题,问题在于定时器资源就像没有GC的内存资源一样,是会由于程序员的代码问题而出现资源泄漏。所以,dhtcrawler第一个版本在节点数配置在100以上的情况下,用不了多久就会内存耗尽,最终导致erlang虚拟机core dump。

除了这个问题以外,dhtcrawler的资源收录速度也不是很快。这当然跟数据库和获取种子的速度有直接关系。尤其是获取种子,使用的是一些提供info-hash到种子映射的网站,通过HTTP请求来下载种子文件。我以为通过BT协议直接下载种子会快些,并且实时性也要高很多,因为这个种子可能未被这些缓存网站收录,但却可以直接向对方请求得到。为此,我还特地翻阅了相关协议,并且用erlang实现了(以后的文章我会讲到具体实现这个协议)。

后来我怀疑get_peers的数量会不会比announce_peer多,但是理论上一般的客户端在get_peers之后都是announce_peer,但是如果get_peers查询的peers恰好不在线呢?这意味着很多资源虽然已经存在,只不过你恰好暂时请求不到。实际测试时,发现get_peers基本是announce_peer数量的10倍。

将hash的获取方式做了调整后,dhtcrawler在几分钟以内以几乎每秒上百个新增种子的速度工作。然后,程序挂掉。

从dhtcrawler到今天为止的dhtcrawler2,中间间隔了刚好1个月。我的所有业余时间全部扑在这个项目上,面临的问题一直都是程序的内存泄漏、资源收录的速度不够快,到后来又变为数据库压力过大。每一天我都以为我将会完成一个稳定版本,然后终于可以去干点别的事情,但总是干不完,目前完没完都还在观察。我始终明白在做优化前需要进行详尽的数据收集和分析,从而真正地优化到正确的点上,但也总是凭直觉和少量数据分析就开始尝试。

这里谈谈遇到的一些问题。

erlang call timeout

最开始遇到erlang中gen_server:call出现timeout错误时,我还一直以为是进程死锁了。相关代码读来读去,实在觉得不可能发生死锁。后来发现,当erlang虚拟机压力上去后,例如内存太大,但没大到耗尽系统所有内存(耗进所有内存基本就core dump了),进程间的调用就会出现timeout。

当然,内存占用过大可能只是表象。其进程过多,进程消息队列太长,也许才是导致出现timeout的根本原因。消息队列过长,也可能是由于发生了消息泄漏的缘故。消息泄漏我指的是这样一种情况,进程自己给自己发消息(当然是cast或info),这个消息被处理时又会发送相同的消息,正常情况下,gen_server处理了一个该消息,就会从消息队列里移除它,然后再发送相同的消息,这不会出问题。但是当程序逻辑出问题,每次处理该消息时,都会发生多余一个的同类消息,那消息队列自然就会一直增长。

保持进程逻辑简单,以避免这种逻辑错误。

erlang gb_trees

我在不少的地方使用了gb_trees,dht_crawler里就可能出现gb_trees:get(xxx, nil)这种错误。乍一看,我以为我真的传入了一个nil值进去。然后我苦看代码,以为在某个地方我会把这个gb_trees对象改成了nil。但事情不是这样的,gb_tress使用一个tuple作为tree的节点,当某个节点没有子节点时,就会以nil表示。

gb_trees:get(xxx, nil)类似的错误,实际指的是xxx没有在这个gb_trees中找到。

erlang httpc

dht_crawler通过http协议从torrage.com之类的缓存网站下载种子。最开始我为了尽量少依赖第三方库,使用的是erlang自带的httpc。后来发现程序有内存泄漏,google发现erlang自带的httpc早为人诟病,当然也有大神说在某个版本之后这个httpc已经很不错。为了省事,我直接换了ibrowse,替换之后正常很多。但是由于没有具体分析测试过,加之时间有点远了,我也记不太清细节。因为早期的http请求部分,没有做数量限制,也可能是由于我的使用导致的问题。

某个版本后,我才将http部分严格地与hash处理部分区分开来。相较数据库操作而言,http请求部分慢了若干数量级。在hash_reader中将这两块分开,严格限制了提交给httpc的请求数,以获得稳定性。

对于一个复杂的网络系统而言,分清哪些是耗时的哪些是不大耗时的,才可能获得性能的提升。对于hash_reader而言,处理一个hash的速度,虽然很大程度取决于数据库,但相较http请求,已经快很多。它在处理这些hash时,会将数据库已收录的资源和待下载的资源分离开,以尽快的速度处理已存在的,而将待下载的处理速度交给httpc的响应速度。

erlang httpc ssl

ibrowse处理https请求时,默认和erlang自带的httpc使用相同的SSL实现。这经常导致出现tls_connection进程挂掉的错误,具体原因不明。

erlang调试

首先合理的日志是任何系统调试的必备。

我面临的大部分问题都是内存泄漏相关,所以依赖的erlang工具也是和内存相关的:

  • 使用etop,可以检查内存占用多的进程、消息队列大的进程、CPU消耗多的进程等等:

      spawn(fun() -> etop:start([{output, text}, {interval, 10}, {lines, 20}, {sort, msg_q }]) end).
    
  • 使用erlang:system_info(allocated_areas).检查内存使用情况,其中会输出系统timer数量

  • 使用erlang:process_info查看某个具体的进程,这个甚至会输出消息队列里的消息

hash_writer/crawler

crawler本身仅收集hash,然后写入数据库,所以可以称crawler为hash_writer。这些hash里存在大量的重复。hash_reader从数据库里取出这些hash然后做处理。处理过程会首先判定该hash对应的资源是否被收录,没有收录就先通过http获取种子。

在某个版本之后,crawler会简单地预先处理这些hash。它缓存一定数量的hash,接收到新hash时,就合并到hash缓存里,以保证缓存里没有重复的hash。这个重复率经过实际数据分析,大概是50%左右,即收到的100个请求里,有50个是重复的。这样的优化,不仅会降低hash数据库的压力,hash_reader处理的hash数量少了,也会对torrent数据库有很大提升。

当然进一步的方案可以将crawler和hash_reader之间交互的这些hash直接放在内存中处理,省去中间数据库。但是由于mongodb大量使用虚拟内存的缘故(内存映射文件),经常导致服务器内存不够(4G),内存也就成了珍稀资源。当然这个方案还有个弊端是难以权衡hash缓存的管理。crawler收到hash是一个不稳定的过程,在某些时间点这些hash可能爆多,而hash_reader处理hash的速度也会不太稳定,受限于收到的hash类别(是新增资源还是已存在资源)、种子请求速度、是否有效等。

当然,也可以限制缓存大小,以及对hash_reader/crawler处理速度建立关系来解决这些问题。但另一方面,这里的优化是否对目前的系统有提升,是否是目前系统面临的最大问题,却是需要考究的事情。

cache indexer

dht_crawler是从torrage.com等网站获取种子文件,这些网站看起来都是使用了相同的接口,其都有一个sync目录,里面存放了每天每个月索引的种子hash,例如 http://torrage.com/sync/。这个网站上是否有某个hash对应的种子,就可以从这些索引中检查。

hash_reader在处理新资源时,请求种子的过程中发现大部分在这些服务器上都没有找到,也就是发起的很多http请求都是404回应,这不但降低了系统的处理能力、带宽,也降低了索引速度。所以我写了一个工具,先手工将sync目录下的所有文件下载到本地,然后通过这个工具 (cache indexer) 将这些索引文件里的hash全部导入数据库。在以后的运行过程中,该工具仅下载当天的索引文件,以更新数据库。 hash_reader 根据配置,会首先检查某个hash是否存在该数据库中,存在的hash才可能在torrage.com上下载得到。

种子缓存

hash_reader可以通过配置,将下载得到的种子保存在本地文件系统或数据库中。这可以建立自己的种子缓存,但保存在数据库中会对数据库造成压力,尤其在当前测试服务器硬件环境下;而保存为本地文件,又特别占用硬盘空间。

基于BT协议的种子下载

通过http从种子缓存里取种子文件,可能会没有直接从P2P网络里取更实时。目前还没来得及查看这些种子缓存网站的实现原理。但是通过BT协议获取种子会有点麻烦,因为dht_crawler是根据get_peer请求索引资源的,所以如果要通过BT协议取种子,那么这里还得去DHT网络里查询该种子,这个查询过程可能会较长,相比之下会没有http下载快。而如果通过announce_peer来索引新资源的话,其索引速度会大大降低,因为announce_peer请求比get_peer请求少很多,几乎10倍。

所以,这里的方案可能会结合两者,新开一个服务,建立自己的种子缓存。

中文分词

mongodb的全文索引是不支持中文的。我在之前提到,为了支持搜索中文,我将字符串拆成了若干子串。这样的后果就是字符串索引会稍稍偏大,而且目前这一块的代码还特别简单,会将很多非文字字符也算在内。后来我加了个中文分词库,使用的是rmmseg-cpp。我将其C++部分抽离出来编译成erlang nif,这可以在我的github上找到。

但是这个库拆分中文句子依赖于词库,而这个词库不太新,dhtcrawler爬到的大部分资源类型你们也懂,那些词汇拆出来的比率不太高,这会导致搜索出来的结果没你想的那么直白。当然更新词库应该是可以解决这个问题的,目前还没有时间顾这一块。

总结

一个老外对我说过,”i have 2 children to feed, so i will not do this only for fun”。

你的大部分编程知识来源于网络,所以稍稍回馈一下不会让你丢了饭碗。

我很穷,如果你能让我收获金钱和编程成就,还不会嫌我穿得太邋遢,that’s really kind of you。

posted @ 2013-07-20 16:37 Kevin Lynx 阅读(2744) | 评论 (1)编辑 收藏

2013年6月20日 #

使用erlang实现P2P磁力搜索-实现

上篇,本篇谈谈一些实现细节。

这个爬虫程序主要的问题在于如何获取P2P网络中分享的资源,获取到资源后索引到数据库中,搜索就是自然而然的事情。

DHT

DHT网络本质上是一个用于查询的网络,其用于查询一个资源有哪些计算机正在下载。每个资源都有一个20字节长度的ID用于标示,称为infohash。当一个程序作为DHT节点加入这个网络时,就会有其他节点来向你查询,当你做出回应后,对方就会记录下你。对方还会询问其他节点,当对方开始下载这个infohash对应的资源时,他就会告诉所有曾经询问过的节点,包括你。这个时候就可以确定,这个infohash对应的资源在这个网络中是有效的。

关于这个网络的工作原理,参看:P2P中DHT网络爬虫以及写了个磁力搜索的网页

获取到infohash后能做什么?关键点在于,我们现在使用的磁力链接(magnet url),是和infohash对应起来的。也就是拿到infohash,就等于拿到一个磁力链接。但是这个爬虫还需要建立资源的信息,这些信息来源于种子文件。种子文件其实也是对应到一个资源,种子文件包含资源名、描述、文件列表、文件大小等信息。获取到infohash时,其实也获取到了对应的计算机地址,我们可以在这些计算机上下载到对应的种子文件。

但是我为了简单,在获取到infohash后,从一些提供映射磁力链到种子文件服务的网站上直接下载了对应的种子。dhtcrawler里使用了以下网站:

http://torrage.com
https://zoink.it
http://bt.box.n0808.com

使用这些网站时,需提供磁力哈希(infohash可直接转换),构建特定的URL,发出HTTP请求即可。

   U1 = "http://torrage.com/torrent/" ++ MagHash ++ ".torrent",
    U2 = "https://zoink.it/torrent/" ++ MagHash ++ ".torrent",
    U3 = format_btbox_url(MagHash),

format_btbox_url(MagHash) ->
    H = lists:sublist(MagHash, 2),
    T = lists:nthtail(38, MagHash),
    "http://bt.box.n0808.com/" ++ H ++ "/" ++ T ++ "/" ++ MagHash ++ ".torrent".

但是,以一个节点的身份加入DHT网络,是无法获取大量查询的。在DHT网络中,每个节点都有一个ID。每个节点在查询信息时,仅询问离信息较近的节点。这里的信息除了infohash外还包含节点,即节点询问一个节点,这个节点在哪里。DHT的典型实现中(Kademlia),使用两个ID的xor操作来确定距离。既然距离的计算是基于ID的,为了尽可能获取整个DHT网络交换的信息,爬虫程序就可以建立尽可能多的DHT节点,让这些节点的ID均匀地分布在ID取值区间内,以这样的方式加入网络。

在dhtcrawler中,我使用以下方式产生了N个大致均匀分布的ID:

create_discrete_ids(1) ->
    [dht_id:random()];
create_discrete_ids(Count) ->
    Max = dht_id:max(),
    Piece = Max div Count,
    [random:uniform(Piece) + Index * Piece || Index <- lists:seq(0, Count - 1)].

除了尽可能多地往DHT网络里部署节点之外,对单个节点而言,也有些注意事项。例如应尽可能快地将自己告诉尽可能多的节点,这可以在启动时进行大量的随机infohash的查询。随着查询过程的深入,该节点会与更多的节点打交道。因为DHT网络里的节点实际上是不稳定的,它今天在线,明天后天可能不在线,所以计算你的ID固定,哪些节点与你较近,本身就是个相对概念。节点在程序退出时,也最好将自己的路由信息(与自己交互的节点列表)保存起来,这样下次启动时就可以更快地加入网络。

在dhtcrawler的实现中,每个节点每个一定时间,都会向网络中随机查询一个infohash,这个infohash是随机产生的。其查询目的不在于infohash,而在于告诉更多的节点,以及在其他节点上保持自己的活跃。

handle_event(startup, {MyID}) ->
    timer:apply_interval(?QUERY_INTERVAL, ?MODULE, start_tell_more_nodes, [MyID]).

start_tell_more_nodes(MyID) ->
    spawn(?MODULE, tell_more_nodes, [MyID]).

tell_more_nodes(MyID) ->
    [search:get_peers(MyID, dht_id:random()) || _ <- lists:seq(1, 3)].

DHT节点的完整实现是比较繁琐的,涉及到查询以及繁杂的各种对象的超时(节点、桶、infohash),而超时的处理并不是粗暴地做删除操作。因为本身是基于UDP协议,你得对这些超时对象做进一步的查询才能正确地进一步做其他事情。而搜索也是个繁杂的事情,递归地查询节点,感觉上,你不一定离目标越来越近,由于被查询节点的不确定性(无法确定对方是否在玩弄你,或者本身对方就是个傻逼),你很可能接下来要查询的节点反而离目标变远了。

在我第一次的DHT实现中,我使用了类似transmission里DHT实现的方法,不断无脑递归,当搜索有太久时间没得到响应后终止搜索。第二次实现时,我就使用了etorrent里的实现。这个搜索更聪明,它记录搜索过的节点,并且检查是否离目标越来越远。当远离目标时,就认为搜索是不太有效的,不太有效的搜索尝试几次就可以放弃。

实际上,爬虫的实现并不需要完整地实现DHT节点的正常功能。爬虫作为一个DHT节点的唯一动机仅是获取网络里其他节点的查询。而要完成这个功能,你只需要装得像个正常人就行。这里不需要保存infohash对应的peer列表,面临每一次查询,你随便回复几个节点地址就可以。但是这里有个责任问题,如果整个DHT网络有2000个节点,而你这个爬虫就有1000个节点,那么你的随意回复,就可能导致对方根本找不到正确的信息,这样你依然得不到有效的资源。(可以利用这一点破坏DHT网络)

DHT的实现没有使用第三方库。

种子

种子文件的格式同DHT网络消息格式一样,使用一种称为bencode的文本格式来编码。种子文件分为两类:单个文件和多个文件。

文件的信息无非就是文件名、大小。文件名可能包含utf8编码的名字,为了后面处理的方便,dhtcrawler都会优先使用utf8编码。

   {ok, {dict, Info}} = dict:find(<<"info">>, TD),
    case type(Info) of
        single -> {single, parse_single(Info)};
        multi -> {multi, parse_multi(Info)}
    end.
parse_single(Info) ->
    Name = read_string("name", Info),
    {ok, Length} = dict:find(<<"length">>, Info),
    {Name, Length}.

parse_multi(Info) ->
    Root = read_string("name", Info),
    {ok, {list, Files}} = dict:find(<<"files">>, Info),
    FileInfo = [parse_file_item(Item) || {dict, Item} <- Files],
    {Root, FileInfo}.

数据库

我最开始在选用数据库时,为了不使用第三方库,打算使用erlang自带的mnesia。但是因为涉及到字符串匹配搜索,mnesia的查询语句在我看来太不友好,在经过一些资料查阅后就直接放弃了。

然后我打算使用couchdb,因为它是erlang写的,而我正在用erlang写程序。第一次接触非关系型数据库,发现NoSQL数据库使用起来比SQL类的简单多了。但是在erlang里要使用couchdb实在太折腾了。我使用的客户端库是couchbeam。

因为couchdb暴露的API都是基于HTTP协议的,其数据格式使用了json,所以couchbeam实际上就是对各种HTTP请求、回应和json的包装。但是它竟然使用了ibrowse这个第三方HTTP客户端库,而不是erlang自带的。ibrowse又使用了jiffy这个解析json的库。这个库更惨烈的是它的解析工作都是交给C语言写的动态库来完成,我还得编译那个C库。

couchdb看起来不支持字符串查询,我得自己创建一个view,这个view里我通过翻阅了一些资料写了一个将每个doc的name拆分成若干次查询结果的map。这个map在处理每一次查询时,我都得动态更新之。couchdb是不支持局部更新的,这还不算大问题。然后很高兴,终于支持字符串查询了。这里的字符串查询都是基于字符串的子串查询。但是问题在于,太慢了。每一次在WEB端的查询,都直接导致erlang进程的call超时。

要让couchdb支持字符串查询,要快速,当然是有解决方案的。但是这个时候我已经没有心思继续折腾,任何一个库、程序如果接口设计得如此不方便,那就可以考虑换一个其他的。

我选择了mongodb。同样的基于文档的数据库。2.4版本还支持全文搜索。什么是全文搜索呢,这是一种基于单词的全文搜索方式。hello world我可以搜索hello,基于单词。mongodb会自动拆词。更关键更让人爽的是,要开启这个功能非常简单:设置启动参数、建立索引。没了。mongodb的erlang客户端库mongodb-erlang也只依赖一个bson-erlang库。然后我又埋头苦干,几个小时候我的这个爬虫程序就可以在浏览器端搜索关键字了。

后来我发现,mongodb的全文搜索是不支持中文的。因为它还不知道中文该怎么拆词。恰好我有个同事做过中文拆词的研究,看起来涉及到很复杂的算法。直到这个时候,我他妈才醒悟,我为什么需要基于单词的搜索。我们大部分的搜索其实都是基于子字符串的搜索。

于是,我将种子文件的名字拆分成了若干个子字符串,将这些子字符串以数组的形式作为种子文档的一个键值存储,而我依然还可以使用全文索引,因为全文索引会将整个字符串作为单词比较。实际上,基于一般的查询方式也是可以的。当然,索引还是得建立。

使用mongodb时唯一让我很不爽的是mongodb-erlang这个客户端库的文档太欠缺。这还不算大问题,因为看看源码参数还是可以大概猜到用法。真正悲剧的是mongodb的有些查询功能它是不支持的。例如通过cursor来排序来限制数量。在cursor模块并没有对应的mongodb接口。最终我只好通过以下方式查询,我不明白batchsize,但它可以工作:

search_announce_top(Conn, Count) ->
    Sel = {'$query', {}, '$orderby', {announce, -1}},
    List = mongo_do(Conn, fun() ->
        Cursor = mongo:find(?COLLNAME, Sel, [], 0, Count), 
        mongo_cursor:rest(Cursor)
    end),
    [decode_torrent_item(Item) || Item <- List].

另一个悲剧的是,mongodb-erlang还不支持文档的局部更新,它的update接口直接要求传入整个文档。几经折腾,我可以通过runCommand来完成:

inc_announce(Conn, Hash) when is_list(Hash) ->
    Cmd = {findAndModify, ?COLLNAME, query, {'_id', list_to_binary(Hash)}, 
        update, {'$inc', {announce, 1}},
        new, true},
    Ret = mongo_do(Conn, fun() ->
        mongo:command(Cmd)
    end).

Unicode

不知道在哪里我看到过erlang说自己其实是不需要支持unicode的,因为这门语言本身是通过list来模拟字符串。对于unicode而言,对应的list保存的本身就是整数值。但是为了方便处理,erlang还是提供了一些unicode操作的接口。

因为我需要将种子的名字按字拆分,对于a中文这样的字符串而言,我需要拆分成以下结果:

a
a中
a中文
中
中文
文

那么,在erlang中当我获取到一个字符串list时,我就需要知道哪几个整数合起来实际上对应着一个汉字。erlang里unicode模块里有几个函数可以将unicode字符串list对应的整数合起来,例如:[111, 222, 333]可能表示的是一个汉字,将其转换以下可得到[111222333]这样的形式。

split(Str) when is_list(Str) ->
    B = list_to_binary(Str), % 必须转换为binary
    case unicode:characters_to_list(B) of
        {error, L, D} ->
            {error, L, D};
        {incomplete, L, D} ->
            {incomplete, L, D};
        UL ->
        {ok, subsplit(UL)}
    end.

subsplit([]) ->
    [];

subsplit(L) ->
    [_|R] = L,
    {PreL, _} = lists:splitwith(fun(Ch) -> not is_spliter(Ch) end, L),
    [unicode:characters_to_binary(lists:sublist(PreL, Len)) 
        || Len <- lists:seq(1, length(PreL))] ++ subsplit(R).

除了这里的拆字之外,URL的编码、数据库的存储都还好,没遇到问题。

注意,以上针对数据库本身的吐槽,完全基于我不熟悉该数据库的情况下,不建议作为你工具选择的参考。

erlang的稳定性

都说可以用erlang来编写高容错的服务器程序。看看它的supervisor,监视子进程,自动重启子进程。天生的容错功能,就算你宕个几次,单个进程自动重启,整个程序看起来还稳健地在运行,多牛逼啊。再看看erlang的进程,轻量级的语言特性,就像OOP语言里的一个对象一样轻量。如果说使用OOP语言写程序得think in object,那用erlang你就得think in process,多牛逼多骇人啊。

实际上,以我的经验来看,你还得以传统的思维去看待erlang的进程。一些多线程程序里的问题,在erlang的进程环境中依然存在,例如死锁。

在erlang中,对于一些异步操作,你可以通过进程间的交互将这个操作包装成同步接口,例如ping的实现,可以等到对方回应之后再返回。被阻塞的进程反正很轻量,其包含的逻辑很单一。这不但是一种良好的包装,甚至可以说是一种erlang-style。但这很容易带来死锁。在最开始的时候我没有注意这个问题,当爬虫节点数上升的时候,网络数据复杂的时候,似乎就出现了死锁型宕机(进程互相等待太久,直接timeout)。

另一个容易在多进程环境下出现的问题就是消息依赖的上下文改变问题。当投递一个消息到某个进程,到这个消息被处理之前,这段时间这个消息关联的逻辑运算所依赖的上下文环境改变了,例如某个ets元素不见了,在处理这个消息时,你还得以多线程编程的思维来编写代码。

至于supervisor,这玩意你得端正态度。它不是用来包容你的傻逼错误的。当你写下傻逼代码导致进程频繁崩溃的时候,supervisor屁用没有。supervisor的唯一作用,仅仅是在一个确实本身可靠的系统,确实人品问题万分之一崩溃了,重启它。毕竟,一个重启频率的推荐值,是一个小时4次。

posted @ 2013-06-20 20:40 Kevin Lynx 阅读(2624) | 评论 (1)编辑 收藏

使用erlang实现P2P磁力搜索(开源)

接上回对DHT网络的研究,我用erlang克隆了一个磁力搜索引擎。我这个实现包含了完整的功能,DHT网络的加入、infohash的接收、种子的获取、资源信息的索引、搜索。

如下图:

screenshot

在我的笔记本上,我开启了100个DHT节点,大致均匀地分布在DHT网络里,资源索引速度大概在1小时一万个左右(包含重复资源)。

这个程序包含三大部分:

这两个项目总共包含大概2500行的erlang代码。其中,DHT实现部分将DHT网络的加入包装成一个库,爬虫部分在搜索种子时,暂时没有使用P2P里的种子下载方式,而是使用现成的磁力链转种子的网站服务,这样我只需要使用erlang自带的HTTP客户端就可以获取种子信息。爬虫在获取到种子信息后,将数据存储到mongodb里。WEB端我为了尽量少用第三方库,我只好使用erlang自带的HTTP服务器,因此网页内容的创建没有模板系统可用,只好通过字符串构建,编写起来不太方便。

使用

整个程序依赖了两个库:bson-erlang和mongodb-erlang,但下载依赖库的事都可以通过rebar解决,项目文件里我已经包含了rebar的执行程序。我仅在Windows7上测试过,但理论上在所有erlang支持的系统上都可以。

  • 下载安装mongodb
  • 进入mongodb bin目录启动mongodb,数据库目录保存在db下,需手动建立该目录

      mongod --dbpath db --setParameter textSearchEnabled=true
    
  • 下载erlang,我使用的是R16B版本

  • 下载dhtcrawler,不需要单独下载kdht,待会下载依赖项的时候会自动下载

      git clone git@github.com:kevinlynx/dhtcrawler.git
    
  • cmd进入dhtcrawler目录,下载依赖项前需保证环境变量里有git,例如D:\Program Files (x86)\Git\cmd,需注意不要将bash的目录加入进来,使用以下命令下载依赖项

      rebar get-deps
    
  • 编译

      rebar compile
    
  • 在dhtcrawler目录下,启动erlang

      erl -pa ebin
    
  • 在erlang shell里运行爬虫,erlang语句以点号(.)作为结束

      crawler_app:start().
    
  • erlang shell里运行HTTP服务器

      crawler_http:start().
    
  • 浏览器里输入localhost:8000/index.html,这个时候还没有索引到资源,建议监视网络流量以观察爬虫程序是否正确工作

爬虫程序启动时会读取priv/dhtcrawler.config配置文件,该文件里配置了DHT节点的UDP监听端口、节点数量、数据库地址等,可自行配置。

接下来我会谈谈各部分的实现方法。

posted @ 2013-06-20 14:44 Kevin Lynx 阅读(3277) | 评论 (2)编辑 收藏

2013年6月10日 #

使用ActionScript开发Ice Web客户端

我们目前的项目服务器端使用了Ice来构建。Ice有一套自己的网络协议,客户端和服务器端可以基于此协议来交互。由于Ice使用Slice这种中间语言来描述服务器和客户端的交互接口,所以它可以做到极大限度地屏蔽网络协议这个细节。也就是说,我们只要借助Ice和Slice,我们可以轻松地编写网络程序。

然后,我们的后端现在需要一个运行在Web浏览器上的客户端。要与Ice做交互,如果使用TCP协议的话,得保证是长连接的。但HTTP是短连接的。而另一方面,我们还需要选择一个Ice支持的和Web相关的语言来做这件事情。如果要在浏览器端直接与Ice服务建立连接,可供选择的语言/平台包括:

  • Flash
  • Silverlight

因为我之前用erlang简单写了个Ice的客户端库,所以我对Ice底层协议有一定了解,可以不一定使用Ice支持的语言,所以HTML5其实也是个选择。此外,如果在浏览器端使用Applet,Java可能也是个选择。

其实几个月前在这块的技术选择问题上我就做过简单的研究,当时确定的方案是使用Flash。但是,后来人员招聘上遇到了问题,看起来要招一个会ActionScript和前端页面技术的程序员来做我们这种项目,似乎大材小用,成本显高了。

那么,考虑到团队里有现成的Java程序员,而且看起来招一个会用Java写网站的程序员简单又便宜,似乎是排除技术原因的最好选择。但是,如果不在浏览器端直接连接服务器来做交互,而是让Web服务器端来做中转的话,会面临不少问题:

  • 浏览器端操作结果的获取问题,说白了就是非实时了,得用Ajax等等技术去模拟实时,代价就是不断轮训,也就是通常说的poll
  • Web服务器端需要编写大量代码:对用户操作的映射,结果缓存等等

如果能用Flash包装与服务器交互的部分,而把UI相关的东西留给HTML/JS/CSS去做,那是不是可行一点?如果只是用ActionScript编写与服务器端的交互逻辑代码,我就不需要花时间去系统学习ActionScript,甚至如何用Flash做界面,我甚至不用搞懂这些技术之间的关系。基本上看些Ice for ActionScript的例子代码,就可以完成这件事情。

以下记录一些主要的过程/方法:

ActionScript程序的开发

开发一个嵌入到网页中的FLASH,只需要Flex SDK。SDK里自带了一些编译器相关工具。我不打算使用IDE,因为看起来IDE更复杂。简单的google之后,基本就可以构建出一个Flash文件:

  • 构建基本的程序需要一个mxml文件,这个文件里主要用来捕获Flash在页面上初始化完成这个事件,以初始化内部逻辑
  • 编写ActionScript源码,看起来其文件、类的组织方式很像Java
  • 使用Flex SDK中的mxmlc程序来编译,生成swf文件,例如:

    mxmlc myflexapp.mxml -library-path+=xxx.swc

  • 嵌入到网页中,简单的做法可以借助swfobject.js这个库,嵌入的方式:

     <script type="text/javascript" src="swfobject.js"></script>
    <script type="text/javascript">
        var flashvars = {};
        var params = {};
      params.play = "true";
        params.quality = "high";
        params.bgcolor = "white";
        params.allowscriptaccess = "always";
        params.allowfullscreen = "true";
        var attributes = {};
        attributes.id = "asclient";
        attributes.name = "asclient";
        attributes.align = "middle";
        swfobject.embedSWF("asclient.swf", "flashContent", "1", "1",
            "0", "", 
            flashvars, params, attributes);
        swfobject.createCSS("#flashContent", "display:none;");
    </script>

自然,页面中需加入flashContent这个div:

     <div id="flashContent">
        <p>no flash</p>
    </div>

我的mxml文件也很简单:

<?xml version="1.0" encoding="utf-8"?>
<s:Application 
    xmlns:fx="http://ns.adobe.com/mxml/2009" 
    xmlns:s="library://ns.adobe.com/flex/spark" 
    xmlns:mx="library://ns.adobe.com/flex/mx"
    applicationComplete="doApplicationComplete()" >
    <fx:Script>
    <![CDATA[
       import ASClient.Coordinator;
       import flash.external.ExternalInterface;

       private var _coordinator:Coordinator;

       public function doApplicationComplete():void
       {
            trace("doApplicationComplete");
            _coordinator = new Coordinator();
            _coordinator.reg_methods();
            ExternalInterface.call("as_ready"); 
       } 
     ]]>
    </fx:Script>
</s:Application>

ActionScript日志

我通过日志来调试ActionScript代码。最简单的方式就是通过trace函数来输出日志。要成功输出日志包含以下步骤:

  • 给浏览器安装调试版本的Flash Player
  • 日志是输出到用户目录下的,并且需要手动创建日志目录(Administrator替换为用户名):

      C:\Users\Administrator\AppData\Roaming\Macromedia\Flash Player\Logs
    
  • 用户目录下新建配置文件mm.cfg:

      AS3StaticProfile=0
      AS3Verbose=0
      TraceOutputFileEnable=1 
      TraceOutputBuffered=0
      ErrorReportingEnable=1  
      AS3Trace=0
    
  • 编译DEBUG版本的Flash文件,可以修改flex sdk下的flex-config.xml文件,里面增加debug=true配置即可

在开发过程中需要注意浏览器缓存问题,当编译出新的Flash文件后,浏览器即使页面刷新也可能使用的是缓存里的Flash。当然,最重要的,是通过浏览器来访问这个包含了Flash的网页,Web服务器随意。

Flash Policy文件

在Flash的某个版本后,Flash中如果要向外建立socket连接,是首先要取得目标主机返回的policy文件的。也就是在建立连接前,Flash底层会先向目标主机询问得到一个描述访问权限的文件。

简单来说,目标主机需要在843端口上建立TCP监听,一旦有网络连接,就发送以下内容,内容后需添加0x00用于标示结束。(当然,具体细节还挺多,自行google)

<cross-domain-policy>
     <allow-access-from domain="*" to-ports="*" />
</cross-domain-policy>

最开始我使用的是朋友给的现成的Policy服务,虽然我写的Flash可以成功连接我的Ice服务,但始终要等待2秒以上的时间。google Flash Policy相关内容,可以发现确实存在一个延时,但那是因为目标主机没有在843端口服务。后来我自己用erlang写了个Policy服务,延时就没有了。猜测可能是他的Policy服务没有添加0x00作为结束导致。

ActionScript与JavaScript的交互

既然我使用ActionScript来包装与服务器的交互,那么JavaScript就必然需要和ActionScript通信。这个通信过程也就是在JavaScript中调用ActionScript中的函数,反过来亦然。这个过程很简单:

在JavaScript中调用ActionScript函数:

首先是ActionScript需要注册哪些函数可以被调用:

ExternalInterface.addCallback("service_loadall", loadAll);

通过ExternalInterface.addCallback注册的函数,其实是个closure,所以在类中注册自己的成员函数都可以(因为成员函数会使用this,形成了一个closure)。

然后在JavaScript中调用:

    function asObject() {
        // asclient是嵌入Flash时填入的name和(或?)id
        return window.document.asclient;
    }
    var as = asObject();
    as.service_loadall();

在ActionScript中调用JavaScript中调用则更简单,一句话:

ExternalInterface.call("service_load_done", args);

至于在两者之间的函数参数传递,其类型都可以自动映射。但因为我的应用里数据较为复杂,我就将数据转换为JSON格式,在JavaScript这边操作较为简单。

页面切换

这里我们需要的Web前端页面,更像是一个管理系统,所以页面切换是很有可能的。问题在于,当出现页面跳转时,Flash对象会重新初始化,新的页面无法使用前一个页面建立好的网络连接(或者能?)。为了解决这个问题,服务器当然可以设计一种重登录机制,方便客户端以一种特殊的方式进入系统,绕过正常的登录环节。但是我们使用了Glacier2这个网关,在这个网关上有针对连接的超时管理,这样反复建立新的连接对资源太浪费了。

综上,我想只能通过前端去规避这个问题。例如,前端开发人员依然可以分开设计很多页面,页面里也可以使用正常的链接。我们编写一些JavaScript代码,将页面里的链接替换成对应的JavaScript代码,动态载入新的页面内容,然后对页面内的部分内容进行替换,从而尽可能让页面设计人员编写正常的网页,同时也解决页面切换问题。

这是个蹩脚的方法,但在我有限的前端知识体系下,似乎也只能这样干了。

posted @ 2013-06-10 21:30 Kevin Lynx 阅读(1409) | 评论 (0)编辑 收藏

2013年5月19日 #

P2P中DHT网络爬虫

DHT网络爬虫基于DHT网络构建了一个P2P资源搜索引擎。这个搜索引擎不但可以用于构建DHT网络中活跃的资源索引(活跃的资源意味着该网络中肯定有人至少持有该资源的部分数据),还可以分析出该网络中的热门分享资源。小虾不久前发布了一个这样的搜索引擎:磁力搜索。他也写博客对此稍作了介绍:写了个磁力搜索的网页 - 收录最近热门分享的资源。网络上其实也有其他人做了类似的应用:DHT monitoringCrawling Bittorrent DHT

但是他的这篇文章仅介绍了DHT网络的大致工作原理,并且这个爬虫的具体工作原理也没有提到。对此我查阅了些文章及代码,虽然从原理上梳理出了整个实现方案,但很多细节还是不甚清楚。所以本文仅作概要介绍。

DHT/Magnet/Torrent

在P2P网络中,要通过种子文件下载一个资源,需要知道整个P2P网络中有哪些计算机正在下载/上传该资源。这里将这些提供某个资源下载的计算机定义为peer。传统的P2P网络中,存在一些tracker服务器,这些服务器的作用主要用于跟踪某个资源有哪些关联的peer。下载这个资源当然得首先取得这些peer。

DHT的出现用于解决当tracker服务器不可用时,P2P客户端依然可以取得某个资源的peer。DHT解决这个问题,是因为它将原来tracker上的资源peer信息分散到了整个网络中。这里将实现了DHT协议的计算机定义为节点(node)。通常一个P2P客户端程序既是peer也是节点。DHT网络有多种实现算法,例如Kademlia。

当某个P2P客户端通过种子文件下载资源时,如果没有tracker服务器,它就会向DHT网络查询这个资源对应的peer列表。资源的标识在DHT网络中称为infohash,是一个20字节长的字符串,一般通过sha1算法获得,也就是一个类似UUID的东西。

实际上,种子文件本身就对应着一个infohash,这个infohash是通过种子文件的文件描述信息动态计算得到。一个种子文件包含了对应资源的描述信息,例如文件名、文件大小等。Magnet,这里指的是磁力链接,它是一个类似URL的字符串地址。P2P软件通过磁力链接,会下载到一个种子文件,然后根据该种子文件继续真实资源的下载。

磁力链接中包含的最重要的信息就是infohash。这个infohash一般为40字节或32字节,它其实只是资源infohash(20字节)的一种编码形式。

Kademlia

Kademlia是DHT网络的一种实现。网络上关于这个算法的文章,主要是围绕整个DHT网络的实现原理进行论述。个人觉得这些文章很蛋疼,基本上读了之后对于要如何去实现一个DHT客户端还是没有概念。这里主要可参考P2P中DHT网络介绍,以及BitTorrent网站上的DHT协议描述

Kad的主要目的是用于查询某个资源对应的peer列表,而这个peer列表实际上是分散在整个网络中。网络中节点数量很大,如果要获得peer列表,最简单的做法无非就是依次询问网络中的每个节点。这当然不可行。所以在Kad算法中,设立了一个路由表。每一个节点都有一份路由表。这个是按照节点之间的距离关系构建出来的。节点之间的距离当然也有特定的算法定义,在Kad中通过对两个节点的ID进行异或操作得到。节点的ID和infohash通过相同算法构建,都是20字节长度。节点和infohash之间也有距离关系,实际上表示的是节点和资源的距离关系。

有了这个路由表之后,再通过一个基于距离关系的查找算法,就可以实现不用挨个遍历就找到特定的节点。而查找资源peer这个操作,正是基于节点查找这个过程。

路由表的实现,按我的理解,有点类似一般的hash表结构。在这个表中有160个桶,称为K桶,这个桶的数量在实现上可以动态增长。每个桶保存有限个元素,例如K取值为8,指的就是这个桶最多保存8个元素。每个元素就是一个节点,节点包含节点ID、地址信息以及peer信息。这些桶可以通过距离值索引得到,即距离值会经过一个hash算法,使其值落到桶的索引范围内。

要加入一个DHT网络,需要首先知道这个网络中的任意一个节点。如何获得这个节点?在一些开源的P2P软件中,会提供一些节点地址,例如transmission中提供的dht.transmissionbt.com:6881。

协议

Kad定义了节点之间的交互协议。这些协议支撑了整个DHT网络里信息分布式存储的实现。这些协议都是使用UDP来传送。其协议格式使用一种称为bencode的编码方式来编码协议数据。bencode是一种文本格式的编码,它还用于种子文件内的信息编码。

Kad协议具体格式可参考BitTorrent的定义:DHT Protocol。这些协议包括4种请求:ping,find_node,get_peer,announce_peer。在有些文档中这些请求的名字会有不同,例如announce_peer又被称为store,get_peer被称为find_value。这4种请求中,都会有对应的回应消息。其中最重要的消息是get_peer,其目的在于在网络中查找某个资源对应的peer列表。

值得一提的是,所有这些请求,包括各种回应,都可以用于处理该消息的节点构建路由表。因为路由表本质就是存储网络中的节点信息。

ping

用于确定某个节点是否在线。这个请求主要用于辅助路由表的更新。

find_node

用于查找某个节点,以获得其地址信息。当某个节点接收到该请求后,如果目标节点不在自己的路由表里,那么就返回离目标节点较近的K个节点。这个消息可用于节点启动时构建路由表。通过find_node方式构建路由表,其实现方式为向DHT网络查询自己。那么,接收该查询的节点就会一直返回其他节点了列表,查询者递归查询,直到无法查询为止。那么,什么时候无法继续查询呢?这一点我也不太清楚。每一次查询得到的都是离目标节点更接近的节点集,那么理论上经过若干次递归查询后,就无法找到离目标节点更近的节点了,因为最近的节点是自己,但自己还未完全加入网络。这意味着最后所有节点都会返回空的节点集合,这样就算查询结束?

实际上,通过find_node来构建路由表,以及顺带加入DHT网络,这种方式什么时候停止在我看来并不重要。路由表的构建并不需要在启动时构建完成,在以后与其他节点的交互过程中,路由表本身就会慢慢地得到构建。在初始阶段尽可能地通过find_node去与其他节点交互,最大的好处无非就是尽早地让网络中的其他节点认识自己。

get_peer

通过资源的infohash获得资源对应的peer列表。当查询者获得资源的peer列表后,它就可以通过这些peer进行资源下载了。收到该请求的节点会在自己的路由表中查找该infohash,如果有收录,就返回对应的peer列表。如果没有,则返回离该infohash较近的若干个节点。查询者若收到的是节点列表,那么就会递归查找。这个过程同find_node一样。

值得注意的是,get_peer的回应消息里会携带一个token,该token会用于稍后的announce_peer请求。

announce_peer

该请求主要目的在于通知,通知其他节点自己开始下载某个资源。这个消息用于构建网络中资源的peer列表。当一个已经加入DHT网络的P2P客户端通过种子文件开始下载资源时,首先在网络中查询该资源的peer列表,这个过程通过get_peer完成。当某个节点从get_peer返回peer时,查询者开始下载,然后通过announce_peer告诉返回这个peer的节点。

announce_peer中会携带get_peer回应消息里的token。关于这一点,我有一个疑问是,在P2P中DHT网络介绍文档中提到:

(announce_peer)同时会把自己的peer信息发送给先前的告诉者和自己K桶中的k个最近的节点存储该peer-list信息

不管这里提到的K的最近的节点是离自己最近,还是离资源infohash最近的节点,因为处理announce_peer消息时,有一个token的验证过程。但是这K个节点中,并没有在之前创建对应的token。我通过transmission中的DHT实现做了个数据收集,可以证明的是,announce_peer消息是不仅仅会发给get_peer的回应者的。

DHT爬虫

DHT爬虫是一个遵循Kad协议的假节点程序。具体可以参考小虾发布的那个网站应用:磁力搜索

这个爬虫的实现方式,主要包含以下内容:

  • 通过其他节点的announce_peer发来的infohash确认网络中有某个资源可被下载
  • 通过从网络中获取这个资源的种子文件,来获得该资源的描述

通过累计收集得到的资源信息,就可以提供一个资源搜索引擎,或者构建资源统计信息。以下进一步描述实现细节。整个爬虫的实现依赖了一个很重要的信息,那就是资源的infohash实际上就是一个磁力链接(当然需要包装一下数据)。这意味着一旦我们获得了一个infohash,我们就等于获得了一个种子。

获得资源通知

当爬虫程序加入DHT网络后,它总会收到其他节点发来的announce_peer消息。announce_peer消息与get_peer消息里都带了资源的infohash,但是get_peer里的infohash对应的资源在该网络中不一定存在,即该资源没有任何可用peer。而announce_peer则表示已经确认了该网络中有节点正在下载该资源,也即该资源的数据确实存在该网络中。

所以,爬虫程序需要尽最大努力地获取其他节点发来的announce_peer消息。如果announce_peer消息会发送给离消息发送节点较近的节点,那么,一方面,爬虫程序应该将自己告诉网络中尽可能多的节点。这可以通过一次完整的find_node操作实现。另一方面,爬虫程序内部实现可以部署多个DHT节点,总之目的在于尽可能地让爬虫程序称为其他节点的较近者。

当收集到infohash之后,爬虫程序还需要通过该infohash获得对应资源的描述信息。

获取资源信息

获得资源描述信息,其实就是通过infohash获得对应的种子文件。这需要实现P2P协议里的文件分享协议。种子文件的获取其实就是一个文件下载过程,下载到种子文件之后,就可以获取到资源描述。这个过程一种简单的方法,就是从infohash构建出一个磁力链接,然后交给一个支持磁力下载的程序下载种子。

从infohash构建出磁力链接非常简单,只需要将infohash编码成磁力链接的xt字段即可,构建实现可以从transmission源码里找到:

/* 这个算法其实和printf("%02x", sha1[i])一样 */
void tr_sha1_to_hex (char *out, const unsigned char *sha1)
{
int i;
static const char hex[] = "0123456789abcdef";
for (i=0; i<20; ++i) {
const unsigned int val = *sha1++;
*out++ = hex[val >> 4];
*out++ = hex[val & 0xf];
}
*out = '\0';
}
void appendMagnet(FILE *fp, const unsigned char *info_hash) {
char out[48];
tr_sha1_to_hex(out, info_hash);
fprintf(fp, "magnet:?xt=urn:btih:%s", out);
}

现在你就可以做一个实验,在transmission的DHT实现中,在announce_peer消息的处理代码中,将收到的infohash通过上面的appendMagnet转换为磁力链接输出到日志文件里。然后,可以通过支持磁力链接的程序(例如QQ旋风)直接下载。有趣的是,当QQ旋风开始下载该磁力链接对应的种子文件时,你自己的测试程序能收到QQ旋风程序发出的announce_peer消息。当然,你得想办法让这个测试程序尽可能地让其他节点知道你,这可以通过很多方式实现。

总结

最终的DHT爬虫除了需要实现DHT协议之外,还需要实现P2P文件下载协议,甚至包括一个种子文件解析模块。看起来包含不小的工作量。而如果要包装为一个资源搜索引擎,还会涉及到资源存储以及搜索,更别说前端呈现了。这个时候,如果你使用的语言不包含这些工具库,那实在是太悲剧了。没错,我就没找到一个erlang DHT库(倒是有erlang实现的BT客户端,懒得去剥了)。

UPDATE

通过详细阅读transmission里的DHT实现,一些之前的疑惑随之解开。

announce_peer会发给哪些节点

在一次对infohash的查询过程中,所有对本节点发出的get_peer作出回应的节点(不论这个回应节点回应的是nodes还是peers),当本节点取得peer信息时,就会对所有这些节点发出announce_peer。get_peer的回应消息里,不论是peer还是node,都会携带一个token,这样在将来收到对方的announce_peer时,就可以验证该token。

节点和bucket状态

在本地的路由表中,保存的node是有状态之分的。状态分为三种:good/dubious/bad。good节点基本可以断定该节点是一个在线的并且可以正常回应消息的节点;而bad节点则是可确定的无效节点,通常会尽快从路由表中移除;而dubious则是介于good和bad节点之间,表示可能有问题的节点,需要进一步发送例如ping消息来确认其状态。路由表中应该尽可能保证保存的是good节点,对查询消息的回应里也需携带好的节点。

bucket也是有状态的,当一个bucket中的所有节点在一定时间之内都没有任何活动的时候,该bucket则应该考虑进行状态的确认,确认方式可以随机选择该bucket中的节点进行find_node操作(这也是find_node除了用于启动之外的唯一作用,但具体实现不见得使用这种方式)。没有消息来往的bucket则应该考虑移除。DHT中几乎所有操作都会涉及到bucket的索引,如果索引到一个所有节点都有问题的bucket,那么该操作可能就无法完成。

search在何时停止

首先,某次发起的search,无论是对node还是对peer,都可能导致进一步产生若干个search。这些search都是基于transaction id来标识的。由一次search导致产生的所有子search都拥有相同的transaction id,以使得在该search成功或失败时可以通过该transaction id来删除对应的所有search。transaction id也就是DHT中每个消息消息头”t”的值。

但是search何时停止?transmission中是通过超时机制来停止。在search过程中,如果长时间没有收到跟该search关联的节点发来的回应消息,那么就撤销该search,表示搜索失败。

参考资料

posted @ 2013-05-19 21:51 Kevin Lynx 阅读(4828) | 评论 (0)编辑 收藏

2013年5月9日 #

Erlang使用感受

用erlang也算写了些代码了,主要包括使用RabbitMQ的练习,以及最近写的kl_tservericerl。其中icerl是一个实现了Ice的erlang库。

erlang的书较少,我主要读过<Programming Erlang>和<Erlang/OTP in Action>。其实erlang本身就语言来说的话比较简单,同ruby一样,类似这种本身目标是应用于实际软件项目的语言都比较简单,对应的语法书很快可以翻完。

这里我仅谈谈自己在编写erlang代码过程中的一些感受。

语法

erlang语法很简单,接触过函数式语言的程序员上手会很快。它没有类似common lisp里宏这种较复杂的语言特性。其语法元素很紧凑,不存在一些用处不大的特性。在这之前,我学习过ruby和common lisp。ruby代码写的比common lisp多。但是在学习erlang的过程中我的脑海里却不断出现common lisp里的语法特性。这大概是因为common lisp的语法相对ruby来说,更接近erlang。

编程模式

erlang不是一个面向对象的语言,它也不同common lisp提供多种编程模式。它的代码就是靠一个个函数组织出来的。面向对象语言在语法上有一点让我很爽的是,其函数调用更自然。erlang的接口调用就像C语言里接口的调用一样:

func(Obj, args)
Obj->func(args)

即需要在函数第一个参数传递操作对象。但是面向对象语言也会带来一些语法的复杂性。如果一门语言可以用很少的语法元素表达很多信息,那么我觉得这门语言就是门优秀的语言。

表达式/语句

erlang里没有语句,全部是表达式,意思是所有语法元素都是有返回值的。这实在太好了,全世界都有返回值可以让代码写起来简单多了:

    Flag = case func() of 1 -> true; 0 -> false end, 

命名

我之所以不想写一行python代码的很大一部分原因在于这门语言居然要求我必须使用代码缩进来编程,真是不敢相信。erlang里虽然没有此规定,却也有不同的语法元素有大小写的限定。变量首字母必须大写,atom必须以小写字母开头,更霸气的是模块命名必须和文件名相同。

变量

erlang里的变量是不可更改的。实际上给一个变量赋值,严格来说应该叫bound,即绑定。这个特性完全就是函数式语言里的特性。其带来的好处就像函数式语言宣扬的一样,这会使得代码没有副作用(side effect)。因为程序里的所有函数不论怎样调用,其程序状态都不会改变,因为变量无法被改变。

变量不可更改,直接意味着全局变量没有存在的意义,也就意味着不论你的系统是多么复杂地被构建出来,当系统崩溃时,其崩溃所在位置的上下文就足够找到问题。

但是变量不可改变也会带来一些代码编写上的不便。我想这大概是编程思维的转变问题。erlang的语法特性会强迫人编写非常短小的函数,你大概不愿意看到你的函数实现里出现Var1/Var2/Var3这样的变量,而实际上这样的命名在命令式语言里其实指的是同一个变量,只不过其值不同而已。

但是我们的程序总是应该有状态的。在erlang里我们通过不断创建新的变量来存储这个状态。我们需要通过将这个状态随着我们的程序流程不断地通过函数参数和返回值传递下去。

atom

atom这个语法特性本身没问题,它就同lisp里的atom一样,没什么意义,就是一个名字。它主要用在增加代码的可读性上。但是这个atom带来的好处,直接导致erlang不去内置诸如true/false这种关键字。erlang使用true/false这两个atom来作为boolean operator的返回值。但erlang里严格来说是没有布尔类型的。这其实没什么,糟糕的是,对于一些较常见的函数返回值,例如true/false,erlang程序员之间就得做约定。要表示一个函数执行失败了,我可以返回false、null、failed、error、nil,甚至what_the_fuck,这一度让我迷惘。

list/tuple

erlang里的list当然没有lisp里的list牛逼,别人整个世界就是由list构成的。在一段时间里,我一直以为list里只能保存相同类型的元素,而tuple才是用于保存不同类型元素的容器。直到有一天我发现tuple的操作不能满足我的需求了,我才发现list居然是可以保存不同类型的。

list相对于tuple而言,更厉害的地方就在于头匹配,意思是可以通过匹配来拆分list的头和剩余部分。

匹配(match)

erlang的匹配机制是个好东西。这个东西贯穿了整个语言。在我理解看来,匹配机制减少了很多判断代码。它试图用一个期望的类型去匹配另一个东西,如果这个东西出了错,它就无法完成这个匹配。无法完成匹配就导致程序断掉。

匹配还有个方便的地方在于可以很方便地取出record里的成员,或者tuple和list的某个部分,这其实增强了其他语法元素的能力。

循环

erlang里没有循环语法元素,这真是太好了。函数式语言里为什么要有循环语法呢?common lisp干毛要加上那些复杂的循环(宏),每次我遇到需要写循环的场景时,我都诚惶诚恐,最后还是用递归来解决。

同样,在erlang里我们也是用函数递归来解决循环问题。甚至,我们还有list comprehension。当我写C++代码时,我很不情愿用循环去写那些容器遍历代码,幸运的是在C++11里通过lambda和STL里那些算法我终于不用再写这样的循环代码了。

if/case/guard

erlang里有条件判定语法if,甚至还有类似C语言里的switch…case。这个我一时半会还不敢评价,好像haskell里也保留了if。erlang里同haskell一样有guard的概念,这其实是一种变相的条件判断,只不过其使用场景不一样。

进程

并发性支持属于erlang的最大亮点。erlang里的进程概念非常简单,基于消息机制,程序员从来不需要担心同步问题。每个进程都有一个mailbox,用于缓存发送到此进程的消息。erlang提供内置的语法元素来发送和接收消息。

erlang甚至提供分布式支持,更酷的是你往网络上的其他进程发送消息,其语法和往本地进程发送是一样的。

模块加载

如果我写了一个erlang库,该如何在另一个erlang程序里加载这个库?这个问题一度让我迷惘。erlang里貌似有对库打包的功能(.ez?),按理说应该提供一种整个库加载的方式,然后可以通过手动调用函数或者指定代码依赖项来加载。结果不是这样。

erlang不是按整个库来加载的,因为也没有方式去描述一个库(应该有第三方的)。当我们调用某个模块里的函数时,erlang会自动从某个目录列表里去搜索对应的beam文件。所以,可以通过在启动erlang添加这个模块文件所在目录来实现加载,这还是自动的。当然,也可以在erlang shell里通过函数添加这个目录。

OTP

使用erlang来编写程序,最大的优势可能就是其OTP了。OTP基本上就是一些随erlang一起发布的库。这些库中最重要的一个概念是behaviour。behaviour其实就是提供了一种编程框架,应用层提供各种回调函数给这个框架,从而获得一个健壮的并发程序。

application behaviour

application behaviour用于组织一个erlang程序,通过一个配置文件,和提供若干回调,就可以让我们编写的erlang程序以一种统一的方式启动。我之前写的都是erlang库,并不需要启动,而是提供给应用层使用,所以也没使用该behaviour。

gen_server behaviour

这个behaviour应该是使用频率很高的。它封装了进程使用的细节,本质上也就是将主动收取消息改成了自动收取,收取后再回调给你的模块。

supervisor behaviour

这个behaviour看起来很厉害,通过对它进行一些配置,你可以把你的并发程序里的所有进程建立成树状结构。这个结构的牛逼之处在于,当某个进程挂掉之后,通过supervisor可以自动重新启动这个挂掉的进程,当然重启没这么简单,它提供多种重启规则,以让整个系统确实通过重启变成正常状态。这实在太牛逼了,这意味着你的服务器可以7x24小时地运行了,就算有问题你也可以立刻获得一个重写工作的系统。

热更新

代码热更新对于一个动态语言而言其实根本算不上什么优点,基本上动态语言都能做到这一点。但是把热更新这个功能加到一个用于开发并发程序的语言里,那就很牛逼了。你再一次可以确保你的服务器7x24小时不停机维护。

gen_tcp

最开始我以为erlang将网络部分封装得已经认不出有socket这个概念了。至少,你也得有一个牛逼的网络库吧。结果发现依然还是socket那一套。然后我很失望。直到后来,发现使用一些behaviour,加上调整gen_tcp的一些option,居然可以以很少的代码写出一个维护大量连接的TCP服务器。是啊,erlang天生就是并发的,在传统的网络模型中,我们会觉得使用one-thread-per-connection虽然简单却不是可行的,因为thread是OS资源,太昂贵。但是在erlang里,one-process-per-connection却是再自然不过的事情。你要是写个erlang程序里面却只有一个process你都不好意思告诉别人你写的是erlang。process是高效的(对我们这种二流程序员而言),它就像C++里一个很普通的对象一样。

在使用gen_tcp的过程中我发现一个问题,不管我使用哪一种模型,我竟然找不到一种温柔的关闭方式。我查看了几个tutorial,这些混蛋竟然没有一个人提到如何去正常关闭一个erlang TCP服务器。后来,我没有办法,只好使用API强制关闭服务器进程。

Story

其实,我和erlang之间是有故事的。我并不是这个月开始才接触erlang。早在2009年夏天的时候我就学习过这门语言。那时候我还没接触过任何函数式语言,那时候lua里的闭包都让我觉得新奇。然后无意间,我莫名其妙地接触了haskell(<Real World Haskell>),在我决定开始写点什么haskell练习时,我发现我无从下手,最后,Monads把我吓哭了。haskell实在太可怕了。

紧接着我怀揣着对函数式语言的浓烈好奇心看到了erlang。当我看到了concurrent programming的章节时,在一个燥热难耐的下午我的领导找到了我,同我探讨起erlang对我们的网游服务器有什么好处。然后,我结束我了的erlang之旅。

时隔四年,这种小众语言,居然进入了中国程序员的视野,并被用于开发网页游戏服务器。时代在进步,我们总是被甩在后面。

posted @ 2013-05-09 21:24 Kevin Lynx 阅读(1153) | 评论 (0)编辑 收藏

仅列出标题  下一页