Redisコード読解3-Redisネットワークリスニング(3)

11272 ワード

Redisネットワーク傍受を紹介する最後の文章で、タイミング処理関数serverCronの分析に重点を置いています.この関数は実はネットワーク傍受とはあまり関係ありません.当時、Redisがカスタマイズしたイベントライブラリのタイミングイベントにバインドされていたので、一緒に話します.serverCronのこの関数はRedisの正常な動作にとって重要であり,Redisの利用者にとって最も重要なのはRedisの現在の動作状況(keys,sizes,memoryなど)を迅速かつ直感的に見ることができ,serverCronはこれらの情報を利用者に知らせることができ,またserverCronという方法は定時周期的に動作し,AOF Write,VM Swap,BGSAVE,Rehashの動作により、Redisの動作がよりスムーズになります.コードによって直接分析します.
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    int j, loops = server.cronloops;
    REDIS_NOTUSED(eventLoop);
    REDIS_NOTUSED(id);
    REDIS_NOTUSED(clientData);

    /* We take a cached value of the unix time in the global state because
     * with virtual memory and aging there is to store the current time
     * in objects at every object access, and accuracy is not needed.
     * To access a global var is faster than calling time(NULL) */
    server.unixtime = time(NULL);
    /* We have just 22 bits per object for LRU information.
     * So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
     * 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
     *
     * Note that even if this will wrap after 1.5 years it's not a problem,
     * everything will still work but just some object will appear younger
     * to Redis. But for this to happen a given object should never be touched
     * for 1.5 years.
     *
     * Note that you can change the resolution altering the
     * REDIS_LRU_CLOCK_RESOLUTION define.
     */
    updateLRUClock();

    /* We received a SIGTERM, shutting down here in a safe way, as it is
     * not ok doing so inside the signal handler. */
    if (server.shutdown_asap) {
        if (prepareForShutdown() == REDIS_OK) exit(0);
        redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
    }

    /* Show some info about non-empty databases */
    for (j = 0; j < server.dbnum; j++) {
        long long size, used, vkeys;

        size = dictSlots(server.db[j].dict);
        used = dictSize(server.db[j].dict);
        vkeys = dictSize(server.db[j].expires);
        if (!(loops % 50) && (used || vkeys)) {
            redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
            /* dictPrintStats(server.dict); */
        }
    }

    /* We don't want to resize the hash tables while a bacground saving
     * is in progress: the saving child is created using fork() that is
     * implemented with a copy-on-write semantic in most modern systems, so
     * if we resize the HT while there is the saving child at work actually
     * a lot of memory movements in the parent will cause a lot of pages
     * copied. */
    if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) {
        if (!(loops % 10)) tryResizeHashTables();
        if (server.activerehashing) incrementallyRehash();
    }

    /* Show information about connected clients */
    if (!(loops % 50)) {
        redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
            listLength(server.clients)-listLength(server.slaves),
            listLength(server.slaves),
            zmalloc_used_memory());
    }

    /* Close connections of timedout clients */
    if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
        closeTimedoutClients();

    /* Check if a background saving or AOF rewrite in progress terminated */
    if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
        int statloc;
        pid_t pid;

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            if (pid == server.bgsavechildpid) {
                backgroundSaveDoneHandler(statloc);
            } else {
                backgroundRewriteDoneHandler(statloc);
            }
            updateDictResizePolicy();
        }
    } else {
        /* If there is not a background saving in progress check if
         * we have to save now */
         time_t now = time(NULL);
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            if (server.dirty >= sp->changes &&
                now-server.lastsave > sp->seconds) {
                redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, sp->seconds);
                rdbSaveBackground(server.dbfilename);
                break;
            }
         }
    }

    /* Expire a few keys per cycle, only if this is a master.
     * On slaves we wait for DEL operations synthesized by the master
     * in order to guarantee a strict consistency. */
    if (server.masterhost == NULL) activeExpireCycle();

    /* Swap a few keys on disk if we are over the memory limit and VM
     * is enbled. Try to free objects from the free list first. */
    if (vmCanSwapOut()) {
        while (server.vm_enabled && zmalloc_used_memory() >
                server.vm_max_memory)
        {
            int retval = (server.vm_max_threads == 0) ?
                        vmSwapOneObjectBlocking() :
                        vmSwapOneObjectThreaded();
            if (retval == REDIS_ERR && !(loops % 300) &&
                zmalloc_used_memory() >
                (server.vm_max_memory+server.vm_max_memory/10))
            {
                redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
            }
            /* Note that when using threade I/O we free just one object,
             * because anyway when the I/O thread in charge to swap this
             * object out will finish, the handler of completed jobs
             * will try to swap more objects if we are still out of memory. */
            if (retval == REDIS_ERR || server.vm_max_threads > 0) break;
        }
    }

    /* Replication cron function -- used to reconnect to master and
     * to detect transfer failures. */
    if (!(loops % 10)) replicationCron();

    server.cronloops++;
    return 100;
}

/* This function gets called every time Redis is entering the
 * main loop of the event driven library, that is, before to sleep
 * for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
    REDIS_NOTUSED(eventLoop);
    listNode *ln;
    redisClient *c;

    /* Awake clients that got all the swapped keys they requested */
    if (server.vm_enabled && listLength(server.io_ready_clients)) {
        listIter li;

        listRewind(server.io_ready_clients,&li);
        while((ln = listNext(&li))) {
            c = ln->value;
            struct redisCommand *cmd;

            /* Resume the client. */
            listDelNode(server.io_ready_clients,ln);
            c->flags &= (~REDIS_IO_WAIT);
            server.vm_blocked_clients--;
            aeCreateFileEvent(server.el, c->fd, AE_READABLE,
                readQueryFromClient, c);
            cmd = lookupCommand(c->argv[0]->ptr);
            redisAssert(cmd != NULL);
            call(c,cmd);
            resetClient(c);
            /* There may be more data to process in the input buffer. */
            if (c->querybuf && sdslen(c->querybuf) > 0)
                processInputBuffer(c);
        }
    }

    /* Try to process pending commands for clients that were just unblocked. */
    while (listLength(server.unblocked_clients)) {
        ln = listFirst(server.unblocked_clients);
        redisAssert(ln != NULL);
        c = ln->value;
        listDelNode(server.unblocked_clients,ln);
        c->flags &= ~REDIS_UNBLOCKED;

        /* Process remaining data in the input buffer. */
        if (c->querybuf && sdslen(c->querybuf) > 0)
            processInputBuffer(c);
    }

    /* Write the AOF buffer on disk */
    flushAppendOnlyFile();
}

i.まずserver.cronloopsの値はloops,serverに与えられる.cronloopsはserverCron関数の実行回数を指し、serverCron関数を実行するたびにserver.cronloops++,server.cronloopsの内部実行ロジックはserver.cronloops値の違いによって変化します.ii.serverでunixtime=time(NULL)は現在の時間を保存します.virtual memory and agingの場合、Objectごとのaccess時間を知る必要がありますが、この時間は正確ではありませんので、グローバル変数による取得時間はtime(NULL)を呼び出すよりもずっと速いです.iii.Redisの最大メモリ使用量を記録する.SIGTERM信号が受信された場合、Redis iv.serverCronメソッドは、Redis内の各非空のDBの使用状況(used,keys,sizes)および現在接続されているclientsを50回実行ごとに表示し、使用するメモリサイズを終了しようとする.v.serverCronメソッドは10回実行するごとにRehashを試み、a bacground savingが進行中であればrehashを行わず、一部のデータが失われないようにする.vi.timeoutのclientsを閉じる;vii.BGSAVEの実行中にクライアントがbgrewriteaofというコマンドを実行すると、serverCronでa scheduled AOF rewrite viiiの実行が開始する.現在のRedisがBGSAVEまたはAOF rewriteを行っている場合、check BGSAVEまたはAOF rewriteが終了しているかどうか、終了している場合は対応する関数処理(backgroundSaveDoneHandler/backgroundRewriteDoneHandler)を呼び出し、現在BGSAVEまたはAOF rewrite操作がない場合は、そのような操作を行うかどうかを判断し、必要であればそのような操作をトリガーする.ix.AOF buffer flush操作が一時停止された場合、serverCronが呼び出されるたびに、AOF buffer flush操作xを復元します.Masterであれば、周期的にいくつかのkey(すぐに選択された)を期限切れにします.この操作はMasterのみに注意してください.slavesであれば、masterのdel操作でkeyを同期して、強い一致性を実現します.xi.VMのSwap操作xii.運転10回ごとにreplicationCronを行い、slavesがあればxiii.100を返します.serverCronメソッドが100ミリ秒ごとに呼び出されることを示します.これはprocesstimeEventというメソッドで体現されています.
if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                aeDeleteTimeEvent(eventLoop, id);
            }
            
以上の分析により、Server CronはRehash、VM Swap、AOF write、BGSAVEなどの操作に重点を置いているが、これらの操作はいずれも時間がかかり、RedisのClientsに対する応答速度に影響を与える.従って、実際に適用する場合、「loops%10」と同様の操作を変更することで、上記の時間のかかる操作の実行周波数を決定することができ、暇があれば、異なる周波数でredisの圧力テストでの性能をテストします.
 
今回、Redisのネット傍受部分はすべて紹介しました.前に述べたいくつかの問題を振り返ってみましょう.
1.Redisはepoll,select,kququeをサポートし、プロファイルによってどちらを取るかを決定する
2.ファイル読み書きイベントとタイミングイベントのサポート
3.配列を使用してファイルイベントを維持し、チェーンテーブルを使用してタイミングイベントを保存する(タイミングイベントを検索する場合、パフォーマンスが高くなく、向上する必要がある)
4.Redis Serverの単一スレッドはイベントに応答し、前後順にイベントに応答するため、単一のRedisサーバのスループットは接続されたclientsが多くなるにつれて低下し、より多くのRedisサーバを増やすことでこの問題を解決することができる
5.Redisは多くのコードの中で、aeProcessEventのように、ポーリングされたwait時間が現在の時間と最近のタイミングイベント応答時間の差に等しいなど、できるだけ早く様々なイベントに応答することを考慮している.ポーリングwaitに入るたびにbeforesleepメソッドでunblockのclientsなどに応答します.