redisソースプロファイル[1]-非同期イベント駆動フレームワーク(下)

6431 ワード

前の文章.Redisのイベント駆動モジュールによるイベント検出の実装を解析した.これらのイベント検出をredisがどのように適用するかを見てみましょう.
まず、イベント駆動モジュールを作成するインタフェースaecreateEventLoopを見てみましょう.イベント登録レコードとトリガされたイベントを格納するバッファサイズ、すなわち何個の記述子を登録できるかを示すパラメータsetsizeがあります.主に最下位のaeApiCreateを呼び出し、aeEventLoopオブジェクトを作成します.では、どこで呼び出されますか?サーバを初期化する関数initServerにあります.作成されたコレクションサイズは、最大client接続数と保存値であることがわかります.最大client接続数はよく理解でき、各ソケットは登録する必要があります.ただし、server自体も、リスニングソケットなどのソケットを作成する必要があるため、値CONFIG_を保持します.FDSET_INCR定義に96を付けるのは、安全区間を予約するためです.
/*
* desc:         
* param: @setsize     ,                        
* return: NULL/   NOT NULL/   aeEventLoop      
*/
aeEventLoop *aeCreateEventLoop(int setsize)
#define CONFIG_MIN_RESERVED_FDS 32
#define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96)

main
    ->initServer
        ->aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

次に、イベントが発生したときに受信したコールバック関数procを呼び出すインタフェースaecreateFileEventが登録されています.ここで注意すべき詳細は以下の通りです.Aの実行が完了すると、記述子はカーネルに登録される.Bまでの実行区間では、ユーザデータがトリガ可読イベントに到達する可能性がある.ただし、この時点で登録された情報はeventLoop->eventsに保存されていません.したがって、検出イベント関数aeMainと登録関数aeCreateFileEventが異なるスレッドによって実行される場合、リスクがある.redisは、プライマリ・スレッドでこの操作を実行するので、この問題を回避します.
/*
* desc:          
* param: @eventLoop aeEventLoop     @fd        @mask      @proc        @clientData      
* return: 0/   -1/  
*/ 
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];
	
    /*
    *  A     ,          ;     B                  
    *              eventLoop->events 。           aeMain     
    * aeCreateFileEvent             。redis                 。
    */
A)  if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
B)  fe->clientData = clientData;	
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

タイマは各アプリケーションに不可欠なモジュールであり、以下のようにタイミングサイクルmillisecondsに転送され、サイクル呼び出し時にclientDataカスタムデータに転送されます.タイマ破棄が終了するとfinalizerProc関数が呼び出されます.aeEventLoop::timeEventHeadメンバーはタイマチェーンヘッダを指し、作成した各タイマに対してチェーンヘッダに挿入されます.タイマーIDを返します.
/*
* deac:      
* param: @eventLoop aeEventLoop     @milliseconds      @proc        
         @ clientData      @finalizerProc          
* return:   0/      0/      ID
*/
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc);

最後に、イベント処理では、タイマが作成されると、イベントの発生を待つタイムアウト時間として、最も近いタイマから必要なタイミング時間が計算されます.タイマが作成されず、待ちたくない場合は、現在イベントが発生しているかどうかを確認してすぐに戻ります.そうしないと、イベントが発生するのを永遠に待ちます.トリガーされたイベントはaeEventLoop::firedメンバーに保存され、処理待ちになります.反転呼び出しフラグAE_BARRIERは、読み書き可能なイベントの呼び出し順を反転させるために使用される.通常、読み取りイベントを処理してから書き込みイベントを処理します.これは、クエリー要求を処理した後、クエリー結果をできるだけ早く返すのに役立ちます.AE_を設定するとBARRIERフラグの場合、呼び出しの前後順序が反転します.このシナリオは、読み取り可能なイベント処理後に書き込み可能なイベントがトリガーされない場合に適しています.たとえば、beforeSleepコールバックで同期ファイルをディスクに実行し、クライアントに返信します.
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;


    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;
			
            /*
            *                         
            */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
			
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /*
            *                      
            *                 
            */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                tvp = NULL;
            }
        }

        /*
        *           
        */
        numevents = aeApiPoll(eventLoop, tvp);

        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /*
            *                 .            
            *             .
            *    AE_BARRIER   ,          .       
            *                  。  ,     beforeSleep
            *               ,    client。
            */
            int invert = fe->mask & AE_BARRIER;

            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    
    /*
    *              
    */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}