Redisコマンドの要求と実行プロセス
20905 ワード
一、クライアントの作成
Redisサーバとクライアントが接続を確立すると、RedisはI/O多重化であるため、サーバはクライアントごとに1つの状態を維持する必要があるため、クライアントが作成されます.作成したらEventLoopに読み込みイベントをバインドします.このとき,クライアントにリードイベントが発生すると,サーバはクライアントのデータを読み出して次の処理を行うことができる.
/*
*
*/
redisClient *createClient(int fd) {
//
redisClient *c = zmalloc(sizeof(redisClient));
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the Redis commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
// fd -1 ,
// fd -1 ,
// Redis , Lua
//
if (fd != -1) {
//
anetNonBlock(NULL,fd);
// Nagle
anetEnableTcpNoDelay(NULL,fd);
// keep alive
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// loop ( )
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
// .......
return c;
}
二、クライアントデータを読み取る
読み取りイベントが発生するとreadQueryFromClient関数が呼び出され、この関数は接続記述子のデータを読み出し、クライアントのqueryBufに格納します.さらにprocessInputBuffer関数を呼び出してクエリーバッファからコンテンツを読み出し、パラメータを作成し、コマンドを実行します.
/*
*
*/
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
int nread, readlen;
size_t qblen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
//
server.current_client = c;
// ( 16 MB)
readlen = REDIS_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= REDIS_MBULK_BIG_ARG)
{
int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
//
// short read ,
// ,
qblen = sdslen(c->querybuf);
// , (peak)
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
//
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
//
nread = read(fd, c->querybuf+qblen, readlen);
//
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
// EOF
} else if (nread == 0) {
redisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
if (nread) {
// , (SDS) free len
// '\0'
sdsIncrLen(c->querybuf,nread);
//
c->lastinteraction = server.unixtime;
// master ,
if (c->flags & REDIS_MASTER) c->reploff += nread;
} else {
// nread == -1 errno == EAGAIN
server.current_client = NULL;
return;
}
//
//
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
// , ,
//
processInputBuffer(c);
server.current_client = NULL;
}
三、バッファの内容を読み取り、パラメータリストを作成する
processInputBuffer関数.この関数は3つの重要な関数を呼び出します.1つはprocessInlineBufferで、1つはprocessMultibulkBufferで、もう1つはprocessCommand関数です.redisはinlineとmultibulkプロトコルの2つのプロトコルをサポートします.Inlineプロトコルは古いプロトコルで、ここでは議論しません.現在は主にmultibulkプロトコルであり、このプロトコルはクライアントが入力したコマンド、例えば「set mykey myvalue」を「3r$3rSETr$5rmykeyr$7rmyvaluer」に変換し、「」の後の「3」は合計3つのパラメータを表す.プロトコルフォーマットに変換してサーバに転送します.サーバは、この文字列を解析して実行可能なコマンドにします.processInlineBufferとprocessmultibulkの2つの関数は,クライアントから送信されたプロトコルをargc(パラメータ総数)とargv(パラメータリスト)に解析する.RedisClientに保存します.ここでは、クライアント構造体に関連するメンバー定義を確認する必要があります.
/* With multiplexing we need to take per-client state.
* Clients are taken in a liked list.
*
* I/O , 。
*
* 。
*/
typedef struct redisClient {
//
int fd;
//
redisDb *db;
// id ( )
int dictid;
//
robj *name; /* As set by CLIENT SETNAME */
//
// ,sds char*
sds querybuf;
//
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */
//
int argc;
//
robj **argv;
//
struct redisCommand *cmd, *lastcmd;
// :
int reqtype;
// 。。。。。
} redisClient;
すなわち、この2つの関数は*3r$3rSETr$5rmykeyr$7rmyvaluer"の3(3つのパラメータ)をargc,SET,mykey,myvalueにargvに与える.
processInputBufferのソースコード
//
void processInputBuffer(redisClient *c) {
/* Keep processing while there is something in the input buffer */
//
// short read ,
// ,
//
while(sdslen(c->querybuf)) {
/* Return if clients are paused. */
// ,
if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;
/* Immediately abort if the client is in the middle of something. */
// REDIS_BLOCKED
if (c->flags & REDIS_BLOCKED) return;
/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands). */
// FLAG ,
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
/* Determine request type when unknown. */
//
// Redis :
// http://redis.readthedocs.org/en/latest/topic/protocol.html
// , ,
// TELNET
// querybuf[0] * , multibulk, inline
// redis , inline, mutibulk ,inline
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
//
c->reqtype = REDIS_REQ_MULTIBULK;
} else {
//
c->reqtype = REDIS_REQ_INLINE;
}
}
// ,
if (c->reqtype == REDIS_REQ_INLINE) {
// , REDIS_OK
if (processInlineBuffer(c) != REDIS_OK) break;
} else if (c->reqtype == REDIS_REQ_MULTIBULK) {
//
if (processMultibulkBuffer(c) != REDIS_OK) break;
} else {
redisPanic("Unknown request type");
}
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
// ,
if (processCommand(c) == REDIS_OK)
resetClient(c);
}
}
}
四、命令を実行する
関数、processCommandを呼び出し、コマンドを実行します.この関数は、lookupCommandを呼び出してコマンドを検索し、call関数を呼び出してコマンドを実行し、最後にaddReplyを呼び出してクライアントに返信します.
このプロセスを理解するには,Redisにおける命令がどのように格納されているかを知る必要がある.
redisは、コマンドに関する情報を格納する構造体を定義します.たとえば,コマンド名,コマンドの実装関数,コマンドのパラメータ個数などである.
/*
* Redis
*/
struct redisCommand {
//
char *name;
//
redisCommandProc *proc;
//
int arity;
// FLAG
char *sflags; /* Flags as string representation, one char per flag. */
// FLAG
int flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
// 。 Redis 。
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
// key
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
//
// microseconds
// calls
long long microseconds, calls;
};
次にstruct redisServerでdict辞書タイプのコマンドテーブルを定義します.ここでcommandsはrename構成オプションで構成され、orig_commandsは元のコマンドテーブルです.受けないconfの影響.
struct redisServer {
。。。。。。。
// ( rename )
dict *commands; /* Command table */
// ( rename )
dict *orig_commands; /* Command table before command renaming. */
。。。。。。。。。
}
そしてredisでcにはredisCommandTableの初期化テーブルが定義されている
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"setnx",setnxCommand,3,"wm",0,NULL,1,1,1,0,0},
{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
{"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
........,}
このredisCommandTableのredisCommandをpopulateCommandTable関数でredisServerのdict型orig_に追加します.commandsでは、検索するたびに辞書で対応するredisCommandsを検索し、対応するproc関数を呼び出してコマンドの処理を実現し、最後にaddReplyでクライアントの処理状況に返信することができます.