REDISマスタコピー
16531 ワード
REDISは現在、非同期のプライマリ・スレーブ・レプリケーション・バージョン・システムを提供しています.redisでは、この仕事を完了するためのいくつかの方法が提供されています.主従レプリケーションは主にredis/replicationに対応する.cこの書類の中にあります.ソースフレームには3つの部分があります:マスター部分/SLAVE部分/複製コア部分
実はコピーから個人的に難しいのは、再起動するたびにmaster/slaveがデータを渡すパターンです
まずslaveにとってアクティブに彼を接続するmasterです
int connectWithMaster(void) {
int fd;
fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
if (fd == -1) {
redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
return REDIS_ERR;
}
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
server.repl_state = REDIS_REPL_CONNECTING;
return REDIS_OK;
}
aecreateFileEvent(server.el,fd,AE_READADBLE|AE_WRITABLE,syncWithMaster,NULL)は、読み取り可能かつ書き込み可能なイベント注意処理イベント関数をsyncwithMasterとして登録する.
rep_state状態がREDIS_になりましたPREPL_CONNECTING.関連するサーバのreplicationは、対応する調整を行います.
syncwithMasterに入ってみましょう
if (server.repl_state == REDIS_REPL_CONNECTING) {
redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
server.repl_state = REDIS_REPL_RECEIVE_PONG;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
syncWrite(fd,"PING\r
",6,100);
return;
}
まずクライアントslaveがserver masterにPINGを送信しますこれはタイムアウトの1つの応答で、状態はREDIS_に変わりましたREPL_RECEIVE_PONG本来ならMasterが受け取ったらそれなりの動きをする.slave端子にとって次はREDIS_REPL_RECEIVE_PONGという状態になりました.実はある値を受け入れるつもりです
buf[0] = '\0';
if (syncReadLine(fd,buf,sizeof(buf),
server.repl_syncio_timeout*1000) == -1)
{
redisLog(REDIS_WARNING,
"I/O error reading PING reply from master: %s",
strerror(errno));
goto error;
}
これはPONG状態での核心的なことです.それを読んで、それに合った内容があるかどうかを判断します.
if (syncWrite(fd,"SYNC\r
",6,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
……if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
MASETERにSYNCを送信すると、readSyncBulkPayloadが読み取り可能な状態に登録されます.ちょっと待って、このイベント関数の後にしなければならないことは、対応するビットを設定することです.
server.repl_state = REDIS_REPL_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
repl_transfer_size設定-1は、masterから受信したファイルサイズが-1であることを示します.状態がREPL_になりましたTRANSFER. readSyncBulkPayloadに入って、この関数がどのように受け入れられているかを見てみましょう.
server.repl_transfer_size = strtol(buf+1,NULL,10);
まず,相手がどれだけのファイルを送信するかを決定し,bufがrdbに書かれた対応するファイルに読み込まれる.
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
nread = read(fd,buf,readlen);
...............................................................
...............................................................
...............................................................
write(server.repl_transfer_fd,buf,nread) != nread)
server.repl_transfer_read += nread;
/* Check if the transfer is now complete */
if (server.repl_transfer_read == server.repl_transfer_size) {
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
replicationAbortSyncTransfer();
return;
}
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
signalFlushedDb(-1);
emptyDb();
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
if (rdbLoad(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
replicationAbortSyncTransfer();
return;
}
2つが等しい場合readとtransfer_sizeが等しい場合は、まずrdb_に名前を置き換えます.filenameの名前を空にしてdb emptyDb()を破棄し、読み取り可能なイベントを最後にrdbLoadを呼び出してローカルでkey_を再構築します.valueデータベースのコピー.【マスターの】すべての動作の操作が完了しました.ここでの送信サイズは双方に限定されるはずです.マスターセクションから対応するイベントを見つけることができます.
プライマリ・サーバにとって、クライアントがPINGを送信した後、ホストからの返信を期待する以外に、このプライマリ・セカンダリ・レプリケーションに本当に役立つのは、サーバからの操作です.
write(fd,“SYNC\r”,buf).このアクションが発行されると、masterはSYNcommand()を呼び出して対応するコピーアクションを完了します.まずSYNcommand()関数に入って、どんな状況なのか見てみましょう.
レプリケーションを完了するには、まず適切なタイミングでmasterがbgsave操作に入ります.rdbファイルが最新のファイルであることを保証します.マスターにとってrdb_を見てみましょうpid!=-1条件が満たされている場合、この操作をしていることを示すmasterは、syncでなければbgsave操作をトリガーするのではなく、完了してから対応する動作を行う必要があります.次に、メインプロセスでは、ステータスが「WAIT_」に設定されます.BGSAVE_END.この時点でsyncCommandは完了していますが、コピー操作はまだ始まっていません.下を見てください.
bgsaveCommand操作を行うとfunction:updateSlavesWaitingBgsaveが呼び出され、同期待ち現象は発生しません.
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
対応するrepldbfdを開き、ファイルをコピーする準備ができました.
aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR)
sendBulkToSlaveを登録しました.salveに送るこの部分に注意してください.
注意点:1)送信バッファサイズをslaveと同じように設定する方法2)送信が同期化されたプロセスなのか非同期化されたプロセスなのか
REDIS_IOBUF_LEN:1024*16この変数は1回のrdb量なのかsendBulkToSlave()に入ってみましょう.
{
redisClient *slave = privdata;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
char buf[REDIS_IOBUF_LEN];
ssize_t nwritten, buflen;
if (slave->repldboff == 0) {
/* Write the bulk write count before to transfer the DB. In theory here
* we don't know how much room there is in the output buffer of the
* socket, but in practice SO_SNDLOWAT (the minimum count for output
* operations) will never be smaller than the few bytes we need. */
sds bulkcount;
bulkcount = sdscatprintf(sdsempty(),"$%lld\r
",(unsigned long long)
slave->repldbsize);
if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
{
sdsfree(bulkcount);
freeClient(slave);
return;
}
sdsfree(bulkcount);
}
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
if (buflen <= 0) {
redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);
return;
}
if ((nwritten = write(fd,buf,buflen)) == -1) {
redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s",
strerror(errno));
freeClient(slave);
return;
}
slave->repldboff += nwritten;
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
slave->replstate = REDIS_REPL_ONLINE;
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
freeClient(slave);
return;
}
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
}
}
repldoff=0が最初の初期化であることを示す場合、すなわち、送信すべき長さのデータが相手slaveに送信される.発送は初めてです.ここでwrite呼び出しが成功したら続行することに注意してください.lseek(slave->repldbfd,slave->repldboff,SEEK_SET); 毎回対応する場所にナビゲートします.これはディスクのランダムな操作を呼び出すのに非常に腹を立てています.ファイルが大きい場合、パフォーマンスに大きな影響を与える場合、比較的時間がかかります.buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); メモリに読み込んでwrite操作をします.イベントモデルを閉じていないため、EPOLLポーリングでは、このイベントを実行する必要があると考えられます.まだ準備ができているので、この関数を呼び出し続け、本質的には非同期の操作と言えるでしょう.したがって、サービスの割り込みは発生しませんが、lseekは比較的時間がかかり、コピーが完了したらfdの読み取り可能な状態を閉じ、replstate状態をREPL_とマークします.ONLINE、この状態がコマンド伝播状態です.新しい関数sendReplyToClientを登録し、もちろん前の関数イベントdelを削除します.したがって、サーバ側が与えるbufはslave側よりずっと小さい.主従レプリケーションコアはここです.