【14日目】トランザクションデータベース


まえがき

今回はトランザクションのコミット処理について説明します。4つのDML(insert, select, update, delete)の実行によってローカル環境に積み上がったデータを、クエリの実行順に共有領域に書き込みます。

query_exec.erl
handle_call({exec_query, {commit_tx}}, _From, State)->
    %% トランザクションが許可されている場合のみ次に進める
    case ask_transaction(State) of
        transaction_not_found ->
            {reply, transaction_not_found, State};
        ok -> 
            TPid = get_tx_mng_pid(State),
            Txid = get_txid(State),
            QueryIdList = get_query_id_list(State),
            lists:map(fun(QueryId) -> commit_local_index(State, QueryId) end, QueryIdList),
            lists:map(fun(QueryId) -> commit_kvstore(State, QueryId) end, QueryIdList),
            Rep = tx_mng:commit_tx(TPid, Txid),
            {reply, Rep, State#state{txid=undefined, queryId = []}}
    end;

lists:map(fun(・・・commit_local_index(・・・)), QueryIdList)
lists:map(fun(・・・commit_kvstore(・・・)), QueryIdList)
となっているところがそれぞれ、ローカルカラムインデックスのコミットと、ローカルキーバリューストアのコミット処理である。

commit_local_index

ローカルのカラムインデックスを共有カラムインデックスに書き込む処理。

commit_local_index
commit_local_index(State, QueryId) ->
    LColumnIndex = get_local_column_index(State),
    LocalDataList = ets:lookup(LColumnIndex, QueryId),
    DelList = lists:filter(fun({_QueryId, Act, _TableName, _ColName, _Val, _Oid}) -> Act =:= del end, LocalDataList),
    InsList = lists:filter(fun({_QueryId, Act, _TableName, _ColName, _Val, _Oid}) -> Act =:= ins end, LocalDataList),
    io:format("DelListIndex: ~p, InsListIndex: ~p~n", [DelList, InsList]),
    FDel = fun({_QueryId, del, TableName, ColName, Val, Oid}) ->
        ColumnIndexId = simple_db_server:get_column_index_id(TableName, ColName),
        simple_db_server:delete_column_index(ColumnIndexId, Val, Oid)
    end,
    FIns = fun({_QueryId, ins, TableName, ColName, Val, Oid}) ->
        ColumnIndexId = simple_db_server:get_column_index_id(TableName, ColName),
        simple_db_server:insert_column_index(ColumnIndexId, Val, Oid)
    end,
    lists:map(FDel, DelList),
    lists:map(FIns, InsList),
    ets:delete(LColumnIndex, QueryId).

commit_local_indexはクエリによって積まれたデータがinsタグがついたものなら、共有カラムインデックスに挿入し、delタグがついたものなら共有カラムインデックスから削除する。
例えば、Fruitというテーブルに対して、以下の2つのクエリを実行した後にトランザクションをコミットすることを考える。クエリIDはそれぞれ0001,0002、オブジェクトIDはAとする。

  • 0001:insert, Fruit, [みかん、100円]
  • 0002:delete, Fruit, name, みかん

そのときローカルカラムインデックスには下記のようなデータが積まれている。ローカルカラムインデックスはETSのbag(キー重複を許すテーブル)オプションで作成している。

これらが指すところは、コミット時に下記のことをクエリIDが若い順に実行してねー、ということになる。

  • カラム名がnameで、値がみかんのデータのオブジェクトIDはAであることを共有カラムインデックスに挿入する。
  • カラム名がpriceで値が100円のデータのオブジェクトIDはAであることを共有カラムインデックスに挿入する。
  • カラム名がnameで、値がみかんのデータのオブジェクトIDはAであることを共有カラムインデックスから削除する。
  • カラム名がpriceで値が100円のデータのオブジェクトIDはAであることを共有カラムインデックスから削除する。

もし、データの更新をする場合、
UPDATE Fruit SET name = バナナ WHERE name = みかん

  • 0003:insert, Fruit, [みかん、100円]
  • 0004:update, Fruit, [name, バナナ], name, みかん

ローカルカラムインデックスは下記のようになり、クエリID0004の2行がdelとinsの組み合わせであることに気をつけて欲しい。

更新クエリにより、カラム名がnameで値がみかんのデータのオブジェクトIDがAであるという紐付けは削除し、値がバナナであるデータのオブジェクトIDがAであると紐付けなければいけない。

commit_kvstore

ローカルのキーバリューストアを共有領域に書き込む処理。

commit_kvstore
commit_kvstore(State, QueryId) ->
    LKvstore = get_local_kvstore(State),
    LocalDataList = ets:lookup(LKvstore, QueryId),
    DelList = lists:filter(fun({_QueryId, Act, _TableName, _Oid, _Val}) -> Act =:= del end, LocalDataList),
    InsList = lists:filter(fun({_QueryId, Act, _TableName, _Oid, _Val}) -> Act =:= ins end, LocalDataList),
    io:format("DelListKvstore: ~p, InsListKvstore: ~p~n", [DelList, InsList]),
    FDel = fun({_QueryId, del, TableName, Oid, _Val}) ->
        simple_db_server:delete_kvstore(TableName, Oid)
    end,
    FIns = fun({_QueryId, ins, TableName, Oid, Val}) ->
        simple_db_server:insert_kvstore(TableName, Oid, Val)
    end,
    lists:map(FDel, DelList),
    lists:map(FIns, InsList),
    ets:delete(LKvstore, QueryId).

ローカルキーバリューストアのコミット処理とほとんど変わらないので、具体例だけ出しておく。下記の2つのクエリを実行したとした時

  • 0001:insert, Fruit, [みかん、100円]
  • 0002:delete, Fruit, name, みかん

ローカルキーバリューストアは下記の状態になる。

これらをクエリIDの古い順に、アクションに応じて共有キーバリューストアに挿入もしくは削除を行えば良い。
更新の時は、delとinsの合わせ技でやる。カラムインデックスの話と同じなので説明は割愛する。

rollback_tx

ロールバックについても簡単に説明する。ロールバックを行う際は、ローカルデータ領域に積み上がったデータを単に削除する。(ローカルデータ領域の破棄。共有データ領域には触れてもいないので影響なし。)

query_exec.erl
rollback_local_index(State, QueryId) ->
    LColumnIndex = get_local_column_index(State),
    ets:delete(LColumnIndex, QueryId).

rollback_kvstore(State, QueryId) ->
    LKvstore = get_local_kvstore(State),
    ets:delete(LKvstore, QueryId).

あとがき

自トランザクション内で実行したクエリの結果は、コミットするまで他のトランザクションから参照できないようにするという方針を取り、それを実現するため下記のように実装しました。

  1. クエリを実行したらローカルデータ領域に書き込む ←前回、前々回に説明
  2. コミットしたらローカル領域のデータを共有領域へ書き込む ←今回説明

これで、コミットするまでは自トランザクションだけが参照でき、コミットすれば他のトランザクションからも参照できるという仕組みが実装できました。
次回は、トランザクションの管理を行っているtx_mngの説明をします。