redisソースシリーズ(17):分身術---replication

203163 ワード

単一のサービスは、本番環境では絶対に受け入れられませんが、データベース・サービスは、マルチノードまたは分散型の導入を実現するためにstatelessサービスよりも多くの問題に直面しています.データの同期方式、一貫性、可用性の妥協には多くの制限があり、考慮しなければならない.
今日はredisの主従同期に関する内容を学びます.このセクションの内容はredisが高可用性、データセキュリティ、データパーティションを実現する礎です.ノード間で信頼できるデータ同期方法がなければ,上記のすべてが空中屋根裏になる.
プライマリノードは任意の時点で1つしかなく、スレーブノードはいくつかあってもよい.プライマリ・スレーブ・ノードはリンクを保持する必要があり、プライマリ・ノードは非同期でセカンダリ・ノードにデータを同期します.

Master


プライマリ・スレーブ・同期とは、プライマリ・ノードのデータをスレーブ・ノードに同期することです.同期の一般的な流れは次のとおりです.
  • マスタノードは、同期要求を受けると、スレーブノードと全量同期してBGSAVEを起動する(既に利用可能なBGSAVEが実行中である場合は起動不要)
  • .
  • プライマリノードは、後続のデータの変化をもたらすコマンド(またはデータの期限切れ)をスレーブノードに送信し、データ
  • を同期し続ける必要がある.

    ぜんりょうどうき


    プライマリノードは起動時に、ノードからの情報を知らない.syncまたはpsyncコマンドを受信すると、スレーブノードと同期します.
    void syncCommand(redisClient *c) {
        //     SLAVE ,     MONITOR   ,  
        if (c->flags & REDIS_SLAVE) return;
        //           ,             ,     SYNC
        // redis              ,       
        if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
            addReplyError(c,"Can't SYNC while not connected with my master");
            return;
        }
        //     sync    psync                。        
        //            (     BGSAVE       )              
        if (listLength(c->reply) != 0 || c->bufpos != 0) {
            addReplyError(c,"SYNC and PSYNC are invalid with pending output");
            return;
        }
        redisLog(REDIS_NOTICE,"Slave asks for synchronization");
    		/*
    			     psync:
    			psync      partial sync,     。                    
    			           ,         ,           ,        
    			    。
    		*/
        if (!strcasecmp(c->argv[0]->ptr,"psync")) {
            //      PSYNC
            if (masterTryPartialResynchronization(c) == REDIS_OK) {
                //     PSYNC
                server.stat_sync_partial_ok++;
                return; /* No full resync needed, return. */
            } else {
                //      PSYNC
                char *master_runid = c->argv[1]->ptr;
              	// replication id   ?             ,            
                if (master_runid[0] != '?') server.stat_sync_partial_err++;
            }
        } else {
            //     ,    ,     REPLCONF ACK 
            c->flags |= REDIS_PRE_PSYNC;
        }
        //    full resynchronization ,    
        server.stat_sync_full++;
        //       BGSAVE    
        if (server.rdb_child_pid != -1) {
    		//          BGSAVE,          BGSAVE                   
            //         ?       BGSAVE    ,       ,                
            //       BGSAVE           ,      rdb           
            redisClient *slave;
            listNode *ln;
            listIter li;
    		//           REDIS_REPL_WAIT_BGSAVE_END     ,     BGSAVE      
            //             ,          BGSAVE
            listRewind(server.slaves,&li);
            while((ln = listNext(&li))) {
                slave = ln->value;
                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
            }
    
            if (ln) {
                //      ,       BGSAVE      RDB,          BGSAVE    
                //                         (           ,    
                //        SYNC                   
                copyClientOutputBuffer(c,slave);
                c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
                redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
            } else {
                //      BGSAVE    ,      ,          BGSAVE   
                c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
                redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
            }
        } else {
            //    BGSAVE    ,       BGSAVE
            redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
            if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
                redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
                addReplyError(c,"Unable to perform background save");
                return;
            }
            //     
            c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            //       
            replicationScriptCacheFlush();
        }
        //    Nagle   ,      ,       
        if (server.repl_disable_tcp_nodelay)
            anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
        c->repldbfd = -1;
        c->flags |= REDIS_SLAVE;
        //                  SELECT   
        server.slaveseldb = -1; 
        //     slave    
        listAddNodeTail(server.slaves,c);
        //        slave ,      backlog,       backlog,             
        if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
            createReplicationBacklog();
        return;
    }
    /*
    	  BGSAVE    ,       
    */
    void backgroundSaveDoneHandler(int exitcode, int bysignal) {
        if (!bysignal && exitcode == 0) {
             // BGSAVE   
            redisLog(REDIS_NOTICE,
                "Background saving terminated with success");
            // dirty     BGSAVE        
            server.dirty = server.dirty - server.dirty_before_bgsave;
            server.lastsave = time(NULL);
            server.lastbgsave_status = REDIS_OK;
        } else if (!bysignal && exitcode != 0) {
            // BGSAVE   
            redisLog(REDIS_WARNING, "Background saving error");
            server.lastbgsave_status = REDIS_ERR;
        } else {
          	// BGSAVE    
            redisLog(REDIS_WARNING,
                "Background saving terminated by signal %d", bysignal);
            //       
            rdbRemoveTempFile(server.rdb_child_pid);
            /* SIGUSR1 is whitelisted, so we have a way to kill a child without
             * tirggering an error conditon. */
            if (bysignal != SIGUSR1)
                server.lastbgsave_status = REDIS_ERR;
        }
        //        
        server.rdb_child_pid = -1;
        server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
        server.rdb_save_time_start = -1;
        //        BGSAVE       
        updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
    }
    
    void updateSlavesWaitingBgsave(int bgsaveerr) {
        listNode *ln;
        int startbgsave = 0;
        listIter li;
        //      slave
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
                //     RDB       slave   ,        BGSAVE
                startbgsave = 1;
                slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
                //      ,    slave     BGSAVE   
                struct redis_stat buf;
                if (bgsaveerr != REDIS_OK) {
                    //    BGSAVE         slave
                    freeClient(slave);
                    redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                    continue;
                }
                //    RDB   
                if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                    redis_fstat(slave->repldbfd,&buf) == -1) {
                    freeClient(slave);
                    redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                    continue;
                }
                //      ,   ,     rdb       
                slave->repldboff = 0;
                slave->repldbsize = buf.st_size;
                slave->replstate = REDIS_REPL_SEND_BULK;
                slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r
    "
    , (unsigned long long) slave->repldbsize); // , aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { freeClient(slave); continue; } } } // BGSAVE if (startbgsave) { // BGSAVE , replicationScriptCacheFlush(); if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { // BGSAVE , listIter li; listRewind(server.slaves,&li); redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed"); while((ln = listNext(&li))) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) freeClient(slave); } } } } // rdb void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *slave = privdata; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); char buf[REDIS_IOBUF_LEN]; ssize_t nwritten, buflen; // rdb if (slave->replpreamble) { nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); if (nwritten == -1) { redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s", strerror(errno)); freeClient(slave); return; } sdsrange(slave->replpreamble,nwritten,-1); if (sdslen(slave->replpreamble) == 0) { sdsfree(slave->replpreamble); slave->replpreamble = NULL; /* fall through sending data. */ } else { return; } } // rdb lseek(slave->repldbfd,slave->repldboff,SEEK_SET); // RDB buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); if (buflen <= 0) { redisLog(REDIS_WARNING,"Read error sending DB to slave: %s", (buflen == 0) ? "premature EOF" : strerror(errno)); freeClient(slave); return; } // rdb if ((nwritten = write(fd,buf,buflen)) == -1) { if (errno != EAGAIN) { redisLog(REDIS_WARNING,"Write error sending DB to slave: %s", strerror(errno)); freeClient(slave); } return; } // offset slave->repldboff += nwritten; // if (slave->repldboff == slave->repldbsize) { // RDB close(slave->repldbfd); slave->repldbfd = -1; // aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); // rdb , online slave->replstate = REDIS_REPL_ONLINE; // slave->repl_ack_time = server.unixtime; // BGSAVE // , , // if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendReplyToClient, slave) == AE_ERR) { redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); freeClient(slave); return; } // (slave->repl_ack_time ) refreshGoodSlavesCount(); redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); } }

    インクリメンタル同期


    書き込みコマンドが伝播する必要がある場合、プライマリ・ノードは、伝播する必要があるデータをスレーブ・ノードに送信する必要があります.
    /*
    	backlog        ,     BGSAVE          。            
    	      backlog,         。          ,     PSYNC     
    	    offset     。
    	        redis      :
    	         ,      ,        replication id  
    	offset,               。
    	replication id             ,offset            
    */
    void feedReplicationBacklog(void *ptr, size_t len) {
        unsigned char *p = ptr;
        // server.master_repl_offset        ,         
        server.master_repl_offset += len;
        //    buffer ,          ,         idx      
        //             ,      ,              
        //                
        while(len) {
            //   idx   backlog       
            size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
            //    idx   backlog                 
            //               len
            //      len       ,   while      
            if (thislen > len) thislen = len;
            //   p    thislen         backlog
            memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
            //    idx ,          
            server.repl_backlog_idx += thislen;
            //         ,          
            if (server.repl_backlog_idx == server.repl_backlog_size)
                server.repl_backlog_idx = 0;
            //          
            len -= thislen;
            //                ,           
            p += thislen;
            //       
            server.repl_backlog_histlen += thislen;
        }
    		// server.repl_backlog_histlen            ,       backlog    
        //       ,              
        if (server.repl_backlog_histlen > server.repl_backlog_size)
            server.repl_backlog_histlen = server.repl_backlog_size;
    		// server.repl_backlog_off   backlog          ,       PSYNC  
        //    offset   [server.repl_backlog_off,server.master_repl_offset)   
        //      partial resync
        server.repl_backlog_off = server.master_repl_offset) -
                                  server.repl_backlog_histlen + 1;
    }
    /*
    	         ,              
    */
    void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
        listNode *ln;
        listIter li;
        int j, len;
        char llstr[REDIS_LONGSTR_SIZE];
    		// backlog   ,       ,    
        if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
        //       ,          backlog   
        redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
        //        ,   SELECT   ,     
        if (server.slaveseldb != dictid) {
            robj *selectcmd;
            if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
                selectcmd = shared.select[dictid];
            } else {
                int dictid_len;
                dictid_len = ll2string(llstr,sizeof(llstr),dictid);
                selectcmd = createObject(REDIS_STRING,
                    sdscatprintf(sdsempty(),
                    "*2\r
    $6\r
    SELECT\r
    $%d\r
    %s\r
    "
    , dictid_len, llstr)); } // SELECT backlog if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); // listRewind(slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; addReply(slave,selectcmd); } if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS) decrRefCount(selectcmd); } server.slaveseldb = dictid; // backlog if (server.repl_backlog) { // char aux[REDIS_LONGSTR_SIZE+3]; aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '
    '
    ; feedReplicationBacklog(aux,len+3); for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); // aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '
    '
    ; feedReplicationBacklog(aux,len+3); feedReplicationBacklogWithObject(argv[j]); feedReplicationBacklog(aux+len+1,2); } } // listRewind(slaves,&li); while((ln = listNext(&li))) { // redisClient *slave = ln->value; // BGSAVE , // BGSAVE if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; // , rdb addReplyMultiBulkLen(slave,argc); for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } }

    Partial Resync


    Partial Resyncは、同期済みのプライマリ・スレーブ・ノードの最適化を回避し、リンクが一時的に切断された後に再同期する際のオーバーヘッドと効率を回避するためです.ノードからプライマリノードに同期した後、ネットワーク上の理由でプライマリノードとのリンクが一時的に切断され、ノードからプライマリノードの増分更新データが失われると仮定します.プライマリ・スレーブが再リンクされた後、プライマリ・ノードにノードから失われた情報(backlog)が保存されていることが判明した場合、今回の同期では、説明した全量同期のステップをスキップすることができ、同期効率を大幅に向上させることができます.
    PSYNCは、いくつかの古いバージョンのredisではサポートされていないことを指摘する必要があります.筆者が見たソース3.0は、現在のredis 5と対照的である.xのコードは、実際には関連部分の変更が多く、redis開発チームはPSYNCに対してかなりの最適化を行った.しかし、基本的な考え方の理解の重要性は、最適化よりもはるかに高いので、筆者は3.0バージョンのコードに基づいて説明します.
    /*
    	   PSYNC    ,               parital resync,      
    	        +      。PSYNC       :
    	PSYNC replication_id offset
    	         replication_id    ,  offset            
    */
    int masterTryPartialResynchronization(redisClient *c) {
        long long psync_offset, psync_len;
        char *master_runid = c->argv[1]->ptr;
        char buf[128];
        int buflen;
        //    master id     runid   ,           PSYNC    
        if (strcasecmp(master_runid, server.runid)) {
            if (master_runid[0] != '?') {
                // replication_id    ,                      
                //       PSYNC                 replication_id 
                redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
                    "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
                    master_runid, server.runid);
            } else {
                // replication_id   ?               
                redisLog(REDIS_NOTICE,"Full resync requested by slave.");
            }
            //    full resync
            goto need_full_resync;
        }
        //    psync_offset   ,         offset       
        if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
           REDIS_OK) goto need_full_resync;
    
        /*
        	backlog      offset         ,         offset
        	  backlog     offset    ,      partial resync
        */
        if (!server.repl_backlog ||
            psync_offset < server.repl_backlog_off ||
            psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
        {
            //    FULL RESYNC
            redisLog(REDIS_NOTICE,
                "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
            if (psync_offset > server.master_repl_offset) {
                redisLog(REDIS_WARNING,
                    "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
            }
            goto need_full_resync;
        }
    		//      partial resync     ,    ONLINE   ,         
        c->flags |= REDIS_SLAVE;
        c->replstate = REDIS_REPL_ONLINE;
        c->repl_ack_time = server.unixtime;
        listAddNodeTail(server.slaves,c);
        //             +CONTINUE ,   PSYNC     
        buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r
    "
    ); if (write(c->fd,buf,buflen) != buflen) { freeClientAsync(c); return REDIS_OK; } // backlog ( ) psync_len = addReplyReplicationBacklog(c,psync_offset); redisLog(REDIS_NOTICE, "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset); // refreshGoodSlavesCount(); return REDIS_OK; /* The caller can return, no full resync needed. */ need_full_resync: // psync offset psync_offset = server.master_repl_offset; // repl_backlog,offset 1 if (server.repl_backlog == NULL) psync_offset++; // +FULLRESYNC , buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r
    "
    , server.runid,psync_offset); if (write(c->fd,buf,buflen) != buflen) { freeClientAsync(c); return REDIS_OK; } return REDIS_ERR; } // partial resync, long long addReplyReplicationBacklog(redisClient *c, long long offset) { long long j, skip, len; redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset); if (server.repl_backlog_histlen == 0) { redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero"); return 0; } redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld", server.repl_backlog_size); redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld", server.repl_backlog_off); redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld", server.repl_backlog_histlen); redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld", server.repl_backlog_idx); // server.repl_backlog_off backlog olddest offset // offset - server.repl_backlog_off skip = offset - server.repl_backlog_off; redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip); // j offset backlog j = (server.repl_backlog_idx + (server.repl_backlog_size-server.repl_backlog_histlen)) % server.repl_backlog_size; redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j); j = (j + skip) % server.repl_backlog_size; // len = server.repl_backlog_histlen - skip; redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len); while(len) { long long thislen = ((server.repl_backlog_size - j) < len) ? (server.repl_backlog_size - j) : len; redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen); addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); len -= thislen; j = 0; } return server.repl_backlog_histlen - skip; }

    Slave


    プライマリノードとは異なり、スレーブノードはサービス側として、プライマリスレーブ同期中に、クライアントとしてプライマリノードと通信します.プロファイルにsalveof HOST PORT命令を追加することで、ノードが起動すると指定されたプライマリノードのスレーブノードになります.
       //     ,  server.masterhost              
    				} else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
                slaveof_linenum = linenum;
                server.masterhost = sdsnew(argv[1]);
                server.masterport = atoi(argv[2]);
                server.repl_state = REDIS_REPL_CONNECT;
             }
    

    メインノードのリンク


    主従同期に関する論理は主にreplicationCronに集中し、この関数は1 sごとに呼び出される.
    /*
    	    ,         TCP   ,        。
    */
    void undoConnectWithMaster(void) {
        int fd = server.repl_transfer_s;
        //             
        redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
                    server.repl_state == REDIS_REPL_RECEIVE_PONG);
        aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
        close(fd);
        server.repl_transfer_s = -1;
        //    CONNECT   
        server.repl_state = REDIS_REPL_CONNECT;
    }
    
    /*repl_state     
    	REDIS_REPL_CONNECT -> REDIS_REPL_CONNECTING -> REDIS_REPL_RECEIVE_PONG 
    																												|
    														REDIS_REPL_CONNECTED 
    void replicationCron(void) {
        //         ,    
        if (server.masterhost &&
            (server.repl_state == REDIS_REPL_CONNECTING ||
             server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
            (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
        {
            redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
            //     
            undoConnectWithMaster();
        }
    
        // RDB         
        if (server.masterhost && server.repl_state ==  &&
            (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
        {
            redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
            //     ,       
            replicationAbortSyncTransfer();
        }
    
        //              ,     (        ,     PSYNC)
        if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
            (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
        {
            redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");
            //       
            freeClient(server.master);
        }
        //            
        if (server.repl_state == REDIS_REPL_CONNECT) {
            redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
                server.masterhost, server.masterport);
            //          
            if (connectWithMaster() == REDIS_OK) {
                redisLog(REDIS_NOTICE,"MASTER  SLAVE sync started");
            }
        }
    		//          ,        PSYNC,   ACK     
        if (server.masterhost && server.master &&
            !(server.master->flags & REDIS_PRE_PSYNC))
            replicationSendAck();
        //        ,            PING       ,           
        if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {
            listIter li;
            listNode *ln;
            robj *ping_argv[1];
            //        PING
            ping_argv[0] = createStringObject("PING",4);
            replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
            decrRefCount(ping_argv[0]);
            //      REDIS_REPL_WAIT_BGSAVE_START   REDIS_REPL_WAIT_BGSAVE_END   
            //     ,      PING   ,         
    , ,
    // listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START || slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { if (write(slave->fd, "
    "
    , 1) == -1) { /* Don't worry, it's just a ping. */ } } } } // if (listLength(server.slaves)) { listIter li; listNode *ln; // listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; // ONLINE if (slave->replstate != REDIS_REPL_ONLINE) continue; // if (slave->flags & REDIS_PRE_PSYNC) continue; // if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) { char ip[REDIS_IP_STR_LEN]; int port; if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) { redisLog(REDIS_WARNING, "Disconnecting timedout slave: %s:%d", ip, slave->slave_listening_port); } // freeClient(slave); } } } // , backlog ( ) if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && server.repl_backlog) { time_t idle = server.unixtime - server.repl_no_slaves_since; if (idle > server.repl_backlog_time_limit) { // freeReplicationBacklog(); redisLog(REDIS_NOTICE, "Replication backlog freed after %d seconds " "without connected slaves.", (int) server.repl_backlog_time_limit); } } // ,AOF , script if (listLength(server.slaves) == 0 && server.aof_state == REDIS_AOF_OFF && listLength(server.repl_scriptcache_fifo) != 0) { replicationScriptCacheFlush(); } // refreshGoodSlavesCount(); } int connectWithMaster(void) { int fd; // , connect , connect // TCP fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport); if (fd == -1) { redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s", strerror(errno)); return REDIS_ERR; } // fd , , 3 ,fd // syncWithMaster if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) { close(fd); redisLog(REDIS_WARNING,"Can't create readable event for SYNC"); return REDIS_ERR; } // server.repl_transfer_lastio = server.unixtime; server.repl_transfer_s = fd; // REDIS_REPL_CONNECTING server.repl_state = REDIS_REPL_CONNECTING; return REDIS_OK; } void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { char tmpfile[256], *err; int dfd, maxtries = 5; int sockerr = 0, psync_result; socklen_t errlen = sizeof(sockerr); REDIS_NOTUSED(el); REDIS_NOTUSED(privdata); REDIS_NOTUSED(mask); // , , server.repl_state REDIS_REPL_NONE // , , if (server.repl_state == REDIS_REPL_NONE) { close(fd); return; } // , connect if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) sockerr = errno; if (sockerr) { aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s", strerror(sockerr)); goto error; } // REDIS_REPL_CONNECTING, ( partial resync) // , PING, PING , if (server.repl_state == REDIS_REPL_CONNECTING) { redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event."); // , , PONG aeDeleteFileEvent(server.el,fd,AE_WRITABLE); // server.repl_state = REDIS_REPL_RECEIVE_PONG; // PING, poll , , syncWrite(fd,"PING\r
    "
    ,6,100); // , return; } // PING if (server.repl_state == REDIS_REPL_RECEIVE_PONG) { char buf[1024]; // , aeDeleteFileEvent(server.el,fd,AE_READABLE); // buf[0] = '\0'; if (syncReadLine(fd,buf,sizeof(buf), server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING, "I/O error reading PING reply from master: %s", strerror(errno)); goto error; } /* 3 : 1. +PONG 2. —NOAUTH 3. -ERR operation not permitted , */ if (buf[0] != '+' && strncmp(buf,"-NOAUTH",7) != 0 && strncmp(buf,"-ERR operation not permitted",28) != 0) { // redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf); goto error; } else { redisLog(REDIS_NOTICE, "Master replied to PING, replication can continue..."); } } // , AUTH , if(server.masterauth) { err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL); if (err[0] == '-') { // AUTH redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err); sdsfree(err); goto error; } sdsfree(err); } // redis , { sds port = sdsfromlonglong(server.port); err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port, NULL); sdsfree(port); // redis , , if (err[0] == '-') { redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err); } sdsfree(err); } // PSYNC psync_result = slaveTryPartialResynchronization(fd); // partial resync, ( ) if (psync_result == PSYNC_CONTINUE) { redisLog(REDIS_NOTICE, "MASTER SLAVE sync: Master accepted a Partial Resynchronization."); return; } // PSYNC, SYNC if (psync_result == PSYNC_NOT_SUPPORTED) { redisLog(REDIS_NOTICE,"Retrying with SYNC..."); if (syncWrite(fd,"SYNC\r
    "
    ,6,server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", strerror(errno)); goto error; } } // PSYNC partial resync, , // rdb , rdb while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); } if (dfd == -1) { redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER SLAVE synchronization: %s",strerror(errno)); goto error; } // , RDB if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) { redisLog(REDIS_WARNING, "Can't create readable event for SYNC: %s (fd=%d)", strerror(errno),fd); goto error; } // server.repl_state = REDIS_REPL_TRANSFER; // server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_fd = dfd; server.repl_transfer_lastio = server.unixtime; server.repl_transfer_tmpfile = zstrdup(tmpfile); return; error: close(fd); server.repl_transfer_s = -1; server.repl_state = REDIS_REPL_CONNECT; return; }

    ぜんりょうどうき


    プライマリノードコードを解析すると,全量同期がプライマリノードがrdbファイルを送信する過程であることが分かった.一方、スレーブノードは、全量の同期が必要になった後、受信するrdbファイルを処理するためにコールバック関数を登録します.
    #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
    void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
        char buf[4096];
        ssize_t nread, readlen;
        off_t left;
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(privdata);
        REDIS_NOTUSED(mask);
        //             rdb      ,   server.repl_transfer_size == -1
        //         rdb     
        if (server.repl_transfer_size == -1) {
            //      
            if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
                redisLog(REDIS_WARNING,
                    "I/O error reading bulk count from MASTER: %s",
                    strerror(errno));
                goto error;
            }
            //   ?
            if (buf[0] == '-') {
                redisLog(REDIS_WARNING,
                    "MASTER aborted replication with an error: %s",
                    buf+1);
                goto error;
            } else if (buf[0] == '\0') {
    						//      replicationCron     BGSAVE_START   BGSAVE_END         
    server.repl_transfer_lastio = server.unixtime; return; } else if (buf[0] != '$') { // , redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); goto error; } // RDB server.repl_transfer_size = strtol(buf+1,NULL,10); redisLog(REDIS_NOTICE, "MASTER SLAVE sync: receiving %lld bytes from master", (long long) server.repl_transfer_size); // , return; } // rdb ( , EAGAIN ) left = server.repl_transfer_size - server.repl_transfer_read; readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); nread = read(fd,buf,readlen); if (nread <= 0) { redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); replicationAbortSyncTransfer(); return; } // rdb server.repl_transfer_lastio = server.unixtime; if (write(server.repl_transfer_fd,buf,nread) != nread) { redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER SLAVE synchronization: %s", strerror(errno)); goto error; } // server.repl_transfer_read += nread; // fsync , buffer , IO if (server.repl_transfer_read >= server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { off_t sync_size = server.repl_transfer_read - server.repl_transfer_last_fsync_off; rdb_fsync_range(server.repl_transfer_fd, server.repl_transfer_last_fsync_off, sync_size); server.repl_transfer_last_fsync_off += sync_size; } // RDB if (server.repl_transfer_read == server.repl_transfer_size) { // , dump.rdb if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER SLAVE synchronization: %s", strerror(errno)); replicationAbortSyncTransfer(); return; } // redisLog(REDIS_NOTICE, "MASTER SLAVE sync: Flushing old data"); signalFlushedDb(-1); emptyDb(replicationEmptyDbCallback); // , rdbLoad() rdbLoadProgressCallback // eventLoop , , aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); // RDB if (rdbLoad(server.rdb_filename) != REDIS_OK) { redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); replicationAbortSyncTransfer(); return; } // zfree(server.repl_transfer_tmpfile); close(server.repl_transfer_fd); // ,createClient server.master = createClient(server.repl_transfer_s); // server.master->flags |= REDIS_MASTER; // server.master->authenticated = 1; // server.repl_state = REDIS_REPL_CONNECTED; // server.master->reploff = server.repl_master_initial_offset; // RUN ID memcpy(server.master->replrunid, server.repl_master_runid, sizeof(server.repl_master_runid)); // offset -1 , 2.8 // PSYNC , if (server.master->reploff == -1) server.master->flags |= REDIS_PRE_PSYNC; redisLog(REDIS_NOTICE, "MASTER SLAVE sync: Finished with success"); // AOF, AOF if (server.aof_state != REDIS_AOF_OFF) { int retry = 10; // stopAppendOnly(); // while (retry-- && startAppendOnly() == REDIS_ERR) { redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second."); sleep(1); } if (!retry) { redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now."); exit(1); } } } return; error: replicationAbortSyncTransfer(); return; }

    インクリメンタル同期


    インクリメンタル同期とは、主ノードから伝播される書き込み命令をノードから絶えず受信するプロセスである.

    Partial Resync


    プライマリノードと同期すると、ノードからpartial resyncを優先的に試行します.
    #define PSYNC_CONTINUE 0
    #define PSYNC_FULLRESYNC 1
    #define PSYNC_NOT_SUPPORTED 2
    int slaveTryPartialResynchronization(int fd) {
        char *psync_runid;
        char psync_offset[32];
        sds reply;
    		//         PSYNC,           FULLRESYNC    partial resync
        // repl_master_initial_offset      -1.        PSYNC     
        // repl_master_initial_offset     1
        server.repl_master_initial_offset = -1;
    
        if (server.cached_master) {
     				/*
     					      ,resync               ,              
     					       replication id   offset        PSYNC
     				*/
            psync_runid = server.cached_master->replrunid;
            snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
            redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
        } else {
            //      ,replication id     ?       
            redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
            psync_runid = "?";
            memcpy(psync_offset,"-1",3);
        }
        //         PSYNC   ,     
        reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
        if (!strncmp(reply,"+FULLRESYNC",11)) {
            //      PSYNC   ,       partial resync,      
            char *runid = NULL, *offset = NULL;
            //            run id
            runid = strchr(reply,' ');
            if (runid) {
                runid++;
                offset = strchr(runid,' ');
                if (offset) offset++;
            }
            //    run id     
            if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
                redisLog(REDIS_WARNING,
                    "Master replied with wrong +FULLRESYNC syntax.");
                //        PSYNC ,          run id
                //     run id    0 ,    PSYNC    
                memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
            } else {
                //    run id
                memcpy(server.repl_master_runid, runid, offset-runid-1);
                server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
                //    initial offset
                server.repl_master_initial_offset = strtoll(offset,NULL,10);
                //     ,     FULL resync
                redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
                    server.repl_master_runid,
                    server.repl_master_initial_offset);
            }
            //         ,     master      ,   
            replicationDiscardCachedMaster();
            sdsfree(reply);
            //     
            return PSYNC_FULLRESYNC;
        }
        if (!strncmp(reply,"+CONTINUE",9)) {
            //      partial resync,          offset      
            redisLog(REDIS_NOTICE,
                "Successful partial resynchronization with master.");
            sdsfree(reply);
            //       master      master
            replicationResurrectCachedMaster(fd);
            //     
            return PSYNC_CONTINUE;
        }
        //        PSYNC   
        if (strncmp(reply,"-ERR",4)) {
            /* If it's not an error, log the unexpected event. */
            redisLog(REDIS_WARNING,
                "Unexpected reply to PSYNC from master: %s", reply);
        } else {
            redisLog(REDIS_NOTICE,
                "Master does not support PSYNC or is in "
                "error state (reply: %s)", reply);
        }
        sdsfree(reply);
        //          
        replicationDiscardCachedMaster();
        //         PSYNC
        return PSYNC_NOT_SUPPORTED;
    }
    
    void replicationResurrectCachedMaster(int newfd) {
        //    master
        server.master = server.cached_master;
        server.cached_master = NULL;
        server.master->fd = newfd;
        server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
        server.master->authenticated = 1;
        server.master->lastinteraction = server.unixtime;
    
        //        
        server.repl_state = REDIS_REPL_CONNECTED;
        //              
        listAddNodeTail(server.clients,server.master);
        //                            
        if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
                              readQueryFromClient, server.master)) {
            redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
            freeClientAsync(server.master); /* Close ASAP. */
        }
    		//               ,     
        if (server.master->bufpos || listLength(server.master->reply)) {
            if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
                              sendReplyToClient, server.master)) {
                redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
                freeClientAsync(server.master); /* Close ASAP. */
            }
        }
    }
    

    クライアントを解放しようとすると、このクライアントがプライマリノードであるかどうかをチェックし、そうであれば、後続のPSYNCにキャッシュされます.
    void freeClient(redisClient *c) {
     	//  
          if (server.master && c->flags & REDIS_MASTER) {
            redisLog(REDIS_WARNING,"Connection with master lost.");
            if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
                              REDIS_CLOSE_ASAP|
                              REDIS_BLOCKED|
                              REDIS_UNBLOCKED)))
            {
                replicationCacheMaster(c);
                return;
            }
        }
    }
    
    void replicationCacheMaster(redisClient *c) {
        listNode *ln;
        redisAssert(server.master != NULL && server.cached_master == NULL);
        redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
        //              
        ln = listSearchKey(server.clients,c);
        redisAssert(ln != NULL);
        listDelNode(server.clients,ln);
        //    master
        server.cached_master = server.master;
        //       ,   socket
        aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
        close(c->fd);
        c->fd = -1;
    		//    peerid
        if (c->peerid) {
            sdsfree(c->peerid);
            c->peerid = NULL;
        }
        replicationHandleMasterDisconnection();
    }
    
    void replicationHandleMasterDisconnection(void) {
        //    replication   ,   master
        server.master = NULL;
        server.repl_state = REDIS_REPL_CONNECT;
        server.repl_down_since = server.unixtime;
    		//              ,           
        if (server.masterhost != NULL) disconnectSlaves();
    }
    

    データの同期は一方向であり、プライマリノードからセカンダリノードにのみ発生し、ノードからの書き込みは最終的に失われることに注意してください.redisは、ノードから書き込み要求を処理することを禁止しない.

    後続の最適化


    最新のredisコードでは、プライマリ・スレーブ同期に対して多くの最適化が行われています.たとえば、次のようになります.
  • プライマリノードがスレーブノードから昇格する場合、以前のプライマリノードの情報が保存され、スレーブノードが以前のプライマリノードのreplication idを使用してPSYNCを行う場合、partial resyncは
  • を行うことができる.
  • 全量同期時、プライマリノードがrdbドロップをスレーブノードに再送信する必要はなく、後続のredisはdiskless replication
  • をサポートする.

    まとめ

  • プライマリスレーブ同期時にデータベース・サービスの分散配置が直面しなければならない問題です.redisは非同期同期同期同期を使用して最終的な一貫性を達成しますが、これにより、データが失われたタイム・ウィンドウ(単一のポイントでもこの問題があります)
  • が永遠に存在します.
  • redis PSYNC機構を用いてresyncのコストを低減する
  • .
  • スレーブノード自体も独自のスレーブノードを持つことができるが、使用シーンは
  • 少ない.
  • スレーブノードを追加することで、read-onlyスレーブノードを追加したり、スレーブノードにリードリクエストを配布したりするなど、redisのサービス能力を拡張することができ、プライマリノードの圧力を軽減することができます.しかし、データ同期は一方向であり、ノードからの書き込みは、最終的には
  • を失う.