redisプロトコルの解析

43831 ワード

redisクライアントがサーバにコマンドを送信場合、redisサーバはいずれもコマンドを解析する、対応するコマンド処理関数を呼び出して処理する必要がある.なお、redisのいずれのプロトコルコマンドも\r
で終了する.
以前のクライアントの接続プロセスで説明したように、redisサーバは、新しく接続されたクライアントのファイルイベントオブジェクトを作成し、その読み取り可能な状態をリスニングします.このファイルイベントオブジェクトのトリガコールバック関数はreadQueryFromClientです.つまり、クライアントが次のtelnet命令でredisサーバに接続されたと仮定します.
telnet 10.7.7.132 6379
Trying 10.7.7.132...
Connected to 10.7.7.132.
Escape character is '^]'.

次に、制御端末でredis setコマンドを入力し続ける(ここではsetコマンドを例にクライアントの接続をトリガする):
set key 3

redisサーバのselectシステム呼び出しが返され、クライアントコマンド入力を読み出す汎用関数(redis.c)として実装される関数readQueryFromClientが呼び出される.
832 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
833     redisClient *c = (redisClient*) privdata;
834     char buf[REDIS_QUERYBUF_LEN];
835     int nread;
836     REDIS_NOTUSED(el);
837     REDIS_NOTUSED(mask);
838 
839     nread = read(fd, buf, REDIS_QUERYBUF_LEN);
840     if (nread == -1) {
841         if (errno == EAGAIN) {
842             nread = 0;
843         } else {
844             redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
845             freeClient(c);
846             return;
847         }
848     } else if (nread == 0) {
849         redisLog(REDIS_DEBUG, "Client closed connection");
850         freeClient(c);
851         return;
852     }
853     if (nread) {
854         c->querybuf = sdscatlen(c->querybuf, buf, nread);
855         c->lastinteraction = time(NULL);
856     } else {
857         return;
858     }
859 
860 again:
861     if (c->bulklen == -1) {
862         /* Read the first line of the query */
863         char *p = strchr(c->querybuf,'
'); 864 size_t querylen; 865 if (p) { 866 sds query, *argv; 867 int argc, j; 868 869 query = c->querybuf; 870 c->querybuf = sdsempty(); 871 querylen = 1+(p-(query)); 872 if (sdslen(query) > querylen) { 873 /* leave data after the first line of the query in the buffer */ 874 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen); 875 } 876 *p = '\0'; /* remove "
"
*/ 877 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */ 878 sdsupdatelen(query); 879 880 /* Now we can split the query in arguments */ 881 if (sdslen(query) == 0) { 882 /* Ignore empty query */ 883 sdsfree(query); 884 return; 885 } 886 argv = sdssplitlen(query,sdslen(query)," ",1,&argc); 887 sdsfree(query); 888 if (argv == NULL) oom("Splitting query in token"); 889 for (j = 0; j < argc && j < REDIS_MAX_ARGS; j++) { 890 if (sdslen(argv[j])) { 891 c->argv[c->argc] = argv[j]; 892 c->argc++; 893 } else { 894 sdsfree(argv[j]); 895 } 896 } 897 free(argv); 898 /* Execute the command. If the client is still valid 899 * after processCommand() return and there is something 900 * on the query buffer try to process the next command. */ 901 if (processCommand(c) && sdslen(c->querybuf)) goto again; 902 return; 903 } else if (sdslen(c->querybuf) >= 1024) { 904 redisLog(REDIS_DEBUG, "Client protocol error"); 905 freeClient(c); 906 return; 907 } 908 } else { 909 /* Bulk read handling. Note that if we are at this point 910 the client already sent a command terminated with a newline, 911 we are reading the bulk data that is actually the last 912 argument of the command. */ 913 int qbl = sdslen(c->querybuf); 914 915 if (c->bulklen <= qbl) { 916 /* Copy everything but the final CRLF as final argument */ 917 c->argv[c->argc] = sdsnewlen(c->querybuf,c->bulklen-2); 918 c->argc++; 919 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1); 920 processCommand(c); 921 return; 922 } 923 } 924 }

Line 839は、readシステムによって読み取りクライアントの入力を呼び出す、読み取り操作は非ブロックとなり、エラーが発生しなければ、Line 854は、読み出したデータをクライアントオブジェクトのフィールドquerybufに保存し、後続の分析コマンドおよびそのパラメータを使用する.関数sdscatlenは、sdsを操作する一連の関数クラスタの1つである、その論理は、指定されたsdsにバイトを追加し、指定するsdsのスペースが足りなければ自動的にその容量を調整し、最後にその関数は追加データを格納sdsの対象に戻る.関数sdscatlenの実装は(sds.c):
114 sds sdscatlen(sds s, void *t, size_t len) {
115     struct sdshdr *sh;
116     size_t curlen = sdslen(s);
117 
118     s = sdsMakeRoomFor(s,len);
119     if (s == NULL) return NULL;
120     sh = (void*) (s-(sizeof(struct sdshdr)));
121     memcpy(s+curlen, t, len);
122     sh->len = curlen+len;
123     sh->free = sh->free-len;
124     s[curlen+len] = '\0';
125     return s;
126 }

Line 118は、sdsのオブジェクトに新しい空間を割り当てる追加のデータを格納し、sdsのオブジェクト内の空き空間に追加するデータを格納できない場合、この関数は新しい空間を割り当て、sdsのオブジェクト内の空き空間に追加するデータを格納できれば、新しい空間を割り当てる必要はない.関数sdsMakeRoomForの実装は(sds.c):
 94 static sds sdsMakeRoomFor(sds s, size_t addlen) {
 95     struct sdshdr *sh, *newsh;
 96     size_t free = sdsavail(s);
 97     size_t len, newlen;
 98 
 99     if (free >= addlen) return s;
100     len = sdslen(s);
101     sh = (void*) (s-(sizeof(struct sdshdr)));
102     newlen = (len+addlen)*2;
103     newsh = realloc(sh, sizeof(struct sdshdr)+newlen+1);
104 #ifdef SDS_ABORT_ON_OOM
105     if (newsh == NULL) sdsOomAbort();
106 #else
107     if (newsh == NULL) return NULL;
108 #endif
109 
110     newsh->free = newlen - len;
111     return newsh->buf;
112 }

Line 96はsdsのオブジェクトの空きスペースの大きさを取得する、Line 99はその空きスペースと追加するデータ量の大きさを比較し、空きスペースが追加するデータを格納できれば、そのまま戻る.Line 100:103は、新たな空間を動的に割り当てる、その前のsdsのオブジェクトが格納データと、追加するデータとを格納するようにする、その前に新たな空間が予約する.Line 103のライブラリ関数reallocは、sdsのオブジェクトの既存のデータを新しい割り当て空間にコピーし、以前のsdsのオブジェクトを解放する、ここでのコピーはsdsのメタデータ、すなわちsdshdrのオブジェクト情報を含むことに注意する.Line 110は、空きスペースのサイズを調整する、すなわち、新たに割り当てられたサイズから、前のsdsのオブジェクトに存在するデータの長さを減算する.Line 111はsdsのオブジェクトを返す、sdsのオブジェクトは、実はタイプsdshdrのオブジェクトを指すbufフィールドである.
関数sdscatlenに戻る、Line 121:123は、追加するデータを新しいsdsオブジェクトの空きスペースに格納し、sdsオブジェクトの格納データ長と空きスペースのサイズを調整する.関数readQueryFromClientに戻ると、Line 854フィールドquerybufは、クライアントコマンドおよびそのパラメータデータを格納sdsのオブジェクトを指す.Line 861は、非bulkタイプのredisコマンドまたはbulkタイプのコマンドの初回コマンド読み取りに対して、クライアントオブジェクトのbulklenフィールドの値が-1であると判断する.Line 862:907は、フィールドquerybufに読み出すredisコマンドおよびそのパラメータを解析する.Line 863は、新しい行バイト’’’’に位置付けられ、新しい行バイトが見つかれば、収益データにredisコマンド文字列が必ず含まれることを示す、このコマンドのパラメータは完全に入力されているか、完全に入力されていないかのいずれかである.具体的には、bulkタイプのredisコマンドについては、パラメータが完全に入力されていない可能性がある、非bulkタイプのredisコマンドについては、通常、完全な入力である.入力中に新しい行バイトに位置決めされていない場合は、redisコマンドの文字が完全に入力されていないことを示し、メインイベントループに戻り、selectリスニングを呼び出し続ける必要がある.Line 869:870型sdsのオブジェクトqueryは、クライアントオブジェクトのフィールドquerybufを指すとともに、空のsdsのオブジェクトを指す.Line 871は、sdsのオブジェクトが保持するデータの先頭から新しい行バイトまでのデータ長を算出する.Line 872:875の条件分岐が満たされると、新しい行バイトの後にデータがあることを示す、Line 874は、クライアントオブジェクトのフィールドquerybufを新しい行バイトの後のデータに向けることを意味する.Line 876:877タイプchar*のローカル変数pは、新しい行バイトの開始データを指し、新しい行バイトを0に置き換え、新しい行バイトの前がリターンバイトである場合、0に置き換える.ローカルsdsオブジェクトqueryが指すデータの'r'および''が0に置き換えられるため、Line 878は関数sdsupdatelenを呼び出してその長さを調整し、それを(sds.c)として実現する.
 87 void sdsupdatelen(sds s) {
 88     struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr)));
 89     int reallen = strlen(s);
 90     sh->free += (sh->len-reallen);
 91     sh->len = reallen;
 92 }

実装から分かるように、この関数は'0'で終わるC言語の文字逃げしか正しく処理できない.関数readQueryFromClientに戻る、Line 880:885長さ調整後、タイプsdsのローカルオブジェクトqueryデータの内容が空である、メインイベントループに戻り、selectリスニングを呼び出し続ける.Line 886は関数sdssplitlenを呼び出し、スペース分割文字分割入力のredisコマンドとそのパラメータを使用し、この関数の実装は(sds.c):
266 sds *sdssplitlen(char *s, int len, char *sep, int seplen, int *count) {
267     int elements = 0, slots = 5, start = 0, j;
268 
269     sds *tokens = malloc(sizeof(sds)*slots);
270 #ifdef SDS_ABORT_ON_OOM
271     if (tokens == NULL) sdsOomAbort();
272 #endif
273     if (seplen < 1 || len < 0 || tokens == NULL) return NULL;
274     for (j = 0; j < (len-(seplen-1)); j++) {
275         /* make sure there is room for the next element and the final one */
276         if (slots < elements+2) {
277             slots *= 2;
278             sds *newtokens = realloc(tokens,sizeof(sds)*slots);
279             if (newtokens == NULL) {
280 #ifdef SDS_ABORT_ON_OOM
281                 sdsOomAbort();
282 #else
283                 goto cleanup;
284 #endif
285             }
286             tokens = newtokens;
287         }
288         /* search the separator */
289         if ((seplen == 1 && *(s+j) == sep[0]) || (memcmp(s+j,sep,seplen) == 0)) {
290             tokens[elements] = sdsnewlen(s+start,j-start);
291             if (tokens[elements] == NULL) {
292 #ifdef SDS_ABORT_ON_OOM
293                 sdsOomAbort();
294 #else
295                 goto cleanup;
296 #endif
297             }
298             elements++;
299             start = j+seplen;
300             j = j+seplen-1; /* skip the separator */
301         }
302     }
303     /* Add the final element. We are sure there is room in the tokens array. */
304     tokens[elements] = sdsnewlen(s+start,len-start);
305     if (tokens[elements] == NULL) {
306 #ifdef SDS_ABORT_ON_OOM
307                 sdsOomAbort();
308 #else
309                 goto cleanup;
310 #endif
311     }
312     elements++;
313     *count = elements;
314     return tokens;
315 
316 #ifndef SDS_ABORT_ON_OOM
317 cleanup:
318     {
319         int i;
320         for (i = 0; i < elements; i++) sdsfree(tokens[i]);
321         free(tokens);
322         return NULL;
323     }
324 #endif
325 }

この関数の基本的な考え方は、Line 269は、redisコマンドおよびそのパラメータを指すために予めタイプsdsの配列を割り当てる、予め割り当てられた配列の大きさは5であり、ほとんどのredisコマンドおよびそのパラメータがこの数より小さいためである.個別のredisコマンドのパラメータが比較的多い場合、Line 278はこの配列のサイズを動的に調整する.Line 304配列のsdsオブジェクトは、redisコマンドおよびそのパラメータを記憶する呼び出し関数sdsnewlenによって新たに作成されたsdsオブジェクトを指す.
関数sdsに戻る、Lline 886が実行された後、局所変数argcは、分割するパラメータの数(redisコマンドを含む)を保存し、局所オブジェクトargvは、分割されたreadQueryFromClientオブジェクトを指し、これらのオブジェクトは動的に割り当てられるので、Line 887呼び出し関数sdsは、元のsdsfreeオブジェクトを解放することができる.Line 889:897は、ローカルオブジェクトargvにおいて指向するsdsのオブジェクトをクライアントオブジェクトのフィールドargvに記録し、パラメータ個数をargcフィールドに記録し、完了するとローカル管理配列argvを解放する.これで、今回のreadシステム呼び出しで読み出すデータのうち、新しい行バイト以前のデータ解析は終了し、redisコマンドが含まれているに違いない、そのパラメータは完全であるか、完全でないかのいずれかである.Line 901は、関数sdsを呼び出す具体的なredisコマンドを処理する準備をしており、この関数が戻るとクライアントオブジェクトのフィールドprocessCommandにデータが残っていると、Line 860にジャンプして先ほどの解析処理を継続する.Lline 909:922クライアントオブジェクトのquerybufの値は-1ではなく、bulkタイプのredisコマンドのみ可能であり、以前のredisコマンドおよびその一部のパラメータ値が読み取られた場合、Line 913はクライアントオブジェクト受信キャッシュにおけるデータ長を判断し、(今回読み取ったデータ長さかもしれないし、以前に読み取ったデータと今回読み取ったデータかもしれない)、Lline 915:922クライアントキャッシュオブジェクトにおける累積読み出しの長いがbulkタイプバイトデータの要求を満たす場合、クライアントオブジェクトのargvおよびargcを更新し続け、関数bulklenの処理を呼び出すと、その関数に伝達されたクライアントオブジェクトはredisコマンドおよびそのパラメータの準備が完了する.
関数processCommandの実装は(redis.c):
774 /* If this function gets called we already read a whole
775  * command, argments are in the client argv/argc fields.
776  * processCommand() execute the command or prepare the
777  * server for a bulk read from the client.
778  *
779  * If 1 is returned the client is still alive and valid and
780  * and other operations can be performed by the caller. Otherwise
781  * if 0 is returned the client was destroied (i.e. after QUIT). */
782 static int processCommand(redisClient *c) {
783     struct redisCommand *cmd;
784 
785     sdstolower(c->argv[0]);
786     /* The QUIT command is handled as a special case. Normal command
787      * procs are unable to close the client connection safely */
788     if (!strcmp(c->argv[0],"quit")) {
789         freeClient(c);
790         return 0;
791     }
792     cmd = lookupCommand(c->argv[0]);
793     if (!cmd) {
794         addReplySds(c,sdsnew("-ERR unknown command\r
"
)); 795 resetClient(c); 796 return 1; 797 } else if ((cmd->arity > 0 && cmd->arity != c->argc) || 798 (c->argc < -cmd->arity)) { 799 addReplySds(c,sdsnew("-ERR wrong number of arguments\r
"
)); 800 resetClient(c); 801 return 1; 802 } else if (cmd->type == REDIS_CMD_BULK && c->bulklen == -1) { 803 int bulklen = atoi(c->argv[c->argc-1]); 804 805 sdsfree(c->argv[c->argc-1]); 806 if (bulklen < 0 || bulklen > 1024*1024*1024) { 807 c->argc--; 808 c->argv[c->argc] = NULL; 809 addReplySds(c,sdsnew("-ERR invalid bulk write count\r
"
)); 810 resetClient(c); 811 return 1; 812 } 813 c->argv[c->argc-1] = NULL; 814 c->argc--; 815 c->bulklen = bulklen+2; /* add two bytes for CR+LF */ 816 /* It is possible that the bulk read is already in the 817 * buffer. Check this condition and handle it accordingly */ 818 if ((signed)sdslen(c->querybuf) >= c->bulklen) { 819 c->argv[c->argc] = sdsnewlen(c->querybuf,c->bulklen-2); 820 c->argc++; 821 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1); 822 } else { 823 return 1; 824 } 825 } 826 /* Exec the command */ 827 cmd->proc(c); 828 resetClient(c); 829 return 1; 830 }

Line 785は、redisコマンドを小文字フォーマットに変換するので、クライアントが入力redisコマンド文字は、大文字フォーマットでも小文字フォーマットでもよく、クライアントオブジェクトのフィールドargv[0]が指すprocessCommandは、redisコマンド文字列を保存し、argv[1]、argv[2]などがそのコマンドパラメータの順である.Line 788:791クライアントがsdsコマンドを送信して接続をアクティブに終了した場合、呼び出し関数quitはクライアントに関連するすべてのリソースを解放し、TCP接続をオフにします.
701 static void freeClient(redisClient *c) {
702     listNode *ln;
703 
704     aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
705     aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
706     sdsfree(c->querybuf);
707     listRelease(c->reply);
708     freeClientArgv(c);
709     close(c->fd);
710     ln = listSearchKey(server.clients,c);
711     assert(ln != NULL);
712     listDelNode(server.clients,ln);
713     free(c);
714 }

呼び出し関数freeClientは、クライアントオブジェクトに関するファイルイベントオブジェクトを解放する.解放フィールドaeDeleteFileEventは、入力受信キャッシュ領域を指す.呼び出し関数querybufは、クライアントオブジェクトの応答リストを解放する.呼び出し関数listReleaseは、クライアントオブジェクトのredisコマンドおよびそのパラメータを解放する.システム呼び出しfreeClientArgvはTCP接続を閉じる.Line 710:712このクライアントオブジェクトをグローバルチェーンテーブルから削除する.Line 713は、クライアントオブジェクトがスタック上で動的に割り当てるので、メモリを解放する.
関数closeに戻り、Line 792はコマンド文字列に対応する処理関数を検索し、関数processCommandの実装は(redis.c):
759 static struct redisCommand *lookupCommand(char *name) {
760     int j = 0;
761     while(cmdTable[j].name != NULL) {
762         if (!strcmp(name,cmdTable[j].name)) return &cmdTable[j];
763         j++;
764     }
765     return NULL;
766 }

コマンド文字列を比較することにより、一致するものが見つかれば、そのコマンド処理関数を返す.グローバルオブジェクトlookupCommandは(redis.c)として定義される.
180 static struct redisCommand cmdTable[] = {
181     {"get",getCommand,2,REDIS_CMD_INLINE},
182     {"set",setCommand,3,REDIS_CMD_BULK},
183     {"setnx",setnxCommand,3,REDIS_CMD_BULK},
184     {"del",delCommand,2,REDIS_CMD_INLINE},
185     {"exists",existsCommand,2,REDIS_CMD_INLINE},
186     {"incr",incrCommand,2,REDIS_CMD_INLINE},
187     {"decr",decrCommand,2,REDIS_CMD_INLINE},
188     {"rpush",rpushCommand,3,REDIS_CMD_BULK},
189     {"lpush",lpushCommand,3,REDIS_CMD_BULK},
190     {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
191     {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
192     {"llen",llenCommand,2,REDIS_CMD_INLINE},
193     {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
194     {"lset",lsetCommand,4,REDIS_CMD_BULK},
195     {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
196     {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
197     {"sadd",saddCommand,3,REDIS_CMD_BULK},
198     {"srem",sremCommand,3,REDIS_CMD_BULK},
199     {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
200     {"scard",scardCommand,2,REDIS_CMD_INLINE},
201     {"sinter",sinterCommand,-2,REDIS_CMD_INLINE},
202     {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
203     {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
204     {"select",selectCommand,2,REDIS_CMD_INLINE},
205     {"move",moveCommand,3,REDIS_CMD_INLINE},
206     {"rename",renameCommand,3,REDIS_CMD_INLINE},
207     {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
208     {"keys",keysCommand,2,REDIS_CMD_INLINE},
209     {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
210     {"ping",pingCommand,1,REDIS_CMD_INLINE},
211     {"echo",echoCommand,2,REDIS_CMD_BULK},
212     {"save",saveCommand,1,REDIS_CMD_INLINE},
213     {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
214     {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
215     {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
216     {"type",typeCommand,2,REDIS_CMD_INLINE},
217     {"",NULL,0,0}
218 };

タイプcmdTableの定義は(redis.c):
114 typedef void redisCommandProc(redisClient *c);
115 struct redisCommand {
116     char *name;
117     redisCommandProc *proc;
118     int arity;
119     int type;
120 };

フィールドredisCommandはredisコマンド文字列、フィールドnameはこのコマンドの処理関数、フィールドprocはredisコマンドパラメータの個数であり、コマンド自体を含む、フィールドarityはこのコマンドがbulkコマンドであるか否かを説明する.
関数typeに戻り、Line 793:802 redisサービス側がクライアントのコマンドをサポートしていない場合、またはコマンドパラメータの個数が正しくない場合、呼び出し関数processCommandはクライアントオブジェクトをリセットし、この関数の実装は(redis.c):
768 /* resetClient prepare the client to process the next command */
769 static void resetClient(redisClient *c) {
770     freeClientArgv(c);
771     c->bulklen = -1;
772 }

リセットクライアントオブジェクトは、解析するredisコマンドおよびそのパラメータを解放し、付与フィールドresetClientを初期値とし、この関数はクライアントとの接続を切断する.
関数bulklenに戻る、Line 802は、コマンドがbulkタイプであり、そのパラメータ、すなわちフィールドbulklenが-1である初めて解析された場合、解析の最後のパラメータは必ずvalueバイトデータの長さである、Line 803は文字列パラメータを整数に変換し、変換が完了すると、Line 805はprocessCommandのオブジェクトを解放することができる.Line 806:812から分かるように、valueバイト数は最大1 GBをサポートする.Line 815は、次に読み出す必要があるvalueのバイト数を設定し、rで終わる2バイトを含む.Line 816:822クライアントオブジェクト受信キャッシュに既に読み出したいバイトが存在する場合、新しいsdsオブジェクトを作成してvalueバイトを格納し、valueバイトはr終了バイトを含まないことに注意し、同時にクライアントオブジェクトはこのsdsオブジェクトを記録し、これでこのredisコマンドの持つパラメータはすべて準備が完了し、Line 827はこのコマンドの処理関数を呼び出し、処理が完了すると、sdsを呼び出すクライアントオブジェクトをリセットし、動的resetClientオブジェクトを解放する.Line 818の条件が満たされていないと判断する、パラメータの入力が不完全であることを示し、パラメータを読み取る必要がある場合、メインイベントループに戻り、ファイルイベントオブジェクトのリスニングを継続し、クライアントの入力を待つ.Line 802の条件判定が満たされていない場合は、そのコマンドまたは非bulkタイプのコマンド、またはbulkコマンドである、そのすべてのパラメータがクライアントオブジェクトに記録されていることを示す、Line 827コマンド処理関数を直接呼び出す.