RedisのHash衝突と漸進的なReHashソース分析
10525 ワード
RedisデータDB
Redisは1つ1つのDBですが、このDBはいったいどのような構造のデータなのでしょうか.
以下はRedis公式のソースコード(5.0)です.
/* Redis 。 0( ) 。 “id” */
typedef struct redisDb {
dict *dict; /* ( ) */
dict *expires; /* */
dict *blocking_keys; /* (BLPOP) */
dict *ready_keys; /* */
dict *watched_keys; /* EXEC CAS */
int id; /* ID */
long long avg_ttl; /* TTL, */
list *defrag_later; /* */
} redisDb;
Redisのデータベースの主なデータは辞書に保存されていることがわかります.
RedisデータDict辞書
公式サイトのソースアドレス:https://github.com/redis/redi...
dict辞書の定義を見つけました.
//
typedef struct dict {
dictType *type; /* */
void *privdata; /* */
dictht ht[2]; /* Hash */
long rehashidx; /* rehashidx == -1, Rehash, ReHash */
unsigned long iterators; /* */
} dict;
主なデータは私たちの辞書Hashテーブルの配列に格納されています.このdictht、辞書Hashテーブルを見てみましょう.
// Hash
typedef struct dictht {
dictEntry **table; /* Hash , , */
unsigned long size; /* , */
unsigned long sizemask; /* , size-1, */
unsigned long used; /* , */
} dictht;
さらに重要なのは、私たちが保存している要素データを表す辞書の要素です.
//
typedef struct dictEntry {
// ,Key Val
void *key;
// , , , 64 , 64 int,
union {
//
void *val;
// 64
uint64_t u64;
// 64 int
int64_t s64;
//
double d;
} v;
// next , dictEntry , Hash
struct dictEntry *next;
} dictEntry;
Redis Hash競合の処理
上のdictにはHashテーブルが2つあることを知っていますが、なぜHashテーブルを2つ置くのでしょうか.
答えは私たちRedisのHashテーブルが拡張を行う際に使う必要があるので、ソースコードの説明を見てみましょう.
int dictRehash(dict *d, int n);
ソース位置:https://github.com/redis/redi...
まず、私たちがどのステップで拡張を行ったのかを知る必要があります.Add操作が発生したときに、私たちがAddに位置する方法に違いありません.
/* */
int dictAdd(dict *d, void *key, void *val)
{
// key
dictEntry *entry = dictAddRaw(d,key,NULL);
if (!entry) return DICT_ERR;
//
dictSetVal(d, entry, val);
return DICT_OK;
}
次に、チェーンテーブルを使用してHash競合を解決するdictAddRawにナビゲートします.
/* :
* , dictEntry , .
*
* API , :
* entry = dictAddRaw(dict,mykey,NULL);
* if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
*
* :
*
* , NULL, , “ * existing”.
* , 。
*/
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;
// ReHash, _dictRehashStep( ReHash ), ReHash , ReHash
if (dictIsRehashing(d)) _dictRehashStep(d);
/* , Key , ReHash(!!! )( ReHash ) */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* Hash , ReHash */
/* 。 , */
// ReHash, HashTable ,
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
// ,
entry = zmalloc(sizeof(*entry));
// , Hash ,
entry->next = ht->table[index];
// Hash ,
ht->table[index] = entry;
ht->used++;
/* Hash Key. */
dictSetKey(d, entry, key);
return entry;
}
Hash衝突の解決策を見てから拡張を見てみましょう.まずdictIsRehashingを見て、どのようにReHashを行う必要があるかを判断します.
ReHash処理拡張ReHashプロセス
// rehashidx -1, Hash
dictIsRehashing(d) ((d)->rehashidx != -1)
では、どこでrehashidxを修正したのでしょうか.Indexを計算するときです.
/* , “Key” , Key , -1
* , , ( ) , ht[1] */
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
/* , , -1(ReHash ) */
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
/* Hash , Key */
for (table = 0; table <= 1; table++) {
/* - 1 */
idx = hash & d->ht[table].sizemask;
/* Hash , Key Hash , -1 */
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
// ReHash, Hash index , ReHash idx Hash
if (!dictIsRehashing(d)) break;
}
return idx;
}
ここで拡容が必要か否かの判断を開始する
/* , Hash */
static int _dictExpandIfNeeded(dict *d)
{
/* ReHash , */
if (dictIsRehashing(d)) return DICT_OK;
/* , 。 4 */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* Hash 1:1 , ( ) , / " " , */
/* Hash , */
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}
2枚目のHashテーブルを再初期化すると、後続のReHashの要素が2枚目のHashテーブルに入ります
/* Hash */
int dictExpand(dict *d, unsigned long size)
{
/* ReHash, size * 2, -1 */
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
dictht n; /* */
unsigned long realsize = _dictNextPower(size);
/* , -1 */
if (realsize == d->ht[0].size) return DICT_ERR;
/* NULL */
n.size = realsize;
n.sizemask = realsize-1;
/* */
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* ? , , , 。 */
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
/* , Hash , ReHash */
d->ht[1] = n;
d->rehashidx = 0;
return DICT_OK;
}
ReHashプロセスとは、状態をReHashに設定し、新しい要素を2番目のHashテーブルに書き込むことです.この場合、2番目のHashテーブルと1番目のHashテーブルを
/* ReHash , , ReHash , 100 , Hash , 100 ReHash */
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* */
/* ReHash */
if (!dictIsRehashing(d)) return 0;
/* ReHash */
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* ,rehashidx , , ht [0] .used!= 0*/
assert(d->ht[0].size > (unsigned long)d->rehashidx);
// ReHashIndex , ,rehashidx 0, Hash ReHash, 0 Hash
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
// n * 10 1,
if (--empty_visits == 0) return 1;
}
// ht[0] Hash
de = d->ht[0].table[d->rehashidx];
/* Key Value ht[1] ht[0] ht[1]*/
while(de) {
uint64_t h;
nextde = de->next;
/* */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
// +1, ht[0]
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
// ReHash, ht[0] ht[1], ht[1] ht[0], ht[1], ReHash
/* ReHash Hash .. */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* , ReHash , ReHash ( ReHash) */
// 1 ,while , ReHash
return 1;
}
そしてタスクスケジューリングReHash
サーバ内のhttps://github.com/redis/redi...
/* */
void databasesCron(void) {
/* Rehash */
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
// ReHash
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
// 1 ReHash
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* 。... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* 。... */
}
return 0;
}
/* Rehash ms+"delta" 。delta , 0, 1。 dictRehash(d,100) */
int dictRehashMilliseconds(dict *d, int ms) {
if (d->iterators > 0) return 0;
//
long long start = timeInMilliseconds();
// ReHash
int rehashes = 0;
// ReHash100
while(dictRehash(d,100)) {
rehashes += 100;
// , - > 1 , Break
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}