woaidongmao

文章均收录自他人博客,但不喜标题前加-[转贴],因其丑陋,见谅!~
随笔 - 1469, 文章 - 0, 评论 - 661, 引用 - 0
数据加载中……

MemoryPool的LockFree实现

MemoryPool已经是一个非常古老的话题了,关于此方面的文章比比皆是,在Addison-Wesley 1999年出版的<Efficient C++ Performance Programming Techniques > (下面简称Efficient)中,两位富有经验的作者Dov Bulka, David Mayhew 详细讲述了如何编写一个Multi-ThreadMemoryPool;而JAVA hotspot VMShare Source CLI源码中也可见这方面的实现。就我看来,他们的实现都大同小异。本文首先简单介绍Efficient文中Memory Pool的实现,然后在此基础上研究如何使用Lock-Free的机制去进一步改善性能。

 

 

<1>Multi-Thread MemoryPool的实现

 

Dov Bulka版本为例,首先简单看看一个最基本的基于对象的MemoryPool实现:

MemoryPool的基本思想是将在delete 的对象的时候并不真正delete,而是归还到Pool中,以某种数据形式管理起来,在下一次分配对象内存的时候,先到Pool中寻找,看看有没有空余的内存,如果没有,在调用OSmalloc来分配新的内存。

 

MemoryPool的声明:

 

template < class T >

class MemoryPool {

public:

    MemoryPool (size_t size = EXPANSION_SIZE);

    ~MemoryPool ();

 

    // Allocate a T element from the free list.

    inline void* alloc (size_t size);

 

    // Return a T element to the free list.

    inline void free (void *someElement);

private:

    // next element on the free list.

    MemoryPool<T> *next;

 

    // If the freeList is empty, expand it by this amount.

enum { EXPANSION_SIZE = 32};

 

    // Add free elements to the free list

    void expandTheFreeList(int howMany = EXPANSION_SIZE);

};

 

看看几个关键的函数

分配内存:

 

template < class T >

inline

void* MemoryPool < T > :: alloc (size_t)

{

    if (!next) {

        expandTheFreeList();

    }

 

    MemoryPool<T> *head = next;

    next = head->next;

 

    return head;

}

 

释放内存,原理是next 指针指向当前Pool的可用对象内存链表的头部,在释放对象内存的时候,就更新链表头部时期指向刚释放的对象内存块

 

template < class T >

inline

void MemoryPool < T > :: free (void *doomed)

{

    MemoryPool<T> *head = static_cast <MemoryPool<T> *> doomed;

 

    head->next = next;

    next  = head;

}

 

 

扩充内存池, 这个是把新分配的内存依次串接到next所在的linked list

template < class T >

void MemoryPool < T > :: expandTheFreeList(int howMany)

{

    // We must allocate an object large enough to contain the

    // next pointer.

    size_t size = (sizeof(T) > sizeof(MemoryPool<T> *)) ?

        sizeof(T) : sizeof(MemoryPool<T> *);

 

    MemoryPool<T> *runner = static_cast <MemoryPool<T> *> new char [size];

 

    next =  runner;

    for (int i = 0; i < howMany ; i++) {

        runner->next =

            static_cast <MemoryPool<T> *> new char [size];

        runner = runner->next;

    }

    runner->next = 0;

}

 

 

 

<2>MemoryPool的同步机制

从上面的代码可见,分配和释放都可能需要更新FreeList可用链表头部,因此,多线程环境下需要使用锁机制来保护对空闲链表头部的修改,很多Linux/GNU Multi-Thread Memory Pool实现都会使用pthread_mutex_lock,例如SSCLIJAVA hotspot VM中的实现。

 

除开pthread_mutex函数族外,还有pthread_spin系列也可用于线程同步。这两者的主要区别是,如果没有获得锁资源,pthread_mutex会使线程睡眠,而pthread_spin 顾名思义,则会反复尝试直到取得锁资源,这样会使CPU空转。

 

Efficient文中提到pthread_mutex是这样使用的:

class ABCLock {// Abstract base class

public:

    virtual ~ABCLock() {}

    virtual void lock() = 0;

    virtual void unlock() = 0;

};

class MutexLock : public ABCLock {

public:

    MutexLock() {pthread_mutex_init(&lock, NULL);}

    ~MutexLock() {pthread_mutex_destroy(&lock);}

 

    inline void lock() {pthread_mutex_lock(&lock);}

    inline void unlock() {pthread_mutex_unlock(&lock);}

private:

    pthread_mutex_t lock;

};

 

在需要保护临界信息的地方使用lock()函数

template <class M, class L>

inline

void* MTMemoryPool<M,L>::alloc (size_t size)

{

    void * mem;

 

    theLock.lock();

    mem = stPool.alloc(size);

    theLock.unlock();

 

    return mem;

}

 

pthread_spin的自旋锁特性导致其比较适用一些轻量级,极短时间的多核cpu系统上的锁保护,在单核cpu上,pthread_spin实质上会死循环直到线程时间片耗尽,从而性能比pthread_mutex大为缩水,一个测试中也证明了这一点。

 

测试环境,2个线程,1个用于operator new 分配内存(通过alloc调用),另一个用operator delete 释放内存(通过free调用)

 

表一,2 core 的测试结果:

 

environment

Intel Core 2 Duo CPU     E6850  @ 3.00GHz

average time

MutexLock

SpinLock

 

 

Lock()

0.00045

0.00024

 

 

Unlock()

0.000185

0.00009919

 

 

Alloc()

0.000857

0.0005728

 

 

Free()

0.0007549

0.0005512

 

 

 

 

表二,在single core上,Spinlock 的性能降低。

Environment

nosmp maxcpus=0 Intel Core 2 Duo CPU     E6850  @ 3.00GHz

Average time

MutexLock

SpinLock

 

 

Lock()

0.0005167

0.0014

 

 

Unlock()

0.000312

0.000246

 

 

Alloc()

0.001176

0.002443

 

 

Free()

0.001506

0.00396

 

 

 

 

 

从表一中可以看到,Lock ( pthread_mutex_lock或者pthread_spin_lock)占用了Alloc() 一半以上的cpu时间,Dov BulkaDavid Mayhew也觉得pthread_mutex太过overweight了,他作了如下假设,并且给出了一个AIX上快速锁的例子:“我们需要的是一个轻量级的锁,假设正在加锁的线程还没有持有锁,正在解锁的线程恰巧好是起初加锁的那个线程。”这样的锁在Linux上并没有对应的实现,我们只能采用pthread_spin_trylock 加上 pthread_spin_lock来模仿这样的过程。

 

 

 

<3>Lock-Free机制

 

pthread_spin 虽然足够轻量,但仍然是一种锁机制,线程的运行需要依赖于持有锁的那个线程。从上面的测试结果可以看到,减去Lock/Unlock消耗的时间,Alloc()真正的cpu时间大约在0.00083-0.00045-0.000185=0.00022左右,有没有一种同步方法可以使Alloc的消耗时间降低到这个数值?

 

2004Maged M. Michael那片开创性的文章“Scalable Lock-Free Dynamic Memory Allocation”,采用了Lock-Free机制的malloc实现击败了目前所有现存的malloc库,包括ptmalloc3,hoard等,Lock-Free一时间成为主流的研究重点。我们也打算采用这种方法来提高Alloc()的性能,不过由于实现的复杂性,本文所要介绍的Lock-Free实现,主要是用于避免在读写空闲链表头部的Lock/Unlock带来的性能损失,MemoryPool采用的lib_malloc仍然是Linux/GNU环境下的基于Lock/Unlockptmalloc实现。尽管如此,采用Lock-Free的实现比mutex/spin lock有明显的性能提高。

 

严格讲,Lock-Free是一套以原子操作为基础,采用事务->提交->提交失败->重试这样特定编程手法的机制,它使得正在访问共享资源的线程不依赖于任何其它线程的调度和执行,并且能够在有限的步骤内完成。

 

Lock-Free的实现往往都是通过CAS-Compare And Swap原子操作完成,声明如下:

 

CAS(addr,expval,newval) atomically do

if (*addr == expval) {

*addr = newval;

return true;

} else

return false;

 

Linux上一种典型的实现是:

inline int CAS(unsigned long *mem,unsigned long newval,unsigned long oldval)

{

  __typeof (*mem) ret;

  __asm __volatile ("lock; cmpxchgl %2, %1"

             : "=a" (ret), "=m" (*mem)

             : "r" (newval), "m" (*mem), "0" (oldval));

   return (int) ret;

 

}

 

MemoryPool < T > :: alloc (size_t)为例:

….

//更新next指针的内容

do {

oldheader = next;

if (oldheader==NULL)

    expandTheFreeList();

else

    target=oldheader->next;

 

}

while (CAS(&next,target,oldheader)!=oldheader)

 

实际执行中,如果next 的值为oldheader,那么就会更新next的值为target,并返回oldheader,这一过程是原子操作;如果在执行CAS指令的时候,next的内容已经发生变化,那么CAS的返回值就不会等于oldheader,while中的条件为false,程序会返回重新执行oldheader = next,直到next 的值被成功更新。同时,expandTheFreeList也要作类似的Lock-Free修改,下面的例子中把原先expandTheFreeList的代码放置到alloc中。

 

newbuf_head = NULL //note1

do

{

    oldheader =(MemoryPool*) next;            //note2

    if (!oldheader)

    {           

  if (!newbuf_head)

  {  

      size_t size = (m_sizeofClass > sizeof(MemoryPool *))?m_sizeofClass:sizeof(MemoryPool*);

            MemoryPool *runner = reinterpret_cast<MemoryPool*>(new char[size]);

      newbuf_head  = runner;

      for (int i = 0 ; i < EXPANSION_SIZE; i++)

            {            

      runner->next = reinterpret_cast<MemoryPool *>(new char [size]);

      runner =(MemoryPool*) runner->next;

            }

      runner->next = 0;

      newbuf_tail = runner;

  }

    }

    if (newbuf_head)  //note3

    {

        newbuf_tail->next = (MemoryPool*)next;

        target = newbuf_head;

    }

    else if (oldheader)

    {

        target = oldheader;//note4

}

//note5

}while ( CAS((unsigned long*)&next,/*m_freeheader,*/(unsigned long)target->next,(unsigned long)oldheader) !=(int) oldheader);

 

如果oldheader NULL,那么就会分配一段新的内存,头部为newbuf_head,尾部为newbuf_tail,并且把原先的FreeList头部挂接到新内存的尾部,以新内存的头部作为新的FreeList的头部去更新next的值。

 

如果分配内存后,CAS执行失败了,这没关系,此时newbuf_head已被置值,因此并不会再次分配内存,程序仍然试图执行上一次执行过的路径。

 

考虑一种极端情况,起初next指向一个有效地址(oldheader也指向一个有效值),因此程序跳过了分配新内存那段(斜体字),不幸在执行到note3处时,next却由于另一个线程分配了内存的关系置为NULL,此时怎么办?

 

答案,CAS操作时next的内容与oldheader不一致,因此CAS失败,程序再次返回note2处执行。

 

<4>Lock-Free中的ABA问题

承接上面的那一个极端问题,

线程A在执行note2时的空闲链表状态如下: A-B-C-D-E-NULL,在执行到note5时,期间发生了如下事情:

 

线程BFreeList中分配了一个AB两个内存块,FreeList变为C-D-E-NULL,但随后线程B归还了内存块A,从而FreeList再次变为A-C-D-E-NULL

 

线程A在执行到note5,进入CAS临界操作时,next的内容仍然和note2时的一致,均指向A,但CAS的第二个关键参数,target->next 的却已不是B了!(note4target被设置为oldheaderA,从而target->nextB),如果把next更新为B,那么就会造成程序错误。

 

这个就是Lock-Free与生俱来的ABA问题,CAS无法判断目标内容从A变为B,然后又变为A这种情况,解决的办法通常是使用一个额外的tag来记录这种情况,并且使用CAS2,来同时检查tag和目标内存两个值又没有发生变化,声明如下:

bool CAS2 (volatile void * ptr, uint32_t old1, uint32_t old2, uint32_t new1, uint32_t new2)

{

    bool ret;

    __asm__ __volatile__ ("lock cmpxchg8b (%1) \n\t sete %%al"

        :   "=a"(ret)

        :   "D"(ptr), "a"(old1), "d"(old2), "b"(new1), "c"(new2)

        :   "memory");

    return ret;

}

CAS2可以用来检查一个64 bit长度的内存指并原子交换他们的内容。

 

在某些情况下也可以采用thread specified Pool这种方案,就是每个线程有其专用的内存池,不会出现两个线程同时在一个Pool上分配内存的情况,这需要额外的空间在分配的对象中嵌入线程的信息。

 

 

<5>Lock-Free的性能

可见Lock-Free的实现并不算一件太新鲜复杂的事情,Windows平台上早就有了InterlockedCompareExchange之类的函数,Windows中提供的Slist也是一个Lock-Free linked-list。那么在何种情况下使用Lock-Free能达到最理想的效果?

 

Lock-Free机制在single core上会取得明显的性能优势。所有的CAS操作实质上在CPU微观世界里是串行执行的,由于没有别的线程干扰,一个处于运行状态的线程永远不会发生CAS操作失败。

 

2 cpu core的情况下,一个线程alloc对象另一个线程free对象内存,可能会导致一个线程里有少量的CAS失败,但是与pthread_spin_lock机制相比,耗时仍然非常少。究其原因,pthread_spin_lock会受到锁粒度的的影响,假如Alloc需要耗时0.00023秒的话,那么pthread_spin_lock就可能需要消耗同样的时间。而CAS是一种尽力处理的机制,即使在某个线程里发生了一个CAS失败,一方面程序会以一个很细的粒度马上重试,另一方面也表明与此同时另一个线程成功执行了一次CAS操作。

 

从下表看出,CAS的耗时仅仅是pthread_mutex_lock1/10。而整个alloc的耗时比较,Lock-Free也只有pthread_mutex版本的1/4

 

如果增加alloc/free的线程以及cpu 的数量,Lock-Free机制会由于CAS操作失败而不断重复执行do {}while(CAS) 中的代码,从而导致一定的性能下降。因此,应该尽量让do{}中的代码写得简短,以保证较高的CAS成功率,其次是针对特定场景开发Lock-Free应用,例如在多核程序中进行pipeline处理,在同一个地方分配内存,在同一个地方回收内存。

 

 

Environment

Intel Core 2 Duo CPU     E6850  @ 3.00GHz

average time

MutexLock

SpinLock

Lock-Free

 

Lock()

0.00045

0.00024

0.000059(CAS)

 

Unlock()

0.000185

0.00009919

0

 

Alloc()

0.000857

0.0005728

0.0002425

 

Free()

0.0007549

0.0005512

0.0002002

 

 

 

 

<6>参考资料

 

[1] Maged M. Michael, Scalable Lock-Free Dynamic Memory Allocation http://researchweb.watson.ibm.com/people/m/michael/pldi-2004.pdf

 

[2] Dov Bulka, David Mayhew, Efficient C++: Performance Programming Techniques

 

[3] 搜狗实验室技术交流文档 Vol4:2 C10K与高性能程序续篇

 

posted on 2009-05-02 01:40 肥仔 阅读(2052) 评论(0)  编辑 收藏 引用 所属分类: C++ 基础


只有注册用户登录后才能发表评论。
【推荐】超50万行VC++源码: 大型组态工控、电力仿真CAD与GIS源码库
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理