Redisシリーズ(二)---クライアント要求を受け入れ、処理関数を呼び出す方法
10212 ワード
前編では、redisがクライアント要求を受け入れ、処理関数を呼び出してコマンドを実行する方法について説明したredisの起動プロセスを要約します.
前編では、initServer()という関数の中で、anetTcpServerとanetUnixServerの2つの関数を呼び出してtcpポートとunixドメインソケットの傍受を作成すると述べたが、ここではまず、この2つの関数の具体的な実装を重点的に分析する.
コードから、まずanetCreateSocket()を呼び出してソケットを作成し、sに複素値を与え、saというsockaddr_inタイプの構造体を初期化し、リスニングするポート、アドレス、アドレスファミリーを設定し、anetListen()関数を呼び出してアドレスをバインドし、ポートをリスニングする.これらの作業が完了するとanetCreateSocket関数が戻り、作成したソケットをserverにコピーする.ipfd.anetUnixServer()という関数で完了した作業はanetCreateSocketに似ており、バインドされたunix socketにすぎないことに注意してください.
次に、initServer関数で、この関数を呼び出しました:aeCreateFileEvent、ここでは最初の、2番目の類似を重点的に分析します.
まず、eventLoopのeventというaeFileEvent配列から、現在のfdに対応するacFileEventを取り出し、主に下に対応するイベントの処理関数を設定するためである.すなわち,伝達されたmaskに基づいてどのイベントであるかを判断する.
次にacApiEventというイベント登録関数を呼び出します.
この関数では、主にepoll_が呼び出されていることがわかります.ctl(state->epfd,op,fd,&ee)現在のfdを関心のあるイベントに設定してepoll_に登録するcreateが返すepollのハンドルにあります.ここに登録されているのはAEですからREADALBLEイベントであるため、このfd(すなわちredisリスニングポートのソケット)にデータ読み取り可能な場合(ここではクライアント接続が到着したことを理解する)、対応するイベント処理関数がトリガーされ、ここでのイベント処理関数はacceptTcpHandlerである.
次にacceptTcpHandlerという関数を見てみましょう.
この関数では、まずanetTcpAcceptを呼び出してクライアントがredisに接続するsocket fd(実際の呼び出し::accept(s,sa,len)関数が返すfd)を取得し、次にacceptCommonHandler()関数を呼び出し、この関数ではcreateClient()を呼び出してredisClient*cインスタンスを作成し、現在のredisサーバの接続総数が最大値を超えない場合、グローバル変数serverに接続数のstat_を記録するnumconnectionsに1を追加します.超過した場合は、クライアントがエラーメッセージを出力し、redisClientインスタンスを解放し、関数が返されます.
次にcreateClientという関数を分析します.
名前の通り、この関数はクライアント接続を表すredisClientを作成します.この関数の中には、必要な初期化のほかに、aecreateFileEvent()という関数を呼び出して、前に取得したクライアント接続のソケットノートAE_に呼び出します.READALBLEイベントは、イベント処理関数readQueryFromClientを設定し、クライアントがデータを送信して到着すると、readQueryFromClientという関数を呼び出してユーザ入力を処理する.readQueryFromClient()はredisがクライアントから送信されたコマンドを処理する開始点である.
これでredisリスニングポートからクライアント受信までが続き,クライアント送信の処理を開始するコマンドまで解析が完了する.
前編では、initServer()という関数の中で、anetTcpServerとanetUnixServerの2つの関数を呼び出してtcpポートとunixドメインソケットの傍受を作成すると述べたが、ここではまず、この2つの関数の具体的な実装を重点的に分析する.
int anetTcpServer(char *err, int port, char *bindaddr)
{
int s;
struct sockaddr_in sa;
if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR)
return ANET_ERR;
memset(&sa,0,sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
sa.sin_addr.s_addr = htonl(INADDR_ANY);
if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) {
anetSetError(err, "invalid bind address");
close(s);
return ANET_ERR;
}
if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR)
return ANET_ERR;
return s;
}
コードから、まずanetCreateSocket()を呼び出してソケットを作成し、sに複素値を与え、saというsockaddr_inタイプの構造体を初期化し、リスニングするポート、アドレス、アドレスファミリーを設定し、anetListen()関数を呼び出してアドレスをバインドし、ポートをリスニングする.これらの作業が完了するとanetCreateSocket関数が戻り、作成したソケットをserverにコピーする.ipfd.anetUnixServer()という関数で完了した作業はanetCreateSocketに似ており、バインドされたunix socketにすぎないことに注意してください.
次に、initServer関数で、この関数を呼び出しました:aeCreateFileEvent、ここでは最初の、2番目の類似を重点的に分析します.
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");
まず、eventLoopのeventというaeFileEvent配列から、現在のfdに対応するacFileEventを取り出し、主に下に対応するイベントの処理関数を設定するためである.すなわち,伝達されたmaskに基づいてどのイベントであるかを判断する.
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= AE_SETSIZE) return AE_ERR;
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
次にacApiEventというイベント登録関数を呼び出します.
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
この関数では、主にepoll_が呼び出されていることがわかります.ctl(state->epfd,op,fd,&ee)現在のfdを関心のあるイベントに設定してepoll_に登録するcreateが返すepollのハンドルにあります.ここに登録されているのはAEですからREADALBLEイベントであるため、このfd(すなわちredisリスニングポートのソケット)にデータ読み取り可能な場合(ここではクライアント接続が到着したことを理解する)、対応するイベント処理関数がトリガーされ、ここでのイベント処理関数はacceptTcpHandlerである.
次にacceptTcpHandlerという関数を見てみましょう.
この関数では、まずanetTcpAcceptを呼び出してクライアントがredisに接続するsocket fd(実際の呼び出し::accept(s,sa,len)関数が返すfd)を取得し、次にacceptCommonHandler()関数を呼び出し、この関数ではcreateClient()を呼び出してredisClient*cインスタンスを作成し、現在のredisサーバの接続総数が最大値を超えない場合、グローバル変数serverに接続数のstat_を記録するnumconnectionsに1を追加します.超過した場合は、クライアントがエラーメッセージを出力し、redisClientインスタンスを解放し、関数が返されます.
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[128];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
if (cfd == AE_ERR) {
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd);
}
次にcreateClientという関数を分析します.
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
c->bufpos = 0;
anetNonBlock(NULL,fd);
anetTcpNoDelay(NULL,fd);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
selectDb(c,0);
c->fd = fd;
c->querybuf = sdsempty();
c->reqtype = 0;
c->argc = 0;
c->argv = NULL;
c->cmd = c->lastcmd = NULL;
c->multibulklen = 0;
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
c->lastinteraction = time(NULL);
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
c->slave_listening_port = 0;
c->reply = listCreate();
c->reply_bytes = 0;
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
c->bpop.keys = NULL;
c->bpop.count = 0;
c->bpop.timeout = 0;
c->bpop.target = NULL;
c->io_keys = listCreate();
c->watched_keys = listCreate();
listSetFreeMethod(c->io_keys,decrRefCount);
c->pubsub_channels = dictCreate(&setDictType,NULL);
c->pubsub_patterns = listCreate();
listSetFreeMethod(c->pubsub_patterns,decrRefCount);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
listAddNodeTail(server.clients,c);
initClientMultiState(c);
return c;
}
名前の通り、この関数はクライアント接続を表すredisClientを作成します.この関数の中には、必要な初期化のほかに、aecreateFileEvent()という関数を呼び出して、前に取得したクライアント接続のソケットノートAE_に呼び出します.READALBLEイベントは、イベント処理関数readQueryFromClientを設定し、クライアントがデータを送信して到着すると、readQueryFromClientという関数を呼び出してユーザ入力を処理する.readQueryFromClient()はredisがクライアントから送信されたコマンドを処理する開始点である.
これでredisリスニングポートからクライアント受信までが続き,クライアント送信の処理を開始するコマンドまで解析が完了する.