Monix Observable で Publish/Subscribe
Monix の サブプロジェクト monix-reactive でリアクティブなストリーミングが提供されている。この記事では、Observable
を使った 1→多 の Publish/Subscribe を書いてみる。
Monix Observable 概要
- GoF の Observer パターンに相当するが、Monix Observable は遅延×非同期な関数型イベントストリーム。
- Task[A] を複数要素のストリーム化したもの、あるいは Iterable[A] を非同期+ノンブロッキングにしたもの、とも言える。
- FS2 の Stream が Pull 型モデルであるのに対して、Observable は Push 型。また、パフォーマンスのために関数的な純粋性の面で若干妥協していたり、副作用の扱いも FS2 ほどにはこだわっていないという。1
- functional reactive programming (FRP)に適している。
などなど
お題
パブリッシャーは、0から3までの数を1秒毎にイベントとして発行するような Observable
とする。これを購読して、名前、スレッドIDと共に標準出力するサブスクライバーを、以降、いくつかのやり方で実装してみる。
- ソースはここに置いた
- Monix のバージョンは 3.0.0-RC2
まずObserver
は以下のように書ける。
val observable = Observable.interval(1.second).takeWhile(_ <= 3)
イベントの購読の仕方にはいくつの方式があるが、ハンドリング自体は比較しやすいように共通コードを使いまわすことにする。ここでは受け取った Long値を、名前、スレッドIDと共に標準出力する、以下のような String => Long => Task[Unit]
の関数にした。
def printTick(name: String) = (n: Long) => Task.sleep(100.milli) map { _ =>
println(s"$name: $n in ${ Thread.currentThread().getId }")
}
Task 版
まず複数の Task
を、gather
で並行実行するシンプルな方式。
def gather(tasks: (Long => Task[Unit])*) = observable.mapEval { n =>
Task.gather(tasks.map(_(n)))
}
def run(args: List[String]): Task[ExitCode] = {
import monix.execution.Scheduler.Implicits.global
val taskA = printTick("A")
val taskB = printTick("B")
Task.parZip2(
Task.sleep(5.second), // Task 全体の所要時間をこれで決める
Task(gather(taskA, taskB).subscribe()) // subscribe() で Observable が動き出す
).map(_ => ExitCode.Success)
}
以下、実行結果(コメントは後で書き足したもの)
A: 0 in 15 // A と B は別のスレッドで実行されている。
B: 0 in 13 // printTick を少し書き換えて、たとえばわずかにスリープした後で、別途
A: 1 in 13 // 終了を println するようにすれば、並行実行されていることが確認できる。
B: 1 in 15
A: 2 in 13
B: 2 in 15
A: 3 in 13
B: 3 in 15
Process finished with exit code 0 // 5秒後に終了
Consumer 版
Consumer
を Observable#consumeWith
で関連付ける方式。
ただし、このままの observable
はいわゆる cold Observable というもので、1つめの Consumer が購読を始めた時点でイベントが発行されてしまうので、hot Observable (ConnectableObservable
) に変換することにより、connect
して初めてイベントの発行が始まるようにする必要がある。
def consumer(name: String): Consumer[Long, Unit] =
Consumer.foreachEval(printTick(name))
def run(args: List[String]): Task[ExitCode] = {
import monix.execution.Scheduler.Implicits.global
val connectable = observable.publish // hot Observable に変換
val countA = connectable.consumeWith[Unit](consumer("A")) // まだ始まらない
val countB = connectable.consumeWith[Unit](consumer("B")) // まだ始まらない
Task.parZip4(
Task.sleep(7.second),
countA,
countB,
Task(connectable.connect()) // ここで初めてイベント発行が始まる
).map(_ => ExitCode.Success)
}
得られる結果はTask 版と同じ。
Observer 版
Observer
を使うと、イベント処理の都度、購読を続けるか止めるかを切り替えたり、またエラーや完了をハンドリングしたりできる。
case class MyObserver(name: String, until: Long = Long.MaxValue) extends Observer[Long] {
import monix.execution.Scheduler.Implicits.global
def onNext(elem: Long): Future[Ack] =
printTick(name)(elem).runToFuture.flatMap { _ =>
if (elem < until) Continue else Stop
}
def onError(ex: Throwable): Unit =
println(s"$name: an error occurred ${ ex.getMessage }")
def onComplete(): Unit =
println(s"$name: completed")
}
上の Observer
は、完了時、エラー時の処理に加えて、イベントの Long値 がコンストラクタで受け取ったしきい値以上になると購読をやめるようにしている。
これを Observable に関連付けるコードが以下。
def run(args: List[String]): Task[ExitCode] = {
import monix.execution.Scheduler.Implicits.global
val connectable = observable.publish
connectable.subscribe(MyObserver("A", 2)) // ObserverA は 2 秒で打ち切り
connectable.subscribe(MyObserver("B")) // ObserverB は打ち切りしない
Task.parZip2(
Task.sleep(10.second),
Task(connectable.connect())
).map(_ => ExitCode.Success)
}
A: 0 in 13
B: 0 in 16
A: 1 in 13
B: 1 in 15
B: 2 in 15 // 2秒目以降 Observer A はプリントしない
B: 3 in 18
B: completed // 完了時のハンドリング
Process finished with exit code 0
Observer でキャンセルする場合
ConnectableObservable#connect
は Cancelable
を返すので、これを使えばパブリッシャー側のイベント発行を止めることができる。
def run(args: List[String]): Task[ExitCode] = {
import monix.execution.Scheduler.Implicits.global
val connectable = observable.publish
connectable.subscribe(MyObserver("A", 2))
connectable.subscribe(MyObserver("B"))
val mainT = Task.sleep(10.second)
val cancelT = for {
cancelable <- Task(connectable.connect())
_ <- Task.sleep(3.second)
} yield cancelable.cancel // 3秒後にキャンセルする
Task.parZip2(mainT, cancelT).map(_ => ExitCode.Success)
}
A: 0 in 13
B: 0 in 13
A: 1 in 17
B: 1 in 15
B: 2 in 17 // 2秒までで止まって、3秒以降、onCompleted などは実行されない。
Process finished with exit code 0
補足
今回は割愛したが、以下のようなものもある
-
Observer
にScheduler
をくっつけたSubscriber
-
Observable
とObserver
の性質を併せ持つSubject
(上のサンプルでも、裏ではPublishSubject
が使われている。)
Author And Source
この問題について(Monix Observable で Publish/Subscribe), 我々は、より多くの情報をここで見つけました https://qiita.com/yasuabe2613/items/7b11f774b11141191a71著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .