c++11ロックなしキュー

4976 ワード

/*************************************************************************
	> File Name: lockfreequeue.cpp
	> Author: 
	> Mail: 
	> Created Time: Wed 08 Aug 2018 11:38:08 AM CST
 ************************************************************************/

#include
#include 
#include 
#include 
using namespace std;

template 
class LockFreeQueue{
    private:
    struct CounterNode;
    public:
    LockFreeQueue(){
        CounterNode node;
        node.extern_counter_ = 1;
        node.next_ = new Node();
        tail_.store( node );
        head_.store(node);
    }
    virtual ~LockFreeQueue(){}

    void Push( T dta )
    {
        std::unique_ptr dta_ptr(new T(dta));
        CounterNode node;
        node.extern_counter_ = 1;
        node.next_= new Node();

        CounterNode tail = tail_.load();
        while(1)
        {
            IncreaseOutCount(tail_, tail);
            T* dta = NULL;
            if( tail.next_->dta.compare_exchange_strong( dta, dta_ptr.get()) )
            {
                tail.next_->next_ = node;
                tail = tail_.exchange(node);

                FreeOutCount( tail );
                dta_ptr.release();
                break;
            }

            tail.next_->ReleaseRef();
        }
    }

    std::unique_ptr Pop()
    {
        CounterNode head = head_.load(std::memory_order_relaxed);
        while(1)
        {
            IncreaseOutCount( head_, head );
            Node* ptr = head.next_;
            if( ptr == tail_.load().next_)
            {
                ptr->ReleaseRef();
                return NULL;
            }

            if( head_.compare_exchange_strong(head, ptr->next_ ) )
            {
                T* dta = ptr->dta.exchange(NULL);
                FreeOutCount( head );
                return std::unique_ptr(dta);
            }

            ptr->ReleaseRef();
        }
        
    }

    private:
    void IncreaseOutCount( std::atomic& head, CounterNode& node )
    {
        CounterNode new_node;
        do{
            new_node = node;
            ++new_node.extern_counter_;
        }
        while( !head.compare_exchange_strong(node, new_node, std::memory_order_acquire, std::memory_order_relaxed) );
        node = new_node;
    }

    void FreeOutCount( CounterNode& node )
    {
        Node* ptr = node.next_;
        int cout_realease = node.extern_counter_ - 2;
        InterCount count = ptr->counter_.load(std::memory_order_relaxed);
        InterCount new_count;
        do{
            new_count = count;
            new_count.inter_counter_ += cout_realease;
            --new_count.outer_counter_;
        }
        while( !ptr->counter_.compare_exchange_strong( count, new_count, std::memory_order_acquire, std::memory_order_relaxed ) );

        if( !new_count.inter_counter_ && !new_count.outer_counter_ )
        {
            delete ptr;
        }
    }

    private:
    struct Node;
    struct InterCount{
        unsigned inter_counter_ : 30;
        unsigned outer_counter_ : 2;
    };

    struct CounterNode{
        int extern_counter_;
        Node* next_;
    };

    struct Node{
        std::atomic dta;
        std::atomic counter_;
        CounterNode next_;

        Node()
        {
            InterCount counter;
            counter.inter_counter_ = 0;
            counter.outer_counter_ = 2;
            counter_.store(counter);
            next_.extern_counter_ = 0;
            next_.next_ = NULL;
            dta = NULL;
        }

        void ReleaseRef()
        {
            InterCount old_counter = counter_.load();
            InterCount new_counter;
            do
            {
                new_counter = old_counter;
                --new_counter.inter_counter_;
            }
            while( !counter_.compare_exchange_strong(old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed) );

            if( !new_counter.inter_counter_ && !new_counter.outer_counter_ )
            {
                delete this;
            }
        }
    };

    std::atomic head_;
    std::atomic tail_;
};


LockFreeQueue queue;

void PushQueueI()
{
    for(  int i = 0; i < 10; ++i )
    {
        queue.Push( i );
    }
}

void PushQueueJ()
{
    for(  int i = 20; i < 30; ++i )
    {
        queue.Push( i );
    }
}

void PopQueue()
{
    while( 1 )
    {
        std::shared_ptr ptr = queue.Pop();
        if( ptr )
            std::cout << *ptr << std::endl;
    }
}

int main()
{
    std::thread t1( PushQueueI);
    std::thread t2( PushQueueJ);
    std::thread t3( PopQueue);

    t1.join();
    t2.join();
    t3.join();
    return 0;
}