redisソース分析(6)——aof rewrite

19960 ワード

redisの実行に伴い、aofは膨張し続け(1つのkeyに対して複数のaofログがある)、aofによってデータを復元するのに多くの不要な時間を費やす.redisが提供する解決策はaof rewriteである.dbの内容に基づいて、各keyに対して1つのログを生成する.aofがトリガーするタイミング:
1)ユーザ呼び出しBGREWRITEAOFコマンド
2)aofログサイズが予め設定された限度額を超える
1.AOF Rewriteトリガタイミング
まず、BGREWRITEAOFの処理関数を見てみましょう.
void bgrewriteaofCommand(redisClient *c) {
    if (server.aof_child_pid != -1) {
        addReplyError(c,"Background append only file rewriting already in progress");
    } else if (server.rdb_child_pid != -1) {
        server.aof_rewrite_scheduled = 1;
        addReplyStatus(c,"Background append only file rewriting scheduled");
    } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
        addReplyStatus(c,"Background append only file rewriting started");
    } else {
        addReply(c,shared.err);
    }
}

aof_child_pidはaof rewriteプロセスを行うpid,rdb_を示すchild_pidはrdb dumpを行うプロセスpidを示す.
1)aof rewriteが現在進行中の場合、クライアントエラーが返されます.
2)rdb dumpが現在進行中の場合、ディスクへの圧力を避けるためaof_rewrite_scheduledを1に設定し、aof rewriteおよびrdb dumpが行われない場合にrewriteをオンにします.
3)aof rewriteおよびrdb dumpが現在実行されていない場合は、aof rewriteのためにrewriteAppendOnlyFileBackgroundが呼び出されます.
4)異常の場合、直接エラーを返す.
次に、serverCronでaof rewriteがどのようにトリガーされているかを見てみましょう.最初のトリガポイントはrdb dumpとの衝突を回避し、rewriteを遅延トリガすることである.
    /* Start a scheduled AOF rewrite if this was requested by the user while
     * a BGSAVE was in progress. */
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }

aof rewriteとrdb dumpが現在行われていないことを確認し、aof_が設定されていることを確認する必要があります.rewrite_scheduled、rewirteAppendOnlyFileBackgroundを呼び出してaof rewriteを行います.
2番目のトリガ位置はaofファイルのサイズが所定のパーセントを超えることです.
         /* Trigger an AOF rewrite if needed */
         if (server.rdb_child_pid == -1 &&
             server.aof_child_pid == -1 &&
             server.aof_rewrite_perc &&
             server.aof_current_size > server.aof_rewrite_min_size)
         {
            long long base = server.aof_rewrite_base_size ?
                            server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
         }

aofファイルが所定の最小値を超え、前回のaofファイルの一定パーセントを超えると、aof rewriteがトリガーされます.
2. AOF Rewrite
rewriteの大まかな流れは、サブプロセスを作成し、現在のスナップショットを取得し、その後のコマンドをaof_に記録することです.rewrite_bufでは、サブプロセスはdbを遍歴してaof一時ファイルを生成し、終了する.親プロセスwaitサブプロセス、終了後aof_rewrite_bufのデータはaofファイルに追加され、最後に一時ファイルの名前を正式なaofファイルに変更します.
具体的なコードを見てみましょう.まずrewriteAppendOnlyFileBackgroundです.
    pid_t childpid;
    long long start;

    // <MM>
    //           rewrite
    // </MM>
    if (server.aof_child_pid != -1) return REDIS_ERR;

他のaof rewriteプロセスが進行中の場合は、エラーを直接返します.
    start = ustime();
    if ((childpid = fork()) == 0) {
        char tmpfile[256];

        /* Child */
        // <MM>
        //          
        // </MM>
        closeListeningSockets(0);
        redisSetProcTitle("redis-aof-rewrite");

        // <MM>
        //     aof   
        // </MM>
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
            size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty) {
                redisLog(REDIS_NOTICE,
                    "AOF rewrite: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }

現在の時間を除いて、forkの統計に時間がかかります.次にforkを呼び出し、サブプロセスのプロセスに入ります.サブプロセスは、クライアント接続の受信を回避するために、まずリスニングsocketを閉じます.プロセスのtitleも同時に設定します.次に、rewirteが書き込む一時ファイル名を生成します.次にrewriteAppendOnlyFileを呼び出してrewriteを行います.rewriteが成功した場合、copy-on-writeの汚れたページを統計し、ログを記録し、コード0でプロセスを終了します.rewriteが失敗した場合、プロセスを終了し、終了コードとして1を返します.
親プロセスのプロセスを見てみましょう.
    } else {
        /* Parent */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) {
            redisLog(REDIS_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
        }
        redisLog(REDIS_NOTICE,
            "Background append only file rewriting started by pid %d",childpid);
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        server.aof_child_pid = childpid;
        updateDictResizePolicy();
        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
         * accumulated by the parent into server.aof_rewrite_buf will start
         * with a SELECT statement and it will be safe to merge. */
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return REDIS_OK;
    }

親プロセスはまずforkの時間とサンプリングを統計します.forkが失敗した場合、ログを記録し、エラーを返します.forkが成功すればaof_rewrite_scheduledクリア、rewrite開始時間およびaof_の記録child_pid(redisはこの属性でaof rewriteが進行しているかどうかを判断します).updateDictResizePolicyを呼び出してdbのkey spaceを調整するrehashポリシーは、サブプロセスが作成されているため、copy-on-writeが大量のメモリページをコピーすることを避けるため、dictのrehashは禁止されます.
aof_selected_dbは-1に設定され、次のaofはまずselect dbのログを生成し、aof_に書き込むことを目的としています.rewrite_bufではaof_rewrite_bufは正常にrewriteに追加されたファイルです.ReplicationScriptCacheFlushはしばらくこれを見ていないので、後で補充します.
次に、aof rewriteを一気に行うプロセスを見て、rewriteAppendOnlyFile関数に入ります.大体、すべてのkeyを巡り、シーケンス化してaofファイルに記録します.
    dictIterator *di = NULL;
    dictEntry *de;
    rio aof;
    FILE *fp;
    char tmpfile[256];
    int j;
    long long now = mstime();

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return REDIS_ERR;
    }

現在の時刻を取得し、一時ファイル名を生成してファイルを作成します.
    rioInitWithFile(&aof,fp);
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);

rioはストリーム向けのI/Oインタフェースであり,下位層では異なる実装が可能であり,現在はファイルとメモリbufferの実装が提供されている.ここでrioを初期化します.サーバが構成されている場合.aof_rewrite_incremental_fsyncは、aofを書くときにfsyncをインクリメンタルに行い、ここでは32 Mを書き込むたびにsyncを1回構成する.集中syncによるディスクの満タン化を回避します.
次に、redisの各dbを巡回してrewirteを行うループです.ループの内部を直接見る:
        char selectcmd[] = "*2\r
$6\r
SELECT\r
"; redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } /* SELECT the new DB */ if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;

まず、dbに対応するselectコマンドを生成し、dbが空であればスキップし、rewriteの次のdbを表示します.その後、dbの反復器を取得し、取得に失敗した場合、エラーを直接返します.最後にselect dbのコマンドをファイルに書き込みます.
次に、dbの各keyを巡回し、対応するコマンドを生成するためのループです.
while ((de = dictNext(di)) != NULL) {
    // ...
}
dictReleaseIterator(di);

ループの内部を見続けます.
            sds keystr;
            robj key, *o;
            long long expiretime;

            keystr = dictGetKey(de);
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);

            expiretime = getExpire(db,&key);

            /* If this key is already expired skip it */
            if (expiretime != -1 && expiretime < now) continue;

deはdictのentryで、keyとvalueが含まれています.ここで、まずkeyとvalueを取得し、keyをrobjタイプに変換する.次にkeyに対応するタイムアウト時間を取得する.タイムアウトした場合は、このkeyをスキップします.
            /* Save the key and associated value */
            if (o->type == REDIS_STRING) {
                /* Emit a SET command */
                char cmd[]="*3\r
$3\r
SET\r
"; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkObject(&aof,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { if (rewriteListObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_SET) { if (rewriteSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_ZSET) { if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_HASH) { if (rewriteHashObject(&aof,&key,o) == 0) goto werr; } else { redisPanic("Unknown object type"); }

次に、オブジェクトのタイプに応じて、対応するコマンドにシーケンス化します.コマンドをaofファイルに書き込みます.具体的な各オブジェクトのシーケンス化については、ここでは詳しく説明しない.
            /* Save the expire time */
            if (expiretime != -1) {
                char cmd[]="*3\r
$9\r
PEXPIREAT\r
"; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; }

タイムアウト時間がある場合は、同じようにコマンドにシーケンス化してaofファイルに記録します.
すべてのdbのrewriteが終了したら、クリーンアップ作業を行います.
    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }
    redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
    return REDIS_OK;

fflushを呼び出し、fsyncはデータをディスクに着陸し、最後にcloseファイルを呼び出します.一時ファイルの名前を変更し、生成されたaofファイルが完全にokであることを確認し、aofが不完全な場合を避ける.最後に、ログを印刷して戻ります.
werr:
    fclose(fp);
    unlink(tmpfile);
    redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
    if (di) dictReleaseIterator(di);
    return REDIS_ERR;

ファイルを開くと、どのステップでもエラーが発生し、werrにジャンプしてエラー処理を行います.ここでは、ファイルcloseを削除し、dictの反復器が解放されていない場合は解放する必要があります.最後にerrorを返します.
これで、サブプロセスのaof rewriteタスクは完了し、rewrite後のファイルは生成されますが、rewriteプロセスで得られたログはaofファイルに記録されていないため、メインプロセスで完了する作業も一部完了する必要があります.
3.AOF Rewrite Buffer追加
マルチプロセスプログラミングでは、サブプロセスが終了すると、親プロセスがクリーンアップする必要があります.そうしないと、サブプロセスはゾンビプロセスをプログラミングします.同様にserverCron関数では、メインプロセスはrewriteプロセスのクリーンアップを完了します.
    /* Check if a background saving or AOF rewrite in progress terminated. */
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
        int statloc;
        pid_t pid;

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;

            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

            if (pid == server.rdb_child_pid) {
                backgroundSaveDoneHandler(exitcode,bysignal);
            } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);
            } else {
                redisLog(REDIS_WARNING,
                    "Warning, detected child with unmatched pid: %ld",
                    (long)pid);
            }
            updateDictResizePolicy();
        }
    } else {

プロセスrdb dumpまたはaof rewriteの場合、メインプロセスはブロックされていないwait 3関数を呼び出し、サブプロセスが終了した後、その終了状態を取得します.終了したプロセスがaof rewriteプロセスである場合、backgroundRewriteDoneHandler関数が呼び出され、最後の終了作業が行われます.この関数を見てみましょう.
正常に終了した場合、被信号killはなく、終了コードは0に等しい.
        int newfd, oldfd;
        char tmpfile[256];
        long long now = ustime();
        mstime_t latency;

        redisLog(REDIS_NOTICE,
            "Background AOF rewrite terminated with success");

        /* Flush the differences accumulated by the parent to the
         * rewritten AOF. */
        latencyStartMonitor(latency);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
            (int)server.aof_child_pid);
        newfd = open(tmpfile,O_WRONLY|O_APPEND);
        if (newfd == -1) {
            redisLog(REDIS_WARNING,
                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
            goto cleanup;
        }

まずログを記録し、一時的に書き込まれたrewriteファイルを開きます.
        // <MM>
        //  rewrite buf     
        // </MM>
        if (aofRewriteBufferWrite(newfd) == -1) {
            redisLog(REDIS_WARNING,
                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
            close(newfd);
            goto cleanup;
        }
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);

        redisLog(REDIS_NOTICE,
            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize());

次にaof rewrite bufferをファイルに追加します.
        /* The only remaining thing to do is to rename the temporary file to
         * the configured file and switch the file descriptor used to do AOF
         * writes. We don't want close(2) or rename(2) calls to block the
         * server on old file deletion.
         *
         * There are two possible scenarios:
         *
         * 1) AOF is DISABLED and this was a one time rewrite. The temporary
         * file will be renamed to the configured file. When this file already
         * exists, it will be unlinked, which may block the server.
         *
         * 2) AOF is ENABLED and the rewritten AOF will immediately start
         * receiving writes. After the temporary file is renamed to the
         * configured file, the original AOF file descriptor will be closed.
         * Since this will be the last reference to that file, closing it
         * causes the underlying file to be unlinked, which may block the
         * server.
         *
         * To mitigate the blocking effect of the unlink operation (either
         * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
         * use a background thread to take care of this. First, we
         * make scenario 1 identical to scenario 2 by opening the target file
         * when it exists. The unlink operation after the rename(2) will then
         * be executed upon calling close(2) for its descriptor. Everything to
         * guarantee atomicity for this switch has already happened by then, so
         * we don't care what the outcome or duration of that close operation
         * is, as long as the file descriptor is released again. */
        if (server.aof_fd == -1) {
            // <MM>
            //     AOF,      aof rewrite
            // </MM>
            /* AOF disabled */

             /* Don't care if this fails: oldfd will be -1 and we handle that.
              * One notable case of -1 return is if the old file does
              * not exist. */
             oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
        } else {
            /* AOF enabled */
            oldfd = -1; /* We'll set this to the current AOF filedes later. */
        }

        /* Rename the temporary file. This will not unlink the target file if
         * it exists, because we reference it with "oldfd". */
        latencyStartMonitor(latency);
        if (rename(tmpfile,server.aof_filename) == -1) {
            redisLog(REDIS_WARNING,
                "Error trying to rename the temporary AOF file: %s", strerror(errno));
            close(newfd);
            if (oldfd != -1) close(oldfd);
            goto cleanup;
        }
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rename",latency);

        if (server.aof_fd == -1) {
            /* AOF disabled, we don't need to set the AOF file descriptor
             * to this new file, so we can close it. */
            close(newfd);
        } else {
            /* AOF enabled, replace the old fd with the new one. */
            oldfd = server.aof_fd;
            server.aof_fd = newfd;
            if (server.aof_fsync == AOF_FSYNC_ALWAYS)
                aof_fsync(newfd);
            else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
                aof_background_fsync(newfd);
            server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
            aofUpdateCurrentSize();
            server.aof_rewrite_base_size = server.aof_current_size;

            /* Clear regular AOF buffer since its contents was just written to
             * the new AOF from the background rewrite buffer. */
            sdsfree(server.aof_buf);
            server.aof_buf = sdsempty();
        }

その後、一時ファイルの名前を最終aofファイルに変更します.
        server.aof_lastbgrewrite_status = REDIS_OK;

        redisLog(REDIS_NOTICE, "Background AOF rewrite finished successfully");
        /* Change state from WAIT_REWRITE to ON if needed */
        if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
            server.aof_state = REDIS_AOF_ON;

        /* Asynchronously close the overwritten AOF. */
        if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);

        redisLog(REDIS_VERBOSE,
            "Background AOF rewrite signal handler took %lldus", ustime()-now);

最後に、ステータスを更新し、前のaofファイルを非同期で閉じます.
rewriteサブプロセスが異常に終了し、信号killまたは終了コードが0でない場合、ログのみが記録されます.
    } else if (!bysignal && exitcode != 0) {
        server.aof_lastbgrewrite_status = REDIS_ERR;

        redisLog(REDIS_WARNING,
            "Background AOF rewrite terminated with error");
    } else {
        server.aof_lastbgrewrite_status = REDIS_ERR;

        redisLog(REDIS_WARNING,
            "Background AOF rewrite terminated by signal %d", bysignal);
    }

rewrite bufferの追加やファイルの名前変更に失敗した場合は、cleanupブランチ処理が必要です.
cleanup:
    aofRewriteBufferReset();
    aofRemoveTempFile(server.aof_child_pid);
    server.aof_child_pid = -1;
    server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
    server.aof_rewrite_time_start = -1;
    /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
    if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
        server.aof_rewrite_scheduled = 1;

主にステータスをリセットして、次のrewriteを行います.
上はaof rewriteの全体的な流れで、rdb関連部分を紹介します.