Redisソース分析:マスターコピー


ソースバージョン:2.4.4更新2014.3.17,base 2.8.7
redisの主従レプリケーションは簡単だが機能が強く、以下の特徴がある:1.1つのmasterは複数のslave接続をサポートし、slaveは他のslaveの接続を受け入れることができる.プライマリスレーブ同期時、masterとslaveはいずれも非ブロックのredisプライマリスレーブレプリケーションに用いることができる:1.data redundancy 2. slaveはmasterの拡張としてread-onlyのサービスを提供している.データの永続化をslaveで行うことで、masterのパフォーマンスを向上させることができます.簡単な構成でslave(master側は構成する必要はありません)を使用することで、ユーザーはredisの主従レプリケーション関連構成(redis.conf):slaveofを使用して、このredisサービスをslaveとして表すことができます.マスターipとマスターportがそれぞれマスターのipとportマスターauth<マスター-password>マスターにセキュリティパスワードが設定されている場合は、ここでは対応するパスワードslave-serve-stale-data yes slaveがマスターを失うか同期が進行中の場合に、slaveに対するサービス要求が発生した場合:slave-serve-stale-dataがyesに設定されている場合、slaveは依然として正常にサービスを提供していますslave-serve-stale-dataがnoに設定されている場合、slaveはclientエラーを返します:“SYNC with master in progress”repl-ping-slave-period 10 slaveはPINGSをmasterに送信する時間間隔repl-timeout 60 IOタイムアウト時間
コード:
slave端子
slaveステータス:
/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0/* No active replication */
#define REDIS_REPL_CONNECT 1/* Must connect to master */
#define REDIS_REPL_CONNECTING 2/* Connecting to master */
#define REDIS_REPL_TRANSFER 3/* Receiving .rdb from master */
#define REDIS_REPL_CONNECTED 4/* Connected to master */
初期化時の設定
server.replstate = REDIS_REPL_CONNECT
すなわちslaveはmasterに接続する必要がある
slaveはreplicationCronを周期的に呼び出し、slaveステータスを表示します.
void replicationCron(void) {
    /*    IO  */
    if (server.masterhost && server.replstate == REDIS_REPL_TRANSFER &&
        (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
    {
        redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER...");
        replicationAbortSyncTransfer(); //    ,   server.replstate = REDIS_REPL_CONNECT;
    }

    /* Timed out master when we are an already connected slave? */
    if (server.masterhost && server.replstate == REDIS_REPL_CONNECTED &&
        (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
    {
        redisLog(REDIS_WARNING,"MASTER time out: no data nor PING received...");
        freeClient(server.master);
    }

    /* Check if we should connect to a MASTER */
    if (server.replstate == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER...");
        if (connectWithMaster() == REDIS_OK) { //  master
            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
        }
    }
    
    /* If we have attached slaves, PING them from time to time.
     * So slaves can implement an explicit timeout to masters, and will
     * be able to detect a link disconnection even if the TCP connection
     * will not actually go down. */
    if (!(server.cronloops % (server.repl_ping_slave_period*10))) {
        listIter li;
        listNode *ln;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;

            /* Don't ping slaves that are in the middle of a bulk transfer
             * with the master for first synchronization. */
            if (slave->replstate == REDIS_REPL_SEND_BULK) continue;
            if (slave->replstate == REDIS_REPL_ONLINE) {
                /* If the slave is online send a normal ping */
                addReplySds(slave,sdsnew("PING\r
")); } else { /* Otherwise we are in the pre-synchronization stage. * Just a newline will do the work of refreshing the * connection last interaction time, and at the same time * we'll be sure that being a single char there are no * short-write problems. */ if (write(slave->fd, "
", 1) == -1) { /* Don't worry, it's just a ping. */ } } } } }

server.replstate == REDIS_REPL_CONNECTではslaveがマスターに接続され、接続に成功するとslaveがsyncWithMaster関数を実行し、syncWithMasterがSYNCコマンドをマスターに送信します
int connectWithMaster(void) {
    int fd;

    fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
    if (fd == -1) {
        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
            strerror(errno));
        return REDIS_ERR;
    }

    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
    {
        close(fd);
        redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
        return REDIS_ERR;
    }

    server.repl_transfer_s = fd;
    server.replstate = REDIS_REPL_CONNECTING;
    return REDIS_OK;
}

マスターエンド:
マスターはslaveの接続とclientの接続の統一処理について、slaveから発行されたSYNCコマンドを受信した後、syncCommandを実行すると、syncCommandは現在の状態を表示し、スナップショットをしている場合は待機し、そうでない場合はバックグラウンドプロセスを起動してスナップショットを行います.
void syncCommand(redisClient *c) {
    /* ignore SYNC if aleady slave or in monitor mode */
    if (c->flags & REDIS_SLAVE) return;

    /* Refuse SYNC requests if we are a slave but the link with our master
     * is not ok... */
    if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) {
        addReplyError(c,"Can't SYNC while not connected with my master");
        return;
    }

    /* SYNC can't be issued when the server has pending data to send to
     * the client about already issued commands. We need a fresh reply
     * buffer registering the differences between the BGSAVE and the current
     * dataset, so that we can copy to other slaves if needed. */
    if (listLength(c->reply) != 0) {
        addReplyError(c,"SYNC is invalid with pending input");
        return;
    }

    redisLog(REDIS_NOTICE,"Slave ask for synchronization");
    /* Here we need to check if there is a background saving operation
     * in progress, or if it is required to start one */
    if (server.bgsavechildpid != -1) {
       .....
    } else {
        /* Ok we don't have a BGSAVE in progress, let's start one */
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
        if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
            addReplyError(c,"Unable to perform background save");
            return;
        }
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
    }
    c->repldbfd = -1;
    c->flags |= REDIS_SLAVE;
    c->slaveseldb = 0;
    listAddNodeTail(server.slaves,c);
    return;
}

スナップショットが完了したら、updateSlavesWaitingBgsave関数を実行します.updateSlavesWaitingBgsaveは、現在のマスターの各slaveの状態を表示します.bgsaveの完了を待っている場合は、イベントsendBulkToSlaveを登録します.sendBulkToSlaveはスナップショットファイルをslaveに送信します.
void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;

    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;

        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            startbgsave = 1;
            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
            struct redis_stat buf;

            if (bgsaveerr != REDIS_OK) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                continue;
            }
            if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                continue;
            }
            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            slave->replstate = REDIS_REPL_SEND_BULK;
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); //        
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { //       
                freeClient(slave);
                continue;
            }
        }
    }
    if (startbgsave) {
        if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
            listIter li;

            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;

                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                    freeClient(slave);
            }
        }
    }
}
ブロックアプリケーションを回避するために、毎回16 Kデータのみが伝送される
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
    ......
    lseek(slave->repldbfd,slave->repldboff,SEEK_SET); //            
    buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); //  16K  
    ......
    if ((nwritten = write(fd,buf,buflen)) == -1) { //     slave
        if (errno != EAGAIN) {
            redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",
                strerror(errno));
            freeClient(slave);
        }
        return;
    }
    slave->repldboff += nwritten; //       
    ......
}

slaveが最初の同期を完了した後、masterがdb状態を変更するコマンドを受信した場合、replicationFeedSlavesを呼び出して対応する変更をslaveに送信する
/* Call() is the core of Redis execution of a command */
void call(redisClient *c) {
    long long dirty, start = ustime(), duration;

    dirty = server.dirty;
    c->cmd->proc(c);
    dirty = server.dirty-dirty;
    duration = ustime()-start;
    slowlogPushEntryIfNeeded(c->argv,c->argc,duration);

    if (server.appendonly && dirty > 0)
        feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
    if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
        listLength(server.slaves))
        replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
    if (listLength(server.monitors))
        replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
    server.stat_numcommands++;
}

まとめ:1.redis主従レプリケーションは、あまり追加コードを追加していませんが、機能が強く、複数のslaveをサポートし、masterとしてslaveをサポートしています.2.redisは主従レプリケーションがブロックされていないと主張するが、redisは単一スレッドサービスを使用するため、slaveとのインタラクションは処理スレッドによって統一的に処理されるため、性能に一定の影響を及ぼす