redisソース分析(2)——イベントループ



redisはサーバプログラムとして,ネットワークIO処理が鍵となる.redisはmemcachedのようにlibeventを使用するのではなく、独自のIOイベントフレームワークを実現し、簡単でコンパクトです.select、epoll、kqueueなどの実装を選択できます.
   
IOイベントフレームワークとしては、複数のIOモデルの共通性を抽象化し、プロセス全体を主に抽象化する必要があります.
          
1)初期化
        
2)イベントの追加、削除
          
3)イベント発生待ち
     
以下、この手順に従ってコードを分析します.
(1)初期化
Redisの初期化の過程を思い出すと、initServer関数はaeCreateEventLoopを呼び出してevent loopオブジェクトを作成し、イベントループを初期化します.次にaeEventLoop構造を見て、イベントループに関連する属性を格納します.
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    // 
    //                 
    // 
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    // 
    //             
    // 
    aeTimeEvent *timeEventHead;
    // 
    //     eventLoop
    // 
    int stop;
    void *apidata; /* This is used for polling API specific data */
    // 
    //              beforesleep
    // 
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

setsize:イベントループをリスニングするファイル記述子のセットのサイズを指定します.この値は、プロファイルのmaxclientsに関係します.
     
events:登録されたすべての読み書きイベントが格納され、setsizeサイズの配列です.カーネルは、新しい接続のfdが現在使用可能な記述子の最小値であることを保証するので、setsize個の記述子を最大でリスニングすると、最大のfdはsetsize-1になります.この組織方式の利点は,fdを下付きとして対応するイベントにインデックスし,イベントトリガ後にfdに基づいて対応するイベントを迅速に検索できることである.
   
Fired:トリガーされた読み書きイベントを格納します.同じsetsizeサイズの配列です.
     
timeEventHead:redisはタイマイベントをチェーンテーブルに組織し、このプロパティはテーブルヘッダを指します.
     
apidata:epoll、selectなどの実装に関するデータを格納します.
     
beforesleep:イベントループは、反復するたびにbeforesleepを呼び出して非同期処理を実行します.
ioモデル初期化の抽象関数はaeApiCreateである.aeCreateEventLoop関数は、グローバル・イベント・ループ構造を作成して初期化し、aeApiCreateを呼び出して依存する特定のデータ構造を初期化します.
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    // 
    // setsize         fd   
    //           fd       ,      setsize  
    //    ,     event
    // 
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

epollを例にとると、aeApiCreateは主にepollのfdを作成し、傍受するepoll_event、これらのデータは次のように定義されています.
typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

ここで、傍受されたイベントの組織方法とevent_loopでのリスニングイベントと同様にsetsizeサイズのデータであり、fdを下付きとする.
aeApiCreateはこれらのプロパティを初期化し、aeApiState構造をeventLoop->apidataに格納します.
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

(2)イベントの追加、削除
redisは、2つのイベント、ネットワークioイベント、タイマイベントをサポートします.タイマイベントの追加、削除は比較的簡単で、主にタイマイベントリストを維持します.まず、タイマーイベントを表す構造を見てみましょう.
/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

when_secとwhen_ms:タイマがトリガーするイベントスタンプを表し、イベントループが反復して戻った後、現在のタイムスタンプがこの値より大きい場合、イベント処理関数をコールバックします.
     
timeProc:イベント処理関数.
     
finalizerProc:関数をクリーンアップし、タイマを削除するときに呼び出す.
     
ClientData:イベント処理関数に入力するパラメータが必要です.
     
next:タイマイベントはチェーンテーブルに組織され、nextは次のイベントを指します.
aeCreateTimeEvent関数はタイマイベントを追加するために使用され、論理は簡単で、現在の時間に基づいて次のトリガイベントを計算し、イベント属性に値を割り当て、タイマチェーンテーブルヘッダの前に挿入します.削除aeDeleteTimeEvent関数、idに基づいてイベントを見つけ、チェーンテーブルからノードを削除し、関数をコールバックしてクリーンアップします.具体的なタイマイベントの処理は後述しますが、ioイベントを見てみましょう.
ioイベントの追加はaeCreateFileEventで、論理は簡単で、登録するfdに基づいてそのeventを取得し、属性を設定し、aeApiAddEvent関数を呼び出して最下位層のioモデルを追加します.
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

mask:登録されたイベントのタイプを指定します.読み取りまたは書き込みができます.
     
proc:イベント処理関数.
次にioイベントの構造を示します.登録されたイベントタイプmask、読み書きイベント処理関数、および対応するパラメータが含まれます.
/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

次に、epoll追加イベントの実装を見てみましょう.主にepollを呼び出します.ctl.
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

struct epll_eventは、リスニングするイベントと、イベントがトリガーされたときに戻るファイル記述子にバインドされたdataを指定します.ここでdataを直接fdとして保存し,このデータにより対応するイベントを見つけ,その処理関数を呼び出すことができる.
     
epollの削除は追加と同様であり,後述しない.
(3)イベントトリガ待ち
aeMain関数を呼び出してイベントループに入ります.
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

関数内部はwhileループであり,aeProcessEvents関数を絶えず呼び出し,イベントの発生を待つ.反復するたびにbeforesleep関数が呼び出され、非同期タスクが処理され、serverCronとともに説明されます.
     
aeProcessEvents関数は、まずタイマイベントを処理し、ioイベントを処理します.この関数の実装について説明します.
     
まず、変数レコード処理のイベント数と、トリガされたイベントを宣言します.flagsは、タイマイベントとioイベントを処理する必要がなければ直接返すイベントタイプを表します.
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

redisにおけるタイマイベントはepollによって実現される.基本的には、イベントごとにepollを反復的に呼び出すことです.waitの場合、このホイールsleepの時間を指定する必要があります.ioイベントが発生しない場合は、sleep時間になると戻ります.次に最初に発生するイベントを算出し,現在時間までの間隔でこの値をsleepとすることで,イベントが到着した後にその処理関数をコールバックすることを保証できる.しかし、戻るたびにioイベントが処理されるため、タイマのトリガイベントは不正確であり、所定のトリガ時間よりも遅いに違いない.具体的な実装を見てみましょう.
     
まず、sleepのイベントを決定するために、次に最初に発生するタイマイベントを検索します.タイマイベントがない場合は、受信したflagsに基づいて、ioイベントの発生を常にブロックするか、ブロックしないかを選択し、チェックが完了したらすぐに戻ります.aesearchNearestTimer関数を呼び出して最初に発生したイベントを検索することで、線形検索方式を採用し、複雑度はO(n)であり、タイマイベントをスタック化し、検索を速めることができる.ただし、redisにはserverCronタイマイベントが1つしかないので、しばらく最適化する必要はありません.
    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    // 
    //         poll,        :
    // 1)           (maxfd != -1)
    // 2)         ,  DONT_WAIT        
    // 
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        // 
        //                  ,    poll     
        // 
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 
            //               
            // 
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 
            //         ,         ,  sleep   (ms  )
            // 
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            // 
            //          ,          ,      
            // 
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

次に、aeApiPoll関数を呼び出し、前に計算したsleep時間を入力し、ioイベントの再生を待つ.関数が返されると、トリガーされたイベントはeventLoopのfired配列に埋め込まれます.epollの実装は、epollを呼び出すことです.waitでは、関数が返されると、state->events配列の最初のnumevents要素にトリガーされたイベントが格納されます.次にfired配列を埋め、各トリガイベントのfdとイベントタイプを設定します.
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    // 
    //   epoll_wait,state->events          fd
    // 
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        // 
        //      ,         fired  
        // 
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

イベントが戻った後、イベントを処理する必要があります.fired配列を巡回し,fdに対応するイベントを取得し,トリガしたイベントタイプに基づいてその処理関数をコールバックする.
        for (j = 0; j < numevents; j++) {
            // 
            // poll   ,            fired  
            // 
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

            /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 
            //        fd,         
            // 
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }

以上、ioイベントの処理ですが、タイマーイベントの処理を見てみましょう.プロセスタイムイベント関数が呼び出され、タイマイベントが処理されます.
     
まず、システムクロック偏差(system clock skew、システムイベントを修正すると発生しますか?イベントを過去に変更します)が発生したかどうかをチェックし、発生した場合はすべてのイベントの発生時間を0に設定し、すぐにトリガーします.
    /* If the system clock is moved to the future, and then set back to the
     * right value, time events may be delayed in a random way. Often this
     * means that scheduled operations will not be performed soon enough.
     *
     * Here we try to detect system clock skews, and force all the time
     * events to be processed ASAP when this happens: the idea is that
     * processing events earlier is less dangerous than delaying them
     * indefinitely, and practice suggests it is. */
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;

次に、すべてのタイマイベントを巡り、トリガーされたイベントを検索し、処理関数をコールバックします.タイマイベント処理関数の戻り値は,このイベントが一度に行われるか,周期的に行われるかを決定する.AE_に戻るとNOMOREは、呼び出しが完了すると削除される使い捨てイベントです.そうでなければ、戻り値は次のトリガの時間を指定します.
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;

        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            // 
            //             ,            
            // 
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            /* After an event is processed our time event list may
             * no longer be the same, so we restart from head.
             * Still we make sure to don't process events registered
             * by event handlers itself in order to don't loop forever.
             * To do so we saved the max ID we want to handle.
             *
             * FUTURE OPTIMIZATIONS:
             * Note that this is NOT great algorithmically. Redis uses
             * a single time event so it's not a problem but the right
             * way to do this is to add the new elements on head, and
             * to flag deleted elements in a special way for later
             * deletion (putting references to the nodes to delete into
             * another linked list). */
            // 
            //                ,           。
            //   retval   -1(AE_NOMORE),            
            // now + retval(ms)
            // 
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                // 
                //     AE_NOMORE,       
                // 
                aeDeleteTimeEvent(eventLoop, id);
            }
            te = eventLoop->timeEventHead;
        } else {
            te = te->next;
        }
    }

コールバック処理関数では、新しいタイマイベントが追加される可能性があり、継続的に追加されると無限ループのリスクがあるため、このような状況を回避する必要があります.この場合、ループごとに新しく追加されたイベントは処理されません.これは、次のコードによって実現されます.
        if (te->id > maxId) {
            te = te->next;
            continue;
        }

イベントループ部分の分析はこれで終わり、直感的で明確で、完全に抽出でき、独立したライブラリとして使用できます.次のセクションでは、リクエストの処理について説明します.