redisソースシリーズ(17):分身術---replication
203163 ワード
単一のサービスは、本番環境では絶対に受け入れられませんが、データベース・サービスは、マルチノードまたは分散型の導入を実現するためにstatelessサービスよりも多くの問題に直面しています.データの同期方式、一貫性、可用性の妥協には多くの制限があり、考慮しなければならない.
今日はredisの主従同期に関する内容を学びます.このセクションの内容はredisが高可用性、データセキュリティ、データパーティションを実現する礎です.ノード間で信頼できるデータ同期方法がなければ,上記のすべてが空中屋根裏になる.
プライマリノードは任意の時点で1つしかなく、スレーブノードはいくつかあってもよい.プライマリ・スレーブ・ノードはリンクを保持する必要があり、プライマリ・ノードは非同期でセカンダリ・ノードにデータを同期します.
プライマリ・スレーブ・同期とは、プライマリ・ノードのデータをスレーブ・ノードに同期することです.同期の一般的な流れは次のとおりです.マスタノードは、同期要求を受けると、スレーブノードと全量同期してBGSAVEを起動する(既に利用可能なBGSAVEが実行中である場合は起動不要) .プライマリノードは、後続のデータの変化をもたらすコマンド(またはデータの期限切れ)をスレーブノードに送信し、データ を同期し続ける必要がある.
プライマリノードは起動時に、ノードからの情報を知らない.syncまたはpsyncコマンドを受信すると、スレーブノードと同期します.
書き込みコマンドが伝播する必要がある場合、プライマリ・ノードは、伝播する必要があるデータをスレーブ・ノードに送信する必要があります.
Partial Resyncは、同期済みのプライマリ・スレーブ・ノードの最適化を回避し、リンクが一時的に切断された後に再同期する際のオーバーヘッドと効率を回避するためです.ノードからプライマリノードに同期した後、ネットワーク上の理由でプライマリノードとのリンクが一時的に切断され、ノードからプライマリノードの増分更新データが失われると仮定します.プライマリ・スレーブが再リンクされた後、プライマリ・ノードにノードから失われた情報(backlog)が保存されていることが判明した場合、今回の同期では、説明した全量同期のステップをスキップすることができ、同期効率を大幅に向上させることができます.
PSYNCは、いくつかの古いバージョンのredisではサポートされていないことを指摘する必要があります.筆者が見たソース3.0は、現在のredis 5と対照的である.xのコードは、実際には関連部分の変更が多く、redis開発チームはPSYNCに対してかなりの最適化を行った.しかし、基本的な考え方の理解の重要性は、最適化よりもはるかに高いので、筆者は3.0バージョンのコードに基づいて説明します.
プライマリノードとは異なり、スレーブノードはサービス側として、プライマリスレーブ同期中に、クライアントとしてプライマリノードと通信します.プロファイルに
主従同期に関する論理は主にreplicationCronに集中し、この関数は1 sごとに呼び出される.
プライマリノードコードを解析すると,全量同期がプライマリノードがrdbファイルを送信する過程であることが分かった.一方、スレーブノードは、全量の同期が必要になった後、受信するrdbファイルを処理するためにコールバック関数を登録します.
インクリメンタル同期とは、主ノードから伝播される書き込み命令をノードから絶えず受信するプロセスである.
プライマリノードと同期すると、ノードからpartial resyncを優先的に試行します.
クライアントを解放しようとすると、このクライアントがプライマリノードであるかどうかをチェックし、そうであれば、後続のPSYNCにキャッシュされます.
データの同期は一方向であり、プライマリノードからセカンダリノードにのみ発生し、ノードからの書き込みは最終的に失われることに注意してください.redisは、ノードから書き込み要求を処理することを禁止しない.
最新のredisコードでは、プライマリ・スレーブ同期に対して多くの最適化が行われています.たとえば、次のようになります.プライマリノードがスレーブノードから昇格する場合、以前のプライマリノードの情報が保存され、スレーブノードが以前のプライマリノードのreplication idを使用してPSYNCを行う場合、partial resyncは を行うことができる.全量同期時、プライマリノードがrdbドロップをスレーブノードに再送信する必要はなく、後続のredisはdiskless replication をサポートする.
プライマリスレーブ同期時にデータベース・サービスの分散配置が直面しなければならない問題です.redisは非同期同期同期同期を使用して最終的な一貫性を達成しますが、これにより、データが失われたタイム・ウィンドウ(単一のポイントでもこの問題があります) が永遠に存在します. redis PSYNC機構を用いてresyncのコストを低減する .スレーブノード自体も独自のスレーブノードを持つことができるが、使用シーンは 少ない.スレーブノードを追加することで、read-onlyスレーブノードを追加したり、スレーブノードにリードリクエストを配布したりするなど、redisのサービス能力を拡張することができ、プライマリノードの圧力を軽減することができます.しかし、データ同期は一方向であり、ノードからの書き込みは、最終的には を失う.
今日はredisの主従同期に関する内容を学びます.このセクションの内容はredisが高可用性、データセキュリティ、データパーティションを実現する礎です.ノード間で信頼できるデータ同期方法がなければ,上記のすべてが空中屋根裏になる.
プライマリノードは任意の時点で1つしかなく、スレーブノードはいくつかあってもよい.プライマリ・スレーブ・ノードはリンクを保持する必要があり、プライマリ・ノードは非同期でセカンダリ・ノードにデータを同期します.
Master
プライマリ・スレーブ・同期とは、プライマリ・ノードのデータをスレーブ・ノードに同期することです.同期の一般的な流れは次のとおりです.
ぜんりょうどうき
プライマリノードは起動時に、ノードからの情報を知らない.syncまたはpsyncコマンドを受信すると、スレーブノードと同期します.
void syncCommand(redisClient *c) {
// SLAVE , MONITOR ,
if (c->flags & REDIS_SLAVE) return;
// , , SYNC
// redis ,
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
addReplyError(c,"Can't SYNC while not connected with my master");
return;
}
// sync psync 。
// ( BGSAVE )
if (listLength(c->reply) != 0 || c->bufpos != 0) {
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
return;
}
redisLog(REDIS_NOTICE,"Slave asks for synchronization");
/*
psync:
psync partial sync, 。
, , ,
。
*/
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// PSYNC
if (masterTryPartialResynchronization(c) == REDIS_OK) {
// PSYNC
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
// PSYNC
char *master_runid = c->argv[1]->ptr;
// replication id ? ,
if (master_runid[0] != '?') server.stat_sync_partial_err++;
}
} else {
// , , REPLCONF ACK
c->flags |= REDIS_PRE_PSYNC;
}
// full resynchronization ,
server.stat_sync_full++;
// BGSAVE
if (server.rdb_child_pid != -1) {
// BGSAVE, BGSAVE
// ? BGSAVE , ,
// BGSAVE , rdb
redisClient *slave;
listNode *ln;
listIter li;
// REDIS_REPL_WAIT_BGSAVE_END , BGSAVE
// , BGSAVE
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
}
if (ln) {
// , BGSAVE RDB, BGSAVE
// ( ,
// SYNC
copyClientOutputBuffer(c,slave);
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
// BGSAVE , , BGSAVE
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
} else {
// BGSAVE , BGSAVE
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
}
//
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
//
replicationScriptCacheFlush();
}
// Nagle , ,
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= REDIS_SLAVE;
// SELECT
server.slaveseldb = -1;
// slave
listAddNodeTail(server.slaves,c);
// slave , backlog, backlog,
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
createReplicationBacklog();
return;
}
/*
BGSAVE ,
*/
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
// BGSAVE
redisLog(REDIS_NOTICE,
"Background saving terminated with success");
// dirty BGSAVE
server.dirty = server.dirty - server.dirty_before_bgsave;
server.lastsave = time(NULL);
server.lastbgsave_status = REDIS_OK;
} else if (!bysignal && exitcode != 0) {
// BGSAVE
redisLog(REDIS_WARNING, "Background saving error");
server.lastbgsave_status = REDIS_ERR;
} else {
// BGSAVE
redisLog(REDIS_WARNING,
"Background saving terminated by signal %d", bysignal);
//
rdbRemoveTempFile(server.rdb_child_pid);
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* tirggering an error conditon. */
if (bysignal != SIGUSR1)
server.lastbgsave_status = REDIS_ERR;
}
//
server.rdb_child_pid = -1;
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
server.rdb_save_time_start = -1;
// BGSAVE
updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}
void updateSlavesWaitingBgsave(int bgsaveerr) {
listNode *ln;
int startbgsave = 0;
listIter li;
// slave
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
// RDB slave , BGSAVE
startbgsave = 1;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
// , slave BGSAVE
struct redis_stat buf;
if (bgsaveerr != REDIS_OK) {
// BGSAVE slave
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
// RDB
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
// , , rdb
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = REDIS_REPL_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r
",
(unsigned long long) slave->repldbsize);
// ,
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
// BGSAVE
if (startbgsave) {
// BGSAVE ,
replicationScriptCacheFlush();
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
// BGSAVE ,
listIter li;
listRewind(server.slaves,&li);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
freeClient(slave);
}
}
}
}
// rdb
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *slave = privdata;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
char buf[REDIS_IOBUF_LEN];
ssize_t nwritten, buflen;
// rdb
if (slave->replpreamble) {
nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
if (nwritten == -1) {
redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",
strerror(errno));
freeClient(slave);
return;
}
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
slave->replpreamble = NULL;
/* fall through sending data. */
} else {
return;
}
}
// rdb
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
// RDB
buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
if (buflen <= 0) {
redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);
return;
}
// rdb
if ((nwritten = write(fd,buf,buflen)) == -1) {
if (errno != EAGAIN) {
redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",
strerror(errno));
freeClient(slave);
}
return;
}
// offset
slave->repldboff += nwritten;
//
if (slave->repldboff == slave->repldbsize) {
// RDB
close(slave->repldbfd);
slave->repldbfd = -1;
//
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
// rdb , online
slave->replstate = REDIS_REPL_ONLINE;
//
slave->repl_ack_time = server.unixtime;
// BGSAVE
// , ,
//
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
freeClient(slave);
return;
}
// (slave->repl_ack_time )
refreshGoodSlavesCount();
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
}
}
インクリメンタル同期
書き込みコマンドが伝播する必要がある場合、プライマリ・ノードは、伝播する必要があるデータをスレーブ・ノードに送信する必要があります.
/*
backlog , BGSAVE 。
backlog, 。 , PSYNC
offset 。
redis :
, , replication id
offset, 。
replication id ,offset
*/
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;
// server.master_repl_offset ,
server.master_repl_offset += len;
// buffer , , idx
// , ,
//
while(len) {
// idx backlog
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
// idx backlog
// len
// len , while
if (thislen > len) thislen = len;
// p thislen backlog
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
// idx ,
server.repl_backlog_idx += thislen;
// ,
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
//
len -= thislen;
// ,
p += thislen;
//
server.repl_backlog_histlen += thislen;
}
// server.repl_backlog_histlen , backlog
// ,
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;
// server.repl_backlog_off backlog , PSYNC
// offset [server.repl_backlog_off,server.master_repl_offset)
// partial resync
server.repl_backlog_off = server.master_repl_offset) -
server.repl_backlog_histlen + 1;
}
/*
,
*/
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[REDIS_LONGSTR_SIZE];
// backlog , ,
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
// , backlog
redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
// , SELECT ,
if (server.slaveseldb != dictid) {
robj *selectcmd;
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
} else {
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),
"*2\r
$6\r
SELECT\r
$%d\r
%s\r
",
dictid_len, llstr));
}
// SELECT backlog
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
//
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
addReply(slave,selectcmd);
}
if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
// backlog
if (server.repl_backlog) {
//
char aux[REDIS_LONGSTR_SIZE+3];
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '
';
feedReplicationBacklog(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
//
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '
';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}
//
listRewind(slaves,&li);
while((ln = listNext(&li))) {
//
redisClient *slave = ln->value;
// BGSAVE ,
// BGSAVE
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
// , rdb
addReplyMultiBulkLen(slave,argc);
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}
Partial Resync
Partial Resyncは、同期済みのプライマリ・スレーブ・ノードの最適化を回避し、リンクが一時的に切断された後に再同期する際のオーバーヘッドと効率を回避するためです.ノードからプライマリノードに同期した後、ネットワーク上の理由でプライマリノードとのリンクが一時的に切断され、ノードからプライマリノードの増分更新データが失われると仮定します.プライマリ・スレーブが再リンクされた後、プライマリ・ノードにノードから失われた情報(backlog)が保存されていることが判明した場合、今回の同期では、説明した全量同期のステップをスキップすることができ、同期効率を大幅に向上させることができます.
PSYNCは、いくつかの古いバージョンのredisではサポートされていないことを指摘する必要があります.筆者が見たソース3.0は、現在のredis 5と対照的である.xのコードは、実際には関連部分の変更が多く、redis開発チームはPSYNCに対してかなりの最適化を行った.しかし、基本的な考え方の理解の重要性は、最適化よりもはるかに高いので、筆者は3.0バージョンのコードに基づいて説明します.
/*
PSYNC , parital resync,
+ 。PSYNC :
PSYNC replication_id offset
replication_id , offset
*/
int masterTryPartialResynchronization(redisClient *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr;
char buf[128];
int buflen;
// master id runid , PSYNC
if (strcasecmp(master_runid, server.runid)) {
if (master_runid[0] != '?') {
// replication_id ,
// PSYNC replication_id
redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for runid '%s', my runid is '%s')",
master_runid, server.runid);
} else {
// replication_id ?
redisLog(REDIS_NOTICE,"Full resync requested by slave.");
}
// full resync
goto need_full_resync;
}
// psync_offset , offset
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
REDIS_OK) goto need_full_resync;
/*
backlog offset , offset
backlog offset , partial resync
*/
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
// FULL RESYNC
redisLog(REDIS_NOTICE,
"Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
if (psync_offset > server.master_repl_offset) {
redisLog(REDIS_WARNING,
"Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
}
goto need_full_resync;
}
// partial resync , ONLINE ,
c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE;
c->repl_ack_time = server.unixtime;
listAddNodeTail(server.slaves,c);
// +CONTINUE , PSYNC
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r
");
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
// backlog ( )
psync_len = addReplyReplicationBacklog(c,psync_offset);
redisLog(REDIS_NOTICE,
"Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);
//
refreshGoodSlavesCount();
return REDIS_OK; /* The caller can return, no full resync needed. */
need_full_resync:
// psync offset
psync_offset = server.master_repl_offset;
// repl_backlog,offset 1
if (server.repl_backlog == NULL) psync_offset++;
// +FULLRESYNC ,
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r
",
server.runid,psync_offset);
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
return REDIS_ERR;
}
// partial resync,
long long addReplyReplicationBacklog(redisClient *c, long long offset) {
long long j, skip, len;
redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset);
if (server.repl_backlog_histlen == 0) {
redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero");
return 0;
}
redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld",
server.repl_backlog_size);
redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld",
server.repl_backlog_off);
redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld",
server.repl_backlog_histlen);
redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld",
server.repl_backlog_idx);
// server.repl_backlog_off backlog olddest offset
// offset - server.repl_backlog_off
skip = offset - server.repl_backlog_off;
redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip);
// j offset backlog
j = (server.repl_backlog_idx +
(server.repl_backlog_size-server.repl_backlog_histlen)) %
server.repl_backlog_size;
redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j);
j = (j + skip) % server.repl_backlog_size;
//
len = server.repl_backlog_histlen - skip;
redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len);
while(len) {
long long thislen =
((server.repl_backlog_size - j) < len) ?
(server.repl_backlog_size - j) : len;
redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
len -= thislen;
j = 0;
}
return server.repl_backlog_histlen - skip;
}
Slave
プライマリノードとは異なり、スレーブノードはサービス側として、プライマリスレーブ同期中に、クライアントとしてプライマリノードと通信します.プロファイルに
salveof HOST PORT
命令を追加することで、ノードが起動すると指定されたプライマリノードのスレーブノードになります. // , server.masterhost
} else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
slaveof_linenum = linenum;
server.masterhost = sdsnew(argv[1]);
server.masterport = atoi(argv[2]);
server.repl_state = REDIS_REPL_CONNECT;
}
メインノードのリンク
主従同期に関する論理は主にreplicationCronに集中し、この関数は1 sごとに呼び出される.
/*
, TCP , 。
*/
void undoConnectWithMaster(void) {
int fd = server.repl_transfer_s;
//
redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
server.repl_state == REDIS_REPL_RECEIVE_PONG);
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
close(fd);
server.repl_transfer_s = -1;
// CONNECT
server.repl_state = REDIS_REPL_CONNECT;
}
/*repl_state
REDIS_REPL_CONNECT -> REDIS_REPL_CONNECTING -> REDIS_REPL_RECEIVE_PONG
|
REDIS_REPL_CONNECTED
void replicationCron(void) {
// ,
if (server.masterhost &&
(server.repl_state == REDIS_REPL_CONNECTING ||
server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
//
undoConnectWithMaster();
}
// RDB
if (server.masterhost && server.repl_state == &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
// ,
replicationAbortSyncTransfer();
}
// , ( , PSYNC)
if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");
//
freeClient(server.master);
}
//
if (server.repl_state == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
//
if (connectWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER SLAVE sync started");
}
}
// , PSYNC, ACK
if (server.masterhost && server.master &&
!(server.master->flags & REDIS_PRE_PSYNC))
replicationSendAck();
// , PING ,
if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {
listIter li;
listNode *ln;
robj *ping_argv[1];
// PING
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
// REDIS_REPL_WAIT_BGSAVE_START REDIS_REPL_WAIT_BGSAVE_END
// , PING ,
, ,
//
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
if (write(slave->fd, "
", 1) == -1) {
/* Don't worry, it's just a ping. */
}
}
}
}
//
if (listLength(server.slaves)) {
listIter li;
listNode *ln;
//
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
// ONLINE
if (slave->replstate != REDIS_REPL_ONLINE) continue;
//
if (slave->flags & REDIS_PRE_PSYNC) continue;
//
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
{
char ip[REDIS_IP_STR_LEN];
int port;
if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) {
redisLog(REDIS_WARNING,
"Disconnecting timedout slave: %s:%d",
ip, slave->slave_listening_port);
}
//
freeClient(slave);
}
}
}
// , backlog ( )
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
server.repl_backlog)
{
time_t idle = server.unixtime - server.repl_no_slaves_since;
if (idle > server.repl_backlog_time_limit) {
//
freeReplicationBacklog();
redisLog(REDIS_NOTICE,
"Replication backlog freed after %d seconds "
"without connected slaves.",
(int) server.repl_backlog_time_limit);
}
}
// ,AOF , script
if (listLength(server.slaves) == 0 &&
server.aof_state == REDIS_AOF_OFF &&
listLength(server.repl_scriptcache_fifo) != 0)
{
replicationScriptCacheFlush();
}
//
refreshGoodSlavesCount();
}
int connectWithMaster(void) {
int fd;
// , connect , connect
// TCP
fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
if (fd == -1) {
redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
// fd , , 3 ,fd
// syncWithMaster
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
return REDIS_ERR;
}
//
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
// REDIS_REPL_CONNECTING
server.repl_state = REDIS_REPL_CONNECTING;
return REDIS_OK;
}
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
// , , server.repl_state REDIS_REPL_NONE
// , ,
if (server.repl_state == REDIS_REPL_NONE) {
close(fd);
return;
}
// , connect
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
if (sockerr) {
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
strerror(sockerr));
goto error;
}
// REDIS_REPL_CONNECTING, ( partial resync)
// , PING, PING ,
if (server.repl_state == REDIS_REPL_CONNECTING) {
redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
// , , PONG
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
//
server.repl_state = REDIS_REPL_RECEIVE_PONG;
// PING, poll , ,
syncWrite(fd,"PING\r
",6,100);
// ,
return;
}
// PING
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
char buf[1024];
// ,
aeDeleteFileEvent(server.el,fd,AE_READABLE);
//
buf[0] = '\0';
if (syncReadLine(fd,buf,sizeof(buf),
server.repl_syncio_timeout*1000) == -1)
{
redisLog(REDIS_WARNING,
"I/O error reading PING reply from master: %s",
strerror(errno));
goto error;
}
/* 3 :
1. +PONG
2. —NOAUTH
3. -ERR operation not permitted
,
*/
if (buf[0] != '+' &&
strncmp(buf,"-NOAUTH",7) != 0 &&
strncmp(buf,"-ERR operation not permitted",28) != 0)
{
//
redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
goto error;
} else {
redisLog(REDIS_NOTICE,
"Master replied to PING, replication can continue...");
}
}
// , AUTH ,
if(server.masterauth) {
err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
if (err[0] == '-') {
// AUTH
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
}
// redis ,
{
sds port = sdsfromlonglong(server.port);
err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
NULL);
sdsfree(port);
// redis , ,
if (err[0] == '-') {
redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
}
sdsfree(err);
}
// PSYNC
psync_result = slaveTryPartialResynchronization(fd);
// partial resync, ( )
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, "MASTER SLAVE sync: Master accepted a Partial Resynchronization.");
return;
}
// PSYNC, SYNC
if (psync_result == PSYNC_NOT_SUPPORTED) {
redisLog(REDIS_NOTICE,"Retrying with SYNC...");
if (syncWrite(fd,"SYNC\r
",6,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
// PSYNC partial resync, ,
// rdb , rdb
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER SLAVE synchronization: %s",strerror(errno));
goto error;
}
// , RDB
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
redisLog(REDIS_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
//
server.repl_state = REDIS_REPL_TRANSFER;
//
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
close(fd);
server.repl_transfer_s = -1;
server.repl_state = REDIS_REPL_CONNECT;
return;
}
ぜんりょうどうき
プライマリノードコードを解析すると,全量同期がプライマリノードがrdbファイルを送信する過程であることが分かった.一方、スレーブノードは、全量の同期が必要になった後、受信するrdbファイルを処理するためにコールバック関数を登録します.
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen;
off_t left;
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
// rdb , server.repl_transfer_size == -1
// rdb
if (server.repl_transfer_size == -1) {
//
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
}
// ?
if (buf[0] == '-') {
redisLog(REDIS_WARNING,
"MASTER aborted replication with an error: %s",
buf+1);
goto error;
} else if (buf[0] == '\0') {
// replicationCron BGSAVE_START BGSAVE_END
server.repl_transfer_lastio = server.unixtime;
return;
} else if (buf[0] != '$') {
// ,
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}
// RDB
server.repl_transfer_size = strtol(buf+1,NULL,10);
redisLog(REDIS_NOTICE,
"MASTER SLAVE sync: receiving %lld bytes from master",
(long long) server.repl_transfer_size);
// ,
return;
}
// rdb ( , EAGAIN )
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
nread = read(fd,buf,readlen);
if (nread <= 0) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
replicationAbortSyncTransfer();
return;
}
// rdb
server.repl_transfer_lastio = server.unixtime;
if (write(server.repl_transfer_fd,buf,nread) != nread) {
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER SLAVE synchronization: %s", strerror(errno));
goto error;
}
//
server.repl_transfer_read += nread;
// fsync , buffer , IO
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read -
server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off, sync_size);
server.repl_transfer_last_fsync_off += sync_size;
}
// RDB
if (server.repl_transfer_read == server.repl_transfer_size) {
// , dump.rdb
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER SLAVE synchronization: %s", strerror(errno));
replicationAbortSyncTransfer();
return;
}
//
redisLog(REDIS_NOTICE, "MASTER SLAVE sync: Flushing old data");
signalFlushedDb(-1);
emptyDb(replicationEmptyDbCallback);
// , rdbLoad() rdbLoadProgressCallback
// eventLoop , ,
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
// RDB
if (rdbLoad(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
replicationAbortSyncTransfer();
return;
}
//
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
// ,createClient
server.master = createClient(server.repl_transfer_s);
//
server.master->flags |= REDIS_MASTER;
//
server.master->authenticated = 1;
//
server.repl_state = REDIS_REPL_CONNECTED;
//
server.master->reploff = server.repl_master_initial_offset;
// RUN ID
memcpy(server.master->replrunid, server.repl_master_runid,
sizeof(server.repl_master_runid));
// offset -1 , 2.8
// PSYNC ,
if (server.master->reploff == -1)
server.master->flags |= REDIS_PRE_PSYNC;
redisLog(REDIS_NOTICE, "MASTER SLAVE sync: Finished with success");
// AOF, AOF
if (server.aof_state != REDIS_AOF_OFF) {
int retry = 10;
//
stopAppendOnly();
//
while (retry-- && startAppendOnly() == REDIS_ERR) {
redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
sleep(1);
}
if (!retry) {
redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
exit(1);
}
}
}
return;
error:
replicationAbortSyncTransfer();
return;
}
インクリメンタル同期
インクリメンタル同期とは、主ノードから伝播される書き込み命令をノードから絶えず受信するプロセスである.
Partial Resync
プライマリノードと同期すると、ノードからpartial resyncを優先的に試行します.
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
char *psync_runid;
char psync_offset[32];
sds reply;
// PSYNC, FULLRESYNC partial resync
// repl_master_initial_offset -1. PSYNC
// repl_master_initial_offset 1
server.repl_master_initial_offset = -1;
if (server.cached_master) {
/*
,resync ,
replication id offset PSYNC
*/
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
} else {
// ,replication id ?
redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_runid = "?";
memcpy(psync_offset,"-1",3);
}
// PSYNC ,
reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
if (!strncmp(reply,"+FULLRESYNC",11)) {
// PSYNC , partial resync,
char *runid = NULL, *offset = NULL;
// run id
runid = strchr(reply,' ');
if (runid) {
runid++;
offset = strchr(runid,' ');
if (offset) offset++;
}
// run id
if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
redisLog(REDIS_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
// PSYNC , run id
// run id 0 , PSYNC
memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
} else {
// run id
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
// initial offset
server.repl_master_initial_offset = strtoll(offset,NULL,10);
// , FULL resync
redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
server.repl_master_runid,
server.repl_master_initial_offset);
}
// , master ,
replicationDiscardCachedMaster();
sdsfree(reply);
//
return PSYNC_FULLRESYNC;
}
if (!strncmp(reply,"+CONTINUE",9)) {
// partial resync, offset
redisLog(REDIS_NOTICE,
"Successful partial resynchronization with master.");
sdsfree(reply);
// master master
replicationResurrectCachedMaster(fd);
//
return PSYNC_CONTINUE;
}
// PSYNC
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
redisLog(REDIS_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
redisLog(REDIS_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
//
replicationDiscardCachedMaster();
// PSYNC
return PSYNC_NOT_SUPPORTED;
}
void replicationResurrectCachedMaster(int newfd) {
// master
server.master = server.cached_master;
server.cached_master = NULL;
server.master->fd = newfd;
server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
server.master->authenticated = 1;
server.master->lastinteraction = server.unixtime;
//
server.repl_state = REDIS_REPL_CONNECTED;
//
listAddNodeTail(server.clients,server.master);
//
if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
readQueryFromClient, server.master)) {
redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
// ,
if (server.master->bufpos || listLength(server.master->reply)) {
if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
sendReplyToClient, server.master)) {
redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
}
}
クライアントを解放しようとすると、このクライアントがプライマリノードであるかどうかをチェックし、そうであれば、後続のPSYNCにキャッシュされます.
void freeClient(redisClient *c) {
//
if (server.master && c->flags & REDIS_MASTER) {
redisLog(REDIS_WARNING,"Connection with master lost.");
if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
REDIS_CLOSE_ASAP|
REDIS_BLOCKED|
REDIS_UNBLOCKED)))
{
replicationCacheMaster(c);
return;
}
}
}
void replicationCacheMaster(redisClient *c) {
listNode *ln;
redisAssert(server.master != NULL && server.cached_master == NULL);
redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
//
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
// master
server.cached_master = server.master;
// , socket
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
close(c->fd);
c->fd = -1;
// peerid
if (c->peerid) {
sdsfree(c->peerid);
c->peerid = NULL;
}
replicationHandleMasterDisconnection();
}
void replicationHandleMasterDisconnection(void) {
// replication , master
server.master = NULL;
server.repl_state = REDIS_REPL_CONNECT;
server.repl_down_since = server.unixtime;
// ,
if (server.masterhost != NULL) disconnectSlaves();
}
データの同期は一方向であり、プライマリノードからセカンダリノードにのみ発生し、ノードからの書き込みは最終的に失われることに注意してください.redisは、ノードから書き込み要求を処理することを禁止しない.
後続の最適化
最新のredisコードでは、プライマリ・スレーブ同期に対して多くの最適化が行われています.たとえば、次のようになります.