在实际工作过程中我们需要对共享的缓冲区进行操作.有插入的线程也有读取的线程.
这就是生产者消费者问题.
生产者要不断将数据放入共享的缓冲,消费者要不断从缓冲取出数据,消费者必须等生产者
#ifndef __QUEUE_H__
#define __QUEUE_H__
#include "Condition.h"
#include "Mutex.h"
template<class T>
class FQueue
{
public:
volatile unsigned int size;
inline FQueue() : cond(&lock)
{
first = last = NULL;
size = 0;
}
uint32 get_size()
{
uint32 ret;
cond.BeginSynchronized();
ret = size;
cond.EndSynchronized();
return ret;
}
void push(T &item)
{
h* p = new h;
p->value = item;
p->pNext = NULL;
cond.BeginSynchronized();//上锁,锁数量++
if(last)//队列里还有其他的结点
{
last->pNext = p;//新元素加到末结点
last = p;//新元素设为末结点
size++;//队列大小++
}
else//是第一个结点
{
last = first = p;
size = 1;
cond.Signal();
}
cond.EndSynchronized();//开锁,锁数量--
}
T pop_nowait()
{
cond.BeginSynchronized();
if(size==0)
{
cond.EndSynchronized();
return NULL;
}
h*tmp=first;
if(tmp == NULL)
{
cond.EndSynchronized();
return NULL;
}
if(--size)
{
first=(h*)first->pNext;
}
else
{
first=last=NULL;
}
cond.EndSynchronized();
T returnVal = tmp->value;
delete tmp;
return returnVal;
}
T pop()
{
cond.BeginSynchronized();
if(size==0)//消费者速度太快了
cond.Wait();//消费者必须等生产者
h*tmp=first;//得到队列第一个结点
if(tmp == NULL)
{
cond.EndSynchronized();
return NULL;
}
if(--size)//队列大小--
{
first=(h*)first->pNext;//下个结点设为第一个结点
}
else//队列为空
{
first=last=NULL;
}
cond.EndSynchronized();
T returnVal = tmp->value;//从队列中delete
delete tmp;
return returnVal;
}
ASCENT_INLINE Condition& GetCond() { return cond; }
private:
struct h
{
T value;
void *pNext;
};
h* first;
h* last;
Mutex lock;
Condition cond;
};
#endif
Condition.h
#ifndef __CONDITION_H__
#define __CONDITION_H__
#include "Mutex.h"
#include <queue>
#include <windows.h>
#define MAX_AWAITING_THREADS 10
class Condition
{
public:
inline Condition(Mutex * mutex) : m_nLockCount(0), m_externalMutex(mutex)
{
::InitializeCriticalSection(&m_critsecWaitSetProtection);
}
~Condition()
{
::DeleteCriticalSection(&m_critsecWaitSetProtection);
}
inline void BeginSynchronized()
{
m_externalMutex->Acquire();
++m_nLockCount;
}
inline void EndSynchronized()
{
--m_nLockCount;
m_externalMutex->Release();
}
DWORD Wait(time_t timeout)
{
DWORD dwMillisecondsTimeout = (DWORD)timeout * 1000;
BOOL bAlertable = FALSE;
HANDLE hWaitEvent = Push();
if( NULL == hWaitEvent )
return WAIT_FAILED;
int nThisThreadsLockCount = m_nLockCount;
m_nLockCount = 0;
for( int i=0; i<nThisThreadsLockCount; ++i)
{
m_externalMutex->Release();
}
DWORD dwWaitResult = ::WaitForSingleObjectEx(
hWaitEvent,
dwMillisecondsTimeout,
bAlertable
);
DWORD dwLastError = 0;
if( WAIT_FAILED == dwWaitResult )
dwLastError = ::GetLastError();
for( int j=0; j<nThisThreadsLockCount; ++j)
{
m_externalMutex->Acquire();
}
m_nLockCount = nThisThreadsLockCount;
if( ! CloseHandle(hWaitEvent) )
return WAIT_FAILED;
if( WAIT_FAILED == dwWaitResult )
::SetLastError(dwLastError);
return dwWaitResult;
}
DWORD Wait()
{
DWORD dwMillisecondsTimeout = INFINITE;//无穷等待
BOOL bAlertable = FALSE;
//消费者速度太快了必须等待生产者
HANDLE hWaitEvent = Push();//一个新的event对象到队列
if( NULL == hWaitEvent )
return WAIT_FAILED;
int nThisThreadsLockCount = m_nLockCount;
m_nLockCount = 0;
for( int i=0; i<nThisThreadsLockCount; ++i)
{
m_externalMutex->Release();
}
DWORD dwWaitResult = ::WaitForSingleObjectEx(
hWaitEvent,
dwMillisecondsTimeout,
bAlertable
);
DWORD dwLastError = 0;
if( WAIT_FAILED == dwWaitResult )
dwLastError = ::GetLastError();
for( int j=0; j<nThisThreadsLockCount; ++j)
{
m_externalMutex->Acquire();
}
//恢复锁的数量
m_nLockCount = nThisThreadsLockCount;
if( ! CloseHandle(hWaitEvent) )
return WAIT_FAILED;
if( WAIT_FAILED == dwWaitResult )
::SetLastError(dwLastError);
return dwWaitResult;
}
void Signal()
{
HANDLE hWaitEvent = Pop();
if(NULL == hWaitEvent)//当前没有线程在等待
return;
//把event设激发状态
SetEvent(hWaitEvent);
}
void Broadcast()
{
::EnterCriticalSection(&m_critsecWaitSetProtection);
std::deque<HANDLE>::const_iterator it_run = m_deqWaitSet.begin();
std::deque<HANDLE>::const_iterator it_end = m_deqWaitSet.end();
for( ; it_run < it_end; ++it_run )
{
if( ! SetEvent(*it_run) )
return;
}
m_deqWaitSet.clear();
::LeaveCriticalSection(&m_critsecWaitSetProtection);
}
private:
HANDLE Push()
{
//产生一个event对象
HANDLE hWaitEvent = ::CreateEvent(
NULL, //
FALSE, //FALSE表示这个event对象变成激发状态(应而换醒一个线程)之后自动重设为非激发状态
FALSE, //FALSE表示这个event对象一开始为非激发状态
NULL //
);
//
if( NULL == hWaitEvent ) {
return NULL;
}
::EnterCriticalSection(&m_critsecWaitSetProtection);
m_deqWaitSet.push_back(hWaitEvent);
::LeaveCriticalSection(&m_critsecWaitSetProtection);
return hWaitEvent;
}
HANDLE Pop()
{
::EnterCriticalSection(&m_critsecWaitSetProtection);
HANDLE hWaitEvent = NULL;
if( 0 != m_deqWaitSet.size() )
{
hWaitEvent = m_deqWaitSet.front();
m_deqWaitSet.pop_front();
}
::LeaveCriticalSection(&m_critsecWaitSetProtection);
return hWaitEvent;
}
BOOL LockHeldByCallingThread()
{
BOOL bTryLockResult = m_externalMutex->AttemptAcquire();
if( ! bTryLockResult )
{
return FALSE;
}
if( 0 == m_nLockCount )
{
m_externalMutex->Release();
return FALSE;
}
m_externalMutex->Release();
return TRUE;
}
std::deque<HANDLE> m_deqWaitSet;
CRITICAL_SECTION m_critsecWaitSetProtection;
Mutex* m_externalMutex;
int m_nLockCount;
};
#endif
Mutex.h
#ifndef __MUTEX_H__
#define __MUTEX_H__
#include <windows.h>
//多个线程操作相同的数据时,一般是需要按顺序访问的,否则会引导数据错乱
//为解决这个问题,就需要引入互斥变量,让每个线程都按顺序地访问变量。
class Mutex
{
public:
Mutex();
~Mutex();
__forceinline void Acquire()
{
EnterCriticalSection(&cs);
}
__forceinline void Release()
{
LeaveCriticalSection(&cs);
}
/*
例如:
线程操作函数。
int AddCount(void)
{
EnterCriticalSection(&cs);
int nRet = m_nCount++;
LeaveCriticalSection(&cs);
return nRet;
}
在函数AddCount里调用EnterCriticalSection和LeaveCriticalSection来互斥访问变量m_nCount。
通过上面这种方法,就可以实现多线程按顺序地访问相同的变量
*/
__forceinline bool AttemptAcquire()
{
//一个线程也可以调用TryEnterCriticalSection函数来请求某个临界区的所有权,此时即
//使请求失败也不会被阻塞
return 0;//(TryEnterCriticalSection(&cs) == TRUE ? true : false);
}
protected:
CRITICAL_SECTION cs;//临界区是一种防止多个线程同时执行一个特定代码节的机制
};
#endif
Mutex.cpp
#include "stdafx.h"
#include "Mutex.h"
Mutex::Mutex()
{
//创建临界区对象
InitializeCriticalSection(&cs);
}
Mutex::~Mutex()
{
//删除临界区对象
DeleteCriticalSection(&cs);
}
测试代码过2天就发出来...