RxSwiftのconcatMapはsubsequenceを得る処理自体を待つわけじゃない


やりたいこと

非同期処理のトリガー PublishRelay<Int> によって、非同期処理 Single<Int> が行われる。
トリガーは非同期処理が終わる前にも叩かれうるが、前の非同期処理が終わるまで、次の処理は待機したい。

方針

  • 非同期処理が同時に走らないようにしたい
    • つまり flatMap は使えない
  • トリガーが叩かれた回数と同じ回数だけ非同期処理自体は行いたい
    • イベントを捨てる flatMapFirst flatMapLatest は使えない
  • 処理順を制御するのに適切なoperatorは concatMap

悪い実装

悪い実装.swift
func doSomething(_ i: Int) -> Single<Int> {
    print("start [\(i)]")
    return Single<Int>.just(1)
        .delay(.seconds(2), scheduler: MainScheduler.instance)
        .do(onSuccess: { _ in print("end [\(i)]") })
}

let trigger = PublishRelay<Int>()
_ = trigger
    .concatMap { doSomething($0) }
    .subscribe()

_ = Observable<Int>
    .interval(1, scheduler: MainScheduler.instance)
    .bind(to: trigger)
期待.log
start [0]
end [0]
start [1]
end [1]
start [2]
end [2]
...
実際.log
start [0]
start [1]
start [2]
end [0]
start [3]
start [4]
end [1]
...

何が悪いか

RxSwiftの concatMap は、
❌️前のsubsequenceがcompleteするまでsubsequenceを得るselectorの実行を待つ
⭕️selectorによって得られたsubsequenceのsubscribeを待つ

適切な実装.diff
 func doSomething(_ i: Int) -> Single<Int> {
+    Single.just(()).flatMap {
         print("start [\(i)]")
         return Single<Int>.just(1)
             .delay(.seconds(2), scheduler: MainScheduler.instance)
             .do(onSuccess: { _ in print("end [\(i)]") })
+    }
 }

 _ = trigger
     .concatMap { doSomething($0) }
     .subscribe()
結果.log
start [0]
end [0]
start [1]
end [1]
start [2]
end [2]
...

結論

concatMap に限った話ではなく、「ストリームを作る」と「ストリームが購読・実行される」のタイミングの違いは意識しましょうねという話に一般化できます。