redisサーバモデル

12115 ワード

redisはデータベースとして、サービス側が大量の顧客の同時性をサポートする必要があるため、できるだけ効率的なサーバモデルを採用する必要があります.redisは、kqueue、evport、select、epollなどの複数のIO多重方式をサポートします.異なるオペレーティングシステムで実行されると、最適なIO多重方式が選択されます.redisは、これらのIO多重化された呼び出しインタフェースを統合されたインタフェースにカプセル化し、使用するたびに統合されたインタフェースを呼び出すだけで、プログラムは異なる実行環境に基づいて最適な下位多重化方式を選択します.次にselectとepollを例に共通インタフェースを説明します.
typedef struct aeApiState {
    fd_set rfds, wfds; //  add del  fd ,    ,    
    fd_set _rfds, _wfds; //    select ,      copy  
} aeApiState;

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop);
static int aeApiResize(aeEventLoop *eventLoop, int setsize);
static void aeApiFree(aeEventLoop *eventLoop) ;

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);

static char *aeApiName(void);

まず、開始時にaeApiState構造体を作成し、前のselectとepollの操作分析から、IO多重化後の増加、削除、待機などの操作はaeApiStateのメンバー変数に依存して完了できることが分かった.selectとepollの具体的な実現は以下の通りである:select
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    FD_ZERO(&state->rfds);
    FD_ZERO(&state->wfds);
    eventLoop->apidata = state;
    return 0;
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    if (mask & AE_READABLE) FD_SET(fd,&state->rfds);
    if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);
    return 0;
}
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    if (mask & AE_READABLE) FD_CLR(fd,&state->rfds);
    if (mask & AE_WRITABLE) FD_CLR(fd,&state->wfds);
}
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, j, numevents = 0;

    memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); //copy  
    memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));

    retval = select(eventLoop->maxfd+1,
                &state->_rfds,&state->_wfds,NULL,tvp);
    if (retval > 0) {
        for (j = 0; j <= eventLoop->maxfd; j++) {
            int mask = 0;
            aeFileEvent *fe = &eventLoop->events[j];

            if (fe->mask == AE_NONE) continue;
            if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                mask |= AE_READABLE;
            if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
                mask |= AE_WRITABLE;
            eventLoop->fired[numevents].fd = j;
            eventLoop->fired[numevents].mask = mask;
            numevents++;
        }
    }
    return numevents;
}

  epoll
typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    state->epfd = epoll_create(1024);
    eventLoop->apidata = state;
    return 0;
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    int mask = eventLoop->events[fd].mask & (~delmask);

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (mask != AE_NONE) {
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

selectとepollのコードを比較すると、aeApiPoll()を呼び出すと、selectとepollは準備イベントをeventLoop->fired配列に格納し、後続の戻り処理を統一していることがわかります.サーバがaeApiPoll()を呼び出す主な操作は次のとおりです.
numevents = aeApiPoll(eventLoop, tvp); //     
for (j = 0; j < numevents; j++) {
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd;
    int rfired = 0;

    if (fe->mask & mask & AE_READABLE) {
        rfired = 1;
        fe->rfileProc(eventLoop,fd,fe->clientData,mask); //   
    }
    if (fe->mask & mask & AE_WRITABLE) {
        if (!rfired || fe->wfileProc != fe->rfileProc)
            fe->wfileProc(eventLoop,fd,fe->clientData,mask); //   
    }
    processed++;
}

具体的な使用方法1)まずaeApiCreate()を呼び出してaeApiState構造体を作成し、初期化を完了する2)aeApiAddEvent()を呼び出し、aeApiDelEvent()を呼び出してリスニングの記述子を追加または削除する3)aeApiPoll()を呼び出してイベントの準備ができているかどうかを待って、裏面はselectまたはepoll_を呼び出すwaitではaeApiPoll()中にfd読み取りイベントをリスニングする準備ができている場合、新しい接続が到来していることを示します.次にacceptを呼び出し、接続fdをリスニングリストに追加する.クライアントとの接続fdが完了した場合、定義されたコールバック関数が呼び出されます.イベントを読むとreadQueryFromClient()が呼び出され、クライアントのリクエストが処理されます.