redis-2.6.16ソース分析のpub-subシステム


redisが実現する送信サブスクリプションシステム、すなわちpub-subは、この部分のコードが少なく、分析も便利である.これは、一般的なpub−sub(Pattern−matching subscriptionsを無視する)のみを分析し、pubsubシステムがどのように実現されるかを簡単に説明する.
redisには主にredisを紹介するpub-subがあり、開始前にredisのpubsubのいくつかのコマンドを知る必要があります.
SUBSCRIBE first second//firstとsecondの2つのchannelを購読
PUBLISH secondHello//送信方向channelはsecondの送信「hello」メッセージ
UNSUBSCRIBE//以前に購読したすべてのチャンネルをキャンセル
 
次にredisのPubsubを見てみましょう.cコードにおけるこれらのコマンドの実装.                       
まずsubscribeの実装を見てみましょう.
voidsubscribeCommand(redisClient *c) {
    int j;                             
 
    for (j = 1; j < c->argc; j++)
        //       channel,         channel      
       pubsubSubscribeChannel(c,c->argv[j]);
}
 
 
/* Subscribe aclient to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed tothat channel. */
intpubsubSubscribeChannel(redisClient *c, robj *channel) {
    struct dictEntry *de;
    list *clients = NULL;
    int retval = 0;
    
    //c->pubsub_channels   redis     hash ,        ,      
    //dictAdd hash      key channel,value null    
    /* Add the channel to the client ->channels hash table */
    if(dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        //             channel           
        //     ,         channel  ,       
        //                     channel           
        //   ,       ,channel         
        /* Add the client to the channel ->list of clients hash table */
        de =dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
           dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        //   channel                      ,
        //                channel
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
    return retval;
}
 
//   pub   :
voidpublishCommand(redisClient *c) {
    //      channel    ,  PUBLISH second Hello 
        //c->args[1]  channel second,             Hello
    int receivers =pubsubPublishMessage(c->argv[1],c->argv[2]);
    addReplyLongLong(c,receivers);
}
 
/*Publish a message */
intpubsubPublishMessage(robj *channel, robj *message) {
   int receivers = 0;
   struct dictEntry *de;
   listNode *ln;
   listIter li;
    
   //           channel      
   //      ,           
   /* Send to clients listening for that channel */
   de = dictFind(server.pubsub_channels,channel);
   if (de) {
       list *list = dictGetVal(de);
       listNode *ln;
       listIter li;
 
       listRewind(list,&li);
       while ((ln = listNext(&li)) != NULL) {
           redisClient *c = ln->value;
 
           addReply(c,shared.mbulkhdr[3]);
           addReply(c,shared.messagebulk);
           addReplyBulk(c,channel);
           addReplyBulk(c,message);
           receivers++;
       }
   }
   /* Send to clients listening to matching channels */
                 ...
 
   return receivers;
}
 
//    unsubscribe   :
void unsubscribeCommand(redisClient *c) {
    if (c->argc == 1) {
        //            
        pubsubUnsubscribeAllChannels(c,1);
    } else {
        int j;
 
        for (j = 1; j < c->argc; j++)
           //            channel   
            pubsubUnsubscribeChannel(c,c->argv[j],1);
    }
}
/* Unsubscribe from all the channels. Return the number of channels the
 * client was subscribed from. */
int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
    //             channel    
    dictIterator *di =dictGetSafeIterator(c->pubsub_channels);
    dictEntry *de;
    int count = 0;
    //                   channel  ,          
    while((de = dictNext(di)) != NULL) {
        robj *channel = dictGetKey(de);
 
        count +=pubsubUnsubscribeChannel(c,channel,notify);
    }
    /* We were subscribed to nothing? Still reply to the client.*/
    if (notify && count == 0) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReply(c,shared.nullbulk);
       addReplyLongLong(c,dictSize(c->pubsub_channels)+
                      listLength(c->pubsub_patterns));
    }
    dictReleaseIterator(di);
    return count;
}
 
* Unsubscribe a client from a channel. Returns 1 if the operationsucceeded, or
 * 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
    struct dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;
 
    //       hash      channel  ,          channel   
    //      channel        ,                
    //          
    /* Remove the channel from the client -> channels hashtable */
    incrRefCount(channel); /* channel may be just a pointer tothe same object
                           we have in the hash tables. Protect it... */
    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
        retval = 1;
        /* Remove the client from the channel ->clients list hash table */
        de = dictFind(server.pubsub_channels,channel);
        redisAssertWithInfo(c,NULL,de != NULL);
        clients = dictGetVal(de);
        ln = listSearchKey(clients,c);
        redisAssertWithInfo(c,NULL,ln != NULL);
        listDelNode(clients,ln);
        if (listLength(clients) == 0) {
            /* Free the list and associatedhash entry at all if this was
             * the latest client, sothat it will be possible to abuse
             * Redis PUBSUB creatingmillions of channels. */
           dictDelete(server.pubsub_channels,channel);
        }
    }
    /* Notify the client */
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReplyBulk(c,channel);
       addReplyLongLong(c,dictSize(c->pubsub_channels)+
                      listLength(c->pubsub_patterns));
 
    }
    decrRefCount(channel); /* it is finally safe to release it*/
    return retval;
}