随笔-369  评论-37  文章-0  trackbacks-0

  动态任务分解方式分类:非嵌套型动态任务,嵌套型动态任务。

1、非嵌套型动态任务
      使用分布式队列

2、嵌套型动态任务
      嵌套型任务通常有一个或多个开始任务,其他任务的产生均源自于开始任务。
      调度方法:每个线程都有一个本地队列,另外所有线程拥有一个共享队列。当每个线程产生N个任务时,首先选择一个任务送入本地队列,然后将其他任务送入共享队列。

      对应的CAPI代码如下:


////////////////////////////////////////////////////////////////////////

/*

 * Copyright (c) 2006-2008

 * Author: Weiming Zhou

 *

 * Permission to use, copy, modify, distribute and sell this software

 * and its documentation for any purpose is hereby granted without fee,

 * provided that the above copyright notice appear in all copies and

 * that both that copyright notice and this permission notice appear

 * in supporting documentation. 

 */

 

/*

 *   CNestTaskScheduler.h

 *

 *   DESCRIPTION

 *       Module for Task Scheduler class

 *

 *   HISTORY

 *       06-08-2008    create by zhouweiming.

 *

 */

#ifndef __CNESTTASKSCHEDULER_H__

#define __CNESTTASKSCHEDULER_H__

 

#include "CLocalQueue.h"

#include "CStealQueue.h"

#include "CDistributedQueue.h"

 

class CNestTaskScheduler {

private :

    CThreadPool     m_ThreadPool; //(TaskScheduler_StartFunc, NULL, 0);

    CDistributedQueue<TASK, CLocalQueue<TASK>, CStealQueue<TASK>> m_DQueue;

    THREADFUNC      m_StartFunc;  // 为线程池使用的线程入口函数指针

    LONG  volatile   m_lTaskId;    //Task Id, 用于判断是否唤醒对应的线程

 

public :

    CNestTaskScheduler();

    virtual ~CNestTaskScheduler(){};

 

 

    // 下面两个函数为调度器本身直接使用

    void SetStartFunc(THREADFUNC StartFunc);

    int GetTask(TASK &Task);

    CThreadPool & GetThreadPool();

    LONG AtomicIncrementTaskId();

   

    // 下面三个函数为调度器的使用者使用

    void SpawnLocalTask(TASK &Task);

    void SpawnTask(TASK &Task);

    void BeginRootThread(TASK &Task);

};

 

#endif //__CNESTTASKSCHEDULER_H__

 

////////////////////////////////////////////////////////////////////////

/*

 * Copyright (c) 2006-2008

 * Author: Weiming Zhou

 *

 * Permission to use, copy, modify, distribute and sell this software

 * and its documentation for any purpose is hereby granted without fee,

 * provided that the above copyright notice appear in all copies and

 * that both that copyright notice and this permission notice appear

 * in supporting documentation. 

 */

 

#include "CapiGlobal.h"

#include "CThreadPool.h"

#include "CLocalQueue.h"

#include "CStealQueue.h"

#include "CDistributedQueue.h"

#include "CNestTaskScheduler.h"

 

static unsigned int WINAPI NestTaskScheduler_StartFunc( void *pArgs);

 

#define      NESTTASK_QUEUE_SIZE     128

 

/**  嵌套任务调度的构造函数

 

     @return  constructor -   

*/

CNestTaskScheduler::CNestTaskScheduler()

{

    m_StartFunc = NestTaskScheduler_StartFunc;

 

    int n = m_ThreadPool.GetThreadCount();

    m_DQueue.Create(NESTTASK_QUEUE_SIZE, n, NESTTASK_QUEUE_SIZE, 0,

        ThreadPool_GetThreadId, &m_ThreadPool);

}

 

/**  嵌套任务调度的设置线程池的入口函数

 

     @param   THREADFUNC StartFunc - 线程池的入口函数  

     @return  void -

*/

void CNestTaskScheduler::SetStartFunc(THREADFUNC StartFunc)

{

    m_StartFunc = StartFunc;

}

 

/**  嵌套任务调度的获取任务函数

 

     @param   TASK &Task - 接收从分布式队列中获取的任务

     @return  int - 成功返回CAPI_SUCCESS, 失败返回CAPI_FAILED.  

*/

int CNestTaskScheduler::GetTask(TASK &Task)

{

    // 先从本地队列获取任务

    // 本地获取任务失败后从共享队列获取任务

    return m_DQueue.DeQueue(Task);

};

 

 

/**  嵌套任务调度的获取线程池函数

 

    @return   CThreadPool & - 返回线程池对象  

*/

CThreadPool & CNestTaskScheduler::GetThreadPool()

{

    return m_ThreadPool;

}

 

/**  嵌套任务调度的原子增加任务Id函数

 

    @return   int - 返回原子加后的任务Id.

*/

LONG CNestTaskScheduler::AtomicIncrementTaskId()

{

    LONG Id = AtomicIncrement(&m_lTaskId);

    return Id;

}

 

 

 

/**  嵌套任务调度的生成当前线程的本地任务

    任务被放入当前线程的本地队列中

 

     @param   TASK &Task - 待执行的任务  

     @return  void -

*/

void CNestTaskScheduler::SpawnLocalTask(TASK &Task)

{

    // 将任务放入本地队列中

    m_DQueue.PushToLocalQueue(Task);

};

 

 

/**  嵌套任务调度的生成任务函数

    生成的任务被放入分布式队列中

 

     @param   TASK &Task - 待执行的任务  

     @return  void -

*/

void CNestTaskScheduler::SpawnTask(TASK &Task)

{

    if ( m_lTaskId < m_ThreadPool.GetThreadCount() )

    {

        // 依次唤醒各个挂起的线程

        LONG Id = AtomicIncrement(&m_lTaskId);

        if ( Id < m_ThreadPool.GetThreadCount() )

        {

            // 下面之所以可以对其他线程的本地队列进行无同步的操作,是因为

            // 访问这些队列的线程在进队操作之后才开始运行

            m_DQueue.PushToLocalQueue(Task, Id);

            m_ThreadPool.ExecThread(Id);

        }

        else

        {

            m_DQueue.EnQueue(Task);

        }

    }

    else

    {

        // 先判断共享队列是否满,如果未满则放入共享队列中

        // 如果满了则放入本地队列中

        m_DQueue.EnQueue(Task);

    }

};

 

 

/**  嵌套任务调度的启动根线程函数

 

     @param   TASK &Task - 要执行的最初任务   

     @return  void -

*/

void CNestTaskScheduler::BeginRootThread(TASK &Task)

{

    m_lTaskId = 0;

 

    m_ThreadPool.CreateThreadPool(m_StartFunc, this , 0);

 

    m_DQueue.PushToLocalQueue(Task, 0);

 

    m_ThreadPool.ExecThread( 0 ); 

 

    m_ThreadPool.WaitAllThread();

}

 

/**  嵌套任务调度的线程池入口函数

 

     @param   void *pArgs - CNestTaskScheduler 类型的参数    

     @return  unsigned int WINAPI - 返回 

*/

unsigned int WINAPI NestTaskScheduler_StartFunc( void *pArgs)

{

    CNestTaskScheduler  *pSched = (CNestTaskScheduler *)pArgs;

 

    TASK    Task;

    int      nRet;

 

    for ( ;; )

    {

        nRet = pSched->GetTask(Task);

 

        if ( nRet == CAPI_FAILED )

        {

            CThreadPool &ThreadPool = pSched->GetThreadPool();

           

            // 唤醒一个挂起的线程,防止任务数量小于CPU核数时,

            // 仍然有任务处于挂起状态,从而导致WaitAllThread()处于死等状态

            // 这个唤醒过程是一个串行的过程,被唤醒的任务会继续唤醒一个挂起线程

            LONG Id = pSched->AtomicIncrementTaskId();

            if ( Id < ThreadPool.GetThreadCount() )

            {

                ThreadPool.ExecThread(Id);

            }

            break ;

        }

 

        (*(Task.func))(Task.pArg);

    }

 

    return 0;

}

 


      

posted on 2010-01-17 17:57 小王 阅读(659) 评论(0)  编辑 收藏 引用 所属分类: 多核编程

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理