Redisソース分析のcluster meet

14806 ワード

Redis clusterはredisが公式に提案した分散クラスタソリューションであり、それ以前にcodis、Twemproxyなどのサードパーティのオプションソリューションがあった.cluster内部ではgossipプロトコルを用いて通信し,データの最終的な一貫性を達成した.詳しくは公式サイトRedis cluster tutorialを参照してください.本明細書では、cluster meetコマンドの実装によって、その中のいくつかの通信の詳細を探ることを試みる.Redis serverがcluster modeで起動すると、ノードAがノードBが存在するクラスタに参加したい場合は、CLUSTER MEET ip portというコマンドを実行するだけでよく、gossip通信により、最終的にBが存在するクラスタの他のノードもAを認識することが知られている.大まかなフローチャートは以下の通りです.

初期化


redis serverがcluster modeで起動されると、すなわちプロファイルのcluster-enabledオプションがtrueに設定され、サービス起動時にcluster初期化のプロセスがあり、これは前述の「Redis起動プロセス」に記載されている実行関数clusterInitである.clusterには、clusterStateclusterNode、およびclusterLinkの3つのデータ構造が重要である.各ノードにはclusterState構造が保存されており、この構造は現在のノードの視点でクラスタが現在置かれている状態、すなわち「私が見た世界がどのようになっているか」を記録している.各ノードは、clusterNode構造を使用して自分の状態を記録し、クラスタ内の他のすべてのノード(プライマリノードとスレーブノードを含む)に対応するclusterNode構造を作成し、他のノードの状態を記録します.clusterNode構造のlink属性は、ソケット記述子、入力バッファ、および出力バッファなどの接続ノードに必要な情報を格納するclusterLink構造である.詳細は、Webページ「redis設計と実装-ノード」で説明します.この初期化は簡単です.まずclusterState構造を作成し、次のようにメンバーを初期化します.
server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0;     //      currentEpoch = 0
server.cluster->state = CLUSTER_FAIL; //        FAIL
server.cluster->size = 1;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType,NULL);
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
server.cluster->stats_bus_messages_sent = 0;
server.cluster->stats_bus_messages_received = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots(); // Clear the migrating/importing state for all the slots

そしてnodeにconfファイルにロックをかけ、各ノードが独自のclusterプロファイルを使用していることを確認します.
if (clusterLockConfig(server.cluster_configfile) == C_ERR)
    exit(1);

この機会にredisがどのように使用するファイルロックを学ぶ.
int fd = open(filename,O_WRONLY|O_CREAT,0644);
if (fd == -1) {
    serverLog(LL_WARNING,
              "Can't open %s in order to acquire a lock: %s",
              filename, strerror(errno));
    return C_ERR;
}

if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
    if (errno == EWOULDBLOCK) {
        serverLog(LL_WARNING,
                  "Sorry, the cluster configuration file %s is already used "
                  "by a different Redis Cluster node. Please make sure that "
                  "different nodes use different cluster configuration "
                  "files.", filename);
    } else {
        serverLog(LL_WARNING,
                  "Impossible to lock %s: %s", filename, strerror(errno));
    }
    close(fd);
    return C_ERR;
}

次にnodeをロードします.confファイル、このプロセスはまたこのファイルが合理的かどうかを検査します.
ロードに失敗した場合(またはプロファイルが存在しない場合)、REDIS_NODE_MYSELF|REDIS_NODE_MASTERをタグとしてclusterNode構造を作成して自分自身を表し、メインノードに設定し、自分の名前を40バイトのランダム列に設定します.ノードをserverに追加します.cluster->nodesでは、新しく起動したノードであり、生成されたプロファイルがブラシされていることを示します.
if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
    myself = server.cluster->myself =
        createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
    serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
              myself->name);
    clusterAddNode(myself);
    saveconf = 1;
}
if (saveconf) clusterSaveConfigOrDie(1); //    ,          ,fsync

次に、listenToPort関数を呼び出し、クラスタgossip通信ポートにsocket fdを作成して傍受する.クラスタ内gossip通信ポートは、例えば、Redisリスニングクライアントのポートが6379である場合、クラスタリスニングポートは16379であり、他のクラスタノードから送信されたgossipメッセージを受信するために使用される.
その後、リスニングポート上の読み取り可能なイベントが登録され、イベントコールバック関数はclusterAcceptHandlerである.
#define CLUSTER_PORT_INCR 10000

if (listenToPort(server.port+CLUSTER_PORT_INCR,
                 server.cfd,&server.cfd_count) == C_ERR)
{
    exit(1);
} else {
    int j;
    for (j = 0; j < server.cfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE, 
                              clusterAcceptHandler, NULL) == AE_ERR)
            serverPanic("Unrecoverable error creating Redis Cluster "
                        "file event.");
    }
}

現在のノードは、他のクラスタノードからTCPチェーン構築要求を受信すると、clusterAcceptHandler関数accept接続を呼び出す.clusterAcceptHandler関数では、accept済みのリンクごとにclusterLink構造が作成され、socket fd上の読み取り可能なイベントが登録され、イベントコールバック関数はclusterReadHandlerである.
#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    clusterLink *link;
    ... ...
    //          ,           ,    UPDATE             
    if (server.masterhost == NULL && server.loading) return;
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_VERBOSE,
                    "Error accepting cluster node: %s", server.neterr);
            return;
        }
        anetNonBlock(NULL,cfd);
        anetEnableTcpNoDelay(NULL,cfd);
        ... ...
        //      link        
        //       , link->node      null,              
        link = createClusterLink(NULL);
        link->fd = cfd;
        aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
    }
}

最後にreset mfに関連するパラメータです.

CLUSTER MEET


AノードCLUSTER MEETコマンド受信


Aノードは、cluster.c->clusterCommand関数で、CLUSTER MEET命令、すなわち
if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
    long long port;

    // CLUSTER MEET  
    if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
        addReplyErrorFormat(c,"Invalid TCP port specified: %s", (char*)c->argv[3]->ptr);
        return;
    }
    if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 && errno == EINVAL)
    {
        addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                            (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
    } else {
        addReply(c,shared.ok);
    }
}
clusterStartHandshakeという関数に重点を置いていることがわかります.
int clusterStartHandshake(char *ip, int port) {
    clusterNode *n;
    char norm_ip[NET_IP_STR_LEN];
    struct sockaddr_storage sa;
    /* IP and Port sanity check */
    ... ...
        
    //     (flag) norm_ip:port       
    if (clusterHandshakeInProgress(norm_ip,port)) { 
        errno = EAGAIN;
        return 0;
    }
    //            node,type   CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET
    //        handshake       
    n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
    memcpy(n->ip,norm_ip,sizeof(n->ip));
    n->port = port;
    clusterAddNode(n);
    return 1;
}
clusterNode *createClusterNode(char *nodename, int flags) {
    clusterNode *node = zmalloc(sizeof(*node));
    if (nodename)
        memcpy(node->name, nodename, CLUSTER_NAMELEN);
    else
        //         nodename   ,      ,              
        getRandomHexChars(node->name, CLUSTER_NAMELEN);
    node->ctime = mstime(); // mstime
    node->configEpoch = 0;
    node->flags = flags;
    memset(node->slots,0,sizeof(node->slots));
    node->slaveof = NULL;
    ... ...
    node->link = NULL; // link   ,   clusterCron       
    memset(node->ip,0,sizeof(node->ip));
    node->port = 0;
    node->fail_reports = listCreate();
    ... ...
    listSetFreeMethod(node->fail_reports,zfree);
    return node;
}

この関数はまずいくつかのipとportの合理性の検査を行って、それから見たnodesを遍歴して、このip:portの対応するnodeはCLUSTER_NODE_HANDSHAKEの状態にあるかどうか、そうであれば、これはmeetを繰り返していることを説明して、下に行く必要はありません.その後、createClusterNode関数によってCLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEETタグ付きノードが作成され、名前はランダムな40バイト文字列(この場合、AにとってBは見知らぬノードであり、情報はipとportを除いて知らないため)、clusterAddNode関数によって自分のnodesに加算される.このプロセスが成功すると,クライアントにOKを返し,他のことはgossip通信で行う必要がある.

AノードはMEET gossipメッセージをBノードに送信する


Aノードは、タイミングタスクclusterCronにおいて、いくつかのことを行う.
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;

//       disconnected nodes         
di = dictGetSafeIterator(server.cluster->nodes); //       
while((de = dictNext(di)) != NULL) {
    clusterNode *node = dictGetVal(de);
    
     //     myself   noaddr      
    if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue; 
    
    //      handshake   ,          handshake_timeout,    nodes    
    if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
        clusterDelNode(node);
        continue;
    }

    //      cluster meet        node ,   server    ,           
    if (node->link == NULL) { 
        int fd;
        mstime_t old_ping_sent;
        clusterLink *link;

        //    gossip       node    + 10000,   tcp   ,        client
        fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+CLUSTER_PORT_INCR, NET_FIRST_BIND_ADDR);
        ... ...
        link = createClusterLink(node);
        link->fd = fd;
        node->link = link;

        //    link->fd       ,        clusterReadHandler
        aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link);
        ... ...

        //    node    MEET flag,       MEET      PING,
        //                   nodes  
        clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
        ... ...
        node->flags &= ~CLUSTER_NODE_MEET;
        ... ...
    }
}
dictReleaseIterator(di);

自分が見たnodesを巡回し、Bノードを巡回するとnode->link == NULLのため、Bの起動ポート番号+10000、すなわちgossip通信ポートが傍受され、その後、読み取り可能なイベントが登録され、処理関数はclusterReadHandlerであることがわかる.次にCLUSTER_を送りますNODE_MEETメッセージはBノードに与え,Bノードのmeet状態を消去する.

BノードはAからのMEET gossipメッセージを処理する


BノードがAノードからgossip送信を受信すると、コールバック関数clusterAcceptHandlerが処理し、accept対端のconnect(serverとして、対端をclientとする)が行われ、読み取り可能なイベントが登録され、コールバック関数はclusterReadHandlerであり、基本論理は以下の通りである.
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    clusterLink *link;
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    //          ,           ,   UPDATE             
    if (server.masterhost == NULL && server.loading) return;
    while(max--) { // 1000    
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_VERBOSE,
                    "Error accepting cluster node: %s", server.neterr);
            return;
        }
        anetNonBlock(NULL,cfd);
        anetEnableTcpNoDelay(NULL,cfd);
        serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
   
        //      link        
        //       , link->node      null,              
        link = createClusterLink(NULL);
        link->fd = cfd;
        aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
    }
}

accept対端connectのたびにclusterLink構造が作成され、データを受信することができます.
typedef struct clusterLink {
    mstime_t ctime;             /* Link creation time */
    int fd;                     /* TCP socket file descriptor */
    sds sndbuf;                 /* Packet send buffer */
    sds rcvbuf;                 /* Packet reception buffer */
    struct clusterNode *node;   /* Node related to this link if any, or NULL */
} clusterLink;
clusterLinkにはnode自身を指すポインタがあります.BノードはAノードから送信された情報を受信し、clusterLinkrcvbufフィールドに配置し、clusterProcessPacket関数を使用して処理する(受信データプロセスは簡単で、分析を行わない).clusterProcessPacket関数の役割は、他の人から送られてきたgossipパケットを処理することです.
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
    clusterNode *node;

    //        CLUSTER_NODE_HANDSHAKE     cluster node,    
    node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
    nodeIp2String(node->ip,link); // ip   port      link    
    node->port = ntohs(hdr->port);

    clusterAddNode(node);
    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
.....
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);

このときBノードはまだAノードを認識していないため,Bノードが自分のnodesからAノードを探すことは見つからないのでsenderは空であるため,このような論理に入る.同じランダムな名前でCLUSTER_NODE_HANDSHAKEはflagのためにnodeを作成し、自分のnodesに参加します.このロジックの最後にAノードにPONGメッセージが返信されます.

Aノードは、Bノードから返信されたPONG gossipメッセージを処理する


同様に、gossipメッセージはclusterProcessPacketで処理される.
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) {
    ... ...
    if (link->node) {
        if (nodeInHandshake(link->node)) { // node       
            ... ...
            clusterRenameNode(link->node, hdr->sender); //      
            link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; //    handshake   
            link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
        }
}

このとき,AノードはBノードからのメッセージに基づいて,AノードnodesにおけるBノードの名前を訂正し,handshake状態を解消する.

BノードはPING gossipメッセージをAノードに送信する


BノードがclusterCronをしているとき、自分が見たAノードのlinkが空であるnode->link == NULLであることに気づいた.これは、上述したAノードがBノードにMEETメッセージを送るのと似ているが、BノードがAノードを見てmeet flagがないので、PINGメッセージが送信される.

AノードはBノードからのPINGメッセージを処理する


いくつかの論理をしますが、今回議論することとは関係なく、後で詳しく書きます.
PINGとMEETメッセージについては、どうしてもPONGメッセージが返信されます.

BノードAノードから返信されたPONGメッセージの処理


論理的には,BノードのnodesにおけるAノードの名前を訂正し,Aノードのhandshake flagを取り除く.

小結


これで、cluster meetコマンドが実行する完全なプロセスが説明され、フローチャートを描くことで、このフローをよりよく理解することができます.