効率的な読み書きメッセージキューSafeQueue

2759 ワード

/******************               ********************/

class SpinLock
{
public:
	SpinLock() { pthread_spin_init(&m_spinlock, 0); }
	~SpinLock() { pthread_spin_destroy(&m_spinlock); }
private:
	DISALLOW_COPY_AND_ASSIGN(SpinLock);
public:
	void lock() { pthread_spin_lock(&m_spinlock); }
	void unlock() { pthread_spin_unlock(&m_spinlock); }
private:
	pthread_spinlock_t m_spinlock;
};
/*	
	                   CPU              
	                  
	              ,         ,
	                        ,          ,
	      cpu,           ,        ,          
	sleep-waiting busy-waiting
*/
template 
class SafeQueue
{
public:
	SafeQueue()
		:m_pushqueue(NULL),m_popqueue(NULL)
		,m_pushqueueBegin(0),m_popqueueBegin(0)
		,m_pushqueueEnd(0),m_popqueueEnd(0)
		,m_pushqueueMaxSize(1),m_popqueueMaxSize(1)
	{
		m_pushqueue = new T[1];
		m_popqueue = new T[1];
	}
	~SafeQueue()
	{
		delete[] m_pushqueue;
		delete[] m_popqueue;
		m_pushqueue = NULL;
		m_popqueue = NULL;
	}
public:
	T pop()
	{
		m_poplock.lock();
		if (m_popqueueBegin >= m_popqueueEnd)
		{
			//     ,              
			m_pushlock.lock();
			T* oldpush = m_pushqueue;
			int32_t maxSize = m_pushqueueMaxSize;
			int32_t pushend = m_pushqueueEnd;

			m_pushqueue = m_popqueue;
			m_pushqueueBegin = 0;			
			m_pushqueueEnd = 0;
			m_pushqueueMaxSize = m_popqueueMaxSize;			
			m_pushlock.unlock();
			//   ,      pop  ,      ,         ,    
			//       pop     ,
			//                    maxsize


			//                     pushend                         
			m_popqueue = oldpush;
			m_popqueueBegin = 0;
			m_popqueueEnd = pushend;
			m_popqueueMaxSize = maxSize;
			
		}
		//     
		if (m_popqueueBegin < m_popqueueEnd)
		{
			//  
			T ret = m_popqueue[m_popqueueBegin++];
			m_poplock.unlock();
			return ret;
		}
		else
		{
			//   
			m_poplock.unlock();
			return T(0);
		}
	}
	void push(T val)
	{
		T* oldpos = NULL;
		m_pushlock.lock();
		if (m_pushqueueEnd >= m_pushqueueMaxSize)
		{
			oldpos = m_pushqueue;
			m_pushqueue = new T[m_pushqueueMaxSize * 2];
			memcpy(m_pushqueue, oldpos, sizeof(T)*m_pushqueueEnd);
			m_pushqueueMaxSize *= 2;
		}
		m_pushqueue[m_pushqueueEnd++] = val;
		m_pushlock.unlock();

		SAFE_DELETE(oldpos);//                  
	}
private:
	DISALLOW_COPY_AND_ASSIGN(SafeQueue);
private:
	//    ,    push     pop              
	//      ,   ,
	//     push   ,     , 
	//     pop                       
	T* m_pushqueue;
	T* m_popqueue;
	//               ,         
	int32_t m_pushqueueEnd;
	int32_t m_popqueueEnd;
	
	int32_t m_pushqueueBegin;
	int32_t m_popqueueBegin;

	int32_t m_pushqueueMaxSize;
	int32_t m_popqueueMaxSize;

	//                
	SpinLock m_pushlock;
	SpinLock m_poplock;
};