Redis AE非同期イベントモジュール

12032 ワード

まず質問ですが、なぜRedisがMemcachedより速いのでしょうか.一般的な考え方:Memcachedはメモリに完全に基づいているが、Redisは持続的な保存特性を有しており、非同期であってもRedisはMemcachedより速くはならない.実際にテストできる状況は、基本的にはRedisが絶対的な優位性を占めています.原因は2つあるかもしれません.1、Libevent:Memcachedが使用していますが、Redisは選択していません.Libeventは汎用性に迎合するためにコードの膨大さと特定のプラットフォームでの多くの性能を犠牲にした.Redisは、コンパクトな設計を堅持し、ライブラリに依存しています.2、CAS問題:CASはMemcachedの中で比較的便利な競争修正資源を防止する方法である.CAS実装では、各cache keyに非表示のcas tokenを設定する必要があります.casはvalueバージョン番号に相当し、setごとにtokenをインクリメントする必要があります.
そのため、CPUとメモリの二重オーバーヘッドをもたらすが、単機10 G+cacheとQPSの数万に達した後、これらのオーバーヘッドは双方に相対的にいくつかをもたらす.
微細な性能の違い.
Redisはパッケージングイベントの処理にReactorモードを採用し,タイミングイベントの処理を追加した.Redis処理イベントは単一プロセスの単一スレッドであり、古典的なReatorモードはイベントをシリアル処理する.すなわち,1つのイベントが長すぎるとRedis全体がブロックされる.以下にRedis AEイベント処理モデルを簡単に解析する.コードから、epoll、select、kqueue、Solarisベースのevent portsが主にサポートされていることがわかります.主に、IOの読み取りイベントと書き込みイベントを含む1、IOイベント(ファイルイベント)の2種類のイベントドライバが提供されます.2、タイマーイベント.使い捨てタイマーと循環タイマーを含む.
基本的なデータ構造:@ae.h
//ファイルイベント処理インタフェースの定義(関数ポインタ)
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

//        (    ),          
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);

//aeMain   ,          
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
//       
typedef struct aeFileEvent {
    //    ,                 
    int mask; /* one of AE_(READABLE|WRITABLE) */
    //        
    aeFileProc *rfileProc;
    //        
    aeFileProc *wfileProc;
    //            
    void *clientData;
} aeFileEvent;

//    
typedef struct aeTimeEvent {
    //       ,           ,          
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    //          
    aeTimeProc *timeProc;
    //             ,    ,            
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

//            
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;
/* State of an event based program */
typedef struct aeEventLoop {
    //         
    int maxfd;   /* highest file descriptor currently registered */
    //           
    int setsize; /* max number of file descriptors tracked */
    //             id
    long long timeEventNextId;
    //            (     now

1、aeCreateEventLoopの下位epoll多重化を初期化し、aeEventLoopにvoid*タイプのapidataを格納し、下位層の実装を非表示にする.
typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;
//ae            
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    //  setsize epoll_event
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

//      ,setsize         ,  epoll    epoll_event   
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

	//           
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    
    //     setsize   
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    
    //        IO     , epoll,  epoll_event, epfd
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

ここで、最大ファイル記述子をパラメータsetSizeとし、後に作成するeventLoop->events、eventLoop->firedは、ファイル記述子をインデックスとして作成し、最大メモリsizeof(aeFiredEvent)+sizeof(aeFileEvent)40バイトに合計fd数を乗じたメモリでo(1)検索効率を置き換える価値がある.
2、aeCreateFileEventファイルイベントを作成するには、イベントが発生すると、対応するコールバック関数が呼び出されるイベントに対応するハンドラを転送する必要があります.ここで設計したaeFileEvent構造体は,イベントソース(FD),イベント,イベントハンドラを関連付ける.
//       ,     fd         ,          
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    //  fd            
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

//         
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    int mask = eventLoop->events[fd].mask & (~delmask);

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (mask != AE_NONE) {
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
         * EPOLL_CTL_DEL. */
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}

//      ,        eventLoop 
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    
    //    fd   FileEvent,               (    )
    aeFileEvent *fe = &eventLoop->events[fd];

	//      eventLoop           (    )
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    
    //                
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    
    //                 
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

3、aeProcessEventsこれはコア部分で、epollを通じてwaitはイベントを分離してfiredに保存する、文aeFileEvent*fe=&eventLoop->events[eventLoop->fired[j]について.fd];イベントをトリガするfdがeventsに直接マッピングされることによって、イベント割り当てが実現される.Reactorの核心はイベントの分離配分を実現することである.
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

	//      
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            //   fired         
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

//      
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

	//        ,     
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

	//                      DONT_WAIT  
    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
        	//           
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;
			//                   epoll wait  
            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
            	//             、        Timer event,      ?!
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

	    /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
            	//              
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

まとめは以下の通りである:1.Reactorモード、シリアル処理イベント2.タイミングイベント機能を有する(ただし、チェーンテーブルを用いて実現されるため)O(N)複雑度3.リードイベントの優先処理