skynetソースラーニング-グローバルキューからメッセージキュープロセスをポップアップ/圧入


クラウド風のskynetソースコードを学び、簡単に記録します.
void 
skynet_globalmq_push(struct message_queue * queue) {
	struct global_queue *q= Q;

	uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1));

	// only one thread can set the slot (change q->queue[tail] from NULL to queue)
	if (!__sync_bool_compare_and_swap(&q->queue[tail], NULL, queue)) {
		// The queue may full seldom, save queue in list
		//   swap    queue[]   ,   64K   ,        
		//      ,      Q list 
		assert(queue->next == NULL);
		struct message_queue * last;
		do {
			last = q->list;
			queue->next = last;
		} while(!__sync_bool_compare_and_swap(&q->list, last, queue));

		return;
	}
}

//    global_queue   head, tail        Q  ,   
// GP        hash   ,         queues index
struct message_queue * 
skynet_globalmq_pop() {
	struct global_queue *q = Q;
	uint32_t head =  q->head;

	if (head == q->tail) {
		// The queue is empty.
		return NULL;
	}

	uint32_t head_ptr = GP(head);

	struct message_queue * list = q->list;
	//   list  ,  Q->queue    ,       queue[] ,        
	if (list) {
		// If q->list is not empty, try to load it back to the queue
		struct message_queue *newhead = list->next;
		if (__sync_bool_compare_and_swap(&q->list, list, newhead)) {
			// try load list only once, if success , push it back to the queue.
			list->next = NULL;
			skynet_globalmq_push(list);
		}
	}

	//          
	struct message_queue * mq = q->queue[head_ptr];
	if (mq == NULL) {
		// globalmq push not complete
		return NULL;
	}
	//            index        ,     position   
	if (!__sync_bool_compare_and_swap(&q->head, head, head+1)) {
		return NULL;
	}
	// only one thread can get the slot (change q->queue[head_ptr] to NULL)
	if (!__sync_bool_compare_and_swap(&q->queue[head_ptr], mq, NULL)) {
		return NULL;
	}

	return mq;
}