Redis持続化のAOF書き換え

18732 ワード

AOF永続化は、実行される書き込みコマンドを保存することによってデータベースの状態を記録するため、サーバの稼働時間が経過するにつれて、AOFファイルの内容が元より多くなり、ファイルの体積もますます大きくなり、制御しなければ、体積が大きすぎるAOFファイルはRedisサーバ、さらにはホストコンピュータ全体に影響を与え、その体積が大きいほど、AOFファイルを使用してデータの復元に要する時間が長くなります。

aofrewriteプロセスがサーバをブロックしないようにするために、Redisサーバはforkのサブプロセスでこのプロセスを実行し、いつでも1つのサブプロセスしかこのことをできません.

server関連変数


AOFの連続性を保証するため、親プロセスはaofrewrite期間の書き込みコマンドをキャッシュし、サブプロセスが書き換えられてから新しいAOFファイルに追加します.aofrewrite期間中に書き込みコマンドの書き込み量が大きい場合、サブプロセスが終了すると、親プロセスの追加が大量の書き込みディスク操作に関連し、サービス性能が低下します.
Redisは親子プロセス間でpipeを確立しaofrewrite期間の書き込みコマンドをpipeでサブプロセスに同期させることで,追加書き込みディスクの操作もサブプロセスに転嫁する.Redis serverに関連する変数は主に以下の数,主に3つのpipeである.
int aof_pipe_write_data_to_child;
int aof_pipe_read_data_from_parent;
int aof_pipe_write_ack_to_parent;
int aof_pipe_read_ack_from_child;
int aof_pipe_write_ack_to_child;
int aof_pipe_read_ack_from_parent;
int aof_stop_sending_diff; /*If true stop sending accumulated diffs to child process. */
sds aof_child_diff;        /* AOF diff accumulator child side. */

実装の原理


aofrewriteのエントリロジックはrewriteAppendOnlyFileBackground関数にある.
int rewriteAppendOnlyFileBackground(void) {
    ...
    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    ...
}

今回のaofrewriteは、バックグラウンドプロセスがaofrewriteまたはrdbをしていないことを確認するために考慮されます.

pipe初期化

int rewriteAppendOnlyFileBackground(void) {
   ...
   if (aofCreatePipes() != C_OK) return C_ERR; 
   ...
}
int aofCreatePipes(void) {
    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;

    if (pipe(fds) == -1) goto error; /* parent -> children data. */
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
    if (pipe(fds+4) == -1) goto error; /* children -> parent ack. */
    /* Parent -> children data is non blocking. */
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;

    /*          ,                   */
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

    server.aof_pipe_write_data_to_child = fds[1];
    server.aof_pipe_read_data_from_parent = fds[0];
    server.aof_pipe_write_ack_to_parent = fds[3];
    server.aof_pipe_read_ack_from_child = fds[2];
    server.aof_pipe_write_ack_to_child = fds[5];
    server.aof_pipe_read_ack_from_parent = fds[4];
    server.aof_stop_sending_diff = 0; /*             */
    return C_OK;

error:
    serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
        strerror(errno));
    for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
    return C_ERR;
}
aofCreatePipes関数ではpipeを初期化し,pipeの各変数の用途は名前からも分かるように,全部で3つのpipeがあり,1つのpipeごとに1回,2つのfdを占有する.
pipe 1は、親プロセスがサブプロセスにキャッシュされた新しいデータを送信するために使用される.サブプロセスはaofrewriteの場合、定期的にパイプからデータを読み出してキャッシュし、最後にキャッシュしたデータを書き換えた新しいAOFファイルに書き込みます.この2つのfdはいずれも非ブロックに設定されています.
pipe 2は、サブプロセスが親プロセスに終了信号を送信することを担当する.親プロセスはfds[2]をリスニングしてイベントを読み、コールバック関数はaofChildPipeReadableです.親プロセスはクライアントコマンドを絶えず受信するが、サブプロセスが親プロセスのデータを絶え間なく待つことはできないため、サブプロセスはデータベースのすべてのデータを遍歴した後、pipe 1から一定時間の読み取り操作を実行すると、pipe 2に特殊なマーク"!"を送信し、親プロセスは、子プロセスの「!するとserverが置かれます.aof_stop_sending_diffは1であり、親プロセスにキャッシュデータを送信しなくなったことを示します.
pipe 3は、親プロセスがサブプロセスに応答信号を送信する責任を負う.親プロセスは、子プロセスの「!その後、このパイプを介してサブプロセスにも「!」と応答します.停止信号が受信されたことを示します.
詳しい過程は後で詳しく話します.

親プロセス処理ロジック


rewriteAppendOnlyFileBackground関数


次に、上記の論理に従って、server forkは1つのサブプロセスを出し、2つのプロセスはそれぞれ異なる処理を行い、次に親プロセスのいくつかの主要な処理(コードが削除されている)を見ます.
int rewriteAppendOnlyFileBackground(void) {
    ...
    if ((childpid = fork()) == 0) {
        ... ...
    } else {
        server.aof_rewrite_scheduled = 0;
        server.aof_child_pid = childpid;
        updateDictResizePolicy();
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return C_OK;
    }
    ...
}

server.aof_rewrite_scheduledはゼロにし、serverCron関数でaofrewriteを繰り返しトリガすることを防止する.この場合、serverCronには次のような論理があるため、
int rewriteAppendOnlyFileBackground(void) {
    ...
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }
    ...
}

ここで、updateDictResizePolicy関数の動作は、以下のように重要である.
void updateDictResizePolicy(void) {
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
        dictEnableResize();
    else
        dictDisableResize();
}

すなわち,バックグラウンドでaofrewriteやrdbをするサブプロセスがある場合はdict rehashはしない.現在、ほとんどのオペレーティングシステムでは、サブプロセスの使用効率を向上させるために書き込み時レプリケーション(copy-on-write)が採用されています.したがって、サブプロセスが存在する間は、不要なメモリ書き込みを避ける必要があります.そうしないと、大量のメモリcopyが発生し、パフォーマンスに影響を与えます.COWの知識は、ドキュメント『Copy On Writeメカニズムを理解してください』を参照してください.
またserver.aof_selected_dbは-1に設定され、サブプロセスがデータベーススキャンを行うときにselectコマンドを挿入して正しいデータベースを選択するためです.

aofRewriteBufferAppend関数


前のブログで述べたように、feedAppendOnlyFile関数appendでコマンドを書く場合、現在aofrewriteをするサブプロセスがある場合はserverに書き込みコマンドを書く必要がある.aof_rewrite_buf_blocksの1部です.この変数はチェーンテーブルで、各ノードは最大10 MBです.
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    ...
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
}
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    ... ...
    /* Install a file event to send data to the rewrite child if there is
     * not one already. */
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
}

serverです.aof_pipe_write_data_to_childは書き込みイベントを登録し、コールバック関数はaofChildWriteDiffDataです.
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
    listNode *ln;
    aofrwblock *block;
    ssize_t nwritten;
    UNUSED(el);
    UNUSED(fd);
    UNUSED(privdata);
    UNUSED(mask);

    while(1) {
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        if (server.aof_stop_sending_diff || !block) {
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
        if (block->used > 0) {
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return;
            memmove(block->buf,block->buf+nwritten,block->used-nwritten);
            block->used -= nwritten;
        }
        if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
    }
}

サブプロセスが親プロセスにデータを送信しない(server.aof_stop_sending_diff=1)またはserver.aof_rewrite_buf_blocksが空の場合、書き込みイベントを削除します.
そうでない場合、pipe 1にデータを書き込む、書き込まれたデータはserverからなる.aof_rewrite_buf_blocks削除.

サブプロセス処理ロジック

int rewriteAppendOnlyFileBackground(void) {
    ...
    char tmpfile[256];
    closeListeningSockets(0);               /* child        socket */
    redisSetProcTitle("redis-aof-rewrite"); /*        redis-aof-rewrite */
    snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
    ...
}

まず、temp-rewriteaof-bg-%d.aofという一時的なAOFファイルの処理を行います.
その後、正式な処理関数rewriteAppendOnlyFileに入り、以下にメインコード(削除あり)を貼ります.
int rewriteAppendOnlyFile(char *filename) {
    ...
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    server.aof_child_diff = sdsempty(); /*     aof_child_diff */
    ...
}

aof_child_diff変数にaofwriteが格納されている間、サブプロセスはpipeを介して親プロセスからキャッシュデータを受信します.
次に、データベースをスキャンする操作です.
int rewriteAppendOnlyFile(char *filename) {
    ...
    rio aof;
    for (j = 0; j < server.dbnum; j++) {
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue; // skip empty database
        di = dictGetSafeIterator(d);
        while((de = dictNext(di)) != NULL) {
            ... ...
            if (aof.processed_bytes > processed+1024*10) { // 10K
                processed = aof.processed_bytes;
                aofReadDiffFromParent();
            }
        }
        dictReleaseIterator(di);
        di = NULL;
    }
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    ...
}

以上のロジックでは、サブプロセスはdbごとに各keyをスキャンし、keyのタイプに応じて異なる関数を使用してデータの書き換えを行い、期限切れのデータを持つにはappendのPEXPIREATコマンドが必要です.
前述pipeによるaofwriteの最適化については、dbを1回巡るごとにrioに書き込まれるデータ量が10 Kを超えると、pipeによって親プロセスから1回データを読み出し、serverにデータを蓄積する論理が見られる.aof_child_diff.
ssize_t aofReadDiffFromParent(void) {
    char buf[65536]; /* Default pipe buffer size on most Linux systems. */
    ssize_t nread, total = 0;

    while ((nread = read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
        total += nread;
    }
    return total;
}

クライアントが親プロセスにトラフィックを送信し続ける可能性があり、サブプロセスが親プロセスを待つことはできないため、Redisでは次のように決定します.
int rewriteAppendOnlyFile(char *filename) {
    ...
    int nodata = 0;
    mstime_t start = mstime();
    while(mstime()-start < 1000 && nodata < 20) {
        /*  1ms  ,           fd        ,     aeWait()    0 */
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
        {
            nodata++;
            continue;
        }
        //          ,  nodata
        nodata = 0;
        aofReadDiffFromParent();
    }
    ...
}

1 msタイムアウトは、親プロセスがpipeからデータを送信するのを待ち、1 msで親プロセスが20回もデータを送信しなかった場合は、ReadDiffFromParentを放棄します.serverのためaof_pipe_read_data_from_parentは初期化時に非ブロックに設定されるため、aeWait呼び出しの戻りが速い.
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;

次にpipe 2で親プロセスに伝えます(特殊記号!)キャッシュデータはもう送らないでください.
前の初期化で親プロセスがserverをリスニングしていたことを覚えています.aof_pipe_read_ack_from_childの読み取り可能な事件でしょう?「!」その後、親プロセスは、処理関数aofChildPipeReadableを呼び出す.
void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
    char byte;
    if (read(fd,&byte,1) == 1 && byte == '!') {
        serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
        server.aof_stop_sending_diff = 1;
        if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
            serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
                strerror(errno));
        }
    }
    /* Remove the handler since this can be called only one time during a
     * rewrite. */
    aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}
server.aof_stop_sending_diffが1に設定、サブプロセスにキャッシュデータを送信しないことを示し、serverを削除することがわかる.aof_pipe_read_ack_from_child上の読み取り可能なイベントは、サブプロセスに「!」を返信します.
今帰ってサブプロセスの行為を見てみましょう.
int rewriteAppendOnlyFile(char *filename) {
    ...
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 || byte != '!') 
        goto werr;
    ...
}

サブプロセスブロック5 sは、親プロセスから確認フラグ「!」が送信されるのを待っています.その後、自分の最後の仕事を始めました.以下のようにします.
int rewriteAppendOnlyFile(char *filename) {
    ...
    aofReadDiffFromParent(); /*                     */

    /*     aof_child_diff            AOF     */
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /*               temp-rewriteaof-bg-.aof */
    if (rename(tmpfile,filename) == -1) {
        unlink(tmpfile);
        return C_ERR;
    }
    ...
}

最後にpipeのデータをもう一度読み出し、サブプロセスをaofrewrite期間、aof_child_diffは、親プロセスから蓄積されたデータブラシディスクから、最後にrenameシステム呼び出しを行う.
以上の論理処理により、serverがサブプロセスに渡すaofrewrite作業が完了し、最終的にファイルtemp-rewriteaof-bg-が得られる.aof、正常に0を返します.そうしないと1を返します.

親プロセスの終了作業


サブプロセスはaofrewriteを実行した後に終了し、親プロセスwait3はサブプロセスの終了状態になった後、aofrewriteの終了作業を行う.serverCron関数には、次のような論理があります.
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
    if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { /* wait3         */
        int exitcode = WEXITSTATUS(statloc);
        int bysignal = 0;

        if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

        if (pid == -1) {
            serverLog(LL_WARNING,"wait3() returned an error: %s. "
                      "rdb_child_pid = %d, aof_child_pid = %d",
                      strerror(errno),
                      (int) server.rdb_child_pid,
                      (int) server.aof_child_pid);
        } else if (pid == server.rdb_child_pid) {
            backgroundSaveDoneHandler(exitcode,bysignal);
        } else if (pid == server.aof_child_pid) { /* aof       */
            backgroundRewriteDoneHandler(exitcode,bysignal);
        } else {
            if (!ldbRemoveChild(pid)) {
                serverLog(LL_WARNING,
                          "Warning, detected child with unmatched pid: %ld",
                          (long)pid);
            }
        }
        updateDictResizePolicy(); /*    dict resize       */
    }
    ...
}
wait3関数は、親プロセスがすべてのサブプロセスの戻り値を待つことを示し、WNOHANGオプションは、サブプロセスexitがない場合に直ちに戻ることを示します.manでは、このオプションについて「WNOHANG return immediately if no child has exited」と説明しています.
aofwriteのサブプロセスexitまで待つと、backgroundRewriteDoneHandler関数を使用して処理され、主に以下のように処理されます(コードは削除されています).
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ...
    snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int)server.aof_child_pid);
    newfd = open(tmpfile,O_WRONLY|O_APPEND);
    if (aofRewriteBufferWrite(newfd) == -1) {
        close(newfd);
        goto cleanup;
    }
    ...
}

サブプロセスで生成されたテンポラリファイルtemp-rewriteaof-bg-.aofは、aofRewriteBufferWriteを呼び出し、サーバがキャッシュした残りの新しいデータを一時ファイルに書き込むことで、AOF一時ファイルが現在のデータベースの状態と完全に一致する.
では、次の2つのことがあります.1つは臨時AOFファイルを改名すること、2つはfdを切り替えることです.
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ...
    if (server.aof_fd == -1) {
        /* AOF disabled */
        oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
    } else {
        /* AOF enabled */
        oldfd = -1; /* We'll set this to the current AOF filedes later. */
    }
    if (rename(tmpfile,server.aof_filename) == -1) {
        close(newfd);
        if (oldfd != -1) close(oldfd);
        goto cleanup;
    }

    if (server.aof_fd == -1) {
        /* AOF disabled, we don't need to set the AOF file descriptor
         * to this new file, so we can close it. */
        close(newfd);
    } else {
        /* AOF enabled, replace the old fd with the new one. */
        oldfd = server.aof_fd;
        server.aof_fd = newfd;
        if (server.aof_fsync == AOF_FSYNC_ALWAYS)
            aof_fsync(newfd);
        else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
            aof_background_fsync(newfd);
        server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
        aofUpdateCurrentSize();
        server.aof_rewrite_base_size = server.aof_current_size;

        /* Clear regular AOF buffer since its contents was just written to
         * the new AOF from the background rewrite buffer. */
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }
    ...  ...
    /* Asynchronously close the overwritten AOF. */
    if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
    ...
}

以上のように,まず一時AOFファイルを名前変更し,次いでoldfdとnewfdの処理を2つに分けた.
AOF機能がオフの場合、元のAOFファイルを開き、oldfdを取得します.ここでは、操作が成功したかどうかは気にしません.失敗した場合、oldfd値は-1close(newfd)です.
AOF機能がオンの場合、oldfdは直接-1に設定され、aof_fdはnewfdに切り替わり,異なるデータディスクポリシーに従ってAOFディスクを行い,対応するパラメータを更新する.
次に、oldfdの論理を閉じます.oldfdは古いAOFファイルへの最後の参照である可能性があるため、直接closeがserverをブロックする可能性があるため、バックグラウンドタスクを作成してファイルを閉じます.
最後に清掃作業を行い、以下のようにします.
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ...
    cleanup:
        aofClosePipes();
        aofRewriteBufferReset();
        aofRemoveTempFile(server.aof_child_pid);
        server.aof_child_pid = -1;
        server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
        server.aof_rewrite_time_start = -1;
        /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
        if (server.aof_state == AOF_WAIT_REWRITE)
            server.aof_rewrite_scheduled = 1;
    ...
}

以上、親プロセスは終了作業を完了し、コマンドを書くとwriteからnewfdになります.

シーケンス図


以上の親子プロセスのインタラクションを次のように整理することができる.
上図参照Redis・原理紹介・配管最適化aofrewrite

いつ書き直すか


AOF書き換えをトリガーできるのは2つの時点です.
【1】BGREWRITEAOFコマンドを手動で実行する.
自動実行は、serverCron関数において一定の論理に基づいて判定される.
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
          /* Trigger an AOF rewrite if needed */
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_perc &&
        server.aof_current_size > server.aof_rewrite_min_size) /*    64M */
    {
        long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1;
        long long growth = (server.aof_current_size*100/base) - 100;
        if (growth >= server.aof_rewrite_perc) {
            rewriteAppendOnlyFileBackground();
        }
     }
}

つまりAOFファイルのサイズがserverを超える.aof_rewrite_min_size、成長率はserverより大きい.aof_rewrite_perc時にトリガーされ、成長率計算の基数server.aof_rewrite_base_sizeは前回aofrewrite終了後のAOFファイルのサイズです.

ふろく


いくつかの解釈.
ブロックモードでは、プロセスまたはスレッドがこれらの関数を実行するときにイベントの発生を待たなければなりません。イベントが発生しない場合、プロセスまたはスレッドはブロックされ(ブロックされた場所で死など)、関数はすぐに戻りません。 非ブロックnon-blockモードでは、プロセスまたはスレッドがこの関数を実行する際にイベントの発生を待つ必要はありません。一旦肯定的な戻りを実行すると、戻り値の違いで関数の実行状況を反映し、イベントが発生するとブロック方式と同じであり、イベントが発生しない場合はイベントが発生していないことを知らせるコードを返し、プロセスまたはスレッドが実行を継続するため、非ブロックモードの効率が高くなります。