Redisシリーズ(二)---クライアント要求を受け入れ、処理関数を呼び出す方法

10212 ワード

前編では、redisがクライアント要求を受け入れ、処理関数を呼び出してコマンドを実行する方法について説明したredisの起動プロセスを要約します.
前編では、initServer()という関数の中で、anetTcpServerとanetUnixServerの2つの関数を呼び出してtcpポートとunixドメインソケットの傍受を作成すると述べたが、ここではまず、この2つの関数の具体的な実装を重点的に分析する.
int anetTcpServer(char *err, int port, char *bindaddr)

{

    int s;

    struct sockaddr_in sa;



    if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR)

        return ANET_ERR;



    memset(&sa,0,sizeof(sa));

    sa.sin_family = AF_INET;

    sa.sin_port = htons(port);

    sa.sin_addr.s_addr = htonl(INADDR_ANY);

    if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) {

        anetSetError(err, "invalid bind address");

        close(s);

        return ANET_ERR;

    }

    if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR)

        return ANET_ERR;

    return s;

}

コードから、まずanetCreateSocket()を呼び出してソケットを作成し、sに複素値を与え、saというsockaddr_inタイプの構造体を初期化し、リスニングするポート、アドレス、アドレスファミリーを設定し、anetListen()関数を呼び出してアドレスをバインドし、ポートをリスニングする.これらの作業が完了するとanetCreateSocket関数が戻り、作成したソケットをserverにコピーする.ipfd.anetUnixServer()という関数で完了した作業はanetCreateSocketに似ており、バインドされたunix socketにすぎないことに注意してください.
次に、initServer関数で、この関数を呼び出しました:aeCreateFileEvent、ここでは最初の、2番目の類似を重点的に分析します.
 if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,

        acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");

    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,

        acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");

まず、eventLoopのeventというaeFileEvent配列から、現在のfdに対応するacFileEventを取り出し、主に下に対応するイベントの処理関数を設定するためである.すなわち,伝達されたmaskに基づいてどのイベントであるかを判断する.
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,

        aeFileProc *proc, void *clientData)

{

    if (fd >= AE_SETSIZE) return AE_ERR;

    aeFileEvent *fe = &eventLoop->events[fd];



    if (aeApiAddEvent(eventLoop, fd, mask) == -1)

        return AE_ERR;

    fe->mask |= mask;

    if (mask & AE_READABLE) fe->rfileProc = proc;

    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    fe->clientData = clientData;

    if (fd > eventLoop->maxfd)

        eventLoop->maxfd = fd;

    return AE_OK;

}

次にacApiEventというイベント登録関数を呼び出します.
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {

    aeApiState *state = eventLoop->apidata;

    struct epoll_event ee;

    /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */

    int op = eventLoop->events[fd].mask == AE_NONE ?

            EPOLL_CTL_ADD : EPOLL_CTL_MOD;



    ee.events = 0;

    mask |= eventLoop->events[fd].mask; /* Merge old events */

    if (mask & AE_READABLE) ee.events |= EPOLLIN;

    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;

    ee.data.u64 = 0; /* avoid valgrind warning */

    ee.data.fd = fd;

    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;

    return 0;

}

この関数では、主にepoll_が呼び出されていることがわかります.ctl(state->epfd,op,fd,&ee)現在のfdを関心のあるイベントに設定してepoll_に登録するcreateが返すepollのハンドルにあります.ここに登録されているのはAEですからREADALBLEイベントであるため、このfd(すなわちredisリスニングポートのソケット)にデータ読み取り可能な場合(ここではクライアント接続が到着したことを理解する)、対応するイベント処理関数がトリガーされ、ここでのイベント処理関数はacceptTcpHandlerである.
次にacceptTcpHandlerという関数を見てみましょう.
この関数では、まずanetTcpAcceptを呼び出してクライアントがredisに接続するsocket fd(実際の呼び出し::accept(s,sa,len)関数が返すfd)を取得し、次にacceptCommonHandler()関数を呼び出し、この関数ではcreateClient()を呼び出してredisClient*cインスタンスを作成し、現在のredisサーバの接続総数が最大値を超えない場合、グローバル変数serverに接続数のstat_を記録するnumconnectionsに1を追加します.超過した場合は、クライアントがエラーメッセージを出力し、redisClientインスタンスを解放し、関数が返されます.
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

    int cport, cfd;

    char cip[128];

    REDIS_NOTUSED(el);

    REDIS_NOTUSED(mask);

    REDIS_NOTUSED(privdata);



    cfd = anetTcpAccept(server.neterr, fd, cip, &cport);

    if (cfd == AE_ERR) {

        redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);

        return;

    }

    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);

    acceptCommonHandler(cfd);

}

次にcreateClientという関数を分析します.
redisClient *createClient(int fd) {

    redisClient *c = zmalloc(sizeof(redisClient));

    c->bufpos = 0;



    anetNonBlock(NULL,fd);

    anetTcpNoDelay(NULL,fd);

    if (aeCreateFileEvent(server.el,fd,AE_READABLE,

        readQueryFromClient, c) == AE_ERR)

    {

        close(fd);

        zfree(c);

        return NULL;

    }



    selectDb(c,0);

    c->fd = fd;

    c->querybuf = sdsempty();

    c->reqtype = 0;

    c->argc = 0;

    c->argv = NULL;

    c->cmd = c->lastcmd = NULL;

    c->multibulklen = 0;

    c->bulklen = -1;

    c->sentlen = 0;

    c->flags = 0;

    c->lastinteraction = time(NULL);

    c->authenticated = 0;

    c->replstate = REDIS_REPL_NONE;

    c->slave_listening_port = 0;

    c->reply = listCreate();

    c->reply_bytes = 0;

    listSetFreeMethod(c->reply,decrRefCount);

    listSetDupMethod(c->reply,dupClientReplyValue);

    c->bpop.keys = NULL;

    c->bpop.count = 0;

    c->bpop.timeout = 0;

    c->bpop.target = NULL;

    c->io_keys = listCreate();

    c->watched_keys = listCreate();

    listSetFreeMethod(c->io_keys,decrRefCount);

    c->pubsub_channels = dictCreate(&setDictType,NULL);

    c->pubsub_patterns = listCreate();

    listSetFreeMethod(c->pubsub_patterns,decrRefCount);

    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);

    listAddNodeTail(server.clients,c);

    initClientMultiState(c);

    return c;

}

名前の通り、この関数はクライアント接続を表すredisClientを作成します.この関数の中には、必要な初期化のほかに、aecreateFileEvent()という関数を呼び出して、前に取得したクライアント接続のソケットノートAE_に呼び出します.READALBLEイベントは、イベント処理関数readQueryFromClientを設定し、クライアントがデータを送信して到着すると、readQueryFromClientという関数を呼び出してユーザ入力を処理する.readQueryFromClient()はredisがクライアントから送信されたコマンドを処理する開始点である.
これでredisリスニングポートからクライアント受信までが続き,クライアント送信の処理を開始するコマンドまで解析が完了する.