Elixirで並行コマンド実行サーバーを作ったら感動した話


この記事はElixir Advent Calendar 2020の11日目です。

昨日は @zacky1972 さんの「Elixir から Swift 5.3のコードを呼び出す方法(Autotoolsを使って / Apple Silicon M1チップにも対応)」でした。
明日は @MzRyuKa さんの「書評:プログラミングElixir第2版」です。


本記事では「Taskを使うとすごく簡単に並行処理が書ける!しかもサーバー化も簡単!Elixirすごい!」と感動したので、その作成過程を紹介しつつ、感動をお伝えできたらと思います。


Taskのおさらい

Task 使ってますか? ざっくり復習します。

Task.asyncで別プロセスを生成できる

 
IExで、Process.sleepするとIExのプロセスがsleepするので入力できなくなります。

iex()> Process.sleep(5000)
# 5秒間入力できない

Task.asyncを使うと別プロセスで処理ができる(つまり並行に処理できている)ので、入力が継続できます。

iex()> require Logger
...()> Task.async(fn ->
...()>   Process.sleep(10000)
...()>   Logger.info("yey")
...()> end)
%Task{
  owner: #PID<0.111.0>,
  pid: #PID<0.144.0>,
  ref: #Reference<0.3890488730.554434561.238327>
}
...()> "saki ni yey" # 入力できます。
"saki ni yey"
iex()>
22:39:23.952 [info]  yey

Task.awaitでTask.asyncの結果を受け取れる

Task.asyncが返すtask構造体をTask.awaitに渡すと、結果を受け取ることができます。

iex()> task = Task.async(fn ->
...()>   Process.sleep(5000)
...()>   2+3
...()> end)
%Task{
  owner: #PID<0.111.0>,
  pid: #PID<0.137.0>,
  ref: #Reference<0.3890488730.554434561.238283>
}
iex()> Task.await(task)
5

チョット工夫してみる

おさらいできたので、チョット工夫してみます。

Taskを使うと「mfa(module, function, arguments)のタプルのリスト」(以降コマンドリストと呼ぶ)を並行に実行してくれる関数を以下のように書くことができます。

defmodule CommandExecutor do
  @spec do_async([tuple()]) :: [any()]
  def do_async(commands) when is_list(commands) do
    commands
    |> Enum.map(fn {m, f, a} -> Task.async(m, f, a) end) # コマンドリストは処理が開始されタスクのリストに
    |> Enum.map(fn task -> Task.await(task) end)         # タスクの結果を一つずつ回収
  end
end
# Task.async/1は匿名関数を、Task.async/3はmfaを処理することができます。

※コマンドの実行は並行でできますが、結果の回収は一番遅いtaskの処理に引っ張られます。
※do_asyncの呼び出し側は結果の回収が完了するまでブロックします。

コマンド実行サーバーにしてみる

作ったdo_asyncを使って、メッセージとしてコマンドリストを投げたら、処理してくれるサーバーを作ってみます。
※リスト内のコマンドは並行処理しますが、リスト単位では逐次処理します。(GenServerは受け取ったメッセージを逐次処理する)

defmodule CommandExecutor do
  use GenServer

  def start_link(state), do: GenServer.start_link(__MODULE__, state, name: __MODULE__)

  def init(state), do: {:ok, state}

  # 結果を返さない
  def handle_cast({:commands, commands, :async}, state) when is_list(commands) do
    do_async(commands)
    {:noreply, state}
  end

  # 結果を返す
  def handle_call({:commands, commands, :async}, _from, state) when is_list(commands) do
    {:reply, do_async(commands), state}
  end

  @spec do_async([tuple()]) :: [any()]
  def do_async(commands) when is_list(commands) do
    commands
    |> Enum.map(fn {m, f, a} -> Task.async(m, f, a) end)
    |> Enum.map(fn task -> Task.await(task) end)
  end
end

コマンドをぽいぽい投げ込めば、リスト単位で逐次処理してくれます。
※コマンドリストをたくさん投げる可能性がある場合は、GenServerのキュー(mailbox)にたまるので、その限界を知っておく必要があります。(知ってたら教えてくださいm(- -)m

コマンド実行で例外が起きたら?

実行できないmfaが投げ込まれる可能性があるので対策しておきます。

  @spec do_async([tuple()]) :: [any()]
  def do_async(commands) when is_list(commands) do
    commands
    |> Enum.map(fn {m, f, a} ->
         try do
           Task.async(m, f, a)
         rescue
           error in [UndefinedFunctionError] -> error
           other_error -> reraise("あちゃー", System.stacktrace)
         end
     end)
    |> Enum.map(fn task -> Task.await(task) end)
  end

でも、これだと予見できない例外でGenServerが落ちるかもしれません。

予見できない例外で落ちる場合はSupervisorに復帰してもらう

「僕に分かる対策はした、あとは頼んだよSupervisor君!!」という感じで、基本はお作法に従って書くだけです。

defmodule CommandExecutorSupervisor do
  use Supervisor

  def start_link(start_state) do
    Supervisor.start_link(__MODULE__, start_state, name: __MODULE__)
  end

  def init(_start_state) do
    children = [{CommandExecutor, nil}]
    Supervisor.init(children, strategy: :one_for_one)
  end
end

※復帰動作の細かい設定をしたい場合はドキュメントを読みましょう。

Applicationに組み込む

アプリ−ケーションの起動時からコマンド実行サーバーが動作するようにApplicationに組み込みます。

defmodule Oreore.Application do
  use Application

  def start(_type, _args) do
    children = [..., {CommandExecutorSupervisor, []}]
    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

できました。

うぉお、めっちゃ簡単、並行コマンド実行サーバーできた!

おわり

雑にさらさら進めましたが、ポイントは以下でした。

  • Taskを使うことで、スレッドを意識せずに並行処理が書ける!
  • GenServerのメッセージキューに頼ることで、自身でキューの実装をせずにすむ!
  • SupervisorにGenServerを監視してもらえるので自動復帰も余裕!

Elixir, Erlang OTPすごいですね!!これからもElixirやっていきましょう♪♪