fs2 の並行処理 〜 Balance, Broadcast編


Scala の Streaming I/O ライブラリ fs2 の並行処理についての記事。今回は Balance と Broardcastにについて。

はじめに

fs2 の concurrency primitives には、これまでに紹介した QueueTopic、Signalの他に、比較的新しい1BalanceBroadcast といったものもある。

概要

どちらもファンアウト(fan-out)系2で、一本の源流のストリームを複数のワーカー3につなげるものだが、以下のような違いがある。

  • Balance: 上から流れてきた要素を、複数のワーカーからどれか一つ適当なもの選んで流す4
  • Broadcast: 上から流れてきた要素を、全てのワーカーが受け取る

これらを扱うための balance*系やbroadCast*系などのメソッドがStream クラスで提供されていて、直接 BalanceBroadcast を使うより簡単なので、今回はそれを使う。

お題

以下のような標準入力由来のストリームのファンアウトを考えてみる。

  1. ソース・ストリームでは、標準入力からのバイト列を、(1)文字列にして、(2)行で分割して、(3)トリムして、(4)空行を取り除いて、後続のストリームに流す。
  2. 下記のようなワーカーをソース・ストリームにつなげる
    1. 識別 id と秒数を指定してパイプを生成する
    2. 各ワーカーは指定秒数だけ待ってから、流れてきた文字列を修飾して標準出力に書き出す
    3. 結果として文字列の長さを整数として返す
  3. 全体のストリーム構成としては、ソース・ストリームを Balance または Broadcast を介してワーカーに接続し、流れてきた文字列長を適当に修飾して表示する。

(fs2 は v2.0.0 を使った。動くコードはここに置いた)

実装

以下のような共通コードを使う。

def readlineStream[F[_]: Sync : ContextShift](implicit bl: Blocker): Stream[F, String] =
  io.stdin[F](4096, bl)
    .through(text.utf8Decode)
    .through(text.lines)
    .map(_.trim)
    .filter(_.nonEmpty)
    .takeWhile(_ != ":q", takeFailure = true)

def printlnSink[F[_]: Sync : ContextShift](implicit bl: Blocker): Pipe[F, String, Unit] =
  _.map(s => s"$s\n")
    .through(text.utf8Encode)
    .through(io.stdout[F](bl))

readlineStream は、コンソール入力を「お題」のように加工している。printlnSink は、上流ストリームから流れてきた文字列に改行を付けて標準出力に書き出している。

Balance と Broadcast で共用するワーカーは、以下のようなパイプとした。

def worker[F[_]: Sync: ContextShift: Timer](id: String, sec: Int)
          (implicit bl: Blocker): Pipe[F, String, Int] = _ flatMap { s =>
  val sleep = Stream.sleep_[F](sec.second)
  val print = Stream(s"worker#$id processing '$s' (${sec}s)") through printlnSink
  (sleep ++ print) zipRight Stream(s).map(_.length)
}

パイプをファンアウトするには、StreambalanceThroughbroadcastThrough を使う。5

(どちらについても、最大並列数を指定して、ある一つのワーカーから複数のインナーストリームを生成する版6もあるが、今回は異なる複数のワーカーを個別に指定する版を使う。)

Balance

Balance では、以下のように balanceThrough メソッドを使って3つのワーカーにファンアウトする。

def stream[F[_] : Concurrent : Timer : ContextShift]
  (implicit bl: Blocker): Stream[F, Unit] =
    readlineStream[F]
      .balanceThrough(1)(worker("A", 1), worker("B", 2), worker("C", 3))
      .map(n => s"length: $n")
      .through(printlnSink)

結果

動かしてみると以下のようになる。(コメントは後で書き足したもの)

hello                            // hello と入力
worker#A processing 'hello' (1s) // ワーカーAが一秒待って文字列長をエミット
length: 5                        // アウター側で表示
bye                              // bye と入力
worker#B processing 'bye' (2s)   // 同様にワーカーB
length: 3
1                                // "1" と "1234" を立て続けに入力
1234
worker#A processing '1234' (1s)  // 順序としてはワーカーCだが、待ち時間が小さいワーカーAが先に終わる
length: 4                        // アウター側も終わった順にストリーム要素を受け取る
worker#C processing '1' (3s)     // 遅れてワーカーC が終了
length: 1                        // アウター側は同様に先行ワーカーが終わり次第処理

A, B, C、A, B, C ...の順にファンアウトされ、アウターストリームは終ったもの順に処理を続行する。

Broadcast

Broadcast では以下のように broadcastThrough を使って、3つのワーカーを接続する。

def stream[F[_] : Concurrent : Timer : ContextShift]
  (implicit bl: Blocker): Stream[F, Unit] =
    readlineStream[F]
      .broadcastThrough(worker("A", 1), worker("B", 2), worker("C", 3))
      .map(n => s"length: $n")
      .through(printlnSink)

結果

Balance とは違って、3つのワーカーそれぞれがアウターストリームから流れてきた要素を処理する。

hello                             // ”hello” と入力
worker#A processing 'hello' (1s)  // ワーカーが3つともアウターからのストリームを処理
length: 5                         // アウターでは終わった順に処理
worker#B processing 'hello' (2s)
length: 5
worker#C processing 'hello' (3s)
length: 5

ストリーム中の一つの要素(あるいはイベント)を複数のワーカー(あるいは購読者)が処理するという点で、なんとなくトピックに似ている気もするが、Topic のようにオブジェクトを明示的には作らなくても Stream のメソッドで直接つなげられる点や、ワーカーの出力がアウターに戻ってくる点などが違う。


  1. v1.0.0 以降でサポート 

  2. 扇状に一本から複数に分岐する形や流れ 

  3. ワーカーは PipeSink として、ソース・ストリームにつなげる。 

  4. ワーカーの選出基準の詳細などはここでは触れない。APIドキュメントに詳しい。 

  5. シンクにつなげるには、balanceTobroadcastTo を使う。 

  6. balanceThrough, broadcastThrough