Redisソース(十一)-Redisの主従レプリケーション

15563 ワード

Redisでは、SLAVEOFコマンドを送信して、あるサーバAを別のサーバBの「slave」にして、Aをスレーブサーバ(slave)、Bをマスターサーバ(master)と呼ぶBをコピーすることができる
レプリケーションについては、2.8バージョン以前と以降で実装が異なり、2.8バージョン以前のレプリケーションは断線再接続後に効率が低いため、ここでは2.8バージョン以降のレプリケーションのみを分析します.
レプリケーション機能は、同期(sync)とコマンド伝播(command propagate)の2つのステップに分けられます.
 
一、同期の実現
Redisは2.8以降のバージョンでPSYNC命令を用いて同期している(旧版ではSYNCを用いている).
PSYNC命令は、完全再同期(full resync)と部分再同期(partial resync)の2つのモードを有する.
1)完全な再同期は,通常,1回目のレプリケーション時にマスタサーバにRDBファイルを作成してスレーブサーバに送信させ,バッファ内の書き込みコマンドをサーバに送信することによって実現される.
2)部分再同期は一般的に断線再接続のレプリケーションに用いられ、サーバーから断線した後に再びオンラインになる場合、一定の条件を満たす場合、主サーバーは断線した時間の書き込みコマンドを従サーバーに送信し、主従データベースの状態を一致させることができる.
 
1.部分再同期実現
部分的な再同期機能は主に以下のいくつかの重要な属性から構成されている:1)サーバがid(runid)を実行する;2)部分再同期オフセット量(psync_offset);3)プライマリ・サーバのレプリケーション・バックアップ(replication backlog)
  
1)サーバ運転id
各Redisサーバには独自の実行idがあり、実行idはサーバ起動時に自動的に生成され、長さ40のランダム16進文字列である.プライマリ・サーバをサーバから最初にコピーすると、プライマリ・サーバは自分の実行idをセカンダリ・サーバに渡し、サーバからこのidが保存されます.部分再同期の要求を行う場合、まず現在接続されているプライマリ・サーバの実行idと以前に保存されている実行idを比較します.同じであれば、サーバの前からコピーされたのがこのプライマリ・サーバであり、部分再同期操作を継続することができます.異なる場合、スレーブ・サーバはこのプライマリ・サーバに対して部分的な再同期操作を実行できません.
2)部分再同期オフセット(psync_offset)
プライマリ・サーバとセカンダリ・サーバは、再同期のオフセット量を維持します.プライマリ・サーバがサーバからNバイトのデータを伝播しようとすると、自分のオフセット量にNを追加します.同様に、サーバからNバイトのデータを受信すると、独自のオフセット量にNを加算し、オフセット量に基づいて、サーバとプライマリサーバのデータベース状態が一致するかどうかを決定することができます.
3)プライマリ・サーバのレプリケーション・バックアップ(replication backlog)
サーバから断線再接続し、自分の同期オフセット量をプライマリサーバに送信すると、プライマリ・サービスはどのようにして完全な再同期を使用するか、または部分的に再同期するかを決定します.これは、プライマリ・サーバのレプリケーション・バックアップに関連しています.プライマリ・サーバは、1024*1024バイト、すなわち1 Mの長さ固定キューを維持します.プライマリ・サーバがコマンド伝播を行うと、ライト・コマンドがセカンダリ・サーバに送信されると同時に、ライト・コマンドがレプリケーション・バックアップ・キューにプッシュされます.そのため、キューには最近の伝播コマンドの一部が保存されています.
サーバのpsync_からoffsetはまた、バックアップキューの範囲内でコピーされ、部分的な再同期操作が実行されます.逆に、完全な再同期が実行されます.
部分的に再同期された最下位コードは、次のように実装されます.
/* Trya partial resynchronization with the master if we are about to reconnect.
 *
 *       ,         。
 *
 * If there is no cached master structure, atleast try to issue a
 * "PSYNC ? -1" command in order totrigger a full resync using the PSYNC
 * command in order to obtain the master run idand the master replication
 * global offset.
 *
 *    master     ,     "PSYNC ? -1"         full resync ,
 *        run id                 。
 *
 * This function is designed to be called fromsyncWithMaster(), so the
 * following assumptions are made:
 *
 *       syncWithMaster()     ,       :
 *
 * 1) We pass the function an already connectedsocket "fd".
 *             fd       
 * 2) This function does not close the filedescriptor "fd". However in case
 *    ofsuccessful partial resynchronization, the function will reuse
 *   'fd' as file descriptor of the server.master client structure.
 *           fd 。
 *            ,     fd    server.master        
 *         。
 *
 * The function returns:
 *          :
 *
 * PSYNC_CONTINUE: If the PSYNC commandsucceded and we can continue.
 *                 PSYNC     ,    。
 * PSYNC_FULLRESYNC: If PSYNC is supported buta full resync is needed.
 *                   In this case the masterrun_id and global replication
 *                   offset is saved.
 *                          PSYNC   ,          full resync 。
 *                         , run_id             。
 * PSYNC_NOT_SUPPORTED: If the server does notunderstand PSYNC at all and
 *                      the caller should fallback to SYNC.
 *                              PSYNC ,         SYNC   。
 */
 
#definePSYNC_CONTINUE 0
#definePSYNC_FULLRESYNC 1
#definePSYNC_NOT_SUPPORTED 2
intslaveTryPartialResynchronization(int fd) {
    char *psync_runid;
    char psync_offset[32];
    sds reply;
 
    /* Initially set repl_master_initial_offsetto -1 to mark the current
     * master run_id and offset as not valid.Later if we'll be able to do
     * a FULL resync using the PSYNC commandwe'll set the offset at the
     * right value, so that this informationwill be propagated to the
     * client structure representing the masterinto server.master. */
    server.repl_master_initial_offset = -1;
 
    if (server.cached_master) {
        //     ,       
        //     "PSYNC  "
        psync_runid =server.cached_master->replrunid;
       snprintf(psync_offset,sizeof(psync_offset),"%lld",server.cached_master->reploff+1);
        redisLog(REDIS_NOTICE,"Trying apartial resynchronization (request %s:%s).", psync_runid, psync_offset);
    } else {
        //      
        //    "PSYNC ? -1" ,       
        redisLog(REDIS_NOTICE,"Partialresynchronization not possible (no cached master)");
        psync_runid = "?";
        memcpy(psync_offset,"-1",3);
    }
 
    /* Issue the PSYNC command */
    //         PSYNC   
    reply =sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
 
    //     FULLRESYNC ,   full-resync
    if(!strncmp(reply,"+FULLRESYNC",11)) {
        char *runid = NULL, *offset = NULL;
 
        /* FULL RESYNC, parse the reply inorder to extract the run id
         * and the replication offset. */
        //            run id
        runid = strchr(reply,' ');
        if (runid) {
            runid++;
            offset = strchr(runid,' ');
            if (offset) offset++;
        }
        //    run id     
        if (!runid || !offset ||(offset-runid-1) != REDIS_RUN_ID_SIZE) {
            redisLog(REDIS_WARNING,
                "Master replied with wrong+FULLRESYNC syntax.");
            /* This is an unexpected condition,actually the +FULLRESYNC
             * reply means that the mastersupports PSYNC, but the reply
             * format seems wrong. To stay safe we blankthe master
             * runid to make sure next PSYNCswill fail. */
            //        PSYNC ,          run id
            //     run id    0 ,    PSYNC    
           memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
        } else {
            //    run id
            memcpy(server.repl_master_runid,runid, offset-runid-1);
           server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
            //    initial offset
            server.repl_master_initial_offset =strtoll(offset,NULL,10);
            //     ,     FULL resync
            redisLog(REDIS_NOTICE,"Fullresync from master: %s:%lld",
                server.repl_master_runid,
                server.repl_master_initial_offset);
        }
        /* We are going to full resync, discardthe cached master structure. */
        //         ,     master      ,   
        replicationDiscardCachedMaster();
        sdsfree(reply);
       
        //     
        return PSYNC_FULLRESYNC;
    }
 
    //     CONTINUE ,   partial resync
    if(!strncmp(reply,"+CONTINUE",9)) {
        /* Partial resync was accepted, set thereplication state accordingly */
        redisLog(REDIS_NOTICE,
            "Successful partial resynchronizationwith master.");
        sdsfree(reply);
        //       master      master
        replicationResurrectCachedMaster(fd);
 
        //     
        return PSYNC_CONTINUE;
    }
 
    /* If we reach this point we receied eitheran error since the master does
     * not understand PSYNC, or an unexpectedreply from the master.
     * Return PSYNC_NOT_SUPPORTED to the callerin both cases. */
 
    //      ?
    if (strncmp(reply,"-ERR",4)) {
        /* If it's not an error, log theunexpected event. */
        redisLog(REDIS_WARNING,
            "Unexpected reply to PSYNCfrom master: %s", reply);
    } else {
        redisLog(REDIS_NOTICE,
            "Master does not support PSYNCor is in "
            "error state (reply:%s)", reply);
    }
    sdsfree(reply);
    replicationDiscardCachedMaster();
 
    //         PSYNC
    return PSYNC_NOT_SUPPORTED;
}
 
//                  
voidsyncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(privdata);
    REDIS_NOTUSED(mask);
 
    /* If this event fired after the userturned the instance into a master
     * with SLAVEOF NO ONE we must just returnASAP. */
    //      SLAVEOF NO ONE   ,     fd
    if (server.repl_state == REDIS_REPL_NONE) {
        close(fd);
        return;
    }
 
    /* Check for errors in the socket. */
    //        
    if (getsockopt(fd, SOL_SOCKET, SO_ERROR,&sockerr, &errlen) == -1)
        sockerr = errno;
    if (sockerr) {
       aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
        redisLog(REDIS_WARNING,"Errorcondition on socket for SYNC: %s",
            strerror(sockerr));
        goto error;
    }
 
    /* If we were connecting, it's time to senda non blocking PING, we want to
     * make sure the master is able to replybefore going into the actual
     * replication process where we have longtimeouts in the order of
     * seconds (in the meantime the slave wouldblock). */
    //       CONNECTING ,           ,
    //               PONG
    //        RDB         ,                
    if (server.repl_state ==REDIS_REPL_CONNECTING) {
        redisLog(REDIS_NOTICE,"Nonblocking connect for SYNC fired the event.");
        /* Delete the writable event so thatthe readable event remains
         * registered and we can wait for thePONG reply. */
        //        PING ,         
       aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
        //     
        server.repl_state =REDIS_REPL_RECEIVE_PONG;
        /* Send the PING, don't check forerrors at all, we have the timeout
         * that will take care about this. */
        //      PING
       syncWrite(fd,"PING\r
",6,100); // , PONG return; } /* Receive the PONG command. */ // PONG if (server.repl_state ==REDIS_REPL_RECEIVE_PONG) { char buf[1024]; /* Delete the readable event, we nolonger need it now that there is * the PING reply to read. */ // PONG , aeDeleteFileEvent(server.el,fd,AE_READABLE); /* Read the reply with explicittimeout. */ // PONG buf[0] = '\0'; // PONG if (syncReadLine(fd,buf,sizeof(buf), server.repl_syncio_timeout*1000) ==-1) { redisLog(REDIS_WARNING, "I/O error reading PINGreply from master: %s", strerror(errno)); goto error; } /* We accept only two replies as valid,a positive +PONG reply * (we just check for "+") oran authentication error. * Note that older versions of Redisreplied with "operation not * permitted" instead of using aproper error code, so we test * both. */ // : // +PONG , -NOAUTH if (buf[0] != '+' && strncmp(buf,"-NOAUTH",7)!= 0 && strncmp(buf,"-ERR operationnot permitted",28) != 0) { // redisLog(REDIS_WARNING,"Errorreply to PING from master: '%s'",buf); goto error; } else { // PONG redisLog(REDIS_NOTICE, "Master replied to PING,replication can continue..."); } } /* AUTH with the master if required. */ // if(server.masterauth) { err =sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL); if (err[0] == '-') { redisLog(REDIS_WARNING,"Unableto AUTH to MASTER: %s",err); sdsfree(err); goto error; } sdsfree(err); } /* Set the slave port, so that Master'sINFO command can list the * slave listening port correctly. */ // , // INFO { sds port = sdsfromlonglong(server.port); err =sendSynchronousCommand(fd,"REPLCONF","listening-port",port, NULL); sdsfree(port); /* Ignore the error if any, not all theRedis versions support * REPLCONF listening-port. */ if (err[0] == '-') { redisLog(REDIS_NOTICE,"(Noncritical) Master does not understand REPLCONF listening-port: %s", err); } sdsfree(err); } /* Try a partial resynchonization. If wedon't have a cached master * slaveTryPartialResynchronization() willat least try to use PSYNC * to start a full resynchronization sothat we get the master run id * and the global offset, to try a partialresync at the next * reconnection attempt. */ // resync , full-resync psync_result =slaveTryPartialResynchronization(fd); // resync if (psync_result == PSYNC_CONTINUE) { redisLog(REDIS_NOTICE, "MASTER SLAVE sync: Master accepted a Partial Resynchronization."); // return; } /* Fall back to SYNC if needed. Otherwisepsync_result == PSYNC_FULLRESYNC * and the server.repl_master_runid andrepl_master_initial_offset are * already populated. */ // PSYNC , SYNC if (psync_result == PSYNC_NOT_SUPPORTED) { redisLog(REDIS_NOTICE,"Retryingwith SYNC..."); // SYNC if(syncWrite(fd,"SYNC\r
",6,server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING,"I/Oerror writing to MASTER: %s", strerror(errno)); goto error; } } // , // psync_result == PSYNC_FULLRESYNC PSYNC_NOT_SUPPORTED /* Prepare a suitable temp file for bulktransfer */ // , RDB while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); } if (dfd == -1) { redisLog(REDIS_WARNING,"Openingthe temp file needed for MASTER SLAVE synchronization:%s",strerror(errno)); goto error; } /* Setup the non blocking download of thebulk file. */ // , RDB if (aeCreateFileEvent(server.el,fd,AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) { redisLog(REDIS_WARNING, "Can't create readable eventfor SYNC: %s (fd=%d)", strerror(errno),fd); goto error; } // server.repl_state = REDIS_REPL_TRANSFER; // server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_fd = dfd; server.repl_transfer_lastio =server.unixtime; server.repl_transfer_tmpfile =zstrdup(tmpfile); return; error: close(fd); server.repl_transfer_s = -1; server.repl_state = REDIS_REPL_CONNECT; return; }

二、命令伝播
一般に、同期コマンドを実行した後、プライマリ・スレーブ・サーバのデータベースの状態は一致するはずですが、プライマリ・スレーブ・サーバが同期を完了したばかりの後、クライアントがプライマリ・サーバに関連する書き込みコマンドを送信したと仮定すると、プライマリ・スレーブ・データベースの状態は一致しません.このとき、プライマリ・サーバは、プライマリ・サーバから実行された書き込みコマンドをセカンダリ・サーバに送信する必要があります.サーバから実行すると、両方のデータベースのステータスが一致します.