BlockingPopはRedis 2にある.バージョン6の変更点

3684 ワード

Blocking popとは
Redisでlistのデータのpopに対して2つのコマンドがあります
  • rpopは、Listの右側からデータ
  • をポップアップする.
  • lpop、listの左側からデータ
  • がポップアップ
    blocking popとは、popの場合、listが存在しない場合、クライアントがブロックして待っていて、他のクライアントがpushを行うまで、このブロックしたクライアントが返答を受けるという意味で、もちろんブロックのタイムアウト時間を設定することもできます
  • blpop list1[list2...] timeout
  • rlpop list1[list2...] timeout

  • Redis2.バージョン6の変更点
    まずシーンを仮定します
       1:blpop list1 0
       2:lpush list1 v1 v2 v3
    
  • Redis2.6バージョン前に、クライアント1は、v 1を取得し、v 1のpushおよびpop操作はaof
  • を記録しない.
  • Redis2.6バージョン以降、クライアント1は、v 3、v 1のpushおよび姑操作がaofに記録される
  • を得る.
    実装方法
    コードを読み終えた後、コード実装には以下のような変化があることがわかりました.
    データ構造の変化
    2.6の後、redisDb構造体にready_が増加したことがわかる.keys辞書
    /* 2.4 */
    typedef struct redisDb {
        dict *dict;
        dict *expires;
        dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */                                                                 
        dict *io_keys;              /* Keys with clients waiting for VM I/O */
        dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
        int id;
    } redisDb;
    
    /* 2.6 */
    typedef struct redisDb {
        dict *dict;                 /* The keyspace for this DB */
        dict *expires;              /* Timeout of keys with a timeout set */
        dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */
        dict *ready_keys;           /* Blocked keys that received a PUSH */
        dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
        int id;
        long long avg_ttl;          /* Average TTL, just for stats */                                                                                
    } redisDb;
    

    lpush/rpush論理の変化
    Redisは2.6リリース前に直接pushでblocking clientを処理するのでblpop/brpopは、pushの最初の数のvalueをblocking clientに返します.これらのvalueはdbを操作していません.aofにも入りません.
    2.6バージョン以降は、まずdbのlistにデータを挿入し、processCommandで処理するので、blpop/brpopは意味と一致します.
    /* 2.6   */
    /*    push   block client */
    void pushGenericCommand(redisClient *c, int where) {                                                                                             
        int j, addlen = 0, pushed = 0; 
        robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
        int may_have_waiting_clients = (lobj == NULL);
    
        if (lobj && lobj->type != REDIS_LIST) {
            addReply(c,shared.wrongtypeerr);
            return;
        }    
    
        for (j = 2; j < c->argc; j++) {
            c->argv[j] = tryObjectEncoding(c->argv[j]);
            if (may_have_waiting_clients) {
                if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
                    addlen++;
                    continue;
                } else {
                    may_have_waiting_clients = 0; 
                }    
            }    
            if (!lobj) {
                lobj = createZiplistObject();
                dbAdd(c->db,c->argv[1],lobj);
            }    
            listTypePush(lobj,c->argv[j],where);
            pushed++;
        }    
        addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0)); 
        if (pushed) signalModifiedKey(c->db,c->argv[1]);
        server.dirty += pushed;
    }
    
    /* 2.6 */
    /*   list push        ,   blocking keys      , 
     *      ,    ready_keys     */
    void dbAdd(redisDb *db, robj *key, robj *val) {
        sds copy = sdsdup(key->ptr);
        int retval = dictAdd(db->dict, copy, val);
    
        redisAssertWithInfo(NULL,key,retval == REDIS_OK);
        if (val->type == REDIS_LIST) signalListAsReady(db, key);                                                                                     
    }
    
    /*  processCommand        */
        call(c,REDIS_CALL_FULL);
        if (listLength(server.ready_keys))
            handleClientsBlockedOnLists();