Nimでスレッド間通信を提供する channels


はじめに

こんにちは。高校3年の樅山です。
今回は、Nimでスレッド間通信を提供する標準ライブラリ、channelsの解説をしたいと思います。
マルチスレッドの基本的な機能については、標準ライブラリ #12 threads を参照してください。

この記事は、Nim Advent Calendar 2020 その2 の14日目です。

Nim 標準ライブラリシリーズ

  1. Nimの文字列フォーマット strformat
  2. Nimで "集合" を扱う sets
  3. Nimで統計分析が行える stats
  4. Nimでジェネリックなアルゴリズムを使いこなそう algorithm
  5. Nimの配列を華麗に処理する sequtils
  6. Nimの型情報を提供する typetraits
  7. Nimで複素数を扱う complex
  8. Nimによるビット操作入門 bitops
  9. NimでCPUのコア数を調べよう cpuinfo
  10. Nimでバイトオーダーを処理する endians
  11. Nimで整数と浮動小数点数間の演算を提供する lenientops
  12. Nimでマルチスレッドプログラミングをはじめよう threads
  13. Nimでスレッド間通信を提供する channels

channels

channelsモジュールは、スレッド間通信を提供しています。
threadsと一緒に使うことを想定しているため、spawnと組み合わせることは推奨されていません。不安定な動作を起こす可能性があります。
threadsモジュールと同様に --threads:on スイッチを付与しなければコンパイルに失敗します。また、デフォルトで読み込まれているので明示的にimport文を記述してはいけません。

簡単なスレッド通信

本記事では、Channelによって生成されるスレッド通信を制御するオブジェクトをチャンネルと呼びます。

channelsモジュールが提供しているsendプロシージャによって、スレッドがチャンネルに対してメッセージを送信することができます。
また、同モジュールが提供しているrecvプロシージャによって、チャンネルがメッセージを受信することができます。

ここで、スレッドが送信したメッセージが受信可能になるまでの間、メインスレッドを占有して他の処理ができなくなる通信方法をブロッキング通信と呼びます。
recvによる受信は、ブロッキング通信となります。

簡単なスレッド通信
import os
var channel: Channel[string]

proc task1 () =
  channel.send("Simple Task!")

proc task2 () =
  sleep(5000)
  channel.send("Delay Task!")

channel.open()

var worker: Thread[void]

createThread(worker, task1)
echo channel.recv()

createThread(worker, task2)
echo channel.recv()

worker.joinThread()

channel.close()
stdout
Simple Task!
Delay Task!

送信したメッセージは、ディープコピーされる点に注意しましょう。

保持できる未処理のメッセージ数を設定する

openプロシージャは、スレッド通信用のチャンネルを開く働きがあります。定義を見てみましょう。

openプロシージャ
proc open*[TMsg](c: var Channel[TMsg]; maxItems: int = 0)

デフォルト値で0をとるパラメータ、maxItemsがあります。
maxItems0の時、保持できる未処理のメッセージは無制限であり、無制限キューとして働きます。
1以上の値を与えると、未処理のメッセージ数に上限を設けることができます。

ここで、チャンネルがいくつメッセージを保持しているか調べるためにpeekプロシージャを使いましょう。

注: peekプロシージャを用いることでスレッドが競合状態に陥りやすくなるため、危険です。ソフトウェアの実装に利用するのはなるべく避けましょう。

無制限キュー
import os
var channel: Channel[string]

proc task1 () =
  channel.send("Hello, Nim!")

channel.open()

var worker: Thread[void]

for i in 1..5:
  createThread(worker, task1)

for i in 1..5:
  echo channel.peek
  echo channel.recv()

worker.joinThread()
stdout
0
Hello, Nim!
4
Hello, Nim!
3
Hello, Nim!
2
Hello, Nim!
1
Hello, Nim!

無制限キューの場合は、全ての未処理メッセージがチャンネルに保持されました。

上限2のチャンネル
import os
var channel: Channel[string]

proc task1 () =
  channel.send("Hello, Nim!")

channel.open(2)

var worker: Thread[void]

for i in 1..5:
  createThread(worker, task1)

for i in 1..5:
  echo channel.peek
  echo channel.recv()

worker.joinThread()
stdout
0
Hello, Nim!
1
Hello, Nim!
1
Hello, Nim!
2
Hello, Nim!
1
Hello, Nim!

メッセージの上限数が2の場合、保持されるメッセージ数も最大2に制限されました。

また、peekの代わりにreadyプロシージャを用いることで、未送信のメッセージがあるかどうかを判定できます。

ノンブロッキング通信

ブロッキング通信に対して、メインスレッドをブロックせずに通信を送受信するのがノンブロッキング通信です。

recvの代わりにtryRecvで受信することでメインスレッドをブロックしなくなります。

ノンブロッキング通信
import os
var channel: Channel[string]
channel.open()

proc task1() =
  sleep(2000)
  channel.send("国家機密")

var worker: Thread[void]
createThread(worker, task1)

while true:
  let triedReceive = channel.tryRecv()
  if triedReceive.dataAvailable:
    echo triedReceive.msg
    break

  echo "重要なセキュリティ処理"
  sleep(400)

worker.joinThread()

channel.close()
stdout
重要なセキュリティ処理
重要なセキュリティ処理
重要なセキュリティ処理
重要なセキュリティ処理
重要なセキュリティ処理
国家機密

ただし、メインスレッドをブロックしない代償にメッセージが必ず受信できるとは限らない点に注意しましょう。
recvtryRecvの定義を比較してみましょう。

定義の比較
proc recv*[TMsg](c: var Channel[TMsg]): TMsg
proc tryRecv*[TMsg](c: var Channel[TMsg]): tuple[dataAvailable: bool, msg: TMsg]

ノンブロッキング通信では、受信が成功したかを表すdataAvailableと、メッセージであるmsgがタプルで返却されます。
上のコードのように、受信するまでwhileループで待ち続けて、dataAvailabletrueになった時点でループを抜けると良いでしょう。

ノンブロッキングな送信

受信と同様に、送信でもtrySendプロシージャを用いてノンブロッキングな送信を行うことができます。

ノンブロッキングな送信
import os
var channel: Channel[string]

proc task1 () =
  echo channel.trySend("Hello, Nim!")

channel.open(2)

var worker: Thread[void]

for i in 1..5:
  createThread(worker, task1)

while true:
  let triedChannel = channel.tryRecv()
  if not triedChannel.dataAvailable:
    break

worker.joinThread()
stdout
true
false
true
false
false

trySendプロシージャは、maxItemsの制限を超えるなど痩身に失敗した場合にfalseを返します。
送信時に、メインスレッドはブロックされません。

終わりに

channelsモジュールを利用することで並列操作だけでなく、スレッド間の通信も利用できるようになります。
threadschannelsも非常に低レベルなAPIなので、実際には他の高レベルなスレッドライブラリを使う場合が多いですが、Nimではスレッドがどのように扱われているかを知っておくのは良いと思います。
ぜひマルチスレッドプログラミングに取り組んでみてください!

余談

学生はGitHub Proに無料で加入できるのですが、GitHub Proユーザーは連携することでCanvaのProプランが利用できるそうです。
本記事ではこれを利用して画像を作りましたが、有料素材を気にせず使えるのは非常に良いので、学生の皆様はぜひ手続きをしてみてください。

参考文献