【15日目】トランザクションデータベース 〜トランザクションマネジャー〜


まえがき

tx_mngの説明をします。tx_mngトランザクションをマネジメントするサーバです。query_execサーバからメッセージを受け取って処理をします。

tx_mngは内部的にms_tx_mngというテーブルで各query_execのトランザクションを管理します。そして、トランザクションが高々1つのみしか実行できないよう制御します。実装方法はたくさんあると思いますが、今回は単純に他のトランザクションがactiveな状態になっているときは、自トランザクションは待たされる、という方法で実装します。

  • トランザクションの管理をしていない場合:各トランザクションがそれぞれ好きなタイミングで開始、コミットできる。
  • トランザクションを高々1つしか実行できないように管理する場合:もし、実行中のトランザクションがある場合、他のトランザクションは実行できないよう、トランザクションの開始を止める。

想定しているクエリは下記の4つです。

  • begin_tx - トランザクションを開始する。
  • commit_tx - トランザクションをコミットする。ローカルデータを共有データへマージする。
  • rollback_tx - トランザクションをロールバックする。ローカルデータを破棄する。
  • allow_tx - 自分のトランザクションが許可されている場合はokを返す。

allow_txが、activeではないトランザクションを待たせる処理になります。
トランザクションの取りうるステータスはactive, inactive, committed, abortedの4つです。
ステータスは基本的に、inactive→active→committed/abortedの順に変わっていきます。
それでは各クエリを順番に説明します。

begin_tx

トランザクションを開始する。query_execからメッセージ{begin_tx}が飛んでくる想定である。ソースは下記。

tx_mng.erl
handle_call({begin_tx}, _From, _State) ->
    Txid = generate_txid(),
    register_tx(Txid),
    {reply, Txid,[]};

トランザクションIDを振り出して、tx_mngサーバのローカルテーブルms_tx_mngに登録する。ms_tx_mngは様々なquery_execから飛んでくるトランザクションIDを一元管理するテーブルである。begin_txでトランザクションを開始した時は、まだinactiveというステータスで登録される。

register_tx
register_tx(Txid) ->
    Timestamp = get_timestamp(),
    ets:insert(ms_tx_mng, {Txid, Timestamp, inactive}),
    activate_tx().

ms_tx_mngにトランザクションを登録したら、すぐactivate_txという関数を呼ぶ。

activate_tx
activate_tx() ->
    case ets:match_object(ms_tx_mng, {'_', '_', active}) of 
        %% activeなトランザクションが存在する
        [_] -> 
            transaction_not_acquired;
        %% activeになっているトランザクションがいない
        [] -> 
            case ets:match_object(ms_tx_mng, {'_', '_', inactive}) of 
                %% inactiveなトランザクションが存在しない場合、なにもしない
                [] -> no_transaction_waiting;
                %% inactiveのトランザクションが存在する場合、一番古いトランザクションをactiveにする
                %% アクティベートされたトランザクションに通知する
                InactiveTxList ->
                    {OldestTxid, OldestTimestamp, _Status} = get_oldest_tx(InactiveTxList),
                    ets:insert(ms_tx_mng, {OldestTxid, OldestTimestamp, active}),
                    io:format("Oldest Txid: ~p~n", [OldestTxid]),
                    notify_tx_active(ets:lookup(ms_waiting_proc, OldestTxid))
            end
    end.

トランザクションを登録した時点ではまだステータスがinactiveなので、activate_txを呼び出し、他のトランザクションの状況を見て、今登録されたトランザクションをactiveにするかどうか処理する。

  • もし他にactiveになっているトランザクションがあれば、何もしない
  • 他にactiveになっているトランザクションがなく、inactiveのトランザクションがあれば、最も古い時刻に登録されたトランザクションをactiveにする
  • 他にactiveになっているトランザクションがなく、inactiveなトランザクションもなければ、何もしない

他にトランザクションが実行されていないとき、begin_txを実行した場合、他にactiveなトランザクションはないので、ステータスはすぐactiveになる。

シリアライザブルなトランザクションを実行できるようにするため、activate_txは高々一つのトランザクションだけを実行することを保証する責務を負う。この後説明するが、activate_txは、commit_tx, rollback_txを行った後にも呼び出される。

commit_tx

トランザクションをコミット(確定)する。query_execから「このトランザクション(Txid)をコミットしてちょうだい」というメッセージ{commit_tx, Txid}が飛んでくる想定である。ソースは下記。

tx_mng.erl
handle_call({commit_tx, Txid}, _From, _State) ->
    commit_tx(Txid),
    {reply, ok, []}.

commit_tx(Txid) ->
    case ets:lookup(ms_tx_mng, Txid) of
        [{Txid, Timestamp, _Status}] ->
            ets:insert(ms_tx_mng, {Txid, Timestamp, committed});
        [] -> transaction_not_found
    end,
    activate_tx().

中身は単純で、ms_tx_mngのquery_execから渡されたTxidのレコードのステータスをcommittedに更新する。

そして、activate_txを呼び出し、もし他のトランザクションがactiveになることを待っていれば、そのトランザクションをactiveにし通知する。

rollback_tx

トランザクションをロールバック(放棄)する。query_execからメッセージ{rollback_tx, Txid}が飛んでくる想定である。ソースは下記。

tx_mng.erl
handle_call({rollback_tx, Txid}, _From, _State) ->
    rollback_tx(Txid),
    {reply, ok, []}.

rollback_tx(Txid) ->
    case ets:lookup(ms_tx_mng, Txid) of
       [{Txid, Timestamp, _Status}] ->
            ets:insert(ms_tx_mng, {Txid, Timestamp, aborted});
        [] -> transaction_not_found
    end,
    activate_tx().

こちらも中身は単純で、ステータスをabortedにして、activate_txを呼び出すだけである。

allow_tx

トランザクションが許可されているか確認する。query_execからメッセージ{allow_tx, Txid}が飛んでくる想定である。

allow_tx
handle_call({allow_tx, Txid}, From, _State) ->
    case is_active_tx(Txid) of
        true -> {reply, ok, []};
        false -> 
            io:format("transaction not allowed~n"),
            stack_waiting_process(Txid, From),
            {noreply, [], infinity};
        transaction_not_found ->
            {reply, transaction_not_found, []}
    end.

is_active_tx(Txid) ->
    case ets:lookup(ms_tx_mng, Txid) of
        [] -> transaction_not_found;
        [{Txid, _Timestamp, active}] -> true;
        [{Txid, _Timestamp, _}] -> false
    end.

is_active_txがトランザクションが許可されているか確認する関数である。tx_mngのms_tx_mngテーブルに指定されたトランザクションがactiveになっていればtrueを返す。そうでないときはfalseを返す。そもそもトランザクションがない時はtransaction_not_foundを返す。

  • trueとtransaction_not_foundの場合はquery_execにokを返す。
  • falseの場合は、他のトランザクションがactiveになっているということなので、query_execにすぐメッセージを返さずに、メッセージを返す宛先を一旦スタックする。そして、他のトランザクションがコミットされるなどして、自トランザクションがactiveになったらokのメッセージを返す。メッセージを同期的にすぐ返さず、他のトランザクションのコミットを起点にメッセージを返す仕組みは別日で説明する。

あとがき

tx_mngの仕組みを簡単に説明しました。
query_execからallow_txを受け取り、そのトランザクションがactiveではなかったときは、メッセージを返却せずにトランザクションがactiveになるまで待つ仕組みを次に説明します。