D3D

不知何年何月得常所愿, 得,得,得,得常所愿

  C++博客 :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理 ::
  52 随笔 :: 0 文章 :: 208 评论 :: 0 Trackbacks


在实际工作过程中我们需要对共享的缓冲区进行操作.有插入的线程也有读取的线程.
这就是生产者消费者问题.
生产者要不断将数据放入共享的缓冲,消费者要不断从缓冲取出数据,消费者必须等生产者


#ifndef __QUEUE_H__
#define __QUEUE_H__

#include 
"Condition.h"
#include 
"Mutex.h"
 
template
<class T> 
class FQueue 
{
public:
    
volatile unsigned int size;

    inline FQueue() : cond(
&lock)
    {
        first 
= last = NULL;
        size 
= 0;
    }
    
    uint32 get_size()
    {
        uint32 ret;
        cond.BeginSynchronized();
        ret 
= size;
        cond.EndSynchronized();
        
return ret;
    }
    

    
void push(T &item)
    {
        h
* p = new h;
        p
->value = item;
        p
->pNext = NULL;
        

        cond.BeginSynchronized();
//上锁,锁数量++
        if(last)//队列里还有其他的结点
        {
            last
->pNext = p;//新元素加到末结点
            last = p;//新元素设为末结点
            size++;//队列大小++
        }
        
else//是第一个结点
        {
            last 
= first = p;
            size 
= 1;
            cond.Signal();
        }

        cond.EndSynchronized();
//开锁,锁数量--
    }

    T pop_nowait()
    {
    
        cond.BeginSynchronized();
        
if(size==0)
        {
            cond.EndSynchronized();
            
return NULL;
        }

        h
*tmp=first;
        
if(tmp == NULL)
        {
            cond.EndSynchronized();
            
return NULL;
        }

        
if(--size)
        {
            first
=(h*)first->pNext;
        }
        
else
        {
            first
=last=NULL;
        }

        cond.EndSynchronized();

        T returnVal 
= tmp->value;
        delete tmp;

        
return returnVal;
    }
    
    T pop()
    {

        cond.BeginSynchronized();
        
if(size==0)//消费者速度太快了
            cond.Wait();//消费者必须等生产者

        h
*tmp=first;//得到队列第一个结点
        if(tmp == NULL)
        {
            cond.EndSynchronized();
            
return NULL;
        }

        
if(--size)//队列大小--
        {
            first
=(h*)first->pNext;//下个结点设为第一个结点
        }
        
else//队列为空
        {
            first
=last=NULL;
        }

        cond.EndSynchronized();

        T returnVal 
= tmp->value;//从队列中delete
        delete tmp;
        
        
return returnVal;
    }    

    ASCENT_INLINE Condition
& GetCond() { return cond; }
    
private:
    
struct h
    {
        T value;
        
void *pNext;
    };

    h
* first;
    h
* last;
    
    Mutex 
lock;
    Condition cond;
};

#endif 


Condition.h


#ifndef __CONDITION_H__
#define __CONDITION_H__

#include 
"Mutex.h"
#include 
<queue>
#include 
<windows.h>

#define MAX_AWAITING_THREADS 10

class Condition
{
public:
    inline Condition(Mutex 
* mutex) : m_nLockCount(0), m_externalMutex(mutex)
    {
        ::InitializeCriticalSection(
&m_critsecWaitSetProtection);
    }
 
    
~Condition()
    {    
        ::DeleteCriticalSection(
&m_critsecWaitSetProtection);
    }

    inline 
void BeginSynchronized()
    {
        m_externalMutex
->Acquire();
        
++m_nLockCount;
    }

    inline 
void EndSynchronized()
    {
        
--m_nLockCount;
        m_externalMutex
->Release();
    }

    DWORD Wait(time_t timeout)
    {
        DWORD dwMillisecondsTimeout 
= (DWORD)timeout * 1000;
        BOOL bAlertable 
= FALSE;

        HANDLE hWaitEvent 
= Push();
        
if( NULL == hWaitEvent )
            
return WAIT_FAILED;

        
int nThisThreadsLockCount = m_nLockCount;
        m_nLockCount 
= 0;

        
forint i=0; i<nThisThreadsLockCount; ++i)
        {
            m_externalMutex
->Release();
        }

        DWORD dwWaitResult 
= ::WaitForSingleObjectEx(
            hWaitEvent,
            dwMillisecondsTimeout,
            bAlertable
            );

        DWORD dwLastError 
= 0;
        
if( WAIT_FAILED == dwWaitResult )
            dwLastError 
= ::GetLastError();

        
forint j=0; j<nThisThreadsLockCount; ++j)
        {
            m_externalMutex
->Acquire();
        }

        m_nLockCount 
= nThisThreadsLockCount;

        
if! CloseHandle(hWaitEvent) )
            
return WAIT_FAILED;

        
if( WAIT_FAILED == dwWaitResult )
            ::SetLastError(dwLastError);

        
return dwWaitResult;
    }

    DWORD Wait()
    {
        DWORD dwMillisecondsTimeout 
= INFINITE;//无穷等待
        BOOL bAlertable = FALSE;
        
        
//消费者速度太快了必须等待生产者
        HANDLE hWaitEvent = Push();//一个新的event对象到队列
        if( NULL == hWaitEvent )
            
return WAIT_FAILED;

        
int nThisThreadsLockCount = m_nLockCount;
        m_nLockCount 
= 0;

        
forint i=0; i<nThisThreadsLockCount; ++i)
        {
            m_externalMutex
->Release();
        }

        DWORD dwWaitResult 
= ::WaitForSingleObjectEx(
            hWaitEvent,
            dwMillisecondsTimeout,
            bAlertable
            );

        DWORD dwLastError 
= 0;
        
if( WAIT_FAILED == dwWaitResult )
            dwLastError 
= ::GetLastError();

        
forint j=0; j<nThisThreadsLockCount; ++j)
        {
            m_externalMutex
->Acquire();
        }

        
//恢复锁的数量
        m_nLockCount = nThisThreadsLockCount;

        
if! CloseHandle(hWaitEvent) )
            
return WAIT_FAILED;

        
if( WAIT_FAILED == dwWaitResult )
            ::SetLastError(dwLastError);

        
return dwWaitResult;

    }

    
void Signal()
    {
        HANDLE hWaitEvent 
= Pop();

        
if(NULL == hWaitEvent)//当前没有线程在等待
            return;

        
//把event设激发状态
        SetEvent(hWaitEvent);
    }

    
void Broadcast()
    {

        ::EnterCriticalSection(
&m_critsecWaitSetProtection);
        std::deque
<HANDLE>::const_iterator it_run = m_deqWaitSet.begin();
        std::deque
<HANDLE>::const_iterator it_end = m_deqWaitSet.end();
        
for( ; it_run < it_end; ++it_run )
        {
            
if! SetEvent(*it_run) )
                
return;
        }
        m_deqWaitSet.clear();
        ::LeaveCriticalSection(
&m_critsecWaitSetProtection);
    }

private:

    HANDLE Push()
    {
        
//产生一个event对象
        HANDLE hWaitEvent = ::CreateEvent(
            NULL, 
//
            FALSE, //FALSE表示这个event对象变成激发状态(应而换醒一个线程)之后自动重设为非激发状态
            FALSE, //FALSE表示这个event对象一开始为非激发状态
            NULL //
            );
        
//
        if( NULL == hWaitEvent ) {
            
return NULL;
        }

        ::EnterCriticalSection(
&m_critsecWaitSetProtection);
        m_deqWaitSet.push_back(hWaitEvent);
        ::LeaveCriticalSection(
&m_critsecWaitSetProtection);

        
return hWaitEvent;
    }

    HANDLE Pop()
    {
        ::EnterCriticalSection(
&m_critsecWaitSetProtection);
        HANDLE hWaitEvent 
= NULL; 
        
if0 != m_deqWaitSet.size() )
        {
            hWaitEvent 
= m_deqWaitSet.front();
            m_deqWaitSet.pop_front();
        }
        ::LeaveCriticalSection(
&m_critsecWaitSetProtection);

        
return hWaitEvent;
    }

    BOOL LockHeldByCallingThread()
    {
        BOOL bTryLockResult 
= m_externalMutex->AttemptAcquire();

        
if! bTryLockResult )
        {
            
return FALSE;
        }


        
if0 == m_nLockCount )
        {
            m_externalMutex
->Release();
            
return FALSE;
        }

        m_externalMutex
->Release();

        
return TRUE;
    }

    std::deque
<HANDLE> m_deqWaitSet;
    CRITICAL_SECTION m_critsecWaitSetProtection;
    Mutex
* m_externalMutex;
    
int m_nLockCount;
};



#endif

Mutex.h


#ifndef __MUTEX_H__
#define __MUTEX_H__

#include 
<windows.h>

//多个线程操作相同的数据时,一般是需要按顺序访问的,否则会引导数据错乱
//为解决这个问题,就需要引入互斥变量,让每个线程都按顺序地访问变量。
class Mutex
{
public:
    Mutex();
    
~Mutex();

    __forceinline 
void Acquire()
    {
        EnterCriticalSection(
&cs);
    }

    __forceinline 
void Release()
    {
        LeaveCriticalSection(
&cs);
    }
    
/*
    例如:
    线程操作函数。
    int AddCount(void)
    {
        EnterCriticalSection(&cs);
        int nRet = m_nCount++;
        LeaveCriticalSection(&cs);
        return nRet;
    }
    在函数AddCount里调用EnterCriticalSection和LeaveCriticalSection来互斥访问变量m_nCount。
    通过上面这种方法,就可以实现多线程按顺序地访问相同的变量
    
*/
    __forceinline 
bool AttemptAcquire()
    {
        
//一个线程也可以调用TryEnterCriticalSection函数来请求某个临界区的所有权,此时即
        
//使请求失败也不会被阻塞
        return 0;//(TryEnterCriticalSection(&cs) == TRUE ? true : false);
    }

protected:
    CRITICAL_SECTION cs;
//临界区是一种防止多个线程同时执行一个特定代码节的机制

};

#endif

Mutex.cpp


#include 
"stdafx.h"
#include 
"Mutex.h"
 
Mutex::Mutex() 
{
    
//创建临界区对象
    InitializeCriticalSection(&cs);
}
Mutex::
~Mutex()
{
    
//删除临界区对象
    DeleteCriticalSection(&cs);
}


测试代码过2天就发出来...
 

posted on 2008-11-14 20:15 王博炜 阅读(2170) 评论(3)  编辑 收藏 引用 所属分类: 多线程

评论

# re: 生产者--消费者[未登录] 2008-11-23 20:43 ngaut
不错,里面可能还有点小问题,
T pop_nowait()
如果是整数或者long之类的返回NULL则无法区分是否冲队列取到资源了,考虑
bool pop_nowait(T&)
的形式
if(size==0)//消费者速度太快了
cond.Wait();//消费者必须等生产者
if可以考虑这种形式,貌似某些系统上可能会有问题
while( 0 == size)


  回复  更多评论
  

# re: 生产者--消费者[未登录] 2008-11-24 19:05 王博炜
@ngaut
如果是整数或者long之类的返回NULL则无法区分是否冲队列取到资源了
是的你说的对,但是这样的用法少。


Win32下是没问题的  回复  更多评论
  

# re: 生产者--消费者 2009-02-14 11:26 wangqis
等待测试代码  回复  更多评论
  


专题:Android  iPad jQuery Chrome OS

博客园首页  IT新闻  知识库  学英语  C++程序员招聘
标题  
姓名  
主页
验证码 *
内容(提交失败后,可以通过“恢复上次提交”恢复刚刚提交的内容)  
  登录  使用高级评论  新用户注册  返回页首  恢复上次提交      
[使用Ctrl+Enter键可以直接提交]
每天10分钟,轻松学英语
网站导航: