redisソース分析(五)-Redisイベント駆動

10704 ワード

Redisイベント駆動
ネットワークデータベースサーバとして、redisはイベント駆動のプログラミングモデルを用いてネットワーク通信を実現し、Memcachedがlibeventを用いてネットワーク通信を実現するのとは異なり、redisは膨大なlibeventを拒否した.libevは比較的軽量であるにもかかわらず、数百行未満のコードしかないaeイベント駆動ライブラリには及ばない.
通常、イベント・ドライバ・ライブラリのコンポーネントには、次のセクションがあります.
1)イベントは、一般的に外在的な要因によってトリガーされ、例えばネットワークデータが到着する. 
2)イベント処理関数は、事件が発生した後、それによって処理しなければならない. 
3)イベントと処理関数とのマッピング関係は,上記2つの概念を結びつける. 
4)ループモニタリング、イベント駆動に基づくプログラムの一般的な主体はループであり、各ループの中でどのようなことが発生したかを検査し、それから相応の処理関数を呼び出す.
二Redisイベント駆動の実現
2.1イベントライブラリ構造体の初期化
redisでcでは、redisServer構造体のフィールドの一部を初期化するためにinitServer関数が定義されています.フィールドの1つはRedisイベントループelです.
    //     
    aeEventLoop *el;
initServer関数はaeで呼び出す.cファイルに定義されたaeCreateEventLoop関数はserverを初期化する.elフィールド.aeEventLoopの定義は次のとおりです.
/* State of an event based program 
 *
 *         
 */
typedef struct aeEventLoop {

    //            
    int maxfd;   /* highest file descriptor currently registered */

    //            
    int setsize; /* max number of file descriptors tracked */

    //          id
    long long timeEventNextId;

    //              
    time_t lastTime;     /* Used to detect system clock skew */

    //         
    aeFileEvent *events; /* Registered events */

    //         
    aeFiredEvent *fired; /* Fired events */

    //     
    aeTimeEvent *timeEventHead;

    //         
    int stop;

    //           
    void *apidata; /* This is used for polling API specific data */

    //             
    aeBeforeSleepProc *beforesleep;

} aeEventLoop;

aeCreateEventLoop
aeCreateEventLoop関数は、まずaeEventLoop構造体の空間を割り当て、ae_を呼び出します.epoll.cファイルのaeApiCreate関数.aeApiCreate関数はaeApiState構造の空間を割り当て、その中の2つのフィールド:epfdはepoll_から格納されるcreate関数が返すepollファイル記述子;eventフィールドはLinux epollライブラリで定義されたepoll_です.event構造タイプ.eventフィールドの使用方法は後述する.
2.2リスニング記述の処理
次にinitServer関数はlistenToPort関数を呼び出し、listenToPort関数はanetを呼び出す.cファイルのanetTcpServer関数で、リスニングされたファイル記述子(listening descriptor)_を返します.このディスクリプタのデフォルトは*6379ポート*で傍受されます.返される_リスニングされたファイル記述子_inserverに保存します.ipfdフィールドにあります.
int listenToPort(int port, int *fds, int *count) {
    int j;

    /* Force binding of 0.0.0.0 if no bind address is specified, always
     * entering the loop if j == 0. */
    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++) {
        if (server.bindaddr[j] == NULL) {
            /* Bind * for both IPv6 and IPv4, we enter here only if
             * server.bindaddr_count == 0. */
            fds[*count] = anetTcp6Server(server.neterr,port,NULL,
                server.tcp_backlog);
            if (fds[*count] != ANET_ERR) {
                anetNonBlock(NULL,fds[*count]);
                (*count)++;
            }
            fds[*count] = anetTcpServer(server.neterr,port,NULL,
                server.tcp_backlog);
            if (fds[*count] != ANET_ERR) {
                anetNonBlock(NULL,fds[*count]);
                (*count)++;
            }
            /* Exit the loop if we were able to bind * on IPv4 or IPv6,
             * otherwise fds[*count] will be ANET_ERR and we'll print an
             * error and return to the caller with an error. */
            if (*count) break;
        } else if (strchr(server.bindaddr[j],':')) {
            /* Bind IPv6 address. */
            fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
                server.tcp_backlog);
        } else {
            /* Bind IPv4 address. */
            fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
                server.tcp_backlog);
        }
        if (fds[*count] == ANET_ERR) {
            redisLog(REDIS_WARNING,
                "Creating Server TCP listening socket %s:%d: %s",
                server.bindaddr[j] ? server.bindaddr[j] : "*",
                port, server.neterr);
            return REDIS_ERR;
        }
        anetNonBlock(NULL,fds[*count]);
        (*count)++;
    }
    return REDIS_OK;
}

2.3時間イベントの作成
initServer関数はaeCreateTimeEvent関数を呼び出してserverにタイミングイベントを追加する.elのtimeEventHeadフィールドにあります.timeEventHeadは、タイミングイベントを指すチェーンテーブルです.redis.cファイルのinitServer関数呼び出しaeCreateTimeEvent関数の形式は以下の通りです.
aeCreateTimeEvent(server.el /*eventLoop*/, 1 /*milliseconds*/, serverCron /*proc*/, NULL /*clientData*/, NULL /*finalizerProc*/);
redis.cファイルのserverCronは、Redisが正常に動作することを保証するためにいくつかの列の操作を行います.
2.4ファイルイベント呼び出しaeCreateFileEvent関数を作成する目的はepoll_を実行することです.ctlシステムは、anetTcpServer関数によって作成されたリスニング記述子をEPOLLINイベントキューに追加するために呼び出す.同時に、生成されたepoll記述子をaeCreateEventLoop関数呼び出しに関連付けます.次に、initServerがaeCreateFileEvent関数を呼び出すときの動作を詳細に説明します.initServerは、aeCreateFileEvent関数:server.に次のパラメータを渡します.el:aecreateEventLoopによって確立されたイベントループ.epoll記述子はserver.に表示されます.server.fd:関連するファイルイベント構造体へのアクセスのインデックスとして傍受を担当する記述子.AE_READABLE:フラグserver.fdはEPOLLINイベントを監視する必要があります.acceptHandler:監視されたイベントが到着したときに実行する関数.関数ポインタはeventLoop->events[server.fd]->rfileProcに格納されます.
/*
 *    mask     ,   fd      ,
 *   fd    ,   proc   
 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }

    if (fd >= eventLoop->setsize) return AE_ERR;

    //         
    aeFileEvent *fe = &eventLoop->events[fd];

    //      fd      
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;

    //         ,        
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    //     
    fe->clientData = clientData;

    //      ,           fd
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;

    return AE_OK;
}

2.5イベントハンドラの実行
最後にredisはイベントプロセッサ、すなわちaeMain関数を呼び出し、この関数の中にはwhileループがあり、一定の条件下でaeProcessEvents関数を絶えず呼び出し、aeProcessEvents関数は1サイクル以内にすべての準備されたファイル記述子と要求に合致する時間を処理する.
/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 *
 *             ,            。
 *
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 *
 *         flags   ,              ,
 *           (     )。
 *
 * If flags is 0, the function does nothing and returns.
 *    flags   0 ,        ,    。
 *
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 *    flags    AE_ALL_EVENTS ,            。
 *
 * if flags has AE_FILE_EVENTS set, file events are processed.
 *    flags    AE_FILE_EVENTS ,        。
 *
 * if flags has AE_TIME_EVENTS set, time events are processed.
 *    flags    AE_TIME_EVENTS ,        。
 *
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * the events that's possible to process without to wait are processed.
 *    flags    AE_DONT_WAIT ,
 *                    ,    。
 *
 * The function returns the number of events processed. 
 *                
 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        //          
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            //           
            //                                   
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            //                    
            //           tv    
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }

            //       0 ,           ,        0 (   )
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            
            //       ,        
            //      AE_DONT_WAIT            ,         

            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                //          
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                //                  
                tvp = NULL; /* wait forever */
            }
        }

        //       ,      tvp   
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            //            
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            //    
            if (fe->mask & mask & AE_READABLE) {
                // rfired    /           
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            //    
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

            processed++;
        }
    }

    /* Check time events */
    //       
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}