c++11ロックなしキュー
4976 ワード
/*************************************************************************
> File Name: lockfreequeue.cpp
> Author:
> Mail:
> Created Time: Wed 08 Aug 2018 11:38:08 AM CST
************************************************************************/
#include
#include
#include
#include
using namespace std;
template
class LockFreeQueue{
private:
struct CounterNode;
public:
LockFreeQueue(){
CounterNode node;
node.extern_counter_ = 1;
node.next_ = new Node();
tail_.store( node );
head_.store(node);
}
virtual ~LockFreeQueue(){}
void Push( T dta )
{
std::unique_ptr dta_ptr(new T(dta));
CounterNode node;
node.extern_counter_ = 1;
node.next_= new Node();
CounterNode tail = tail_.load();
while(1)
{
IncreaseOutCount(tail_, tail);
T* dta = NULL;
if( tail.next_->dta.compare_exchange_strong( dta, dta_ptr.get()) )
{
tail.next_->next_ = node;
tail = tail_.exchange(node);
FreeOutCount( tail );
dta_ptr.release();
break;
}
tail.next_->ReleaseRef();
}
}
std::unique_ptr Pop()
{
CounterNode head = head_.load(std::memory_order_relaxed);
while(1)
{
IncreaseOutCount( head_, head );
Node* ptr = head.next_;
if( ptr == tail_.load().next_)
{
ptr->ReleaseRef();
return NULL;
}
if( head_.compare_exchange_strong(head, ptr->next_ ) )
{
T* dta = ptr->dta.exchange(NULL);
FreeOutCount( head );
return std::unique_ptr(dta);
}
ptr->ReleaseRef();
}
}
private:
void IncreaseOutCount( std::atomic& head, CounterNode& node )
{
CounterNode new_node;
do{
new_node = node;
++new_node.extern_counter_;
}
while( !head.compare_exchange_strong(node, new_node, std::memory_order_acquire, std::memory_order_relaxed) );
node = new_node;
}
void FreeOutCount( CounterNode& node )
{
Node* ptr = node.next_;
int cout_realease = node.extern_counter_ - 2;
InterCount count = ptr->counter_.load(std::memory_order_relaxed);
InterCount new_count;
do{
new_count = count;
new_count.inter_counter_ += cout_realease;
--new_count.outer_counter_;
}
while( !ptr->counter_.compare_exchange_strong( count, new_count, std::memory_order_acquire, std::memory_order_relaxed ) );
if( !new_count.inter_counter_ && !new_count.outer_counter_ )
{
delete ptr;
}
}
private:
struct Node;
struct InterCount{
unsigned inter_counter_ : 30;
unsigned outer_counter_ : 2;
};
struct CounterNode{
int extern_counter_;
Node* next_;
};
struct Node{
std::atomic dta;
std::atomic counter_;
CounterNode next_;
Node()
{
InterCount counter;
counter.inter_counter_ = 0;
counter.outer_counter_ = 2;
counter_.store(counter);
next_.extern_counter_ = 0;
next_.next_ = NULL;
dta = NULL;
}
void ReleaseRef()
{
InterCount old_counter = counter_.load();
InterCount new_counter;
do
{
new_counter = old_counter;
--new_counter.inter_counter_;
}
while( !counter_.compare_exchange_strong(old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed) );
if( !new_counter.inter_counter_ && !new_counter.outer_counter_ )
{
delete this;
}
}
};
std::atomic head_;
std::atomic tail_;
};
LockFreeQueue queue;
void PushQueueI()
{
for( int i = 0; i < 10; ++i )
{
queue.Push( i );
}
}
void PushQueueJ()
{
for( int i = 20; i < 30; ++i )
{
queue.Push( i );
}
}
void PopQueue()
{
while( 1 )
{
std::shared_ptr ptr = queue.Pop();
if( ptr )
std::cout << *ptr << std::endl;
}
}
int main()
{
std::thread t1( PushQueueI);
std::thread t2( PushQueueJ);
std::thread t3( PopQueue);
t1.join();
t2.join();
t3.join();
return 0;
}