【Redisソース】rehashソースプロファイル
31518 ワード
rehashについてまだよく知られていない場合は、「rehashの図解」を参照してください.
1.rehashの分類
Redisは性能を両立させるため,lazyとactiveの2種類のrehash操作に分け,rehashが完了するまで同時に行う. lazy rehashing:dictを操作するたびにslotのrehash を実行する active rehashing:100 msごとに1 ms時間でrehashを行います.(serverCron関数) 2.lazy rehashing
関数の呼び出し手順:dictRehashStep–> dictRehash
(1)dict.c/_dictRehashStep関数のソースコードは次のとおりです.
(2)dict.c/dictRehash関数のソースコードは次のとおりです.
(3)ソース分析在_dictRehashStep関数では、dictRehashメソッドが呼び出されます.dictRehashStepは毎回ht[0]からht[1]までの値をrehashするだけですが、_dictRehashStepは、dictGetRandomKey、dictFind、dictGenericDelete、dictAddによって呼び出されるため、dict削除のたびに呼び出されるため、rehashプロセスが加速するに違いない. dictRehash関数でrehash n個の要素を増分するたびに、自動的にサイズを調整する際にht[1]のサイズが設定されているため、rehashの主なプロセスはht[0]を遍歴してkeyを取得し、そのkeyをht[1]のバケツのサイズで再rehashし、rehashが完了した後にht[0]をht[1]に指向し、ht[1]を空にすることである.この過程でrehashidxは非常に重要であり、前回rehashのht[0]の下付き位置を表す.
3.active rehashing
関数呼び出しの手順は、serverCron->databasesCron->incrementallyRehash->dictRehashMilliseconds->dictRehash
(1)redis.c/serverCron関数のソースコード
(2)redis.c/databasesCron関数のソースコードは次の通りです.
(3)redis.c/incrementallyRehash関数ソースコード
(4)dict.c/dictRehashMilliseconds関数ソースコード
(5)dictRehash関数のソースコードは既に貼り付けられている.
(6)分析 serverCronでは、バックグラウンドサブスレッドがない場合、databasesCron関数を呼び出し、incrementallyRehash関数を呼び出し、dictRehashMilliseconds関数を呼び出し、最終的にdictRehash関数を呼び出します. incrementallyRehashは時間が長く、rehashの個数も多い.ここでは毎回1 millisecond rehash操作を実行する.rehashが完了していない場合は、次のloopで実行を続行します.
私はまだ浅いので、もし間違いがあれば、指摘してください.ありがとうございます.もしあなたがもっと良い提案があれば、伝言を残して私たちと一緒に討論して、一緒に進歩することができます!このブログを辛抱強く読んでいただき、誠にありがとうございます.
参考書:『Redis設計と実現(第2版)』-黄健宏参考リンク:Redis-resizeソース解析
1.rehashの分類
Redisは性能を両立させるため,lazyとactiveの2種類のrehash操作に分け,rehashが完了するまで同時に行う.
関数の呼び出し手順:dictRehashStep–> dictRehash
(1)dict.c/_dictRehashStep関数のソースコードは次のとおりです.
/* This function performs just a step of rehashing, and only if there are
* no safe iterators bound to our hash table. When we have iterators in the
* middle of a rehashing we can't mess with the two hash tables otherwise
* some element can be missed or duplicated.
*
* , rehash 。
*
* rehash ,
* 。
*
* This function is called by common lookup or update operations in the
* dictionary so that the hash table automatically migrates from H1 to H2
* while it is actively used.
*
* 、 ,
* rehash 。
*
* T = O(1)
*/
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}
(2)dict.c/dictRehash関数のソースコードは次のとおりです.
/* Performs N steps of incremental rehashing. Returns 1 if there are still
* keys to move from the old to the new hash table, otherwise 0 is returned.
*
* N rehash 。
*
* 1 0 1 ,
* 0 。
*
* Note that a rehashing step consists in moving a bucket (that may have more
* than one key as we use chaining) from the old to the new hash table.
*
* , rehash ( ) ,
* ,
* rehash 。
*
* T = O(N)
*/
int dictRehash(dict *d, int n) {
// rehash
if (!dictIsRehashing(d)) return 0;
// N
// T = O(N)
while(n--) {
dictEntry *de, *nextde;
/* Check if we already rehashed the whole table... */
// 0 , rehash
// T = O(1)
if (d->ht[0].used == 0) {
// 0
zfree(d->ht[0].table);
// 1 0
d->ht[0] = d->ht[1];
// 1
_dictReset(&d->ht[1]);
// rehash
d->rehashidx = -1;
// 0 , rehash
return 0;
}
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
// rehashidx
assert(d->ht[0].size > (unsigned)d->rehashidx);
// ,
while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
//
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
//
// T = O(1)
while(de) {
unsigned int h;
//
nextde = de->next;
/* Get the index in the new hash table */
// ,
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
//
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
//
d->ht[0].used--;
d->ht[1].used++;
//
de = nextde;
}
//
d->ht[0].table[d->rehashidx] = NULL;
// rehash
d->rehashidx++;
}
return 1;
}
(3)ソース分析
3.active rehashing
関数呼び出しの手順は、serverCron->databasesCron->incrementallyRehash->dictRehashMilliseconds->dictRehash
(1)redis.c/serverCron関数のソースコード
/* This is our timer interrupt, called server.hz times per second.
*
* Redis , server.hz 。
*
* Here is where we do a number of things that need to be done asynchronously.
* For instance:
*
* :
*
* - Active expired keys collection (it is also performed in a lazy way on
* lookup).
* 。
*
* - Software watchdog.
* watchdog 。
*
* - Update some statistic.
* 。
*
* - Incremental rehashing of the DBs hash tables.
* Rehash
*
* - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
* BGSAVE AOF , BGSAVE AOF 。
*
* - Clients timeout of different kinds.
* 。
*
* - Replication reconnection.
*
*
* - Many more...
* 。。。
*
* Everything directly called here will be called server.hz times per second,
* so in order to throttle execution of things we want to do less frequently
* a macro is used: run_with_period(milliseconds) { .... }
*
* serverCron server.hz ,
* ,
* run_with_period(milliseconds) { ... } ,
* milliseconds 。
*/
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);
/* Software watchdog: deliver the SIGALRM that will reach the signal
* handler if we don't return here fast enough. */
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
/* Update the time cache. */
updateCachedTime();
//
run_with_period(100) trackOperationsPerSecond();
/* We have just REDIS_LRU_BITS bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock.
*
* Note that even if the counter wraps it's not a big problem,
* everything will still work but some object will appear younger
* to Redis. However for this to happen a given object should never be
* touched for all the time needed to the counter to wrap, which is
* not likely.
*
* 1.5 ,
* , 。
* 1.5 , 。
*
* Note that you can change the resolution altering the
* REDIS_LRU_CLOCK_RESOLUTION define.
*
* LRU REDIS_LRU_CLOCK_RESOLUTION 。
*/
server.lruclock = getLRUClock();
/* Record the max memory used since the server was started. */
//
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();
/* Sample the RSS here since this is a relatively slow call. */
server.resident_set_size = zmalloc_get_rss();
/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
// SIGTERM ,
if (server.shutdown_asap) {
//
if (prepareForShutdown(0) == REDIS_OK) exit(0);
// , LOG ,
redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
}
/* Show some info about non-empty databases */
//
run_with_period(5000) {
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
//
size = dictSlots(server.db[j].dict);
//
used = dictSize(server.db[j].dict);
//
vkeys = dictSize(server.db[j].expires);
// LOG
if (used || vkeys) {
redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
}
}
/* Show information about connected clients */
// SENTINEL ,
if (!server.sentinel_mode) {
run_with_period(5000) {
redisLog(REDIS_VERBOSE,
"%lu clients connected (%lu slaves), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
}
}
/* We need to do a few operations on clients asynchronously. */
// , ,
clientsCron();
/* Handle background operations on Redis databases. */
//
databasesCron();
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
// BGSAVE BGREWRITEAOF
// BGREWRITEAOF , BGREWRITEAOF
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
/* Check if a background saving or AOF rewrite in progress terminated. */
// BGSAVE BGREWRITEAOF
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
int statloc;
pid_t pid;
// ,
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
// BGSAVE
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
// BGREWRITEAOF
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
} else {
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
updateDictResizePolicy();
}
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now */
// BGSAVE BGREWRITEAOF ,
// , BGSAVE
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
//
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
REDIS_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == REDIS_OK))
{
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
// BGSAVE
rdbSaveBackground(server.rdb_filename);
break;
}
}
/* Trigger an AOF rewrite if needed */
// BGREWRITEAOF
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
// AOF BGREWRITEAOF
server.aof_current_size > server.aof_rewrite_min_size)
{
// AOF ,AOF
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
// AOF base
long long growth = (server.aof_current_size*100/base) - 100;
// growth , BGREWRITEAOF
if (growth >= server.aof_rewrite_perc) {
redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
// BGREWRITEAOF
rewriteAppendOnlyFileBackground();
}
}
}
// AOF ,
// AOF AOF
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* an higher frequency. */
run_with_period(1000) {
if (server.aof_last_write_status == REDIS_ERR)
flushAppendOnlyFile(0);
}
/* Close clients that need to be closed asynchronous */
//
freeClientsInAsyncFreeQueue();
/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don't check return value, just use the side effect. */
/* Replication cron function -- used to reconnect to master and
* to detect transfer failures. */
//
// 、 ACK 、 、 ,
run_with_period(1000) replicationCron();
/* Run the Redis Cluster cron. */
// ,
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
/* Run the Sentinel timer if we are in sentinel mode. */
// sentinel , SENTINEL
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
/* Cleanup expired MIGRATE cached sockets. */
// 。。。TODO
run_with_period(1000) {
migrateCloseTimedoutSockets();
}
// loop
server.cronloops++;
return 1000/server.hz;
}
(2)redis.c/databasesCron関数のソースコードは次の通りです.
/* This function handles 'background' operations we are required to do
* incrementally in Redis databases, such as active key expiring, resizing,
* rehashing. */
// , , rehash
void databasesCron(void) {
// ,
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
// ,
if (server.active_expire_enabled && server.masterhost == NULL)
// CYCLE_SLOW ,
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
* as will cause a lot of copy-on-write of memory pages. */
// BGSAVE BGREWRITEAOF , rehash
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
/* We use global counters so if we stop the computation at a given
* DB we'll be able to start from the successive in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
unsigned int j;
/* Don't test more DBs than we have. */
//
if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
/* Resize */
//
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
/* Rehash */
// rehash
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db % server.dbnum);
rehash_db++;
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
}
}
}
}
}
(3)redis.c/incrementallyRehash関数ソースコード
/* Our hash table implementation performs rehashing incrementally while
* we write/read from the hash table. Still if the server is idle, the hash
* table will use two tables for a long time. So we try to use 1 millisecond
* of CPU time at every call of this function to perform some rehahsing.
*
* / rehash ,
* , rehash ,
* , rehash 。
*
* The function returns 1 if some rehashing was performed, otherwise 0
* is returned.
*
* rehash 1 , 0 。
*/
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
}
(4)dict.c/dictRehashMilliseconds関数ソースコード
/* Rehash for an amount of time between ms milliseconds and ms+1 milliseconds */
int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;
while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
(5)dictRehash関数のソースコードは既に貼り付けられている.
(6)分析
私はまだ浅いので、もし間違いがあれば、指摘してください.ありがとうございます.もしあなたがもっと良い提案があれば、伝言を残して私たちと一緒に討論して、一緒に進歩することができます!このブログを辛抱強く読んでいただき、誠にありがとうございます.
参考書:『Redis設計と実現(第2版)』-黄健宏参考リンク:Redis-resizeソース解析