Redisコマンドの要求と実行プロセス

20905 ワード

一、クライアントの作成


Redisサーバとクライアントが接続を確立すると、RedisはI/O多重化であるため、サーバはクライアントごとに1つの状態を維持する必要があるため、クライアントが作成されます.作成したらEventLoopに読み込みイベントをバインドします.このとき,クライアントにリードイベントが発生すると,サーバはクライアントのデータを読み出して次の処理を行うことができる.
/*
 *         
 */
redisClient *createClient(int fd) {

    //     
    redisClient *c = zmalloc(sizeof(redisClient));

    /* passing -1 as fd it is possible to create a non connected client.
     * This is useful since all the Redis commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    //   fd    -1  ,           
    //    fd   -1 ,              
    //    Redis                 ,      Lua        
    //          
    if (fd != -1) {
        //    
        anetNonBlock(NULL,fd);
        //    Nagle   
        anetEnableTcpNoDelay(NULL,fd);
        //    keep alive
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        //          loop (        )
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
//           .......
return c;
}

二、クライアントデータを読み取る


読み取りイベントが発生するとreadQueryFromClient関数が呼び出され、この関数は接続記述子のデータを読み出し、クライアントのqueryBufに格納します.さらにprocessInputBuffer関数を呼び出してクエリーバッファからコンテンツを読み出し、パラメータを作成し、コマンドを実行します.
/*
 *              
 */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient*) privdata;
    int nread, readlen;
    size_t qblen;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    //            
    server.current_client = c;

    //     (    16 MB)
    readlen = REDIS_IOBUF_LEN;

    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= REDIS_MBULK_BIG_ARG)
    {
        int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);

        if (remaining < readlen) readlen = remaining;
    }

    //               
    //        short read ,                  
    //                        ,
    qblen = sdslen(c->querybuf);
    //      ,            (peak)
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    //           
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    //          
    nread = read(fd, c->querybuf+qblen, readlen);

    //     
    if (nread == -1) {
        if (errno == EAGAIN) {
            nread = 0;
        } else {
            redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    //    EOF
    } else if (nread == 0) {
        redisLog(REDIS_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    }

    if (nread) {
        //     ,       (SDS) free   len   
        //    '\0'           
        sdsIncrLen(c->querybuf,nread);
        //                   
        c->lastinteraction = server.unixtime;
        //        master   ,         
        if (c->flags & REDIS_MASTER) c->reploff += nread;
    } else {
        //   nread == -1   errno == EAGAIN    
        server.current_client = NULL;
        return;
    }

    //                    
    //            
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }

    //           ,    ,     
    //                      
    processInputBuffer(c);

    server.current_client = NULL;
}

三、バッファの内容を読み取り、パラメータリストを作成する


processInputBuffer関数.この関数は3つの重要な関数を呼び出します.1つはprocessInlineBufferで、1つはprocessMultibulkBufferで、もう1つはprocessCommand関数です.redisはinlineとmultibulkプロトコルの2つのプロトコルをサポートします.Inlineプロトコルは古いプロトコルで、ここでは議論しません.現在は主にmultibulkプロトコルであり、このプロトコルはクライアントが入力したコマンド、例えば「set mykey myvalue」を「3r$3rSETr$5rmykeyr$7rmyvaluer」に変換し、「」の後の「3」は合計3つのパラメータを表す.プロトコルフォーマットに変換してサーバに転送します.サーバは、この文字列を解析して実行可能なコマンドにします.processInlineBufferとprocessmultibulkの2つの関数は,クライアントから送信されたプロトコルをargc(パラメータ総数)とargv(パラメータリスト)に解析する.RedisClientに保存します.ここでは、クライアント構造体に関連するメンバー定義を確認する必要があります.
/* With multiplexing we need to take per-client state.
 * Clients are taken in a liked list.
 *
 *    I/O      ,              。
 *
 *                   。
 */
typedef struct redisClient {

    //       
    int fd;

    //           
    redisDb *db;

    //             id (  )
    int dictid;

    //       
    robj *name;             /* As set by CLIENT SETNAME */

    //      
    //   ,sds  char*
    sds querybuf;

    //          
    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size */

    //     
    int argc;

    //       
    robj **argv;

    //            
    struct redisCommand *cmd, *lastcmd;

    //      :          
    int reqtype;

//         。。。。。
    } redisClient;

すなわち、この2つの関数は*3r$3rSETr$5rmykeyr$7rmyvaluer"の3(3つのパラメータ)をargc,SET,mykey,myvalueにargvに与える.
processInputBufferのソースコード
//             
void processInputBuffer(redisClient *c) {

    /* Keep processing while there is something in the input buffer */
    //                
    //        short read ,                  
    //                        ,
    //             
    while(sdslen(c->querybuf)) {

        /* Return if clients are paused. */
        //             ,      
        if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;

        /* Immediately abort if the client is in the middle of something. */
        // REDIS_BLOCKED             
        if (c->flags & REDIS_BLOCKED) return;

        /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands). */
        //            FLAG ,         
        if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;

        /* Determine request type when unknown. */
        //        
        //            Redis         :
        // http://redis.readthedocs.org/en/latest/topic/protocol.html
        //     ,              ,
        //         TELNET     

        //   querybuf[0]       *  ,  multibulk,   inline
        // redis       ,   inline,   mutibulk  ,inline    
        if (!c->reqtype) {
            if (c->querybuf[0] == '*') {
                //     
                c->reqtype = REDIS_REQ_MULTIBULK;
            } else {
                //     
                c->reqtype = REDIS_REQ_INLINE;
            }
        }

        //              ,      
        if (c->reqtype == REDIS_REQ_INLINE) {
            //          ,    REDIS_OK
            if (processInlineBuffer(c) != REDIS_OK) break;
        } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
            //          
            if (processMultibulkBuffer(c) != REDIS_OK) break;
        } else {
            redisPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            //     ,      
            if (processCommand(c) == REDIS_OK)
                resetClient(c);
        }
    }
}

四、命令を実行する


関数、processCommandを呼び出し、コマンドを実行します.この関数は、lookupCommandを呼び出してコマンドを検索し、call関数を呼び出してコマンドを実行し、最後にaddReplyを呼び出してクライアントに返信します.
このプロセスを理解するには,Redisにおける命令がどのように格納されているかを知る必要がある.
redisは、コマンドに関する情報を格納する構造体を定義します.たとえば,コマンド名,コマンドの実装関数,コマンドのパラメータ個数などである.
/*
 * Redis   
 */
struct redisCommand {

    //     
    char *name;

    //     
    redisCommandProc *proc;

    //     
    int arity;

    //        FLAG
    char *sflags; /* Flags as string representation, one char per flag. */

    //    FLAG
    int flags;    /* The actual flags, obtained from the 'sflags' field. */

    /* Use a function to determine keys arguments in a command line.
     * Used for Redis Cluster redirect. */
    //             。  Redis        。
    redisGetKeysProc *getkeys_proc;

    /* What keys should be loaded in background when calling this command? */
    //         key
    int firstkey; /* The first argument that's a key (0 = no keys) */
    int lastkey;  /* The last argument that's a key */
    int keystep;  /* The step between first and last key */

    //     
    // microseconds                
    // calls           
    long long microseconds, calls;
};

次にstruct redisServerでdict辞書タイプのコマンドテーブルを定義します.ここでcommandsはrename構成オプションで構成され、orig_commandsは元のコマンドテーブルです.受けないconfの影響.
struct redisServer {

      。。。。。。。

    //    (   rename        )
    dict *commands;             /* Command table */
    //    (  rename        )
    dict *orig_commands;        /* Command table before command renaming. */

    。。。。。。。。。
    }

そしてredisでcにはredisCommandTableの初期化テーブルが定義されている
struct redisCommand redisCommandTable[] = {
    {"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wm",0,NULL,1,1,1,0,0},
    {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
    ........,}

このredisCommandTableのredisCommandをpopulateCommandTable関数でredisServerのdict型orig_に追加します.commandsでは、検索するたびに辞書で対応するredisCommandsを検索し、対応するproc関数を呼び出してコマンドの処理を実現し、最後にaddReplyでクライアントの処理状況に返信することができます.