【16日目】トランザクションサポートデータベース実装 ~waitingの実装~


まえがき

シリアライザブルなトランザクションを実装するための道具として、waitingを実装しました、という話です。下記のように、複数のトランザクションを並行して実行している時、実行順序をシーケンシャルにするため、同時に実行されるトランザクションは単一になるよう、他のトランザクションを待たせます。

システム上はtx_mngのms_tx_mngテーブルのTx①だけがactive、それ以外のトランザクションはinactiveで表現される。

シリアライザブルなトランザクション

説明が前後していますが、トランザクションには分離レベルという概念があります。この概念は、I/O効率を上げるためにトランザクションの並列度を上げると、読み込み/書き込み競合などの望まない挙動が発生するというトレードオフを、段階的に定義したものです。そのうちの、シリアライザブルという分離レベルに関してPostgreSQLの文書より引用します。

SQLの標準規格では、トランザクションの分離について4つのレベルを定義しています。 標準規格で定義されているもののうち最も厳密なものはシリアライザブルです。 1セットのシリアライザブルなトランザクションを同時実行した場合には、ある順番でひとつずつそれらを実行した場合と同じ結果となることが保証されるものです。
PostgreSQL 11.5文書 13.2. トランザクションの分離

今回実装しようとしているトランザクションサポートデータベースは、あるトランザクションの実行中は他のトランザクションの実行をブロックするものなので、シリアライザブルということになります。
(一つ注意として、あるトランザクションの実行中に他のトランザクションの実行を実際に、本当に、ブロックすることと、分離レベルがシリアライザブルであることは同値ではありません。シリアライザブルを実装する一つの方法として、そのような愚直な実装方法があるということです。普通のRDBMSではもっと効率の良いアルゴリズムでシリアライザブルを実現しているはずです。)

waiting

waitingとは、あるトランザクションを開始しようとしたときに、他のトランザクションがすでに実行中の場合、そのトランザクションが終了するまで待つ機能のことを指します。高々一つのトランザクションしか実行させないようにするために、実行中ではないトランザクションは列に並んで待機していなさい、ということです。
例として、トランザクション1が先に開始し、次にトランザクション2が開始したとする。

Tx②のallow_txを送ってからokが返ってくるまでの灰色の部分がwaitingである。

Erlangはもともとプロセス間でメッセージをやり取りして処理を進めていく思想があるため、waitingも簡単に実装できます。実装の方針は、query_execからtx_mngへallow_txのメッセージが飛んできた時、トランザクションがactiveではない場合は返信メッセージを返さない。query_execはtx_mngからメッセージが返ってくるまで、次の処理を進めることができず、ブロックされるということになる。tx_mngは別の処理を進めなければいけないので、Tx②から飛んできたallow_txに対して、okが返せないことがわかったら、すぐにTx②のプロセスIDをローカルテーブルにスタックして処理を完了する。そして、Tx①がコミットされて、Tx②がactiveになったら、スタックしておいたプロセスIDに対してokを送る。

それでは、ソースを説明する。query_execサーバからはask_transactionという関数によって、tx_mngのallow_txを呼び出す。以前説明したが、各DML処理のはじめに実行され、自トランザクションがactiveであることをチェックするための関数である。

query_exec.erl
ask_transaction(#state{txMngPid=TPid, txid=Txid}) ->
    tx_mng:allow_tx(TPid, Txid).

それによって呼び出されるtx_mngのソースが次である。

tx_mng.erl
allow_tx(Pid, Txid) ->
    gen_server:call(Pid, {allow_tx, Txid}, infinity).

handle_call({allow_tx, Txid}, From, _State) ->
    case is_active_tx(Txid) of
        true -> {reply, ok, []};
        false -> 
            stack_waiting_process(Txid, From),
            io:format("waiting list added~n"),
            {noreply, [], infinity};
        transaction_not_found ->
            {reply, transaction_not_found, []}
    end;

まずはallow_tx(Pid, Txid)の関数のなかで、gen_server:callにinfinityを引数に入れているところがポイントである。ここで、呼び出しがwaitingされる可能性があるので、タイムアウトの設定を無限にしている。
トランザクションがinactiveだった場合、stack_waiting_processを呼び出し、トランザクションがactiveになるまで、waiting状態に入る。query_execにはまだ返信しない。

stack_waiting_process
stack_waiting_process(Txid, From) ->
    ets:insert(ms_waiting_proc, {Txid, From}).

stack_waiting_process関数はms_waiting_procテーブルにトランザクションIDとメッセージの返信先を保存する。メッセージの返信先は、handle_callの第2引数のFromである。(その中身はquery_execのプロセスIDと一意のタグから成るタプルである。)

ms_waiting_procテーブルにメッセージの返信先を保存するのは、トランザクションがactiveになったときに、そのトランザクションのプロセスにokを返信するために、宛先を覚えておく必要があるからである。
これがいつ使われるかというと、activeなトランザクションがコミットされたあと、inactiveだったトランザクションがactivate_tx関数によってactiveになったときである。

activate_tx関数中のトランザクションをactiveにする処理
{OldestTxid, OldestTimestamp, _Status} = get_oldest_tx(InactiveTxList),
ets:insert(ms_tx_mng, {OldestTxid, OldestTimestamp, active}),
notify_tx_active(ets:lookup(ms_waiting_proc, OldestTxid))

InactiveTxListがinactiveなトランザクションからactiveにするトランザクションを1つ選択する処理である。1番古いトランアクションがactiveにされる。activeにするトランザクションが選ばれるということは、返信を待っているquery_execプロセスがいるということなので、メッセージokを返信する。それが次のnotify_tx_active関数である。

notify_tx_active
notify_tx_active([]) ->
    transaction_not_found;
notify_tx_active([{Txid, From}]) ->
    ets:delete(ms_waiting_proc, Txid),
    gen_server:reply(From, ok).

gen_server:reply(From, ok)
でokを返信する。
これによって、waitingされていたquery_execプロセスは、トランザクションがactiveになったので、次の処理に進むことができる。

あとがき

ここまでで、シリアライザブルなトランザクションを実装したDDL,DMLの実行ができるように一通り実装できました。
次は、実際にクライアントからクエリを実行してテストしていきます。