【Redisソース】rehashソースプロファイル

31518 ワード

rehashについてまだよく知られていない場合は、「rehashの図解」を参照してください.
1.rehashの分類
Redisは性能を両立させるため,lazyとactiveの2種類のrehash操作に分け,rehashが完了するまで同時に行う.
  • lazy rehashing:dictを操作するたびにslotのrehash
  • を実行する
  • active rehashing:100 msごとに1 ms時間でrehashを行います.(serverCron関数)
  • 2.lazy rehashing
    関数の呼び出し手順:dictRehashStep–> dictRehash
    (1)dict.c/_dictRehashStep関数のソースコードは次のとおりです.
    /* This function performs just a step of rehashing, and only if there are
     * no safe iterators bound to our hash table. When we have iterators in the
     * middle of a rehashing we can't mess with the two hash tables otherwise
     * some element can be missed or duplicated.
     *
     *                ,        rehash 。
     *
     *                  rehash ,
     *                      。
     *
     * This function is called by common lookup or update operations in the
     * dictionary so that the hash table automatically migrates from H1 to H2
     * while it is actively used. 
     *
     *             、      ,
     *                 rehash 。
     *
     * T = O(1)
     */
    static void _dictRehashStep(dict *d) {
        if (d->iterators == 0) dictRehash(d,1);
    }
    

    (2)dict.c/dictRehash関数のソースコードは次のとおりです.
    /* Performs N steps of incremental rehashing. Returns 1 if there are still
     * keys to move from the old to the new hash table, otherwise 0 is returned.
     *
     *    N      rehash 。
     *
     *    1          0         1     ,
     *    0              。
     *
     * Note that a rehashing step consists in moving a bucket (that may have more
     * than one key as we use chaining) from the old to the new hash table.
     *
     *   ,   rehash           ( )     ,
     *             ,
     *   rehash                   。
     *
     * T = O(N)
     */
    int dictRehash(dict *d, int n) {
    
        //      rehash       
        if (!dictIsRehashing(d)) return 0;
    
        //    N    
        // T = O(N)
        while(n--) {
            dictEntry *de, *nextde;
    
            /* Check if we already rehashed the whole table... */
            //    0       ,     rehash     
            // T = O(1)
            if (d->ht[0].used == 0) {
                //    0     
                zfree(d->ht[0].table);
                //      1           0     
                d->ht[0] = d->ht[1];
                //      1     
                _dictReset(&d->ht[1]);
                //    rehash   
                d->rehashidx = -1;
                //    0 ,       rehash     
                return 0;
            }
    
            /* Note that rehashidx can't overflow as we are sure there are more
             * elements because ht[0].used != 0 */
            //    rehashidx     
            assert(d->ht[0].size > (unsigned)d->rehashidx);
    
            //           ,         
            while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
    
            //             
            de = d->ht[0].table[d->rehashidx];
            /* Move all the keys in this bucket from the old to the new hash HT */
            //                 
            // T = O(1)
            while(de) {
                unsigned int h;
    
                //          
                nextde = de->next;
    
                /* Get the index in the new hash table */
                //           ,           
                h = dictHashKey(d, de->key) & d->ht[1].sizemask;
    
                //          
                de->next = d->ht[1].table[h];
                d->ht[1].table[h] = de;
    
                //      
                d->ht[0].used--;
                d->ht[1].used++;
    
                //         
                de = nextde;
            }
            //                  
            d->ht[0].table[d->rehashidx] = NULL;
            //    rehash   
            d->rehashidx++;
        }
    
        return 1;
    }

    (3)ソース分析
  • 在_dictRehashStep関数では、dictRehashメソッドが呼び出されます.dictRehashStepは毎回ht[0]からht[1]までの値をrehashするだけですが、_dictRehashStepは、dictGetRandomKey、dictFind、dictGenericDelete、dictAddによって呼び出されるため、dict削除のたびに呼び出されるため、rehashプロセスが加速するに違いない.
  • dictRehash関数でrehash n個の要素を増分するたびに、自動的にサイズを調整する際にht[1]のサイズが設定されているため、rehashの主なプロセスはht[0]を遍歴してkeyを取得し、そのkeyをht[1]のバケツのサイズで再rehashし、rehashが完了した後にht[0]をht[1]に指向し、ht[1]を空にすることである.この過程でrehashidxは非常に重要であり、前回rehashのht[0]の下付き位置を表す.

  • 3.active rehashing
    関数呼び出しの手順は、serverCron->databasesCron->incrementallyRehash->dictRehashMilliseconds->dictRehash
    (1)redis.c/serverCron関数のソースコード
    /* This is our timer interrupt, called server.hz times per second.
     *
     *    Redis       ,     server.hz  。
     *
     * Here is where we do a number of things that need to be done asynchronously.
     * For instance:
     *
     *             :
     *
     * - Active expired keys collection (it is also performed in a lazy way on
     *   lookup).
     *          。
     *
     * - Software watchdog.
     *        watchdog    。
     *
     * - Update some statistic.
     *         。
     *
     * - Incremental rehashing of the DBs hash tables.
     *             Rehash
     *
     * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
     *      BGSAVE    AOF   ,       BGSAVE   AOF           。
     *
     * - Clients timeout of different kinds.
     *          。
     *
     * - Replication reconnection.
     *       
     *
     * - Many more...
     *     。。。
     *
     * Everything directly called here will be called server.hz times per second,
     * so in order to throttle execution of things we want to do less frequently
     * a macro is used: run_with_period(milliseconds) { .... }
     *
     *    serverCron                server.hz  ,
     *                 ,
     *        run_with_period(milliseconds) { ... } ,
     *                      milliseconds     。
     */
    
    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        int j;
        REDIS_NOTUSED(eventLoop);
        REDIS_NOTUSED(id);
        REDIS_NOTUSED(clientData);
    
        /* Software watchdog: deliver the SIGALRM that will reach the signal
         * handler if we don't return here fast enough. */
        if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
    
        /* Update the time cache. */
        updateCachedTime();
    
        //             
        run_with_period(100) trackOperationsPerSecond();
    
        /* We have just REDIS_LRU_BITS bits per object for LRU information.
         * So we use an (eventually wrapping) LRU clock.
         *
         * Note that even if the counter wraps it's not a big problem,
         * everything will still work but some object will appear younger
         * to Redis. However for this to happen a given object should never be
         * touched for all the time needed to the counter to wrap, which is
         * not likely.
         *
         *             1.5       ,
         *           ,                     。
         *           1.5          ,        。
         *
         * Note that you can change the resolution altering the
         * REDIS_LRU_CLOCK_RESOLUTION define.
         *
         * LRU             REDIS_LRU_CLOCK_RESOLUTION      。
         */
        server.lruclock = getLRUClock();
    
        /* Record the max memory used since the server was started. */
        //           
        if (zmalloc_used_memory() > server.stat_peak_memory)
            server.stat_peak_memory = zmalloc_used_memory();
    
        /* Sample the RSS here since this is a relatively slow call. */
        server.resident_set_size = zmalloc_get_rss();
    
        /* We received a SIGTERM, shutting down here in a safe way, as it is
         * not ok doing so inside the signal handler. */
        //         SIGTERM   ,     
        if (server.shutdown_asap) {
    
            //        
            if (prepareForShutdown(0) == REDIS_OK) exit(0);
    
            //       ,     LOG ,       
            redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
            server.shutdown_asap = 0;
        }
    
        /* Show some info about non-empty databases */
        //            
        run_with_period(5000) {
            for (j = 0; j < server.dbnum; j++) {
                long long size, used, vkeys;
    
                //         
                size = dictSlots(server.db[j].dict);
                //         
                used = dictSize(server.db[j].dict);
                //             
                vkeys = dictSize(server.db[j].expires);
    
                //   LOG     
                if (used || vkeys) {
                    redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
                    /* dictPrintStats(server.dict); */
                }
            }
        }
    
        /* Show information about connected clients */
        //            SENTINEL    ,            
        if (!server.sentinel_mode) {
            run_with_period(5000) {
                redisLog(REDIS_VERBOSE,
                    "%lu clients connected (%lu slaves), %zu bytes in use",
                    listLength(server.clients)-listLength(server.slaves),
                    listLength(server.slaves),
                    zmalloc_used_memory());
            }
        }
    
        /* We need to do a few operations on clients asynchronously. */
        //      ,       ,            
        clientsCron();
    
        /* Handle background operations on Redis databases. */
        //           
        databasesCron();
    
        /* Start a scheduled AOF rewrite if this was requested by the user while
         * a BGSAVE was in progress. */
        //    BGSAVE   BGREWRITEAOF       
        //       BGREWRITEAOF    ,     BGREWRITEAOF
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
            server.aof_rewrite_scheduled)
        {
            rewriteAppendOnlyFileBackground();
        }
    
        /* Check if a background saving or AOF rewrite in progress terminated. */
        //    BGSAVE    BGREWRITEAOF         
        if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
            int statloc;
            pid_t pid;
    
            //           ,   
            if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
                int exitcode = WEXITSTATUS(statloc);
                int bysignal = 0;
    
                if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
    
                // BGSAVE     
                if (pid == server.rdb_child_pid) {
                    backgroundSaveDoneHandler(exitcode,bysignal);
    
                // BGREWRITEAOF     
                } else if (pid == server.aof_child_pid) {
                    backgroundRewriteDoneHandler(exitcode,bysignal);
    
                } else {
                    redisLog(REDIS_WARNING,
                        "Warning, detected child with unmatched pid: %ld",
                        (long)pid);
                }
                updateDictResizePolicy();
            }
        } else {
    
            /* If there is not a background saving/rewrite in progress check if
             * we have to save/rewrite now */
            //      BGSAVE    BGREWRITEAOF    ,            
    
            //         ,        BGSAVE   
             for (j = 0; j < server.saveparamslen; j++) {
                struct saveparam *sp = server.saveparams+j;
    
                /* Save if we reached the given amount of changes,
                 * the given amount of seconds, and if the latest bgsave was
                 * successful or if, in case of an error, at least
                 * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
                //                 
                if (server.dirty >= sp->changes &&
                    server.unixtime-server.lastsave > sp->seconds &&
                    (server.unixtime-server.lastbgsave_try >
                     REDIS_BGSAVE_RETRY_DELAY ||
                     server.lastbgsave_status == REDIS_OK))
                {
                    redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                        sp->changes, (int)sp->seconds);
                    //    BGSAVE
                    rdbSaveBackground(server.rdb_filename);
                    break;
                }
             }
    
             /* Trigger an AOF rewrite if needed */
            //    BGREWRITEAOF
             if (server.rdb_child_pid == -1 &&
                 server.aof_child_pid == -1 &&
                 server.aof_rewrite_perc &&
                 // AOF             BGREWRITEAOF        
                 server.aof_current_size > server.aof_rewrite_min_size)
             {
                //       AOF     ,AOF      
                long long base = server.aof_rewrite_base_size ?
                                server.aof_rewrite_base_size : 1;
    
                // AOF            base        
                long long growth = (server.aof_current_size*100/base) - 100;
    
                //               growth ,     BGREWRITEAOF
                if (growth >= server.aof_rewrite_perc) {
                    redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                    //    BGREWRITEAOF
                    rewriteAppendOnlyFileBackground();
                }
             }
        }
    
        //    AOF   ,
        //         AOF            AOF    
        /* AOF postponed flush: Try at every cron cycle if the slow fsync
         * completed. */
        if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    
        /* AOF write errors: in this case we have a buffer to flush as well and
         * clear the AOF error in case of success to make the DB writable again,
         * however to try every second is enough in case of 'hz' is set to
         * an higher frequency. */
        run_with_period(1000) {
            if (server.aof_last_write_status == REDIS_ERR)
                flushAppendOnlyFile(0);
        }
    
        /* Close clients that need to be closed asynchronous */
        //               
        freeClientsInAsyncFreeQueue();
    
        /* Clear the paused clients flag if needed. */
        clientsArePaused(); /* Don't check return value, just use the side effect. */
    
        /* Replication cron function -- used to reconnect to master and
         * to detect transfer failures. */
        //     
        //        、        ACK 、          、             ,  
        run_with_period(1000) replicationCron();
    
        /* Run the Redis Cluster cron. */
        //              ,        
        run_with_period(100) {
            if (server.cluster_enabled) clusterCron();
        }
    
        /* Run the Sentinel timer if we are in sentinel mode. */
        //          sentinel    ,     SENTINEL     
        run_with_period(100) {
            if (server.sentinel_mode) sentinelTimer();
        }
    
        /* Cleanup expired MIGRATE cached sockets. */
        //   。。。TODO
        run_with_period(1000) {
            migrateCloseTimedoutSockets();
        }
    
        //    loop    
        server.cronloops++;
    
        return 1000/server.hz;
    }

    (2)redis.c/databasesCron関数のソースコードは次の通りです.
    /* This function handles 'background' operations we are required to do
     * incrementally in Redis databases, such as active key expiring, resizing,
     * rehashing. */
    //            ,    ,         rehash
    void databasesCron(void) {
    
        //              ,              
    
        /* Expire keys by random sampling. Not required for slaves
         * as master will synthesize DELs for us. */
        //            ,           
        if (server.active_expire_enabled && server.masterhost == NULL)
            //       CYCLE_SLOW ,             
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
    
        /* Perform hash tables rehashing if needed, but only if there are no
         * other processes saving the DB on disk. Otherwise rehashing is bad
         * as will cause a lot of copy-on-write of memory pages. */
        //     BGSAVE    BGREWRITEAOF    ,       rehash
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
            /* We use global counters so if we stop the computation at a given
             * DB we'll be able to start from the successive in the next
             * cron loop iteration. */
            static unsigned int resize_db = 0;
            static unsigned int rehash_db = 0;
            unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
            unsigned int j;
    
            /* Don't test more DBs than we have. */
            //            
            if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
    
            /* Resize */
            //        
            for (j = 0; j < dbs_per_call; j++) {
                tryResizeHashTables(resize_db % server.dbnum);
                resize_db++;
            }
    
            /* Rehash */
            //          rehash
            if (server.activerehashing) {
                for (j = 0; j < dbs_per_call; j++) {
                    int work_done = incrementallyRehash(rehash_db % server.dbnum);
                    rehash_db++;
                    if (work_done) {
                        /* If the function did some work, stop here, we'll do
                         * more at the next cron loop. */
                        break;
                    }
                }
            }
        }
    }

    (3)redis.c/incrementallyRehash関数ソースコード
    /* Our hash table implementation performs rehashing incrementally while
     * we write/read from the hash table. Still if the server is idle, the hash
     * table will use two tables for a long time. So we try to use 1 millisecond
     * of CPU time at every call of this function to perform some rehahsing.
     *
     *               /                rehash ,
     *                 ,       rehash           ,
     *           ,             rehash 。
     *
     * The function returns 1 if some rehashing was performed, otherwise 0
     * is returned. 
     *
     *          rehash     10 。
     */
    int incrementallyRehash(int dbid) {
    
        /* Keys dictionary */
        if (dictIsRehashing(server.db[dbid].dict)) {
            dictRehashMilliseconds(server.db[dbid].dict,1);
            return 1; /* already used our millisecond for this loop... */
        }
    
        /* Expires */
        if (dictIsRehashing(server.db[dbid].expires)) {
            dictRehashMilliseconds(server.db[dbid].expires,1);
            return 1; /* already used our millisecond for this loop... */
        }
    
        return 0;
    }

    (4)dict.c/dictRehashMilliseconds関数ソースコード
    /* Rehash for an amount of time between ms milliseconds and ms+1 milliseconds */
    
    int dictRehashMilliseconds(dict *d, int ms) {
    
    long long start = timeInMilliseconds();
    
    int rehashes = 0;
    
    while(dictRehash(d,100)) {
    
    rehashes += 100;
    
    if (timeInMilliseconds()-start &gt; ms) break;
    
    }
    
    return rehashes;
    
    }

    (5)dictRehash関数のソースコードは既に貼り付けられている.
    (6)分析
  • serverCronでは、バックグラウンドサブスレッドがない場合、databasesCron関数を呼び出し、incrementallyRehash関数を呼び出し、dictRehashMilliseconds関数を呼び出し、最終的にdictRehash関数を呼び出します.
  • incrementallyRehashは時間が長く、rehashの個数も多い.ここでは毎回1 millisecond rehash操作を実行する.rehashが完了していない場合は、次のloopで実行を続行します.

  • 私はまだ浅いので、もし間違いがあれば、指摘してください.ありがとうございます.もしあなたがもっと良い提案があれば、伝言を残して私たちと一緒に討論して、一緒に進歩することができます!このブログを辛抱強く読んでいただき、誠にありがとうございます.
    参考書:『Redis設計と実現(第2版)』-黄健宏参考リンク:Redis-resizeソース解析