skynetソースラーニング-グローバルキューからメッセージキュープロセスをポップアップ/圧入
1892 ワード
クラウド風のskynetソースコードを学び、簡単に記録します.
void
skynet_globalmq_push(struct message_queue * queue) {
struct global_queue *q= Q;
uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1));
// only one thread can set the slot (change q->queue[tail] from NULL to queue)
if (!__sync_bool_compare_and_swap(&q->queue[tail], NULL, queue)) {
// The queue may full seldom, save queue in list
// swap queue[] , 64K ,
// , Q list
assert(queue->next == NULL);
struct message_queue * last;
do {
last = q->list;
queue->next = last;
} while(!__sync_bool_compare_and_swap(&q->list, last, queue));
return;
}
}
// global_queue head, tail Q ,
// GP hash , queues index
struct message_queue *
skynet_globalmq_pop() {
struct global_queue *q = Q;
uint32_t head = q->head;
if (head == q->tail) {
// The queue is empty.
return NULL;
}
uint32_t head_ptr = GP(head);
struct message_queue * list = q->list;
// list , Q->queue , queue[] ,
if (list) {
// If q->list is not empty, try to load it back to the queue
struct message_queue *newhead = list->next;
if (__sync_bool_compare_and_swap(&q->list, list, newhead)) {
// try load list only once, if success , push it back to the queue.
list->next = NULL;
skynet_globalmq_push(list);
}
}
//
struct message_queue * mq = q->queue[head_ptr];
if (mq == NULL) {
// globalmq push not complete
return NULL;
}
// index , position
if (!__sync_bool_compare_and_swap(&q->head, head, head+1)) {
return NULL;
}
// only one thread can get the slot (change q->queue[head_ptr] to NULL)
if (!__sync_bool_compare_and_swap(&q->queue[head_ptr], mq, NULL)) {
return NULL;
}
return mq;
}