Redisソースの剖析と注釈(19)---Redisイベント処理の実現


Redisイベント処理実装
1.Redisイベントの紹介Redisサーバは です.まず、イベントドライバとは何かを簡単に説明します.
イベント駆動とは、コマンドを入力してリターンを押すと、メッセージがRedisプロトコルのフォーマットに組み込まれてRedisサーバに送信され、Redisサーバがコマンドを受信し、コマンドを処理して返信を送信し、サーバと対話していない場合、サーバはブロック待機状態になります.CPUを出してスリープ状態にし、イベントがトリガーされるとオペレーティングシステムに起動します.イベント駆動によりCPUをより効率的に利用できる.
イベント駆動は要約と抽象であり、I/O多重(I/O multiplexing)とも呼ばれ、その実現方式はシステムごとに異なり、Redisの方式については後述する.redisサーバでは、2つのイベントが処理されます.
  • ファイルイベント(file event):Redisサーバは、クライアント(または他のRedisサーバ)にソケットを介して接続され、ファイルイベントは、サーバがソケットを操作する抽象的なものである.
  • タイムイベント(time event):Redisサーバのいくつかの操作は、所与のイベントポイントで実行する必要があり、タイムイベントは、サーバがこのようなタイミング操作を抽象化するものである.

  • 2.イベントの抽象化Redisは、2つのイベントをそれぞれ1つのデータ構造に抽象化して管理する.redisすべてのソースコメント
    2.1ファイルイベント構造
    /* File event structure */
    typedef struct aeFileEvent {
        //       :AE_NONE,AE_READABLE,AE_WRITABLE
        int mask; /* one of AE_(READABLE|WRITABLE) */
        //       
        aeFileProc *rfileProc;
        //       
        aeFileProc *wfileProc;
        //         
        void *clientData;
    } aeFileEvent;  //    

    そのうちrfileProcwfileProcのメンバーはそれぞれ2つの関数ポインタであり、彼らのプロトタイプは
    typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

    この関数は であり、現在のファイルイベントで指定されたイベントタイプが発生した場合、対応する がイベントを処理するように呼び出されます.関数ポインタとコールバック関数の詳細
    イベントが準備されている場合、イベントをロックするには、ファイルイベントのファイル記述子とイベントタイプを知る必要があります.したがって、aeFiredEvent構造の統合管理が定義されています.
    /* A fired event */
    typedef struct aeFiredEvent {
        //           
        int fd;
        //       :AE_NONE,AE_READABLE,AE_WRITABLE
        int mask;
    } aeFiredEvent; //    

    2.2時間イベント構造
    /* Time event structure */
    typedef struct aeTimeEvent {
        //      id
        long long id; /* time event identifier. */
        //             
        long when_sec; /* seconds */
        //              
        long when_ms; /* milliseconds */
        //         
        aeTimeProc *timeProc;
        //         
        aeEventFinalizerProc *finalizerProc;
        //         
        void *clientData;
        //          
        struct aeTimeEvent *next;
    } aeTimeEvent;  //    

    この構造から、時間イベントテーブルは、nextポインタドメインがあり、次の時間イベントを指すため、チェーンテーブルであることがわかる.
    ファイルイベントと同様に、時間イベントが指定したイベントが発生すると、対応する も呼び出され、構造メンバーtimeProcfinalizerProcはいずれもコールバック関数であり、関数の原型は以下の通りである:関数ポインタとコールバック関数の詳細
    typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
    typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);

    ファイルイベントと時間イベントの両方を抽象化したが、Redisはイベント全体を抽象化する必要があり、aeEventLoop構造を定義した.
    2.3イベントステータス構造
    /* State of an event based program */
    typedef struct aeEventLoop {
        //               
        int maxfd;   /* highest file descriptor currently registered */
        //             
        int setsize; /* max number of file descriptors tracked */
        //         ID
        long long timeEventNextId;
        //            
        time_t lastTime;     /* Used to detect system clock skew */
        //         
        aeFileEvent *events; /* Registered events */
        //          
        aeFiredEvent *fired; /* Fired events */
        //           
        aeTimeEvent *timeEventHead;
        //       
        int stop;
        //             
        void *apidata; /* This is used for polling API specific data */
        //            
        aeBeforeSleepProc *beforesleep;
    } aeEventLoop;  //         
    aeEventLoop構造は、ポーリングイベントの状態を保存するためのvoid *型の万能ポインタapidata、すなわち、最下位呼び出しの多重化ライブラリのイベント状態を保存するためのものであり、Redisの多重化ライブラリの選択について、Redisは一般的なselect epoll evport kqueue INFO serverをパッケージし、それらはコンパイル段階において、異なるシステムに応じて最も性能の高い多重化ライブラリをRedisの多重化プログラムの実装として選択し,すべてのライブラリ実装のインタフェース名が同じであるため,Redis多重化プログラムの下位実装は互換性がある.具体的な選択ライブラリのソースコードは
    // IO     ,      ,Linux   "ae_epoll.c"   "ae_select.c"
    #ifdef HAVE_EVPORT
    #include "ae_evport.c"
    #else
        #ifdef HAVE_EPOLL
        #include "ae_epoll.c"
        #else
            #ifdef HAVE_KQUEUE
            #include "ae_kqueue.c"
            #else
            #include "ae_select.c"
            #endif
        #endif
    #endif

    Redisクライアントのコマンドを使用して、現在選択されている多重化ライブラリを表示することもできます.apidata
    127.0.0.1:6379> INFO server
    # Server
    ……
    multiplexing_api:epoll
    ……

    では、多重化ライブラリの選択が分かった以上、epollに保存されているstruct epoll_eventモデルのイベントステータス構造を見てみましょう.ae_epoll.cファイル中
    typedef struct aeApiState {
        // epoll        
        int epfd;
        //    
        struct epoll_event *events;
    } aeApiState;   //     

    epollのI/O多重化モデルについては、Linuxネットワークプログラミング-I/O多重化モデルのepollを参照してください.
    epollモデルのEPOLLINの構造では、POLLOUT aeFileEventなどの独自のイベントタイプが定義されているが、Redisのファイルイベント構造maskでもAE_READABLEで独自のイベントタイプが定義されている.これは、前述したAE_WRITABLEファイルで実装された同じAPIです.
    //     epoll  ,   eventLoop 
    static int aeApiCreate(aeEventLoop *eventLoop)
    //         
    static int aeApiResize(aeEventLoop *eventLoop, int setsize)  
    //   epoll        
    static void aeApiFree(aeEventLoop *eventLoop)
    //  epfd         fd   
    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
    //  epfd          fd   
    static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
    //                 
    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
    //        IO        
    static char *aeApiName(void)

    これらのAPIは、対応する下位多重ライブラリを呼び出してRedisイベント状態構造ae_epoll.cを関連付けるものであり、aeEventLoopの下位関数をカプセル化し、Redisがイベントを実装する場合、これらのインタフェースを呼び出すだけでよい.2つの重要な関数のソースコードを見て、どのように実現されているかを見てみましょう.
    redisすべてのソースコメント
  • は、Redisイベント状態構造epollのイベントテーブルaeEventLoopに、event関数
  • に対応するイベントを登録する.
    //  epfd         fd   
    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
        aeApiState *state = eventLoop->apidata;
        struct epoll_event ee = {0}; /* avoid valgrind warning */
        /* If the fd was already monitored for some event, we need a MOD
         * operation. Otherwise we need an ADD operation. */
        // EPOLL_CTL_ADD, epfd  fd   event
        // EPOLL_CTL_MOD,  fd    event
        // #define AE_NONE 0           //   
        // #define AE_READABLE 1       //    
        // #define AE_WRITABLE 2       //    
        //   fd     ,        ,     mask    ,      
        int op = eventLoop->events[fd].mask == AE_NONE ?
                EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    
        // struct epoll_event {
        //      uint32_t     events;      /* Epoll events */
        //      epoll_data_t data;        /* User data variable */
        // };
        ee.events = 0;
        //        ,         
        mask |= eventLoop->events[fd].mask; /* Merge old events */
        //   mask  epoll     
        if (mask & AE_READABLE) ee.events |= EPOLLIN;   //   
        if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;  //   
        ee.data.fd = fd;    //               
        //  ee     epoll 
        if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
        return 0;
    }
  • は、受信ファイル記述子にイベントが発生するのを待つ、下位epoll_ctl関数
  • に対応する.
    //                 
    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        aeApiState *state = eventLoop->apidata;
        int retval, numevents = 0;
    
        //              
        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
                tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
        //           
        if (retval > 0) {
            int j;
    
            numevents = retval;
            //         ,     eventLoop       
            for (j = 0; j < numevents; j++) {
                int mask = 0;
                struct epoll_event *e = state->events+j;
    
                //          ,  mask
                if (e->events & EPOLLIN) mask |= AE_READABLE;
                if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
                if (e->events & EPOLLERR) mask |= AE_WRITABLE;
                if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
                //          
                eventLoop->fired[j].fd = e->data.fd;
                eventLoop->fired[j].mask = mask;
            }
        }
        //          
        return numevents;
    }

    3.イベントのソース実装
    redisすべてのソースコメントepoll_waitイベントのソースコードはすべてRedisファイルに定義されており、イベントのメイン関数ae.cから言えば、一歩一歩深く分析されています.
    //         
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        //       
        while (!eventLoop->stop) {
            //            
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            //                 
            aeProcessEvents(eventLoop, AE_ALL_EVENTS);
        }
    }

    このイベントのメイン関数aeMainは、サーバがイベントを処理している場合、デッドサイクルであり、最も典型的なイベント駆動であり、デッドサイクルであることが明らかである.イベントを処理する関数aeMainが呼び出され、それらのパラメータはイベント状態構造aeProcessEventsaeEventLoopであり、ソースコードは以下の通りである.
    //                  
    //            
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        int processed = 0, numevents;
    
        /* Nothing to do? return ASAP */
        //                 
        if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    
        /* Note that we want call select() even if there are no
         * file events to process as long as we want to process time
         * events, in order to sleep until the next time event is ready
         * to fire. */
        //    ,           ,            ,      select(),                  
    
        //              ,                    
        if (eventLoop->maxfd != -1 ||
            ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
            int j;
            aeTimeEvent *shortest = NULL;
            struct timeval tv, *tvp;
    
            //                    
            if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
                //            
                shortest = aeSearchNearestTimer(eventLoop);
            //              
            if (shortest) {
                long now_sec, now_ms;
                //       
                aeGetTime(&now_sec, &now_ms);
                tvp = &tv;
    
                /* How many milliseconds we need to wait for the next
                 * time event to fire? */
                //                
                long long ms =
                    (shortest->when_sec - now_sec)*1000 +
                    shortest->when_ms - now_ms;
    
                //      
                if (ms > 0) {
                    //      tvp 
                    tvp->tv_sec = ms/1000;
                    tvp->tv_usec = (ms % 1000)*1000;
                //       ,  tvp      0
                } else {
                    tvp->tv_sec = 0;
                    tvp->tv_usec = 0;
                }
    
            //                ,        
            } else {
                /* If we have to check for events but need to return
                 * ASAP because of AE_DONT_WAIT we need to set the timeout
                 * to zero */
                //           
                if (flags & AE_DONT_WAIT) {
                    //  tvp      0,     
                    tv.tv_sec = tv.tv_usec = 0;
                    tvp = &tv;
                } else {
                    //              
                    /* Otherwise we can block */
                    tvp = NULL; /* wait forever */
                }
            }
    
            //                 
            //   tvp NULL,     ,    tvp       ,         
            //             
            numevents = aeApiPoll(eventLoop, tvp);
            //          
            for (j = 0; j < numevents; j++) {
                //            
                aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
                //            ,     
                int mask = eventLoop->fired[j].mask;
                int fd = eventLoop->fired[j].fd;
                int rfired = 0;
    
            /* note the fe->mask & mask & ... code: maybe an already processed
                 * event removed an element that fired and we still didn't
                 * processed, so we check if the event is still valid. */
                //            
                if (fe->mask & mask & AE_READABLE) {
                    //                       
                    rfired = 1;
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                }
                //            
                if (fe->mask & mask & AE_WRITABLE) {
                    //            ,      ,           
                    if (!rfired || fe->wfileProc != fe->rfileProc)
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                }
                processed++;    //        1
            }
        }
        /* Check time events */
        //       
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
    
        return processed; /* return the number of processed file/time events */
    }

    先ほど述べた関数のパラメータの1つはAE_ALL_EVENTSで、彼の定義はAE_ALL_EVENTSで、以下のように定義されています.
    #define AE_FILE_EVENTS 1                                //    
    #define AE_TIME_EVENTS 2                                //    
    #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)   //       
    #define AE_DONT_WAIT 4                                  //       

    明らかに、ae.hflagsAE_FILE_EVENTSまたはその結果であり、彼らの意味は以下の通りである.
  • flags=0の場合、関数は何もせず、
  • に戻ります.
  • flagsがAE_を設定した場合ALL_EVENTSは、すべてのタイプのイベント
  • を実行する.
  • flagsがAE_を設定した場合FILE_EVENTSは、ファイルイベント
  • を実行する.
  • flagsがAE_を設定した場合TIME_EVENTSは、タイムイベント
  • を実行する.
  • flagsがAE_を設定した場合DONT_WAIT、それでは関数は事件を処理した後に直接帰って、ブロックしないで
  • 待ちます
    Redisサーバは、AE_TIME_EVENTS識別子が設定されていないため、イベントによってトリガされていない場合、待機をブロックします.しかし、彼はずっと待つことはありません.ファイルイベントの到来を待っています.彼はまだ時間を処理しなければならないので、AE_DONT_WAITを呼び出して傍受する前に、まず時間イベントテーブルから最近到着した時間を取得し、待つ時間に基づいてaeApiPoll構造の変数を構築します.この変数は、サーバブロック待ちファイルイベントの最長時間を保持し、時間が到来してファイルイベントがトリガーされないと、struct timeval tv, *tvp関数はブロックを停止し、さらにaeApiPollを呼び出して時間イベントを処理する.Redisサーバは、自身のリソースと状態を検査する周期的な検査の時間イベントを設定するため、この関数はprocessTimeEventsが指すコールバック関数です
    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)

    ブロック待ちの最長時間の間にファイルイベントがトリガーされると、ファイルイベントが先に実行され、その後にタイムイベントが実行されるため、処理時間イベントは通常、プリセットよりも遅くなります.
    実行ファイルイベントtimeProcrfileProcもコールバック関数を呼び出し、Redisはファイルイベントの処理をいくつかに分けて、異なるネットワーク通信需要を処理し、以下にコールバック関数の原型をリストする.
    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
    void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask)
    void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
    void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
  • acceptTcpHandler:wfileProc client用accept.
  • acceptUnixHandler:connect client用ローカルaccept.
  • sendReplyToClient:クライアントにコマンド返信を送信します.
  • readQueryFromClient:clientが送信したリクエストを読み込むために使用されます.

  • 次に、最も速く到達した時間イベントを取得する関数connect実装を参照する.
    //              
    //                                      。
    //              NULL。
    static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
    {
        //          
        aeTimeEvent *te = eventLoop->timeEventHead;
        aeTimeEvent *nearest = NULL;
    
        //          
        while(te) {
            //              ,   nearest 
            if (!nearest || te->when_sec < nearest->when_sec ||
                    (te->when_sec == nearest->when_sec &&
                     te->when_ms < nearest->when_ms))
                nearest = te;
            te = te->next;
        }
        return nearest;
    }

    この関数は何もありません.チェーンテーブルを巡って、最小値を見つけます.時間イベントを実行する関数aeSearchNearestTimerの実装に重点を置いた.
    /* Process time events */
    //       
    static int processTimeEvents(aeEventLoop *eventLoop) {
        int processed = 0;
        aeTimeEvent *te, *prev;
        long long maxId;
        time_t now = time(NULL);
    
        //              ,                  
        //              
        if (now < eventLoop->lastTime) {
            te = eventLoop->timeEventHead;
            while(te) {
                te->when_sec = 0;
                te = te->next;
            }
        }
        //                    
        eventLoop->lastTime = now;
    
        prev = NULL;
        te = eventLoop->timeEventHead;
        maxId = eventLoop->timeEventNextId-1;   //           ID
        //         
        while(te) {
            long now_sec, now_ms;
            long long id;
    
            /* Remove events scheduled for deletion. */
            //            
            if (te->id == AE_DELETED_EVENT_ID) {
                aeTimeEvent *next = te->next;
                //              
                if (prev == NULL)
                    eventLoop->timeEventHead = te->next;
                else
                    prev->next = te->next;
                //                
                if (te->finalizerProc)
                    te->finalizerProc(eventLoop, te->clientData);
                zfree(te);
                te = next;
                continue;
            }
    
            //                         。    ,       :               ,            ,           :            
            if (te->id > maxId) {
                te = te->next;
                continue;
            }
            //       
            aeGetTime(&now_sec, &now_ms);
            //            
            if (now_sec > te->when_sec ||
                (now_sec == te->when_sec && now_ms >= te->when_ms))
            {
                int retval;
    
                id = te->id;
                //           
                retval = te->timeProc(eventLoop, id, te->clientData);
                //        1
                processed++;
                //         ,           
                if (retval != AE_NOMORE) {
                    aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
                //        , retval -1,         ,    
                } else {
                    te->id = AE_DELETED_EVENT_ID;
                }
            }
            //                
            prev = te;
            te = te->next;
        }
        return processed;   //         
    }

    タイムイベントが存在しない場合は、processTimeEventsが指すコールバック関数を呼び出し、現在のタイムイベントを削除します.存在する場合、finalizerProcが指すコールバック関数を呼び出して時間イベントを処理する.Redisのタイムイベントは2種類に分けられます
  • タイミングイベント:特定の時間後にプログラムを1回実行させる.
  • 周期的イベント:特定の時間ごとにプログラムを実行させます.

  • 現在のタイムイベントが周期的である場合、サイクルイベントにタイムサイクルが追加された時点になります.タイミングイベントの場合は、タイムイベントが削除されます.
    これで、Redisイベントの実装は完了しますが、イベントの他のAPI、例えば、イベントの作成、イベントの削除、イベントテーブルのサイズの調整などはリストされていません.すべてのソースコードのプロファイリングはgithubで表示できます.redisのすべてのソースコードコメント