RxSwift `a.withLatestFrom(a)` 同じ上流元の同期的合流問題


以下は、社内で話題になった RxSwiftの「同じ上流元の同期的合流問題」の自分用メモです。
(文法とか型の形がおかしい等は、一旦忘れてください)


a, bをhot observable (e.g. PublishSubject)とする。
一般に、

let c = a.withLatestFrom(b) { ($0, $1) }
c.subscribe()

は、bが先にsubscribeされるため、$0部分よりも前に$1(内部キャッシュ)が生成される。
よって、もし b = a の場合でも、 a.onNext("hello")時にc同期的にonNext を行い、$0$1はともに最新値"hello"を取る。
ただしこの挙動は、 RxSwiftの実装を見ないと分かりにくい 欠点がある。

ちなみに 非同期 flatMapがある場合は、

a
  .flatMap { async($0) }
  .withLatestFrom(a)
  .subscribe()
// NOTE: f(a).withLatestFrom(a) の形だが、 fは「非同期」変換 

があり、async が通常、非同期的な合流のため、理解しやすい。
問題となるのは、 同じ上流元が「同期的」に合流する場合 のみ。

問題を一般化すると、

h(f(a), g(a)).subscribe()
// h = merge, combineLatest, zip, withLatestFrom

において、f, gが任意の 同期変換 (mapなど) のとき、subscribeに流れるnext値は hの実装に依存する ことになり、理解が難しくなる。

例:

// # 理解しにくい例
merge(a.map { true }, a.map { false }) 
// true, falseのどちらが先に送られるか?
// 直感的にはtrueだが、mergeの中身がflipされている可能性もある=内部実装を見ないといけない

// # 例外
zip(a, a.skip(1)) 
// 「最新の値」と「1つ前の値」のタプル。
// skip(1)とzipによって、キャッシュ生成+利用が「非同期的」に行われるため、この合成は理解しやすい。

今回、社内で話題に上がった問題は、

// # 理解しにくい
let c1 = merge(a, b).withLatestFrom(a)

// # 理解しやすい
let c2 = merge(a, b.withLatestFrom(a))

というパターン。
どちらも、同じ上流aを合流させた形だが、前者c1よりも後者c2が理解しやすいのは、 a.onNext 時に

  • c1は、実質 a.withLatestFrom(a) の形と等価で、前述の通り、同期処理のタイミング判断が難しい
  • c2b.withLatestFrom(a) は、a.onNext時には内部キャッシュを更新するのみ。つまり、b.onNexta.onNextに対して非同期)+最新キャッシュのみを考慮すれば良いので、 a.onNextb.onNextを分解したmerge について考えられる

という違いがあるため。
(注意: 先にb.onNextが起きた場合の挙動についても注意を払う必要がある)