Elixirで並行処理をやってみた


はじめに

Elixirといえば並行処理ですよね。
プロセスを使って並行処理をするプログラムを書いてみました。

コード

プロセスを10個生成し、それぞれ適当な処理をしてもらうと見立てて、ランダムで1~10秒後にメッセージを返すプログラムになっています。

parallel.exs
defmodule Service do
  def do_some_task do
    receive do
      {sender, index} ->
        # 1~10のランダムな数字
        number = :rand.uniform(10)
        # ランダムな数字の秒数待機
        :timer.sleep(number * 1000)

        send sender, {:ok, "#{index}番目 #{inspect self()} タスクが#{number}秒で完了しました。"}
    end
  end
end

1..10
|> Enum.map(fn index -> 
    send spawn(Service, :do_some_task, []), {self(), index} end)
|> Enum.map(fn _ -> 
    receive do {:ok, message} -> IO.puts message end end)
結果
$ elixir parallel.exs
10番目 #PID<0.107.0> タスクが1秒で完了しました。
6番目 #PID<0.103.0> タスクが4秒で完了しました。
3番目 #PID<0.100.0> タスクが5秒で完了しました。
4番目 #PID<0.101.0> タスクが5秒で完了しました。
2番目 #PID<0.99.0> タスクが6秒で完了しました。
5番目 #PID<0.102.0> タスクが6秒で完了しました。
9番目 #PID<0.106.0> タスクが6秒で完了しました。
7番目 #PID<0.104.0> タスクが7秒で完了しました。
1番目 #PID<0.98.0> タスクが10秒で完了しました。
8番目 #PID<0.105.0> タスクが10秒で完了しました。

コードの解説

プロセスの生成

プロセスはspawn関数を使って生成します。
プロセスといってもOSのスレッドやプロセスとは違い、Elixir独自に実装されているものです。
とても軽量でプロセスはオブジェクトを作る感覚で、数百万のプロセスを作ることも珍しくないみたいです。
また、関数に渡した引数は呼び出し先でコピーされるという不変性を備えているおかげで、値の整合性も取られています。

ちなみに詳細な解説はこちらの記事が分かりやすかったです。
Elixir試飲録 (7) – Erlangの軽量プロセスはどのように実現されているのか?

今回のコードではdo_some_taskという関数をspawn関数に渡し、10個生成しています。
同時にsend(後述)をしているので分かりにくくなってしまいました。。。

アクターモデル

Elixirの並行処理はアクターと呼ばれるデザインパターンで実装されています。
アクターと呼ばれるオブジェクト(ここでは親プロセス, 子プロセスがそれにあたるのかな?)がメッセージを送受信して処理を行なっていきます。

図やもう少し詳しい説明はこちらの記事が分かりやすかったです。
ScalaのAkkaというFWも同じくアクターモデルを実装しており、それについての説明です。
Akkaで始める並行処理(1) - アクターモデルとAkkaの概要

またこちらの記事も分かりやすいです。
【プログラミングErlang7章より】並行処理とアクターモデルの概念について

本格的に並行処理について学ぶなら、Erlangの本が良さげみたいです。
ちなみにElixirはErlangで実装されており、並行処理についてはErlangと一緒みたいですね。
プログラミングErlang

メッセージの送受信

アクターモデルのメッセージの送受信ですが、send, receiveを使います。
sendには送りたいプロセスとメッセージを引数として渡します。
receiveにはメッセージを受け取り、何か処理を行います。

今回のコードを見ながら順をおって解説します。

親から子へメッセージを送る

1..10
|> Enum.map(fn index -> 
    send spawn(Service, :do_some_task, []), {self(), index} end)

spawnで生成した子プロセスを、そのままsendで送っています。
メッセージには自分のプロセスを表すself()とindexを渡しています。

子で親のメッセージを受け取る

  def do_some_task do
    receive do
      {sender, index} -> 省略
    end
  end

こちらで親からのメッセージを受け取るようにしています。

子から親へメッセージを送る

send sender, {:ok, "#{index}番目 #{inspect self()} タスクが#{number}秒で完了しました。"}

何か処理を行なったあと、最後にsendで親にメッセージを送ります。
senderはself()を呼び出して親プロセスを渡しました。
そのため、呼び出し元にメッセージを送ることが可能です。

親で子のメッセージを受け取る

|> Enum.map(fn _ -> 
    receive do {:ok, message} -> IO.puts message end end)

最後に親で子のメッセージを受け取るようにし、受け取ったメッセージを表示しました。
説明は以上になります。

まとめ

Elixirで並行処理のコードを書いてました。
レベルとしては並行処理のHello worldくらいですが、アクターモデルなどの単語を知りました。
ちなみに並行処理を調べる過程で、JSのPromiseはFutureというデザインパターンであると知り、これもアクターモデルと同じ並行処理のデザインパターンです。今回はできませんでしたが、比較したら面白そうですね。

並行処理は知っていて損はない技術だと思うので、Elixirを使って勉強していきたいです。
単語だけ聞いたことあるOTP, スーパーバイザなどに挑戦したいですね。
あとElixirは並行処理以外にもメタプログラミングも特徴してあるので、こちらも触ってみたいです。