redis-2.6.16ソース分析のpub-subシステム
redisが実現する送信サブスクリプションシステム、すなわちpub-subは、この部分のコードが少なく、分析も便利である.これは、一般的なpub−sub(Pattern−matching subscriptionsを無視する)のみを分析し、pubsubシステムがどのように実現されるかを簡単に説明する.
redisには主にredisを紹介するpub-subがあり、開始前にredisのpubsubのいくつかのコマンドを知る必要があります.
SUBSCRIBE first second//firstとsecondの2つのchannelを購読
PUBLISH secondHello//送信方向channelはsecondの送信「hello」メッセージ
UNSUBSCRIBE//以前に購読したすべてのチャンネルをキャンセル
次にredisのPubsubを見てみましょう.cコードにおけるこれらのコマンドの実装.
まずsubscribeの実装を見てみましょう.
redisには主にredisを紹介するpub-subがあり、開始前にredisのpubsubのいくつかのコマンドを知る必要があります.
SUBSCRIBE first second//firstとsecondの2つのchannelを購読
PUBLISH secondHello//送信方向channelはsecondの送信「hello」メッセージ
UNSUBSCRIBE//以前に購読したすべてのチャンネルをキャンセル
次にredisのPubsubを見てみましょう.cコードにおけるこれらのコマンドの実装.
まずsubscribeの実装を見てみましょう.
voidsubscribeCommand(redisClient *c) {
int j;
for (j = 1; j < c->argc; j++)
// channel, channel
pubsubSubscribeChannel(c,c->argv[j]);
}
/* Subscribe aclient to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed tothat channel. */
intpubsubSubscribeChannel(redisClient *c, robj *channel) {
struct dictEntry *de;
list *clients = NULL;
int retval = 0;
//c->pubsub_channels redis hash , ,
//dictAdd hash key channel,value null
/* Add the channel to the client ->channels hash table */
if(dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
// channel
// , channel ,
// channel
// , ,channel
/* Add the client to the channel ->list of clients hash table */
de =dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
// channel ,
// channel
listAddNodeTail(clients,c);
}
/* Notify the client */
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
return retval;
}
// pub :
voidpublishCommand(redisClient *c) {
// channel , PUBLISH second Hello
//c->args[1] channel second, Hello
int receivers =pubsubPublishMessage(c->argv[1],c->argv[2]);
addReplyLongLong(c,receivers);
}
/*Publish a message */
intpubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
struct dictEntry *de;
listNode *ln;
listIter li;
// channel
// ,
/* Send to clients listening for that channel */
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
/* Send to clients listening to matching channels */
...
return receivers;
}
// unsubscribe :
void unsubscribeCommand(redisClient *c) {
if (c->argc == 1) {
//
pubsubUnsubscribeAllChannels(c,1);
} else {
int j;
for (j = 1; j < c->argc; j++)
// channel
pubsubUnsubscribeChannel(c,c->argv[j],1);
}
}
/* Unsubscribe from all the channels. Return the number of channels the
* client was subscribed from. */
int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
// channel
dictIterator *di =dictGetSafeIterator(c->pubsub_channels);
dictEntry *de;
int count = 0;
// channel ,
while((de = dictNext(di)) != NULL) {
robj *channel = dictGetKey(de);
count +=pubsubUnsubscribeChannel(c,channel,notify);
}
/* We were subscribed to nothing? Still reply to the client.*/
if (notify && count == 0) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReply(c,shared.nullbulk);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
dictReleaseIterator(di);
return count;
}
* Unsubscribe a client from a channel. Returns 1 if the operationsucceeded, or
* 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
struct dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
// hash channel , channel
// channel ,
//
/* Remove the channel from the client -> channels hashtable */
incrRefCount(channel); /* channel may be just a pointer tothe same object
we have in the hash tables. Protect it... */
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
retval = 1;
/* Remove the client from the channel ->clients list hash table */
de = dictFind(server.pubsub_channels,channel);
redisAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
redisAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
/* Free the list and associatedhash entry at all if this was
* the latest client, sothat it will be possible to abuse
* Redis PUBSUB creatingmillions of channels. */
dictDelete(server.pubsub_channels,channel);
}
}
/* Notify the client */
if (notify) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
decrRefCount(channel); /* it is finally safe to release it*/
return retval;
}