Redisソースプロファイル集合オブジェクトt_set実装


前の章「Redisソースプロファイリング-オブジェクトObject」から分かるように、redisのSET(集合)には2つの可能なデータ格納方式がある.それぞれ整数集合REDIS_ENCODING_INTSETとハッシュ表REDIS_ENCODING_HT.
robj *setTypeCreate(sds value) {
    if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
        return createIntsetObject();
    return createSetObject();
}

最初にセットに追加された要素は初期の記憶方式を決定し、最初の元が整数である場合、初期の符号化はREDIS_である.ENCODING_INTSET、そうでなければREDIS_ENCODING_HTなので,集合中のデータ操作の際には基本的に2つの記憶方式の判断に関わる.

SETの構造

typedef struct redisObject {
    unsigned type:4;  //     OBJ_SET
    unsigned encoding:4;  //    set  ,   OBJ_ENCODING_INTSET OBJ_ENCODING_HT
    unsigned lru:LRU_BITS; 
    int refcount;
    void *ptr;
} robj;

他のオブジェクトと同様にset構造もredisObject構造体に格納され、type=OBJ_を指定することでSETはこれが集合オブジェクトであることを決定し,集合オブジェクトである場合,セットのendodingは対応する2つの値しか取れない.

SETコマンド


コマンド#コマンド#
説明
sadd
1つ以上のメンバー要素を集合keyに追加すると、すでに集合に存在するメンバー要素は無視されます.
scard
コレクションkeyの基数(コレクション内の要素の数)を返します.
spop
コレクション内のランダム要素を削除して返します
smembers
コレクションkeyのすべてのメンバーを返します
sismember
メンバー要素がkeyのメンバーを集合するかどうかを判断する
sinter
複数のコレクションの交差を返します.複数のコレクションはkeysによって指定されます.
srandmember
コレクション内のランダム要素を返します
srandmember
コレクション内のcount個のランダム要素を返します
srem
コレクションkeyの1つ以上のメンバー要素を削除すると、存在しないメンバー要素は無視されます.
sunion
複数のコレクションの並列セットを返します.複数のコレクションはkeysによって指定されます.
sdiff
指定されたすべての集合間の差分集合である集合のすべてのメンバーを返します.

SETコマンド実装


SETコード変換


コレクションがREDIS_を使用する場合ENCODING_INTSET符号化は、以下のいずれかの条件が満たされると、このセットはREDIS_に変換されるENCODING_HTコード:
  • intset保存整数値個数が
  • を超える
  • server.set_max_intset_entries(デフォルトは512).集合に新しい要素を追加しようとします.この要素はlong longタイプ(すなわち、整数ではありません)として表すことはできません.
  • /* Convert the set to specified encoding. The resulting dict (when converting
     * to a hash table) is presized to hold the number of elements in the original
     * set. */
    void setTypeConvert(robj *setobj, int enc) {
        setTypeIterator *si;
        serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET &&
                                 setobj->encoding == OBJ_ENCODING_INTSET);
    
        if (enc == OBJ_ENCODING_HT) {
            int64_t intele;
            dict *d = dictCreate(&setDictType,NULL);
            sds element;
    
            /*             ,    rehash   */
            dictExpand(d,intsetLen(setobj->ptr));
    
            /* To add the elements we extract integers and create redis objects */
            si = setTypeInitIterator(setobj); //       
            //     intset,     dict 
            while (setTypeNext(si,&element,&intele) != -1) {
                element = sdsfromlonglong(intele);
                serverAssert(dictAdd(d,element,NULL) == DICT_OK);
            }
            setTypeReleaseIterator(si); //      
    
            //           
            setobj->encoding = OBJ_ENCODING_HT;
            zfree(setobj->ptr);
            setobj->ptr = d; //         dict
        } else {
            //           OBJ_ENCODING_HT,  
            serverPanic("Unsupported set conversion");
        }
    }
    

    SETがHTで符号化されると、すべての要素が辞書のキーに保存され、辞書の値がNULLに設定され、セット内のすべての要素の一意性が保証されます.

    SADDインタフェース実装

    /* Add the specified value into a set.
     *
     * If the value was already member of the set, nothing is done and 0 is
     * returned, otherwise the new element is added and 1 is returned. */
    //        ,       ,  0,       1
    int setTypeAdd(robj *subject, sds value) {
        long long llval;
        //    HT    , dict     
        if (subject->encoding == OBJ_ENCODING_HT) {
            dict *ht = subject->ptr;
            dictEntry *de = dictAddRaw(ht,value,NULL);
            if (de) {
                dictSetKey(ht,de,sdsdup(value));
                dictSetVal(ht,de,NULL);
                return 1;
            }
        } else if (subject->encoding == OBJ_ENCODING_INTSET) {
            //            ,            long long  
            if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
                //        , intset     
                uint8_t success = 0;
                subject->ptr = intsetAdd(subject->ptr,llval,&success);
                if (success) {
                    /* Convert to regular set when the intset contains
                     * too many entries. */
                    //       ,        ,     set_max_intset_entries,       
                    if (intsetLen(subject->ptr) > server.set_max_intset_entries)
                        setTypeConvert(subject,OBJ_ENCODING_HT);
                    return 1;
                }
            } else {
                //           ,     intset     ,      
                /* Failed to get integer from object, convert to regular set. */
                setTypeConvert(subject,OBJ_ENCODING_HT);
    
                /* The set *was* an intset and this value is not integer
                 * encodable, so dictAdd should always work. */
                serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
                return 1;
            }
        } else {
            serverPanic("Unknown set encoding");
        }
        return 0;
    }
    

    SETに挿入する場合は、現在のSETの下位構造を判断し、HT形式であればdictの挿入コマンドを直接呼び出す必要があります.intset構造であれば、挿入する新しい要素も整数かどうかを判断し、整数でなければSETの符号化フォーマットをHT構造体に変換して挿入を実行し、整数であれば正常に挿入した後もintsetの最大設定を超えているかどうかをチェックする必要があります.
    他の要素の除去などの動作も同様であり、異なる符号化フォーマットでの処理スキームを考慮する必要がある.

    集合コマンドの実装


    SETは他のフォーマットの構造と比較して,集合コマンドの実装が多くなり,我々がよく知っている集合の交差並列集合である.交差を求めるアルゴリズムについて説明します.

    交差アルゴリズムを求める

    /* Iterate all the elements of the first (smallest) set, and test
         * the element against all the other sets, if at least one set does
         * not include the element it is discarded */
        //      (     )  ,    
        si = setTypeInitIterator(sets[0]);
        while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) {
            for (j = 1; j < setnum; j++) {
                if (sets[j] == sets[0]) continue;
                //        INTSET
                if (encoding == OBJ_ENCODING_INTSET) {
                    /* intset with intset is simple... and fast */
                    if (sets[j]->encoding == OBJ_ENCODING_INTSET &&
                        !intsetFind((intset*)sets[j]->ptr,intobj))
                    {
                        break;
                    /* in order to compare an integer with an object we
                     * have to use the generic function, creating an object
                     * for this */
                    } else if (sets[j]->encoding == OBJ_ENCODING_HT) {
                        elesds = sdsfromlonglong(intobj);
                        if (!setTypeIsMember(sets[j],elesds)) {
                            sdsfree(elesds);
                            break;
                        }
                        sdsfree(elesds);
                    }
                } else if (encoding == OBJ_ENCODING_HT) {
                    //       HT
                    if (!setTypeIsMember(sets[j],elesds)) {
                        break;
                    }
                }
            }
    
            /* Only take action when all sets contain the member */
            //   j==setNum,          ,   set      
            if (j == setnum) {
                if (!dstkey) {
                    if (encoding == OBJ_ENCODING_HT)
                        addReplyBulkCBuffer(c,elesds,sdslen(elesds));
                    else
                        addReplyBulkLongLong(c,intobj);
                    cardinality++;
                } else {
                    if (encoding == OBJ_ENCODING_INTSET) {
                        elesds = sdsfromlonglong(intobj);
                        setTypeAdd(dstset,elesds);
                        sdsfree(elesds);
                    } else {
                        setTypeAdd(dstset,elesds);
                    }
                }
            }
        }
        setTypeReleaseIterator(si);
    
        //     key  ,    
        if (dstkey) {
            /* Store the resulting set into the target, if the intersection
             * is not an empty set. */
            int deleted = dbDelete(c->db,dstkey);
            //        set  ,    set    dstkey 
            if (setTypeSize(dstset) > 0) {
                dbAdd(c->db,dstkey,dstset);
                addReplyLongLong(c,setTypeSize(dstset));
                notifyKeyspaceEvent(NOTIFY_SET,"sinterstore",
                    dstkey,c->db->id);
            } else {
                //         ,  
                decrRefCount(dstset);
                addReply(c,shared.czero);
                if (deleted)
                    notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
                        dstkey,c->db->id);
            }
            signalModifiedKey(c->db,dstkey);
            server.dirty++;
        } else {
            setDeferredSetLen(c,replylen,cardinality);
        }
        zfree(sets);
    }
    

    通俗的に言えば、交差を求めるには主に以下のいくつかのステップに分けられます.
  • すべての集合を基数で並べ替える
  • は、基数の最小の集合を結果セット
  • として用いる.
  • この基数最小集合の各要素を巡回し、残りの他の集合に
  • が存在するかどうかを確認する.
  • が存在する場合、この要素は最終結果セット
  • に加えられる.
    アルゴリズムの複雑さはO(N 2)である.