Monix Observable で Publish/Subscribe


Monix の サブプロジェクト monix-reactive でリアクティブなストリーミングが提供されている。この記事では、Observableを使った 1→多 の Publish/Subscribe を書いてみる。

Monix Observable 概要

  • GoF の Observer パターンに相当するが、Monix Observable は遅延×非同期な関数型イベントストリーム。
  • Task[A] を複数要素のストリーム化したもの、あるいは Iterable[A] を非同期+ノンブロッキングにしたもの、とも言える。
  • FS2 の Stream が Pull 型モデルであるのに対して、Observable は Push 型。また、パフォーマンスのために関数的な純粋性の面で若干妥協していたり、副作用の扱いも FS2 ほどにはこだわっていないという。1
  • functional reactive programming (FRP)に適している。

などなど

お題

パブリッシャーは、0から3までの数を1秒毎にイベントとして発行するような Observableとする。これを購読して、名前、スレッドIDと共に標準出力するサブスクライバーを、以降、いくつかのやり方で実装してみる。

  • ソースはここに置いた
  • Monix のバージョンは 3.0.0-RC2

まずObserver は以下のように書ける。

val observable = Observable.interval(1.second).takeWhile(_ <= 3)

イベントの購読の仕方にはいくつの方式があるが、ハンドリング自体は比較しやすいように共通コードを使いまわすことにする。ここでは受け取った Long値を、名前、スレッドIDと共に標準出力する、以下のような String => Long => Task[Unit] の関数にした。

def printTick(name: String) = (n: Long) => Task.sleep(100.milli) map { _ =>
  println(s"$name: $n in ${ Thread.currentThread().getId }")
}

Task 版

まず複数の Task を、gather で並行実行するシンプルな方式。

def gather(tasks: (Long => Task[Unit])*) = observable.mapEval { n =>
  Task.gather(tasks.map(_(n)))
}
def run(args: List[String]): Task[ExitCode] = {
  import monix.execution.Scheduler.Implicits.global

  val taskA = printTick("A")
  val taskB = printTick("B")

  Task.parZip2(
    Task.sleep(5.second), // Task 全体の所要時間をこれで決める 
    Task(gather(taskA, taskB).subscribe()) // subscribe() で Observable が動き出す
  ).map(_ => ExitCode.Success)
}

以下、実行結果(コメントは後で書き足したもの)

A: 0 in 15 // A と B は別のスレッドで実行されている。
B: 0 in 13 // printTick を少し書き換えて、たとえばわずかにスリープした後で、別途
A: 1 in 13 // 終了を println するようにすれば、並行実行されていることが確認できる。
B: 1 in 15
A: 2 in 13
B: 2 in 15
A: 3 in 13
B: 3 in 15

Process finished with exit code 0 // 5秒後に終了

Consumer 版

ConsumerObservable#consumeWith で関連付ける方式。

ただし、このままの observable はいわゆる cold Observable というもので、1つめの Consumer が購読を始めた時点でイベントが発行されてしまうので、hot Observable (ConnectableObservable) に変換することにより、connect して初めてイベントの発行が始まるようにする必要がある。

def consumer(name: String): Consumer[Long, Unit] =
  Consumer.foreachEval(printTick(name))

def run(args: List[String]): Task[ExitCode] = {
  import monix.execution.Scheduler.Implicits.global

  val connectable = observable.publish // hot Observable に変換
  val countA = connectable.consumeWith[Unit](consumer("A")) // まだ始まらない
  val countB = connectable.consumeWith[Unit](consumer("B")) // まだ始まらない

  Task.parZip4(
    Task.sleep(7.second),
    countA,
    countB,
    Task(connectable.connect()) // ここで初めてイベント発行が始まる
  ).map(_ => ExitCode.Success)
}

得られる結果はTask 版と同じ。

Observer 版

Observer を使うと、イベント処理の都度、購読を続けるか止めるかを切り替えたり、またエラーや完了をハンドリングしたりできる。

case class MyObserver(name: String, until: Long = Long.MaxValue) extends Observer[Long] {
  import monix.execution.Scheduler.Implicits.global

  def onNext(elem: Long): Future[Ack] =
    printTick(name)(elem).runToFuture.flatMap { _ =>
      if (elem < until) Continue else Stop
    }
  def onError(ex: Throwable): Unit =
    println(s"$name: an error occurred ${ ex.getMessage }")

  def onComplete(): Unit =
    println(s"$name: completed")
}

上の Observer は、完了時、エラー時の処理に加えて、イベントの Long値 がコンストラクタで受け取ったしきい値以上になると購読をやめるようにしている。

これを Observable に関連付けるコードが以下。

def run(args: List[String]): Task[ExitCode] = {
  import monix.execution.Scheduler.Implicits.global

  val connectable = observable.publish
  connectable.subscribe(MyObserver("A", 2)) // ObserverA は 2 秒で打ち切り
  connectable.subscribe(MyObserver("B"))    // ObserverB は打ち切りしない

  Task.parZip2(
    Task.sleep(10.second),
    Task(connectable.connect())
  ).map(_ => ExitCode.Success)
}
A: 0 in 13
B: 0 in 16
A: 1 in 13
B: 1 in 15
B: 2 in 15   // 2秒目以降 Observer A はプリントしない
B: 3 in 18
B: completed // 完了時のハンドリング

Process finished with exit code 0

Observer でキャンセルする場合

ConnectableObservable#connectCancelable を返すので、これを使えばパブリッシャー側のイベント発行を止めることができる。

  def run(args: List[String]): Task[ExitCode] = {
    import monix.execution.Scheduler.Implicits.global

    val connectable = observable.publish

    connectable.subscribe(MyObserver("A", 2))
    connectable.subscribe(MyObserver("B"))

    val mainT   = Task.sleep(10.second)
    val cancelT = for {
      cancelable <- Task(connectable.connect())
      _          <- Task.sleep(3.second)
    } yield cancelable.cancel // 3秒後にキャンセルする

    Task.parZip2(mainT, cancelT).map(_ => ExitCode.Success)
  }
A: 0 in 13
B: 0 in 13
A: 1 in 17
B: 1 in 15
B: 2 in 17 // 2秒までで止まって、3秒以降、onCompleted などは実行されない。

Process finished with exit code 0

補足

今回は割愛したが、以下のようなものもある

  • ObserverScheduler をくっつけた Subscriber
  • ObservableObserver の性質を併せ持つ Subject (上のサンプルでも、裏では PublishSubjectが使われている。)