Redisソース解析:29トランザクション

11963 ワード

RedisはMULTl,EXEC,WATCH,DISCARDなどのコマンドによりトランザクション機能を実現する.
トランザクションはMULTIコマンドから始まり、その後、クライアントからの他のコマンドがキューに入れられ、クライアントからEXECコマンドが送られた後、Redisはキュー内のコマンドを順次実行します.また、サーバは、実行中にトランザクションを中断することなく、他のクライアントのコマンド要求を実行します.これにより、トランザクション内のすべてのコマンドが実行された後、他のクライアントのコマンド要求を処理します.
WATCHコマンドは、クライアントがEXECコマンドの実行前に任意の数のデータベースキーを監視し、EXECコマンドの実行時に、監視されているキーが他のクライアントによって変更されたかどうかを確認できるようにします.もしそうであれば、Redisはトランザクションの実行を拒否し、トランザクションの実行に失敗したことを示す空の返信をクライアントに返します.
DISCARDコマンドは、トランザクション・キューを空にし、トランザクション・モードを終了します.
 
一:MULTIとEXEC命令
1:データ構造
クライアントを表す構造体redisClientには、multiState構造のmstateプロパティがあります.
typedef struct redisClient {
    ...
    multiState mstate;      /* MULTI/EXEC state */
    ...
} redisClient;

MultiState構造は、トランザクション内のコマンドを保存するためのトランザクションキューです.MultiState構造の定義は次のとおりです.
/* Client MULTI/EXEC state */
typedef struct multiCmd {
    robj **argv;
    int argc;
    struct redisCommand *cmd;
} multiCmd;

typedef struct multiState {
    multiCmd *commands;     /* Array of MULTI commands */
    int count;              /* Total number of MULTI commands */
    int minreplicas;        /* MINREPLICAS for synchronous replication */
    time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;

MultiStateのcommands配列プロパティは、countタグコマンドバー数を使用して各コマンドを保存します.後の2つのプロパティは使用されません.
 
2:MULTIコマンド
クライアントからMULTIコマンドが送信されると、このコマンドの処理関数はmultiCommandであり、コードは以下の通りである.
void multiCommand(redisClient *c) {
    if (c->flags & REDIS_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    c->flags |= REDIS_MULTI;
    addReply(c,shared.ok);
}

コードは簡単です.クライアントのフラグビットにREDIS_を追加します.MULTIタグは、クライアントがトランザクションモードに入ることを示す.
 
3:トランザクションコマンドキュー
クライアントフラグビットにREDIS_を追加するとMULTIタグの後、コマンドを処理するprocessCommand関数には、次のような論理があります.
int processCommand(redisClient *c) {
    ...
    /* Exec the command */
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        call(c,REDIS_CALL_FULL);
        ...
    }
    return REDIS_OK;
}

したがって、トランザクションモードでは、クライアントからEXEC、DISCARD、MULTI、WATCHといういくつかのトランザクションコマンドのいずれかでない場合、queueMultiCommand関数が呼び出され、コマンドがc->mstateにキューされ、クライアント「+QUEUED」情報に返信される.
 
queueMultiCommand関数のコードは次のとおりです.
void queueMultiCommand(redisClient *c) {
    multiCmd *mc;
    int j;

    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj*)*c->argc);
    memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    c->mstate.count++;
}

コードは簡単です.コマンドをc->mstateに並べて、説明しません.
 
4:EXECコマンド
クライアントからEXECコマンドが来た場合、execCommand関数を呼び出して処理します.この関数のコードは次のとおりです.
void execCommand(redisClient *c) {
    int j;
    robj **orig_argv;
    int orig_argc;
    struct redisCommand *orig_cmd;
    int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */

    if (!(c->flags & REDIS_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }

    /* Check if we need to abort the EXEC because:
     * 1) Some WATCHed key was touched.
     * 2) There was a previous error while queueing commands.
     * A failed EXEC in the first case returns a multi bulk nil object
     * (technically it is not an error but a special behavior), while
     * in the second an EXECABORT error is returned. */
    if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
        addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
                                                  shared.nullmultibulk);
        discardTransaction(c);
        goto handle_monitor;
    }

    /* Exec all the queued commands */
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyMultiBulkLen(c,c->mstate.count);
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;

        /* Propagate a MULTI request once we encounter the first write op.
         * This way we'll deliver the MULTI/..../EXEC block as a whole and
         * both the AOF and the replication link will have the same consistency
         * and atomicity guarantees. */
        if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {
            execCommandPropagateMulti(c);
            must_propagate = 1;
        }

        call(c,REDIS_CALL_FULL);

        /* Commands may alter argc/argv, restore mstate. */
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    discardTransaction(c);
    /* Make sure the EXEC command will be propagated as well if MULTI
     * was already propagated. */
    if (must_propagate) server.dirty++;

handle_monitor:
    /* Send EXEC to clients waiting data from MONITOR. We do it here
     * since the natural order of commands execution is actually:
     * MUTLI, EXEC, ... commands inside transaction ...
     * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command
     * table, and we do it here with correct ordering. */
    if (listLength(server.monitors) && !server.loading)
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}

関数では、クライアントが現在トランザクションモードにない場合、クライアントエラー情報を返信します.
クライアントフラグビットにREDIS_が設定されている場合DIRTY_CASまたはREDIS_DIRTY_EXECタグは、クライアントの対応するエラーメッセージに返信し、discardTransactionを呼び出してクライアントのトランザクションモードを終了し、最後にmonitorにメッセージを送信した後、直接戻ります.REDIS_DIRTY_CASタグは、クライアントWATCHのキーが変更されたことを示す.REDIS_DIRTY_EXECタグは、トランザクション中にprocessCommand関数のチェック中にエラーが発生したことを示します.たとえば、コマンドが見つからなかったり、コマンドパラメータの個数がエラーになったりします.
 
次に、トランザクションのコマンドの実行を開始します.
まずunwatchAllKeysを呼び出し、クライアントcがWATCHのキーを一切持たないように設定します.クライアントの現在のコマンド属性をorig_*に記録します.を選択して、その後のリカバリを実行します.
次に,輪訓c−>mstateに並んだ各コマンドについて,コマンドを順次実行する.注意:最初の書き込み操作に遭遇した場合、execCommandPropagateMulti関数を呼び出し、ノードとAOFファイルからMULTIコマンドを追加する必要があります.c->mstateのコマンドは順次実行され、あるコマンドの実行に失敗し、後続のコマンドの実行に影響しない.
最後に、orig_*に記録されたコマンド属性を復元し、discardTransactionを呼び出してクライアントのトランザクションモードを終了します.モニタにメッセージを送信した後、直接戻ります.
 
5:エラー処理
トランザクション中に、2つのタイプのエラーが発生する可能性があります.
1つのエラーは、パラメータエラー、コマンド名エラーなどの構文エラーです.これらのエラーはprocessCommand関数でコマンド処理関数を呼び出す前にチェックされます.エラーをチェックするとflagTransaction関数を呼び出し、クライアントフラグビットにREDIS_を追加します.DIRTY_EXECタグ:
void flagTransaction(redisClient *c) {
    if (c->flags & REDIS_MULTI)
        c->flags |= REDIS_DIRTY_EXEC;
}

この場合、このコマンドはエンキューされず、最終的なEXECコマンド実行関数でクライアントエラーメッセージに直接返信します.
 
もう1つのエラーは、コマンド実行関数を実際に呼び出したときにのみチェックできる実行エラーです.例えば、文字列タイプのキーに対してRPUSHのようなリストのみのコマンドを実行します.
EXECコマンド処理関数execCommandでは,キューに並ぶコマンドに対してcall関数を順次呼び出す.したがって、この場合、エラーのコマンドは、後続のコマンドの実行に影響しません.
 
二:WATCHコマンド
クライアントは、EXECコマンドが実行される前に、任意の数のデータベース・キーを監視し、EXECコマンドが実行されると、監視されているキーが他のクライアントによって変更されたかどうかを確認できます.もしそうであれば、Redisはトランザクションの実行を拒否し、トランザクションの実行に失敗したことを示す空の返信をクライアントに返します.
 
1:データ構造
WATCHコマンドに関するデータ構造は次のとおりです.
データベースを表すredisDb構造ではwatched_keys辞書のプロパティ:
typedef struct redisDb {
    ...
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    ...
} redisDb;

この辞書はデータベースキーをkeyとし、リストclientsをvalueとする.リストクライアントには、WATCHデータベースキーのすべてのクライアントが記録されています.
このような構造により、あるデータベースキーが現在WATCHにあるクライアントを迅速に得ることができる.
        
クライアントを表すredisClient構造ではwatched_keysリストのプロパティ:
typedef struct redisClient {
    ...
    multiState mstate;      /* MULTI/EXEC state */
    ...
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    ...
} redisClient;

各リスト要素は、keyおよびdb属性を有するwatchedKey構造である.リストには、現在のクライアントWATCHのデータベース・キーが記録されています.
typedef struct watchedKey {
    robj *key;
    redisDb *db;
} watchedKey;

2:WATCHコマンド
クライアントから「WATCH[...]」が送信された後、RedisはwatchCommand関数を呼び出して処理する.関連コードは次のとおりです.
void watchCommand(redisClient *c) {
    int j;

    if (c->flags & REDIS_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);
}

/* Watch for the specified key */
void watchForKey(redisClient *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    /* Check if we are already watching for this key */
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    /* This key is not already watched in this DB. Let's add it */
    clients = dictFetchValue(c->db->watched_keys,key);
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    listAddNodeTail(clients,c);
    /* Add the new key to the list of keys watched by this client */
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}

WATCHコマンド処理関数watchCommandでは、現在のクライアントがトランザクションモードにある場合は、クライアントエラーメッセージに返信し、直接戻ります.
次に、コマンドパラメータの各キーについてwatchForKey関数を呼び出し、クライアントWATCHのキーを設定する.
最後にクライアント「ok」に返信する.
 
watchForKey関数では、まずローテーションリストc->watched_keysは、現在のクライアントがWATCHのkeyになったかどうかを判断し、もし、直接戻る.
次にkeyを介して辞書c->db->watched_keysでWATCHキーのクライアントリストclientsを検索します.見つからない場合は、クライアントWATCHキーがまだ存在しないため、リストclientsを作成し、辞書c->db->watched_にキーとclientsを追加します.keysで;見つかったら、現在のクライアントcをリストclientsに追加する.
次にwatchedKey構造のwkを作成し、keyとその属するデータベースdbが記録される.次にwkをリストc->watched_に追加keysで;
 
 
クライアントからのコマンドがキーを変更すると、signalModifiedKey関数が呼び出されます.この関数はtouchWatchedKeyを呼び出すだけです.touchWatchedKey関数の役割は、データベースdbのkeyが変更されたときにREDIS_を増加させることです.DIRTY_CASは、すべてのWATCHキーのクライアントフラグビットにタグ付けされ、これらのクライアントがEXECコマンドを実行すると、クライアントエラーメッセージが直接返信されます.
touchWatchedKey関数のコードは次のとおりです.
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;

    if (dictSize(db->watched_keys) == 0) return;
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
    /* Check if we are already watching for this key */
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        redisClient *c = listNodeValue(ln);

        c->flags |= REDIS_DIRTY_CAS;
    }
}

関数で、辞書db->watched_keysが空の場合、直接戻ります.
そしてkeyで辞書db->watched_keysでWATCH当該keyのクライアントリストclientsを検索し、clientsが見つからない場合、クライアントWATCH当該keyがないことを説明し、直接戻る.
次に、各クライアントのフラグビットにREDIS_を追加するローテーションリストclientsです.DIRTY_CASタグ.
このように、WATCHのキーのクライアントがEXECを実行すると、クライアントフラグビットにREDIS_が設定されていることがわかるDIRTY_CASタグを付けると、クライアントエラーメッセージが返信されます.