RedisのHash衝突と漸進的なReHashソース分析


RedisデータDB


Redisは1つ1つのDBですが、このDBはいったいどのような構造のデータなのでしょうか.
以下はRedis公式のソースコード(5.0)です.
/* Redis     。         0(     )             。         “id”  */
typedef struct redisDb {
    dict *dict;                 /*          (    ) */
    dict *expires;              /*           */
    dict *blocking_keys;        /*           (BLPOP) */
    dict *ready_keys;           /*           */
    dict *watched_keys;         /* EXEC CAS     */
    int id;                     /*    ID */
    long long avg_ttl;          /*   TTL,      */
    list *defrag_later;         /*                */
} redisDb;

Redisのデータベースの主なデータは辞書に保存されていることがわかります.

RedisデータDict辞書


公式サイトのソースアドレス:https://github.com/redis/redi...
dict辞書の定義を見つけました.
//         
typedef struct dict {
    dictType *type; /*        */
    void *privdata; /*      */
    dictht ht[2]; /*   Hash    */
    long rehashidx; /*    rehashidx == -1,      Rehash,        ReHash   */
    unsigned long iterators; /*             */
} dict;

主なデータは私たちの辞書Hashテーブルの配列に格納されています.このdictht、辞書Hashテーブルを見てみましょう.
//   Hash       
typedef struct dictht {
    dictEntry **table; /* Hash ,            ,         */
    unsigned long size; /*      ,         */
    unsigned long sizemask; /*        ,    size-1,         */
    unsigned long used; /*       ,         */
} dictht;

さらに重要なのは、私たちが保存している要素データを表す辞書の要素です.
//           
typedef struct dictEntry {
      //      ,Key  Val 
    void *key;
    //  ,      ,         ,    64    ,  64 int,   
    union {
           //    
        void *val;
          // 64    
        uint64_t u64;
          // 64 int
        int64_t s64;
          //    
        double d;
    } v;
      // next  ,   dictEntry      ,    Hash  
    struct dictEntry *next;
} dictEntry;

Redis Hash競合の処理


上のdictにはHashテーブルが2つあることを知っていますが、なぜHashテーブルを2つ置くのでしょうか.
答えは私たちRedisのHashテーブルが拡張を行う際に使う必要があるので、ソースコードの説明を見てみましょう.
      int dictRehash(dict *d, int n);
ソース位置:https://github.com/redis/redi...
まず、私たちがどのステップで拡張を行ったのかを知る必要があります.Add操作が発生したときに、私たちがAddに位置する方法に違いありません.
/*              */
int dictAdd(dict *d, void *key, void *val)
{
      //       key
    dictEntry *entry = dictAddRaw(d,key,NULL);

    if (!entry) return DICT_ERR;
      //         
    dictSetVal(d, entry, val);
    return DICT_OK;
}

次に、チェーンテーブルを使用してHash競合を解決するdictAddRawにナビゲートします.
/*        :
 *         ,         dictEntry       ,           .
 *
 *                API                ,  :
 * entry = dictAddRaw(dict,mykey,NULL);
 * if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
 * 
 *    :
 *
 *        ,   NULL,     ,         “ * existing”.
 *       ,                 。
 */
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    dictht *ht;
    //       ReHash,       _dictRehashStep(  ReHash    ),  ReHash    ,      ReHash
    if (dictIsRehashing(d)) _dictRehashStep(d);

    /*         ,  Key    ,            ReHash(!!!  )(   ReHash  ) */
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;

      /*   Hash  ,  ReHash      */
    /*           。                          ,         */
      //       ReHash,        HashTable        ,              
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    //     ,    
    entry = zmalloc(sizeof(*entry));
      //         ,          Hash       ,                    
    entry->next = ht->table[index];
      // Hash          ,       
    ht->table[index] = entry;
    ht->used++;

    /*     Hash   Key. */
    dictSetKey(d, entry, key);
    return entry;
}

Hash衝突の解決策を見てから拡張を見てみましょう.まずdictIsRehashingを見て、どのようにReHashを行う必要があるかを判断します.

ReHash処理拡張ReHashプロセス

 //      rehashidx  -1,        Hash  
 dictIsRehashing(d) ((d)->rehashidx != -1)

では、どこでrehashidxを修正したのでしょうか.Indexを計算するときです.
/*            ,  “Key”     ,  Key    ,   -1
 *    ,           ,        ( )          ,   ht[1] */
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;

    /*     ,     ,      -1(ReHash    ) */
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
      /*    Hash     ,    Key          */
    for (table = 0; table <= 1; table++) {
              /*        - 1          */
        idx = hash & d->ht[table].sizemask;
        /*   Hash     ,      Key    Hash   ,      -1 */
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                if (existing) *existing = he;
                return -1;
            }
            he = he->next;
        }
        //     ReHash,       Hash  index  ,   ReHash    idx     Hash 
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}

ここで拡容が必要か否かの判断を開始する
/*     ,  Hash  */
static int _dictExpandIfNeeded(dict *d)
{
    /*      ReHash  ,     */
    if (dictIsRehashing(d)) return DICT_OK;

    /*        ,         。    4 */
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /*                Hash        1:1   ,          (    )         ,         /     "  "  ,               */
      /*                            Hash ,       */
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

2枚目のHashテーブルを再初期化すると、後続のReHashの要素が2枚目のHashテーブルに入ります
/*       Hash  */
int dictExpand(dict *d, unsigned long size)
{
    /*     ReHash,         size * 2,  -1 */
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    dictht n; /*       */
    unsigned long realsize = _dictNextPower(size);

    /*                 ,  -1 */
    if (realsize == d->ht[0].size) return DICT_ERR;

    /*                 NULL     */
    n.size = realsize;
    n.sizemask = realsize-1;
      /*          */
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;

    /*          ?     ,        ,            ,        。 */
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    /*                 ,         Hash      ,  ReHash   */
    d->ht[1] = n;
    d->rehashidx = 0;
    return DICT_OK;
}

ReHashプロセスとは、状態をReHashに設定し、新しい要素を2番目のHashテーブルに書き込むことです.この場合、2番目のHashテーブルと1番目のHashテーブルを
/*   ReHash  ,           ,         ReHash   ,  100     ,    Hash  ,      100    ReHash */
int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /*           */
      /*   ReHash       */
    if (!dictIsRehashing(d)) return 0;
        /* ReHash         */
    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        /*    ,rehashidx    ,            ,  ht [0] .used!= 0*/
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
          //           ReHashIndex    ,      ,rehashidx        0,        Hash   ReHash,    0     Hash 
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
              //      n * 10               1,     
            if (--empty_visits == 0) return 1;
        }
          //      ht[0]      Hash 
        de = d->ht[0].table[d->rehashidx];
        /*        Key Value     ht[1]       ht[0]     ht[1]*/
        while(de) {
            uint64_t h;

            nextde = de->next;
            /*            */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
          //         +1,  ht[0]      
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }

      //   ReHash,   ht[0]         ht[1],   ht[1]    ht[0],    ht[1],  ReHash    
    /*           ReHash    Hash .. */
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    }

    /*     ,       ReHash     , ReHash    (   ReHash) */
      //   1           ,while  ,    ReHash  
    return 1;
}

そしてタスクスケジューリングReHash
サーバ内のhttps://github.com/redis/redi...
/*         */
void databasesCron(void) {
/* Rehash */
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                  //    ReHash
                int work_done = incrementallyRehash(rehash_db);
                if (work_done) {
                    break;
                } else {
                    /* If this db didn't need rehash, we'll try the next one. */
                    rehash_db++;
                    rehash_db %= server.dbnum;
                }
            }
        }


//          1   ReHash
int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; /*              。... */
    }
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /*              。... */
    }
    return 0;
}
  
/* Rehash ms+"delta"  。delta   ,  0,        1。       dictRehash(d,100)      */
int dictRehashMilliseconds(dict *d, int ms) {
    if (d->iterators > 0) return 0;
        //       
    long long start = timeInMilliseconds();
      //   ReHash   
    int rehashes = 0;
        //   ReHash100   
    while(dictRehash(d,100)) {
        rehashes += 100;
          //                  ,     -      > 1  ,   Break
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}