Redisコード読み取り3-Redisネットワークリスニング(2)
9456 ワード
この記事では、Redisネットワークのリスニングプロセスの各ステップについて説明します. aeCreateEventLoop:ループリスニング用のeventLoopを作成し、Redisは主流の3つのイベントトリガメカニズムをサポートする:select,epoll,kqueue、config.h奥配置HAVE_EPOLL/HAVE_KQUEUEは、異なるオペレーティングシステムに基づいて適切なメカニズムを選択する:aeを呼び出すepoll.c/ae_select.c/ae_kqueue.c中のaeApiCreate;eventLoopを作成するときにbeforesleepが指定されていません.ループリスニングを開始する前に、関数beforeSleepをeventLoopにバインドします.この関数も後述 です. aeCreateTimeEvent/:タイミングイベントを作成し、タイミング関数serverCronを登録し、後で説明します. aeCreateFileEvent:読み取りI/Oイベントを登録し、関数acceptTcpHandler(同様に後述)をバインドし、多重化がepollメカニズムを採用すれば、LTモードでトリガする. serverを作成します.ae後、aeMainという方法でネットワーク傍受を開始する.ここでのコードは、 Beforesleep:名前の通り、この方法はRedisがsleep/waitに入るたびにリスニングを待つポートでI/Oイベントが発生する前に呼び出されます(この話はあまりにも拗ねています...).やはりコードを見てみましょう: beforesleepが実行された後aeprocessEventsは、主に様々な傍受されたファイルの読み書きイベントと期限切れ応答のタイミングイベントを処理する.この方法はコードが長く、論理が簡単で、貼り付けられないため、手順を簡単に説明します.a)まずeventLoopに登録されているtimeEventを巡回することで、現在に最も近いtimeEvent(shortest)を見つけます.b)epoll_を呼び出すwait()メソッド、I/Oイベントの発生を待つ、できるだけ早く時間イベントに応答するため、epoll_wait()メソッドの待機時間はshortestと現在の時間の差であり、差がゼロ未満の場合epoll_wait()はI/Oイベントが発生するまでポーリングします.c)eventLoopのfiredのaeFileEventに応答し,ここで呼び出されるのが,以前に設定したファイル処理関数accepttTcpHandlerである.d)I/Oイベントに応答した後、timeEventHeadを介してtimeEventを巡回し、timeProc-serverCronに逐一応答する.タイミングイベントに応答する際の注意点: は、1つのタイミングイベントに応答するとeventLoop内のタイミングイベントチェーンテーブルが変更される可能性があるため、タイミングイベントチェーンテーブルを最初からノードから巡回する. は、毎回、タイミングイベントチェーンテーブルを先頭ノードから巡回するので、応答ループ呼び出しを回避する方法、すなわち、応答タイミングイベントaの場合、aの処理関数timeProcに新たなタイミングイベントbがregisterされ、イベントaに応答した後にbが鳴ると、ループ応答をもたらすことを考慮する.この状況を解決するために、redisはeventLoopの中で1つのtimeEventNextId、すなわち次のタイミングイベントのidを維持した.例えば、現在のeventLoopのtimeEvent aが1つしかない場合、timeEventNextId=2、a->id=1 aのtimeProcメソッドがtimeEvent bを登録すると、timeEventNextId=3、b->id=2となる.では、redisは、遍歴タイミングイベントの開始時に遍歴前のeventLoopのmaxId=timeEventNextId-1を保存し、遍歴タイミングイベントの時にあるtimeEvent->id>maxIdがあれば、このイベントをスキップする. 著者らも、毎回、タイミングイベントを最初からループするのは良いアルゴリズムではないことを認識しているが、現在Redisにはタイミングイベントが1つしかないため、redisにとって問題ではないが、著者らは将来のバージョンで を改善すると述べている.
acceptTcpHandler:この方法は主にネットワークポートを傍受する:i.anetTcpAccept方法を呼び出すことによって傍受ポート上のclient connectionを得る;ii.次にacceptCommonHandlerを呼び出してredisClientオブジェクトを作成し、現在接続されているclientの数が構成されている最大clientの数より大きい場合、現在の接続を拒否し、「max number of clients reached」プロンプト情報を返します.iii.createClientメソッドを呼び出してredisClientを作成し、同時に新しいfileEvent(AE_READADADABLE)を登録し、バインド処理関数はreadQueryFromClientである. を実行する. _installWriteEvent:上記のファイルイベントはすべてAEです.READADBLEイベントですが、Redisはクライアント要求後のコマンドを実行した後、クライアント側にreturnデータを書き込み、Socketにデータを書き込み、AE_AE_WRITABLEイベント.Redisはcommandを実行した後、addReplyメソッドを呼び出し、このメソッドでinstallWriteEventを呼び出してAE_を登録します.WRITABLEイベントは、クライアントにデータを送信するためにイベント処理関数sendReplyToClientをバインドします. serverCron:fileEventの処理関数を説明した後、最後にtimeEventの処理関数を紹介します.名前の通り、serverCronはRedis Serverのスケジュールタスクです.この方法は比較的複雑で,処理することも多く,主にRedisの動作状況(memory,clientsなど),AOF write,VM Swap,BGSAVEなどRedisの正常な動作に密接に関係する事項を記録することに集中している.この方法のコードはたくさんあります.だから単独で紹介しますです.
/* test for polling API */
#ifdef __linux__
#define HAVE_EPOLL 1
#endif
#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
#define HAVE_KQUEUE 1
#endif
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
は、イベントを処理するたびにserverを1回実行する.aeで設定したbeforesleepの方法について、beforesleepを紹介します.void beforeSleep(struct aeEventLoop *eventLoop) {
REDIS_NOTUSED(eventLoop);
listNode *ln;
redisClient *c;
/* Awake clients that got all the swapped keys they requested */
if (server.vm_enabled && listLength(server.io_ready_clients)) {
listIter li;
listRewind(server.io_ready_clients,&li);
while((ln = listNext(&li))) {
c = ln->value;
struct redisCommand *cmd;
/* Resume the client. */
listDelNode(server.io_ready_clients,ln);
c->flags &= (~REDIS_IO_WAIT);
server.vm_blocked_clients--;
aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c);
cmd = lookupCommand(c->argv[0]->ptr);
redisAssert(cmd != NULL);
call(c,cmd);
resetClient(c);
/* There may be more data to process in the input buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0)
processInputBuffer(c);
}
}
/* Try to process pending commands for clients that were just unblocked. */
while (listLength(server.unblocked_clients)) {
ln = listFirst(server.unblocked_clients);
redisAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients,ln);
c->flags &= ~REDIS_UNBLOCKED;
/* Process remaining data in the input buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0)
processInputBuffer(c);
}
/* Write the AOF buffer on disk */
flushAppendOnlyFile();
}
この方法は3つのことをしました:I.RediがVirtual memoryを開いた場合、いくつかのclientsが要求したkeysはswapされたため、これらのclientはblockに閉じ込められ、これらのclientsが要求したkeysがメモリにswapされた場合、これらのblockに閉じ込められたclientsはunblockされ、処理されるべきです.io_ready_Clientsは、これらのclientsを維持するために用いるものであり、clientの要求にできるだけ早く応答するために、sleepのたびにこれらの要求IIを先に処理する.一部のRedis操作はblockingであり、BLPOPのように、これらの操作を実行するclientsはblockによってブロックされる可能性があります.unblocked_Clientsというlistはunblockされたばかりのclientsを維持するために使用され、このlistが空でなければ、できるだけ早くこれらのclients III.flushAppendOnlyFileに応答しなければならない.ClientsのSocketのwriteはeventLoopでしか行われないが、flushAppendOnlyFileはsleepのたびに呼び出されるため、eventLoopのすべてのAOF writesはメモリの中の1つのbufferに先に書かれており、flushAppendOnlyFileはこのbufferの内容flushをdiskに伝える責任を負っている.static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
if (te->id > maxId)
{
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
/* After an event is processed our time event list may
* no longer be the same, so we restart from head.
* Still we make sure to don't process events registered
* by event handlers itself in order to don't loop forever.
* To do so we saved the max ID we want to handle.
*
* FUTURE OPTIMIZATIONS:
* Note that this is NOT great algorithmically. Redis uses
* a single time event so it's not a problem but the right
* way to do this is to add the new elements on head, and
* to flag deleted elements in a special way for later
* deletion (putting references to the nodes to delete into
* another linked list). */
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
aeDeleteTimeEvent(eventLoop, id);
}
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
c->bufpos = 0;
anetNonBlock(NULL,fd);
anetTcpNoDelay(NULL,fd);
if (!c) return NULL;
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
.........
}
iv.readQueryFromClient:指定されたSocketからclientから送信されたデータを読み出し、Redisのプロトコル(後述)に従ってRedisに組み立てる各commandを解析し、commandTableを検索することでcommand