redisのメッセージパブリケーション/サブスクリプション(subscribe)

14251 ワード

最近、プロジェクトにはパブリケーション/サブスクリプションメカニズムを実現できるメッセージキューが必要です.まず、Kafka、RabbitMQなどのメッセージキューコンポーネントが考えられますが、私たちのプロジェクトにはそんなに複雑なコンポーネントを導入する必要はないかもしれません.Redisにも比較的軽量なサブスクリプションメカニズムがあります.比較的良好な問題解決策を得るためにredisのPublish/subscribe機構を参照することができる.
publish/subscribeの使い方
redisは、この機能をサポートする6つのコマンドを提供します.
シーケンス番号
コマンド#コマンド#
説明
1
PSUBSCRIBE pattern [pattern …]
指定されたモードに一致する1つ以上のチャネルを購読する
2
PUBSUB subcommand [argument [argument …]]
サブスクリプションとパブリッシュシステムのステータスの表示
3
PUBLISH channel message
指定したチャンネルにメッセージを送信
4
PUNSUBSCRIBE [pattern [pattern …]]
指定されたすべてのモードのチャンネルをキャンセル
5
SUBSCRIBE channel [channel …]
指定された1つ以上のチャンネルの情報を購読する
6
UNSUBSCRIBE [channel [channel …]]
所定のチャンネルをキャンセルすることを指す
  • クライアントは、1つまたは複数のチャネル、SUBSCRIBEチャネル1チャネル2チャネル3を一度に購読することができる.
  • PUBSUBは、現在のpublish/subscribeシステムの内部コマンドのアクティブな状態を返します.内部コマンドには、channels(現在アクティブなchannelをリスト)、NUMSUB(指定したchannelのサブスクリプション数を返す)、NUMPAT(patternのサブスクリプション数を返す)、
  • は複数のチャネルを購読し、ワイルドカード*は上のすべてのチャネル、PSUBScriBE chan*に一致することができる.
  • メッセージリリース、PUBLISH channel 2 hello-test;
  • あるchannelメッセージの購読をキャンセルし、UNSUBSCRIBE channel 1;
  • patternのメッセージ購読をキャンセルし、PUNSUBScriBE chan*;

  • publish/subscribeの実装コード分析
    基本的にすべてのコードはpubsubにあります.cの中には、郭の辞書とチェーンテーブルを通じて実現されています.辞書には、channelの名前からchannelに関連する購読clientsが含まれています.patternモードの購読では、チェーンテーブルを使用してすべてのpattern、およびpatternに対応する購読者を保存します.
    struct redisClient {
        dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
        list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
    }
    
    struct redisServer {
        /* Pubsub */
        dict *pubsub_channels;  /* Map channels to list of subscribed clients */                                                                                                                 
        list *pubsub_patterns;  /* A list of pubsub_patterns */
        int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
                                       xor of REDIS_NOTIFY... flags. */
    }

    subscribe実装
    チャンネルを追加します.pubsub_channels辞書で、RedisServerに行きます.pubsub_channels辞書内駆クエリーは、そのchannelがなければ辞書に追加され、すでに存在する場合は現在の値を返します.最後にクライアントに戻る.
    void subscribeCommand(redisClient *c) {
        int j;
    
        for (j = 1; j < c->argc; j++)
            pubsubSubscribeChannel(c,c->argv[j]);
        c->flags |= REDIS_PUBSUB;
    }
    
    int pubsubSubscribeChannel(redisClient *c, robj *channel) {
        struct dictEntry *de;
        list *clients = NULL;
        int retval = 0;
    
        /* Add the channel to the client -> channels hash table */
        if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
            retval = 1;
            incrRefCount(channel);
            /* Add the client to the channel -> list of clients hash table */
            de = dictFind(server.pubsub_channels,channel);
            if (de == NULL) {
                clients = listCreate();
                dictAdd(server.pubsub_channels,channel,clients);
                incrRefCount(channel);
            } else {
                clients = dictGetVal(de);
            }
            listAddNodeTail(clients,c);
        }
        /* Notify the client */
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.subscribebulk);
        addReplyBulk(c,channel);
        addReplyLongLong(c,clientSubscriptionsCount(c));
        return retval;
    }

    unsubscribe
    チャンネルをRedisClientにpubsub_channels辞書内のKVペアを削除し、channelでredisServerに行きます.pubsub_channels辞書内でそのchannelに対応するclientsのチェーンテーブルlnを検索し、指定したclientsを削除し、ln内のすべての要素が削除された場合、その辞書内のKVペアを削除します.
    void unsubscribeCommand(redisClient *c) {
        if (c->argc == 1) {
            pubsubUnsubscribeAllChannels(c,1);
        } else {
            int j;
    
            for (j = 1; j < c->argc; j++)
                pubsubUnsubscribeChannel(c,c->argv[j],1);
        }
        if (clientSubscriptionsCount(c) == 0) c->flags &= ~REDIS_PUBSUB;
    }
    int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
        struct dictEntry *de;
        list *clients;
        listNode *ln;
        int retval = 0;
    
        /* Remove the channel from the client -> channels hash table */
        incrRefCount(channel); /* channel may be just a pointer to the same object
                                we have in the hash tables. Protect it... */
        if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
            retval = 1;
            /* Remove the client from the channel -> clients list hash table */
            de = dictFind(server.pubsub_channels,channel);
            redisAssertWithInfo(c,NULL,de != NULL);
            clients = dictGetVal(de);
            ln = listSearchKey(clients,c);
            redisAssertWithInfo(c,NULL,ln != NULL);
            listDelNode(clients,ln);
            if (listLength(clients) == 0) {
                /* Free the list and associated hash entry at all if this was
                 * the latest client, so that it will be possible to abuse
                 * Redis PUBSUB creating millions of channels. */
                dictDelete(server.pubsub_channels,channel);
            }
        }
        /* Notify the client */
        if (notify) {
            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.unsubscribebulk);
            addReplyBulk(c,channel);
            addReplyLongLong(c,dictSize(c->pubsub_channels)+
                           listLength(c->pubsub_patterns));
    
        }
        decrRefCount(channel); /* it is finally safe to release it */
        return retval;
    }
    

    psubscribeとpunsubscribe
    この2つはsubscribe,unsubscribeの実装とほぼ同じであり,異なる点では主に対応するKV対のpubsub_の存在である.patternsチェーンテーブルの中;
    publish
    publishの実装はserver側を巡るpubsub_である.channels辞書およびpubsub_patternsチェーンテーブルは、messageを対応するclientに送信します.
    int pubsubPublishMessage(robj *channel, robj *message) {
        int receivers = 0;
        struct dictEntry *de;
        listNode *ln;
        listIter li;
    
        /* Send to clients listening for that channel */
        de = dictFind(server.pubsub_channels,channel);
        if (de) {
            list *list = dictGetVal(de);
            listNode *ln;
            listIter li;
    
            listRewind(list,&li);
            while ((ln = listNext(&li)) != NULL) {
                redisClient *c = ln->value;
    
                addReply(c,shared.mbulkhdr[3]);
                addReply(c,shared.messagebulk);
                addReplyBulk(c,channel);
                addReplyBulk(c,message);
                receivers++;
            }
        }
        /* Send to clients listening to matching channels */
        if (listLength(server.pubsub_patterns)) {
            listRewind(server.pubsub_patterns,&li);
            channel = getDecodedObject(channel);
            while ((ln = listNext(&li)) != NULL) {
                pubsubPattern *pat = ln->value;
    
                if (stringmatchlen((char*)pat->pattern->ptr,
                                    sdslen(pat->pattern->ptr),
                                    (char*)channel->ptr,
                                    sdslen(channel->ptr),0)) {
                    addReply(pat->client,shared.mbulkhdr[4]);
                    addReply(pat->client,shared.pmessagebulk);
                    addReplyBulk(pat->client,pat->pattern);
                    addReplyBulk(pat->client,channel);
                    addReplyBulk(pat->client,message);
                    receivers++;
                }
            }
            decrRefCount(channel);
        }
        return receivers;
    }
    

    pubsub
    pubsubは主にサブスクリプション-パブリケーションシステムの内部アクティビティの状態を表示し、システムの統計コマンドに相当し、それを通じて、ユーザーは現在のシステムのパブリケーションサブスクリプションの状況を検査することができる.
    void pubsubCommand(redisClient *c) {
        if (!strcasecmp(c->argv[1]->ptr,"channels") &&
            (c->argc == 2 || c->argc ==3))
        {
            /* PUBSUB CHANNELS [] */
            sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
            dictIterator *di = dictGetIterator(server.pubsub_channels);
            dictEntry *de;
            long mblen = 0;
            void *replylen;
    
            replylen = addDeferredMultiBulkLength(c);
            while((de = dictNext(di)) != NULL) {
                robj *cobj = dictGetKey(de);
                sds channel = cobj->ptr;
    
                if (!pat || stringmatchlen(pat, sdslen(pat),
                                           channel, sdslen(channel),0))
                {
                    addReplyBulk(c,cobj);
                    mblen++;
                }
            }
            dictReleaseIterator(di);
            setDeferredMultiBulkLength(c,replylen,mblen);
        } else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
            /* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
            int j;
    
            addReplyMultiBulkLen(c,(c->argc-2)*2);
            for (j = 2; j < c->argc; j++) {
                list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
    
                addReplyBulk(c,c->argv[j]);
                addReplyLongLong(c,l ? listLength(l) : 0);
            }
        } else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
            /* PUBSUB NUMPAT */
            addReplyLongLong(c,listLength(server.pubsub_patterns));
        } else {
            addReplyErrorFormat(c,
                "Unknown PUBSUB subcommand or wrong number of arguments for '%s'",
                (char*)c->argv[1]->ptr);
        }
    }        

    まとめ
    redisのPublish/subscribeメカニズムは、メッセージを1つのクライアントから1つまたは複数のクライアントに渡す基本的な機能を比較的容易に実現することができますが、上のコードからもいくつかの問題があることがわかります.
  • は、発行されたメッセージが必ず購読者に受信されることを保証できない.
  • 再起動後、再起動中のメッセージが失われる.