Redisソースの剖析と注釈(19)---Redisイベント処理の実現
31533 ワード
Redisイベント処理実装
1.Redisイベントの紹介
イベント駆動とは、コマンドを入力してリターンを押すと、メッセージが
イベント駆動は要約と抽象であり、I/O多重(I/O multiplexing)とも呼ばれ、その実現方式はシステムごとに異なり、Redisの方式については後述する.ファイルイベント(file event): タイムイベント(time event):Redisサーバのいくつかの操作は、所与のイベントポイントで実行する必要があり、タイムイベントは、サーバがこのようなタイミング操作を抽象化するものである.
2.イベントの抽象化
2.1ファイルイベント構造
そのうち
この関数は
イベントが準備されている場合、イベントをロックするには、ファイルイベントのファイル記述子とイベントタイプを知る必要があります.したがって、
2.2時間イベント構造
この構造から、時間イベントテーブルは、
ファイルイベントと同様に、時間イベントが指定したイベントが発生すると、対応する
ファイルイベントと時間イベントの両方を抽象化したが、
2.3イベントステータス構造
Redisクライアントのコマンドを使用して、現在選択されている多重化ライブラリを表示することもできます.
では、多重化ライブラリの選択が分かった以上、
epollのI/O多重化モデルについては、Linuxネットワークプログラミング-I/O多重化モデルのepollを参照してください.
epollモデルの
これらのAPIは、対応する下位多重ライブラリを呼び出してRedisイベント状態構造
redisすべてのソースコメントは、Redisイベント状態構造 に対応するイベントを登録する.は、受信ファイル記述子にイベントが発生するのを待つ、下位 に対応する.
3.イベントのソース実装
redisすべてのソースコメント
このイベントのメイン関数
先ほど述べた関数のパラメータの1つは
明らかに、 flags=0の場合、関数は何もせず、 に戻ります. flagsがAE_を設定した場合ALL_EVENTSは、すべてのタイプのイベント を実行する. flagsがAE_を設定した場合FILE_EVENTSは、ファイルイベント を実行する. flagsがAE_を設定した場合TIME_EVENTSは、タイムイベント を実行する. flagsがAE_を設定した場合DONT_WAIT、それでは関数は事件を処理した後に直接帰って、ブロックしないで 待ちます
Redisサーバは、
ブロック待ちの最長時間の間にファイルイベントがトリガーされると、ファイルイベントが先に実行され、その後にタイムイベントが実行されるため、処理時間イベントは通常、プリセットよりも遅くなります.
実行ファイルイベント acceptTcpHandler: acceptUnixHandler: sendReplyToClient:クライアントにコマンド返信を送信します. readQueryFromClient:clientが送信したリクエストを読み込むために使用されます.
次に、最も速く到達した時間イベントを取得する関数
この関数は何もありません.チェーンテーブルを巡って、最小値を見つけます.時間イベントを実行する関数
タイムイベントが存在しない場合は、タイミングイベント:特定の時間後にプログラムを1回実行させる. 周期的イベント:特定の時間ごとにプログラムを実行させます.
現在のタイムイベントが周期的である場合、サイクルイベントにタイムサイクルが追加された時点になります.タイミングイベントの場合は、タイムイベントが削除されます.
これで、Redisイベントの実装は完了しますが、イベントの他のAPI、例えば、イベントの作成、イベントの削除、イベントテーブルのサイズの調整などはリストされていません.すべてのソースコードのプロファイリングはgithubで表示できます.redisのすべてのソースコードコメント
1.Redisイベントの紹介
Redis
サーバは
です.まず、イベントドライバとは何かを簡単に説明します.イベント駆動とは、コマンドを入力してリターンを押すと、メッセージが
Redis
プロトコルのフォーマットに組み込まれてRedis
サーバに送信され、Redis
サーバがコマンドを受信し、コマンドを処理して返信を送信し、サーバと対話していない場合、サーバはブロック待機状態になります.CPUを出してスリープ状態にし、イベントがトリガーされるとオペレーティングシステムに起動します.イベント駆動によりCPUをより効率的に利用できる.イベント駆動は要約と抽象であり、I/O多重(I/O multiplexing)とも呼ばれ、その実現方式はシステムごとに異なり、Redisの方式については後述する.
redis
サーバでは、2つのイベントが処理されます.Redis
サーバは、クライアント(または他のRedisサーバ)にソケットを介して接続され、ファイルイベントは、サーバがソケットを操作する抽象的なものである.2.イベントの抽象化
Redis
は、2つのイベントをそれぞれ1つのデータ構造に抽象化して管理する.redisすべてのソースコメント2.1ファイルイベント構造
/* File event structure */
typedef struct aeFileEvent {
// :AE_NONE,AE_READABLE,AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */
//
aeFileProc *rfileProc;
//
aeFileProc *wfileProc;
//
void *clientData;
} aeFileEvent; //
そのうち
rfileProc
とwfileProc
のメンバーはそれぞれ2つの関数ポインタであり、彼らのプロトタイプはtypedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
この関数は
であり、現在のファイルイベントで指定されたイベントタイプが発生した場合、対応する
がイベントを処理するように呼び出されます.関数ポインタとコールバック関数の詳細イベントが準備されている場合、イベントをロックするには、ファイルイベントのファイル記述子とイベントタイプを知る必要があります.したがって、
aeFiredEvent
構造の統合管理が定義されています./* A fired event */
typedef struct aeFiredEvent {
//
int fd;
// :AE_NONE,AE_READABLE,AE_WRITABLE
int mask;
} aeFiredEvent; //
2.2時間イベント構造
/* Time event structure */
typedef struct aeTimeEvent {
// id
long long id; /* time event identifier. */
//
long when_sec; /* seconds */
//
long when_ms; /* milliseconds */
//
aeTimeProc *timeProc;
//
aeEventFinalizerProc *finalizerProc;
//
void *clientData;
//
struct aeTimeEvent *next;
} aeTimeEvent; //
この構造から、時間イベントテーブルは、
next
ポインタドメインがあり、次の時間イベントを指すため、チェーンテーブルであることがわかる.ファイルイベントと同様に、時間イベントが指定したイベントが発生すると、対応する
も呼び出され、構造メンバーtimeProc
とfinalizerProc
はいずれもコールバック関数であり、関数の原型は以下の通りである:関数ポインタとコールバック関数の詳細typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
ファイルイベントと時間イベントの両方を抽象化したが、
Redis
はイベント全体を抽象化する必要があり、aeEventLoop
構造を定義した.2.3イベントステータス構造
/* State of an event based program */
typedef struct aeEventLoop {
//
int maxfd; /* highest file descriptor currently registered */
//
int setsize; /* max number of file descriptors tracked */
// ID
long long timeEventNextId;
//
time_t lastTime; /* Used to detect system clock skew */
//
aeFileEvent *events; /* Registered events */
//
aeFiredEvent *fired; /* Fired events */
//
aeTimeEvent *timeEventHead;
//
int stop;
//
void *apidata; /* This is used for polling API specific data */
//
aeBeforeSleepProc *beforesleep;
} aeEventLoop; //
aeEventLoop
構造は、ポーリングイベントの状態を保存するためのvoid *
型の万能ポインタapidata
、すなわち、最下位呼び出しの多重化ライブラリのイベント状態を保存するためのものであり、Redisの多重化ライブラリの選択について、Redisは一般的なselect
epoll
evport
kqueue
INFO server
をパッケージし、それらはコンパイル段階において、異なるシステムに応じて最も性能の高い多重化ライブラリをRedisの多重化プログラムの実装として選択し,すべてのライブラリ実装のインタフェース名が同じであるため,Redis多重化プログラムの下位実装は互換性がある.具体的な選択ライブラリのソースコードは// IO , ,Linux "ae_epoll.c" "ae_select.c"
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
Redisクライアントのコマンドを使用して、現在選択されている多重化ライブラリを表示することもできます.
apidata
127.0.0.1:6379> INFO server
# Server
……
multiplexing_api:epoll
……
では、多重化ライブラリの選択が分かった以上、
epoll
に保存されているstruct epoll_event
モデルのイベントステータス構造を見てみましょう.ae_epoll.cファイル中typedef struct aeApiState {
// epoll
int epfd;
//
struct epoll_event *events;
} aeApiState; //
epollのI/O多重化モデルについては、Linuxネットワークプログラミング-I/O多重化モデルのepollを参照してください.
epollモデルの
EPOLLIN
の構造では、POLLOUT
aeFileEvent
などの独自のイベントタイプが定義されているが、Redisのファイルイベント構造mask
でもAE_READABLE
で独自のイベントタイプが定義されている.これは、前述したAE_WRITABLE
ファイルで実装された同じAPIです.// epoll , eventLoop
static int aeApiCreate(aeEventLoop *eventLoop)
//
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
// epoll
static void aeApiFree(aeEventLoop *eventLoop)
// epfd fd
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
// epfd fd
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
//
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// IO
static char *aeApiName(void)
これらのAPIは、対応する下位多重ライブラリを呼び出してRedisイベント状態構造
ae_epoll.c
を関連付けるものであり、aeEventLoop
の下位関数をカプセル化し、Redisがイベントを実装する場合、これらのインタフェースを呼び出すだけでよい.2つの重要な関数のソースコードを見て、どのように実現されているかを見てみましょう.redisすべてのソースコメント
epoll
のイベントテーブルaeEventLoop
に、event
関数// epfd fd
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
// EPOLL_CTL_ADD, epfd fd event
// EPOLL_CTL_MOD, fd event
// #define AE_NONE 0 //
// #define AE_READABLE 1 //
// #define AE_WRITABLE 2 //
// fd , , mask ,
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
// struct epoll_event {
// uint32_t events; /* Epoll events */
// epoll_data_t data; /* User data variable */
// };
ee.events = 0;
// ,
mask |= eventLoop->events[fd].mask; /* Merge old events */
// mask epoll
if (mask & AE_READABLE) ee.events |= EPOLLIN; //
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; //
ee.data.fd = fd; //
// ee epoll
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
epoll_ctl
関数//
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
//
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
//
if (retval > 0) {
int j;
numevents = retval;
// , eventLoop
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
// , mask
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
//
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
//
return numevents;
}
3.イベントのソース実装
redisすべてのソースコメント
epoll_wait
イベントのソースコードはすべてRedis
ファイルに定義されており、イベントのメイン関数ae.c
から言えば、一歩一歩深く分析されています.//
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
//
while (!eventLoop->stop) {
//
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
//
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
このイベントのメイン関数
aeMain
は、サーバがイベントを処理している場合、デッドサイクルであり、最も典型的なイベント駆動であり、デッドサイクルであることが明らかである.イベントを処理する関数aeMain
が呼び出され、それらのパラメータはイベント状態構造aeProcessEvents
とaeEventLoop
であり、ソースコードは以下の通りである.//
//
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
//
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
// , , , select(),
// ,
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
//
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
//
shortest = aeSearchNearestTimer(eventLoop);
//
if (shortest) {
long now_sec, now_ms;
//
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
//
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
//
if (ms > 0) {
// tvp
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
// , tvp 0
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
// ,
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
//
if (flags & AE_DONT_WAIT) {
// tvp 0,
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
//
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
//
// tvp NULL, , tvp ,
//
numevents = aeApiPoll(eventLoop, tvp);
//
for (j = 0; j < numevents; j++) {
//
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
// ,
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
//
if (fe->mask & mask & AE_READABLE) {
//
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
//
if (fe->mask & mask & AE_WRITABLE) {
// , ,
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++; // 1
}
}
/* Check time events */
//
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
先ほど述べた関数のパラメータの1つは
AE_ALL_EVENTS
で、彼の定義はAE_ALL_EVENTS
で、以下のように定義されています.#define AE_FILE_EVENTS 1 //
#define AE_TIME_EVENTS 2 //
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) //
#define AE_DONT_WAIT 4 //
明らかに、
ae.h
はflags
とAE_FILE_EVENTS
またはその結果であり、彼らの意味は以下の通りである.Redisサーバは、
AE_TIME_EVENTS
識別子が設定されていないため、イベントによってトリガされていない場合、待機をブロックします.しかし、彼はずっと待つことはありません.ファイルイベントの到来を待っています.彼はまだ時間を処理しなければならないので、AE_DONT_WAIT
を呼び出して傍受する前に、まず時間イベントテーブルから最近到着した時間を取得し、待つ時間に基づいてaeApiPoll
構造の変数を構築します.この変数は、サーバブロック待ちファイルイベントの最長時間を保持し、時間が到来してファイルイベントがトリガーされないと、struct timeval tv, *tvp
関数はブロックを停止し、さらにaeApiPoll
を呼び出して時間イベントを処理する.Redisサーバは、自身のリソースと状態を検査する周期的な検査の時間イベントを設定するため、この関数はprocessTimeEvents
が指すコールバック関数ですint serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
ブロック待ちの最長時間の間にファイルイベントがトリガーされると、ファイルイベントが先に実行され、その後にタイムイベントが実行されるため、処理時間イベントは通常、プリセットよりも遅くなります.
実行ファイルイベント
timeProc
とrfileProc
もコールバック関数を呼び出し、Redisはファイルイベントの処理をいくつかに分けて、異なるネットワーク通信需要を処理し、以下にコールバック関数の原型をリストする.void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
wfileProc
client用accept
.connect
client用ローカルaccept
.次に、最も速く到達した時間イベントを取得する関数
connect
実装を参照する.//
// 。
// NULL。
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
//
aeTimeEvent *te = eventLoop->timeEventHead;
aeTimeEvent *nearest = NULL;
//
while(te) {
// , nearest
if (!nearest || te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec &&
te->when_ms < nearest->when_ms))
nearest = te;
te = te->next;
}
return nearest;
}
この関数は何もありません.チェーンテーブルを巡って、最小値を見つけます.時間イベントを実行する関数
aeSearchNearestTimer
の実装に重点を置いた./* Process time events */
//
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te, *prev;
long long maxId;
time_t now = time(NULL);
// ,
//
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
//
eventLoop->lastTime = now;
prev = NULL;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1; // ID
//
while(te) {
long now_sec, now_ms;
long long id;
/* Remove events scheduled for deletion. */
//
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
//
if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;
//
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
// 。 , : , , :
if (te->id > maxId) {
te = te->next;
continue;
}
//
aeGetTime(&now_sec, &now_ms);
//
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
//
retval = te->timeProc(eventLoop, id, te->clientData);
// 1
processed++;
// ,
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
// , retval -1, ,
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
//
prev = te;
te = te->next;
}
return processed; //
}
タイムイベントが存在しない場合は、
processTimeEvents
が指すコールバック関数を呼び出し、現在のタイムイベントを削除します.存在する場合、finalizerProc
が指すコールバック関数を呼び出して時間イベントを処理する.Redisのタイムイベントは2種類に分けられます現在のタイムイベントが周期的である場合、サイクルイベントにタイムサイクルが追加された時点になります.タイミングイベントの場合は、タイムイベントが削除されます.
これで、Redisイベントの実装は完了しますが、イベントの他のAPI、例えば、イベントの作成、イベントの削除、イベントテーブルのサイズの調整などはリストされていません.すべてのソースコードのプロファイリングはgithubで表示できます.redisのすべてのソースコードコメント