Redisソース分析のPSYNC同期
52983 ワード
Redis master-slave同期ソース分析
(1)slaveフロー分析(2)masterフロー分析
Redisが起動すると、replicationCron(redisに付属のserverCronバックグラウンドスレッドを介して)が1 sおきに呼び出されます.すなわち、この関数は、スタンドアロンでもMasterでもSlaveでも呼び出されます.まず,SlaveとしてreplicationCron関数が論理を実行する場合について議論する.slaveとして、この関数の機能は、Masterと接続を維持し、握手し、Masterストレージデータを受け入れる+Masterインクリメンタルデータを受け入れることであると推測できる.
syncWithMasterはSlaveとMasterを接続する握手関数である.syncWithMaster関数はコードを詳細に貼らず、ここで論理1、ping 2の送信、pongの読み取りはステップ3に成功したことを示し、masterがrequirepassを構成している場合、slaveはmasterauthを構成しなければならないため、auth情報を送信する必要がある.authを送信する必要がなければ第5ステップ4、authの結果を受信し、正しくは5を継続し、slaveのport情報を送信し、このportはmasterがslaveに接続する際にどのポート6を使用するか、判断5の結果7、同5を通知するために使用され、slaveのipアドレスのみが送信される.5と7のポートは、実際には現在Redisソースコードをめくっていますが、使用されていません.8、判断7の返却結果9、slaveは自分の同期能力をmasterに送信し、Masterと同期を合わせる方法で、最新バージョンはeofとpsync 2をサポートしている.psyncはpartial resynchronizationの意味10、「PSYNC psync_replid psync_offset-1」をMaster(全量化は「PSYNC?-1)11、Masterが「+FULLRESYNC psync_replid psync_offset」と返信した場合、全量コピーを表す.返信+CONTINE psync_replidは部分コピーを表します
握手が完了すると、Masterと確立されたfdのread eventはreadSyncBulkPayload関数になります.
まとめて、Redisのpsyncモード(1)slaveはmaster connectに行き、握手します.(2)slaveはsync/psyncをMasterに送信する.(3)マスターは全量か増分かを判断してslaveに返信し,全量モードではマスターが直ちにrdbファイルを送信し,増分モードではマスターが命令を送信する.(4)フルモードであれば,rdbファイルをプッシュした後,Masterはrdbファイルというウィンドウ期間内のデータ+後続のデータをプッシュし,命令的にプッシュするが,実は増分部分である.(5)増分部の送信は、call->propagate->replicationFeedSlaves->addReplyを呼び出してslaveに送信され、addReplyは通常Clientの要求を処理して応答を送信するために使用され、ここではSalveにデータを送信するために使用される.しかしaddReplyはRedis自身のバッファに書き込み、メインサイクルの次のサイクルを待つだけで、beforeSleep->handleClientsWithPendingWritesがデータを送信します.
Salveフローを見ると、マスターのフローがほぼ推測できます.
1、Masterはpsync命令を処理し、全量同期かインクリメンタル同期かを判断する2、m a s t e r TryPartialResynchronization->addReplyReplicationBacklog
実際,Masterが開かないAOFとRDBは同期に影響を及ぼさない.
Masterのbacklogロジックは、Masterの任意の書き込み操作に対して
Redisのbacklogはリング状のデータなのでオーバーフローは存在しませんが、もし1つの断網期間中にMasterが書き込んだデータが、実質的に1周を超えると、救いようがありません.1周を書くと、必然的に古いデータの一部をカバーし、データを失うことに相当するので、増分同期ができないのが見え、全量が必要です.
いくつかoffsetで意味を説明します(1)repl_backlog_idxは、次回書き込まれるデータの先頭アドレスなので、次のコードで見ることができます.repl_backlog_idxがいっぱいになったら、0にして、0から書きます.(2)repl_backlog_size,bufferの全長,プロファイル構成,repl_backlogはこの大きさで申請したbuffer(3)master_repl_offset,0から加算される値(4)repl_backlog_histlen値も0から始まる値で、最大repl_backlog_size
いずれにしても、文字の角度やソースの角度からこれらの値を理解しようとしないでください.複雑ですが、核心的な目的は簡単です.これらの値は、Slave psyncの時に送られてきたoffsetからの命令で、Master自身が保存しているかどうかをMaster自身に知らせる必要があります.
(1)slaveフロー分析(2)masterフロー分析
Slave分析
Redisが起動すると、replicationCron(redisに付属のserverCronバックグラウンドスレッドを介して)が1 sおきに呼び出されます.すなわち、この関数は、スタンドアロンでもMasterでもSlaveでも呼び出されます.まず,SlaveとしてreplicationCron関数が論理を実行する場合について議論する.slaveとして、この関数の機能は、Masterと接続を維持し、握手し、Masterストレージデータを受け入れる+Masterインクリメンタルデータを受け入れることであると推測できる.
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
static long long replication_cron_loops = 0;
/* , */
/* Non blocking connection timeout? */
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
cancelReplicationHandshake();
}
......
/* slave,connectWithMaster Master socket, syncWithMaster, REPL_STATE_CONNECTING */
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER REPLICA sync started");
}
}
/* Send ACK to master from time to time.
* Note that we do not send periodic acks to masters that don't
* support PSYNC and replication offsets. */
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
replicationSendAck();
....
}
syncWithMasterはSlaveとMasterを接続する握手関数である.syncWithMaster関数はコードを詳細に貼らず、ここで論理1、ping 2の送信、pongの読み取りはステップ3に成功したことを示し、masterがrequirepassを構成している場合、slaveはmasterauthを構成しなければならないため、auth情報を送信する必要がある.authを送信する必要がなければ第5ステップ4、authの結果を受信し、正しくは5を継続し、slaveのport情報を送信し、このportはmasterがslaveに接続する際にどのポート6を使用するか、判断5の結果7、同5を通知するために使用され、slaveのipアドレスのみが送信される.5と7のポートは、実際には現在Redisソースコードをめくっていますが、使用されていません.8、判断7の返却結果9、slaveは自分の同期能力をmasterに送信し、Masterと同期を合わせる方法で、最新バージョンはeofとpsync 2をサポートしている.psyncはpartial resynchronizationの意味10、「PSYNC psync_replid psync_offset-1」をMaster(全量化は「PSYNC?-1)11、Masterが「+FULLRESYNC psync_replid psync_offset」と返信した場合、全量コピーを表す.返信+CONTINE psync_replidは部分コピーを表します
握手が完了すると、Masterと確立されたfdのread eventはreadSyncBulkPayload関数になります.
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
......
/* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
}
readSyncBulkPayload
はデータの読み取りを担当し、replicationCreateMasterClient
を呼び出し、現在masterに接続されているfdのcallbackをreadQueryFromClient
に変更すると、この接続の後にMasterからの命令が受け入れられる.void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen, nwritten;
off_t left;
UNUSED(el);
UNUSED(privdata);
UNUSED(mask);
/* Static vars used to hold the EOF mark, and the last bytes received
* form the server: when they match, we reached the end of the transfer. */
static char eofmark[CONFIG_RUN_ID_SIZE];
static char lastbytes[CONFIG_RUN_ID_SIZE];
static int usemark = 0;
/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
/* repl_transfer_size -1
* Master rdb ,
* 2 , $len。 $EOF:<40 bytes delimiter>, , slave , 40
*Master rdb 40 。
*
*/
if (server.repl_transfer_size == -1) {
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
}
if (buf[0] == '-') {
serverLog(LL_WARNING,
"MASTER aborted replication with an error: %s",
buf+1);
goto error;
} else if (buf[0] == '\0') {
/* At this stage just a newline works as a PING in order to take
* the connection live. So we refresh our last interaction
* timestamp. */
server.repl_transfer_lastio = server.unixtime;
return;
} else if (buf[0] != '$') {
serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}
/* There are two possible forms for the bulk payload. One is the
* usual $ bulk format. The other is used for diskless transfers
* when the master does not know beforehand the size of the file to
* transfer. In the latter case, the following format is used:
*
* $EOF:<40 bytes delimiter>
*
* At the end of the file the announced delimiter is transmitted. The
* delimiter is long and random enough that the probability of a
* collision with the actual file content can be ignored. */
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
usemark = 1;
memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
/* Set any repl_transfer_size to avoid entering this code path
* at the next call. */
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
"MASTER REPLICA sync: receiving streamed RDB from master");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
"MASTER REPLICA sync: receiving %lld bytes from master",
(long long) server.repl_transfer_size);
}
return;
}
/*
* usemark: EOF
*repl_transfer_size:
*repl_transfer_read:
*/
/* Read bulk data */
if (usemark) {
readlen = sizeof(buf);
} else {
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}
nread = read(fd,buf,readlen);
if (nread <= 0) {
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake();
return;
}
server.stat_net_input_bytes += nread;
/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */
int eof_reached = 0;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= CONFIG_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
} else {
int rem = CONFIG_RUN_ID_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
}
server.repl_transfer_lastio = server.unixtime;
if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER REPLICA synchronization: %s",
(nwritten == -1) ? strerror(errno) : "short write");
goto error;
}
server.repl_transfer_read += nread;
/* Delete the last 40 bytes from the file if we reached EOF. */
if (usemark && eof_reached) {
if (ftruncate(server.repl_transfer_fd,
server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
{
serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
goto error;
}
}
/* Sync data on disk from time to time, otherwise at the end of the transfer
* we may suffer a big delay as the memory buffers are copied into the
* actual disk. */
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read -
server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off, sync_size);
server.repl_transfer_last_fsync_off += sync_size;
}
/* Check if the transfer is now complete */
if (!usemark) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}
/* , rdb*/
if (eof_reached) {
int aof_is_enabled = server.aof_state != AOF_OFF;
/* Ensure background save doesn't overwrite synced data */
if (server.rdb_child_pid != -1) {
serverLog(LL_NOTICE,
"Replica is about to load the RDB file received from the "
"master, but there is a pending RDB child running. "
"Killing process %ld and removing its temp file to avoid "
"any race",
(long) server.rdb_child_pid);
kill(server.rdb_child_pid,SIGUSR1);
rdbRemoveTempFile(server.rdb_child_pid);
}
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER REPLICA synchronization: %s", strerror(errno));
cancelReplicationHandshake();
return;
}
serverLog(LL_NOTICE, "MASTER REPLICA sync: Flushing old data");
/* We need to stop any AOFRW fork before flusing and parsing
* RDB, otherwise we'll create a copy-on-write disaster. */
if(aof_is_enabled) stopAppendOnly();
signalFlushedDb(-1);
emptyDb(
-1,
server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake();
/* Re-enable the AOF if we disabled it earlier, in order to restore
* the original configuration. */
if (aof_is_enabled) restartAOFAfterSYNC();
return;
}
/* Final setup of the connected slave
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();
/* Let's create the replication backlog if needed. Slaves need to
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
serverLog(LL_NOTICE, "MASTER REPLICA sync: Finished with success");
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
if (aof_is_enabled) restartAOFAfterSYNC();
}
return;
error:
cancelReplicationHandshake();
return;
}
まとめて、Redisのpsyncモード(1)slaveはmaster connectに行き、握手します.(2)slaveはsync/psyncをMasterに送信する.(3)マスターは全量か増分かを判断してslaveに返信し,全量モードではマスターが直ちにrdbファイルを送信し,増分モードではマスターが命令を送信する.(4)フルモードであれば,rdbファイルをプッシュした後,Masterはrdbファイルというウィンドウ期間内のデータ+後続のデータをプッシュし,命令的にプッシュするが,実は増分部分である.(5)増分部の送信は、call->propagate->replicationFeedSlaves->addReplyを呼び出してslaveに送信され、addReplyは通常Clientの要求を処理して応答を送信するために使用され、ここではSalveにデータを送信するために使用される.しかしaddReplyはRedis自身のバッファに書き込み、メインサイクルの次のサイクルを待つだけで、beforeSleep->handleClientsWithPendingWritesがデータを送信します.
Masterプロセス分析
Salveフローを見ると、マスターのフローがほぼ推測できます.
1、Masterはpsync命令を処理し、全量同期かインクリメンタル同期かを判断する2、m a s t e r TryPartialResynchronization->addReplyReplicationBacklog
実際,Masterが開かないAOFとRDBは同期に影響を及ぼさない.
Masterのbacklogロジックは、Masterの任意の書き込み操作に対して
feedReplicationBacklog
をトリガーし、データを自分のバッファに書き込み、オフセット量を記録することを目的としています.バッファ作用(1)全量同期rdbの場合は比較的時間がかかり、RDBファイル処理が完了する前に、この部分のデータを保持する必要があり、インクリメンタルデータ転送(2)SlaveとMaster全量同期後にインクリメンタル同期を行い、このときネットワークが切断された場合、Slave再接続後、無条件全量同期は絶対に受け入れられず、Masterは部分データを保存する必要があり、この部分のデータは、ネットワーク切断中にSalveが受信していないデータが適切に含まれている場合、インクリメンタル同期を実行します.Redisのbacklogはリング状のデータなのでオーバーフローは存在しませんが、もし1つの断網期間中にMasterが書き込んだデータが、実質的に1周を超えると、救いようがありません.1周を書くと、必然的に古いデータの一部をカバーし、データを失うことに相当するので、増分同期ができないのが見え、全量が必要です.
いくつかoffsetで意味を説明します(1)repl_backlog_idxは、次回書き込まれるデータの先頭アドレスなので、次のコードで見ることができます.repl_backlog_idxがいっぱいになったら、0にして、0から書きます.(2)repl_backlog_size,bufferの全長,プロファイル構成,repl_backlogはこの大きさで申請したbuffer(3)master_repl_offset,0から加算される値(4)repl_backlog_histlen値も0から始まる値で、最大repl_backlog_size
いずれにしても、文字の角度やソースの角度からこれらの値を理解しようとしないでください.複雑ですが、核心的な目的は簡単です.これらの値は、Slave psyncの時に送られてきたoffsetからの命令で、Master自身が保存しているかどうかをMaster自身に知らせる必要があります.
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;
printf("p %s
",p);
server.master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every
* iteration and rewind the "idx" index if we reach the limit. */
while(len) {
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
if (thislen > len) thislen = len;
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
server.repl_backlog_idx += thislen;
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
len -= thislen;
p += thislen;
server.repl_backlog_histlen += thislen;
}
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;
/* Set the offset of the first byte we have in the backlog. */
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
}