レプリケーション機能は、同期(sync)とコマンド伝播(command propagate)の2つのステップに分けられます.
PSYNC命令は、完全再同期(full resync)と部分再同期(partial resync)の2つのモードを有する.
部分的な再同期機能は主に以下のいくつかの重要な属性から構成されている:1)サーバがid(runid)を実行する;2)部分再同期オフセット量(psync_offset);3)プライマリ・サーバのレプリケーション・バックアップ(replication backlog)
サーバから断線再接続し、自分の同期オフセット量をプライマリサーバに送信すると、プライマリ・サービスはどのようにして完全な再同期を使用するか、または部分的に再同期するかを決定します.これは、プライマリ・サーバのレプリケーション・バックアップに関連しています.プライマリ・サーバは、1024*1024バイト、すなわち1 Mの長さ固定キューを維持します.プライマリ・サーバがコマンド伝播を行うと、ライト・コマンドがセカンダリ・サーバに送信されると同時に、ライト・コマンドがレプリケーション・バックアップ・キューにプッシュされます.そのため、キューには最近の伝播コマンドの一部が保存されています.
/* Trya partial resynchronization with the master if we are about to reconnect.
 *       ,         。
 * If there is no cached master structure, atleast try to issue a
 * "PSYNC ? -1" command in order totrigger a full resync using the PSYNC
 * command in order to obtain the master run idand the master replication
 * global offset.
 *    master     ,     "PSYNC ? -1"         full resync ,
 *        run id                 。
 * This function is designed to be called fromsyncWithMaster(), so the
 * following assumptions are made:
 *       syncWithMaster()     ,       :
 * 1) We pass the function an already connectedsocket "fd".
 *             fd       
 * 2) This function does not close the filedescriptor "fd". However in case
 *    ofsuccessful partial resynchronization, the function will reuse
 *   'fd' as file descriptor of the server.master client structure.
 *           fd 。
 *            ,     fd    server.master        
 *         。
 * The function returns:
 *          :
 * PSYNC_CONTINUE: If the PSYNC commandsucceded and we can continue.
 *                 PSYNC     ,    。
 * PSYNC_FULLRESYNC: If PSYNC is supported buta full resync is needed.
 *                   In this case the masterrun_id and global replication
 *                   offset is saved.
 *                          PSYNC   ,          full resync 。
 *                         , run_id             。
 * PSYNC_NOT_SUPPORTED: If the server does notunderstand PSYNC at all and
 *                      the caller should fallback to SYNC.
 *                              PSYNC ,         SYNC   。
intslaveTryPartialResynchronization(int fd) {
    char *psync_runid;
    char psync_offset[32];
    sds reply;
    /* Initially set repl_master_initial_offsetto -1 to mark the current
     * master run_id and offset as not valid.Later if we'll be able to do
     * a FULL resync using the PSYNC commandwe'll set the offset at the
     * right value, so that this informationwill be propagated to the
     * client structure representing the masterinto server.master. */
    server.repl_master_initial_offset = -1;
    if (server.cached_master) {
        //     ,       
        //     "PSYNC  "
        psync_runid =server.cached_master->replrunid;
        redisLog(REDIS_NOTICE,"Trying apartial resynchronization (request %s:%s).", psync_runid, psync_offset);
    } else {
        //    "PSYNC ? -1" ,       
        redisLog(REDIS_NOTICE,"Partialresynchronization not possible (no cached master)");
        psync_runid = "?";
    /* Issue the PSYNC command */
    //         PSYNC   
    reply =sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
    //     FULLRESYNC ,   full-resync
    if(!strncmp(reply,"+FULLRESYNC",11)) {
        char *runid = NULL, *offset = NULL;
        /* FULL RESYNC, parse the reply inorder to extract the run id
         * and the replication offset. */
        //            run id
        runid = strchr(reply,' ');
        if (runid) {
            offset = strchr(runid,' ');
            if (offset) offset++;
        //    run id     
        if (!runid || !offset ||(offset-runid-1) != REDIS_RUN_ID_SIZE) {
                "Master replied with wrong+FULLRESYNC syntax.");
            /* This is an unexpected condition,actually the +FULLRESYNC
             * reply means that the mastersupports PSYNC, but the reply
             * format seems wrong. To stay safe we blankthe master
             * runid to make sure next PSYNCswill fail. */
            //        PSYNC ,          run id
            //     run id    0 ,    PSYNC    
        } else {
            //    run id
            memcpy(server.repl_master_runid,runid, offset-runid-1);
           server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
            //    initial offset
            server.repl_master_initial_offset =strtoll(offset,NULL,10);
            //     ,     FULL resync
            redisLog(REDIS_NOTICE,"Fullresync from master: %s:%lld",
        /* We are going to full resync, discardthe cached master structure. */
        //         ,     master      ,   
        return PSYNC_FULLRESYNC;
    //     CONTINUE ,   partial resync
    if(!strncmp(reply,"+CONTINUE",9)) {
        /* Partial resync was accepted, set thereplication state accordingly */
            "Successful partial resynchronizationwith master.");
        //       master      master
        return PSYNC_CONTINUE;
    /* If we reach this point we receied eitheran error since the master does
     * not understand PSYNC, or an unexpectedreply from the master.
     * Return PSYNC_NOT_SUPPORTED to the callerin both cases. */
    //      ?
    if (strncmp(reply,"-ERR",4)) {
        /* If it's not an error, log theunexpected event. */
            "Unexpected reply to PSYNCfrom master: %s", reply);
    } else {
            "Master does not support PSYNCor is in "
            "error state (reply:%s)", reply);
    //         PSYNC
voidsyncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    /* If this event fired after the userturned the instance into a master
     * with SLAVEOF NO ONE we must just returnASAP. */
    //      SLAVEOF NO ONE   ,     fd
    if (server.repl_state == REDIS_REPL_NONE) {
    /* Check for errors in the socket. */
    if (getsockopt(fd, SOL_SOCKET, SO_ERROR,&sockerr, &errlen) == -1)
        sockerr = errno;
    if (sockerr) {
        redisLog(REDIS_WARNING,"Errorcondition on socket for SYNC: %s",
        goto error;
    /* If we were connecting, it's time to senda non blocking PING, we want to
     * make sure the master is able to replybefore going into the actual
     * replication process where we have longtimeouts in the order of
     * seconds (in the meantime the slave wouldblock). */
    //       CONNECTING ,           ,
    //               PONG
    //        RDB         ,                
    if (server.repl_state ==REDIS_REPL_CONNECTING) {
        redisLog(REDIS_NOTICE,"Nonblocking connect for SYNC fired the event.");
        /* Delete the writable event so thatthe readable event remains
         * registered and we can wait for thePONG reply. */
        //        PING ,         
        server.repl_state =REDIS_REPL_RECEIVE_PONG;
        /* Send the PING, don't check forerrors at all, we have the timeout
         * that will take care about this. */
        //      PING
",6,100); // , PONG return; } /* Receive the PONG command. */ // PONG if (server.repl_state ==REDIS_REPL_RECEIVE_PONG) { char buf[1024]; /* Delete the readable event, we nolonger need it now that there is * the PING reply to read. */ // PONG , aeDeleteFileEvent(server.el,fd,AE_READABLE); /* Read the reply with explicittimeout. */ // PONG buf[0] = '\0'; // PONG if (syncReadLine(fd,buf,sizeof(buf), server.repl_syncio_timeout*1000) ==-1) { redisLog(REDIS_WARNING, "I/O error reading PINGreply from master: %s", strerror(errno)); goto error; } /* We accept only two replies as valid,a positive +PONG reply * (we just check for "+") oran authentication error. * Note that older versions of Redisreplied with "operation not * permitted" instead of using aproper error code, so we test * both. */ // : // +PONG , -NOAUTH if (buf[0] != '+' && strncmp(buf,"-NOAUTH",7)!= 0 && strncmp(buf,"-ERR operationnot permitted",28) != 0) { // redisLog(REDIS_WARNING,"Errorreply to PING from master: '%s'",buf); goto error; } else { // PONG redisLog(REDIS_NOTICE, "Master replied to PING,replication can continue..."); } } /* AUTH with the master if required. */ // if(server.masterauth) { err =sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL); if (err[0] == '-') { redisLog(REDIS_WARNING,"Unableto AUTH to MASTER: %s",err); sdsfree(err); goto error; } sdsfree(err); } /* Set the slave port, so that Master'sINFO command can list the * slave listening port correctly. */ // , // INFO { sds port = sdsfromlonglong(server.port); err =sendSynchronousCommand(fd,"REPLCONF","listening-port",port, NULL); sdsfree(port); /* Ignore the error if any, not all theRedis versions support * REPLCONF listening-port. */ if (err[0] == '-') { redisLog(REDIS_NOTICE,"(Noncritical) Master does not understand REPLCONF listening-port: %s", err); } sdsfree(err); } /* Try a partial resynchonization. If wedon't have a cached master * slaveTryPartialResynchronization() willat least try to use PSYNC * to start a full resynchronization sothat we get the master run id * and the global offset, to try a partialresync at the next * reconnection attempt. */ // resync , full-resync psync_result =slaveTryPartialResynchronization(fd); // resync if (psync_result == PSYNC_CONTINUE) { redisLog(REDIS_NOTICE, "MASTER SLAVE sync: Master accepted a Partial Resynchronization."); // return; } /* Fall back to SYNC if needed. Otherwisepsync_result == PSYNC_FULLRESYNC * and the server.repl_master_runid andrepl_master_initial_offset are * already populated. */ // PSYNC , SYNC if (psync_result == PSYNC_NOT_SUPPORTED) { redisLog(REDIS_NOTICE,"Retryingwith SYNC..."); // SYNC if(syncWrite(fd,"SYNC\r
",6,server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING,"I/Oerror writing to MASTER: %s", strerror(errno)); goto error; } } // , // psync_result == PSYNC_FULLRESYNC PSYNC_NOT_SUPPORTED /* Prepare a suitable temp file for bulktransfer */ // , RDB while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); } if (dfd == -1) { redisLog(REDIS_WARNING,"Openingthe temp file needed for MASTER SLAVE synchronization:%s",strerror(errno)); goto error; } /* Setup the non blocking download of thebulk file. */ // , RDB if (aeCreateFileEvent(server.el,fd,AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) { redisLog(REDIS_WARNING, "Can't create readable eventfor SYNC: %s (fd=%d)", strerror(errno),fd); goto error; } // server.repl_state = REDIS_REPL_TRANSFER; // server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_fd = dfd; server.repl_transfer_lastio =server.unixtime; server.repl_transfer_tmpfile =zstrdup(tmpfile); return; error: close(fd); server.repl_transfer_s = -1; server.repl_state = REDIS_REPL_CONNECT; return; }
