Redisソース分析のcluster meet
14806 ワード
Redis clusterはredisが公式に提案した分散クラスタソリューションであり、それ以前にcodis、Twemproxyなどのサードパーティのオプションソリューションがあった.cluster内部ではgossipプロトコルを用いて通信し,データの最終的な一貫性を達成した.詳しくは公式サイトRedis cluster tutorialを参照してください.本明細書では、
redis serverがcluster modeで起動されると、すなわちプロファイルの
そしてnodeにconfファイルにロックをかけ、各ノードが独自のclusterプロファイルを使用していることを確認します.
この機会にredisがどのように使用するファイルロックを学ぶ.
次にnodeをロードします.confファイル、このプロセスはまたこのファイルが合理的かどうかを検査します.
ロードに失敗した場合(またはプロファイルが存在しない場合)、
次に、
その後、リスニングポート上の読み取り可能なイベントが登録され、イベントコールバック関数は
現在のノードは、他のクラスタノードからTCPチェーン構築要求を受信すると、
最後にreset mfに関連するパラメータです.
Aノードは、
この関数はまずいくつかのipとportの合理性の検査を行って、それから見たnodesを遍歴して、このip:portの対応するnodeは
Aノードは、タイミングタスク
自分が見たnodesを巡回し、Bノードを巡回すると
BノードがAノードからgossip送信を受信すると、コールバック関数
accept対端connectのたびに
このときBノードはまだAノードを認識していないため,Bノードが自分のnodesからAノードを探すことは見つからないのでsenderは空であるため,このような論理に入る.同じランダムな名前でCLUSTER_NODE_HANDSHAKEはflagのためにnodeを作成し、自分のnodesに参加します.このロジックの最後にAノードにPONGメッセージが返信されます.
同様に、gossipメッセージは
このとき,AノードはBノードからのメッセージに基づいて,AノードnodesにおけるBノードの名前を訂正し,handshake状態を解消する.
Bノードが
いくつかの論理をしますが、今回議論することとは関係なく、後で詳しく書きます.
PINGとMEETメッセージについては、どうしてもPONGメッセージが返信されます.
論理的には,BノードのnodesにおけるAノードの名前を訂正し,Aノードのhandshake flagを取り除く.
これで、
cluster meet
コマンドの実装によって、その中のいくつかの通信の詳細を探ることを試みる.Redis serverがcluster modeで起動すると、ノードAがノードBが存在するクラスタに参加したい場合は、CLUSTER MEET ip port
というコマンドを実行するだけでよく、gossip通信により、最終的にBが存在するクラスタの他のノードもAを認識することが知られている.大まかなフローチャートは以下の通りです.初期化
redis serverがcluster modeで起動されると、すなわちプロファイルの
cluster-enabled
オプションがtrue
に設定され、サービス起動時にcluster初期化のプロセスがあり、これは前述の「Redis起動プロセス」に記載されている実行関数clusterInit
である.clusterには、clusterState
、clusterNode
、およびclusterLink
の3つのデータ構造が重要である.各ノードにはclusterState
構造が保存されており、この構造は現在のノードの視点でクラスタが現在置かれている状態、すなわち「私が見た世界がどのようになっているか」を記録している.各ノードは、clusterNode
構造を使用して自分の状態を記録し、クラスタ内の他のすべてのノード(プライマリノードとスレーブノードを含む)に対応するclusterNode
構造を作成し、他のノードの状態を記録します.clusterNode
構造のlink
属性は、ソケット記述子、入力バッファ、および出力バッファなどの接続ノードに必要な情報を格納するclusterLink
構造である.詳細は、Webページ「redis設計と実装-ノード」で説明します.この初期化は簡単です.まずclusterState
構造を作成し、次のようにメンバーを初期化します.server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0; // currentEpoch = 0
server.cluster->state = CLUSTER_FAIL; // FAIL
server.cluster->size = 1;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType,NULL);
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
server.cluster->stats_bus_messages_sent = 0;
server.cluster->stats_bus_messages_received = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots(); // Clear the migrating/importing state for all the slots
そしてnodeにconfファイルにロックをかけ、各ノードが独自のclusterプロファイルを使用していることを確認します.
if (clusterLockConfig(server.cluster_configfile) == C_ERR)
exit(1);
この機会にredisがどのように使用するファイルロックを学ぶ.
int fd = open(filename,O_WRONLY|O_CREAT,0644);
if (fd == -1) {
serverLog(LL_WARNING,
"Can't open %s in order to acquire a lock: %s",
filename, strerror(errno));
return C_ERR;
}
if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
if (errno == EWOULDBLOCK) {
serverLog(LL_WARNING,
"Sorry, the cluster configuration file %s is already used "
"by a different Redis Cluster node. Please make sure that "
"different nodes use different cluster configuration "
"files.", filename);
} else {
serverLog(LL_WARNING,
"Impossible to lock %s: %s", filename, strerror(errno));
}
close(fd);
return C_ERR;
}
次にnodeをロードします.confファイル、このプロセスはまたこのファイルが合理的かどうかを検査します.
ロードに失敗した場合(またはプロファイルが存在しない場合)、
REDIS_NODE_MYSELF|REDIS_NODE_MASTER
をタグとしてclusterNode構造を作成して自分自身を表し、メインノードに設定し、自分の名前を40バイトのランダム列に設定します.ノードをserverに追加します.cluster->nodesでは、新しく起動したノードであり、生成されたプロファイルがブラシされていることを示します.if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
myself = server.cluster->myself =
createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
myself->name);
clusterAddNode(myself);
saveconf = 1;
}
if (saveconf) clusterSaveConfigOrDie(1); // , ,fsync
次に、
listenToPort
関数を呼び出し、クラスタgossip通信ポートにsocket fdを作成して傍受する.クラスタ内gossip通信ポートは、例えば、Redisリスニングクライアントのポートが6379である場合、クラスタリスニングポートは16379であり、他のクラスタノードから送信されたgossipメッセージを受信するために使用される.その後、リスニングポート上の読み取り可能なイベントが登録され、イベントコールバック関数は
clusterAcceptHandler
である.#define CLUSTER_PORT_INCR 10000
if (listenToPort(server.port+CLUSTER_PORT_INCR,
server.cfd,&server.cfd_count) == C_ERR)
{
exit(1);
} else {
int j;
for (j = 0; j < server.cfd_count; j++) {
if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR)
serverPanic("Unrecoverable error creating Redis Cluster "
"file event.");
}
}
現在のノードは、他のクラスタノードからTCPチェーン構築要求を受信すると、
clusterAcceptHandler
関数accept接続を呼び出す.clusterAcceptHandler
関数では、accept済みのリンクごとにclusterLink
構造が作成され、socket fd上の読み取り可能なイベントが登録され、イベントコールバック関数はclusterReadHandler
である.#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
clusterLink *link;
... ...
// , , UPDATE
if (server.masterhost == NULL && server.loading) return;
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_VERBOSE,
"Error accepting cluster node: %s", server.neterr);
return;
}
anetNonBlock(NULL,cfd);
anetEnableTcpNoDelay(NULL,cfd);
... ...
// link
// , link->node null,
link = createClusterLink(NULL);
link->fd = cfd;
aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
}
}
最後にreset mfに関連するパラメータです.
CLUSTER MEET
AノードCLUSTER MEETコマンド受信
Aノードは、
cluster.c
->clusterCommand
関数で、CLUSTER MEET
命令、すなわちif (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
long long port;
// CLUSTER MEET
if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
addReplyErrorFormat(c,"Invalid TCP port specified: %s", (char*)c->argv[3]->ptr);
return;
}
if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 && errno == EINVAL)
{
addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
(char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
} else {
addReply(c,shared.ok);
}
}
clusterStartHandshake
という関数に重点を置いていることがわかります.int clusterStartHandshake(char *ip, int port) {
clusterNode *n;
char norm_ip[NET_IP_STR_LEN];
struct sockaddr_storage sa;
/* IP and Port sanity check */
... ...
// (flag) norm_ip:port
if (clusterHandshakeInProgress(norm_ip,port)) {
errno = EAGAIN;
return 0;
}
// node,type CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET
// handshake
n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
clusterAddNode(n);
return 1;
}
clusterNode *createClusterNode(char *nodename, int flags) {
clusterNode *node = zmalloc(sizeof(*node));
if (nodename)
memcpy(node->name, nodename, CLUSTER_NAMELEN);
else
// nodename , ,
getRandomHexChars(node->name, CLUSTER_NAMELEN);
node->ctime = mstime(); // mstime
node->configEpoch = 0;
node->flags = flags;
memset(node->slots,0,sizeof(node->slots));
node->slaveof = NULL;
... ...
node->link = NULL; // link , clusterCron
memset(node->ip,0,sizeof(node->ip));
node->port = 0;
node->fail_reports = listCreate();
... ...
listSetFreeMethod(node->fail_reports,zfree);
return node;
}
この関数はまずいくつかのipとportの合理性の検査を行って、それから見たnodesを遍歴して、このip:portの対応するnodeは
CLUSTER_NODE_HANDSHAKE
の状態にあるかどうか、そうであれば、これはmeetを繰り返していることを説明して、下に行く必要はありません.その後、createClusterNode
関数によってCLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET
タグ付きノードが作成され、名前はランダムな40バイト文字列(この場合、AにとってBは見知らぬノードであり、情報はipとportを除いて知らないため)、clusterAddNode
関数によって自分のnodesに加算される.このプロセスが成功すると,クライアントにOKを返し,他のことはgossip通信で行う必要がある.AノードはMEET gossipメッセージをBノードに送信する
Aノードは、タイミングタスク
clusterCron
において、いくつかのことを行う.handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;
// disconnected nodes
di = dictGetSafeIterator(server.cluster->nodes); //
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
// myself noaddr
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
// handshake , handshake_timeout, nodes
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
clusterDelNode(node);
continue;
}
// cluster meet node , server ,
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;
// gossip node + 10000, tcp , client
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+CLUSTER_PORT_INCR, NET_FIRST_BIND_ADDR);
... ...
link = createClusterLink(node);
link->fd = fd;
node->link = link;
// link->fd , clusterReadHandler
aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link);
... ...
// node MEET flag, MEET PING,
// nodes
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
... ...
node->flags &= ~CLUSTER_NODE_MEET;
... ...
}
}
dictReleaseIterator(di);
自分が見たnodesを巡回し、Bノードを巡回すると
node->link == NULL
のため、Bの起動ポート番号+10000、すなわちgossip通信ポートが傍受され、その後、読み取り可能なイベントが登録され、処理関数はclusterReadHandler
であることがわかる.次にCLUSTER_を送りますNODE_MEETメッセージはBノードに与え,Bノードのmeet状態を消去する.BノードはAからのMEET gossipメッセージを処理する
BノードがAノードからgossip送信を受信すると、コールバック関数
clusterAcceptHandler
が処理し、accept対端のconnect(serverとして、対端をclientとする)が行われ、読み取り可能なイベントが登録され、コールバック関数はclusterReadHandler
であり、基本論理は以下の通りである.void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
clusterLink *link;
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
// , , UPDATE
if (server.masterhost == NULL && server.loading) return;
while(max--) { // 1000
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_VERBOSE,
"Error accepting cluster node: %s", server.neterr);
return;
}
anetNonBlock(NULL,cfd);
anetEnableTcpNoDelay(NULL,cfd);
serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
// link
// , link->node null,
link = createClusterLink(NULL);
link->fd = cfd;
aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
}
}
accept対端connectのたびに
clusterLink
構造が作成され、データを受信することができます.typedef struct clusterLink {
mstime_t ctime; /* Link creation time */
int fd; /* TCP socket file descriptor */
sds sndbuf; /* Packet send buffer */
sds rcvbuf; /* Packet reception buffer */
struct clusterNode *node; /* Node related to this link if any, or NULL */
} clusterLink;
clusterLink
にはnode自身を指すポインタがあります.BノードはAノードから送信された情報を受信し、clusterLink
のrcvbuf
フィールドに配置し、clusterProcessPacket
関数を使用して処理する(受信データプロセスは簡単で、分析を行わない).clusterProcessPacket
関数の役割は、他の人から送られてきたgossipパケットを処理することです.if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;
// CLUSTER_NODE_HANDSHAKE cluster node,
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
nodeIp2String(node->ip,link); // ip port link
node->port = ntohs(hdr->port);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
.....
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
このときBノードはまだAノードを認識していないため,Bノードが自分のnodesからAノードを探すことは見つからないのでsenderは空であるため,このような論理に入る.同じランダムな名前でCLUSTER_NODE_HANDSHAKEはflagのためにnodeを作成し、自分のnodesに参加します.このロジックの最後にAノードにPONGメッセージが返信されます.
Aノードは、Bノードから返信されたPONG gossipメッセージを処理する
同様に、gossipメッセージは
clusterProcessPacket
で処理される.if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) {
... ...
if (link->node) {
if (nodeInHandshake(link->node)) { // node
... ...
clusterRenameNode(link->node, hdr->sender); //
link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; // handshake
link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}
このとき,AノードはBノードからのメッセージに基づいて,AノードnodesにおけるBノードの名前を訂正し,handshake状態を解消する.
BノードはPING gossipメッセージをAノードに送信する
Bノードが
clusterCron
をしているとき、自分が見たAノードのlinkが空であるnode->link == NULL
であることに気づいた.これは、上述したAノードがBノードにMEETメッセージを送るのと似ているが、BノードがAノードを見てmeet flagがないので、PINGメッセージが送信される.AノードはBノードからのPINGメッセージを処理する
いくつかの論理をしますが、今回議論することとは関係なく、後で詳しく書きます.
PINGとMEETメッセージについては、どうしてもPONGメッセージが返信されます.
BノードAノードから返信されたPONGメッセージの処理
論理的には,BノードのnodesにおけるAノードの名前を訂正し,Aノードのhandshake flagを取り除く.
小結
これで、
cluster meet
コマンドが実行する完全なプロセスが説明され、フローチャートを描くことで、このフローをよりよく理解することができます.