RxSwiftのflatMapって、どんな時に使ってる??


RxSwiftのflatMapって理解しにくくないですか?
僕は理解するのに大分苦労しました。

前提:複数のストリームを一箇所に流すには

複数のObservableの変更を、一箇所に集約するには下記のようにそれぞれbindしては駄目です。

//出力用のObserver
let output = PublishSubject<String>()

output
    .subscribe(
        onNext: { print($0) },
        onError: nil,
        onCompleted: { print("onCompleted") }
)
//入力用のObservable
let stream1 = PublishSubject<String>()
let stream2 = PublishSubject<String>()
stream1.bind(to: output)//stream1の変化をoutputに通知
stream2.bind(to: output)//stream2の変化をoutputに通知

一応想定通りに動きますが、どちらかのObservableがcompleteすると、流れが止まります。

stream1.onNext("stream1-1")
stream2.onNext("stream2-1")
stream2.onNext("stream2-2")
stream2.onCompletced()
stream1.onNext("stream1-2")
stream1.onNext("stream1-3")
stream1.onNext("stream1-4")
//結果
stream1-1
stream2-1
stream2-2
onCompleted

stream2側でonCompletcedを呼び出してますが、その後のstream1からのイベントも止まってしまうのです。

複数のストリームはちゃんとオペレーターで束ねましょう

ではどうすればいいかと言うと、merge オペレータで束ねます

let output = PublishSubject<String>()

output
    .subscribe(
        onNext: { print($0) },
        onError: nil,
        onCompleted: { print("onCompleted") }
)
//入力用のObservable
let stream1 = PublishSubject<String>()
let stream2 = PublishSubject<String>()

//stream1, stream2を束ねた新しいObservableを作り
let merged = Observable
                .of(stream1, stream2)
                .merge()
//これをoutputにbindする
let mergeBind = merged.bind(to: output)

stream1.onNext("stream1-1")
stream2.onNext("stream2-1")
stream2.onNext("stream2-2")
stream2.onCompletced()
//途中でstream2にonCompletcedさせる
stream1.onNext("stream1-2")
stream1.onNext("stream1-3")
//結果
merged-stream1-1
merged-stream2-1
merged-stream2-2
merged-stream1-2
merged-stream1-3

しかし、merge は後からObservableを追加する事ができません。新しく作り直したとしても、一旦 outputへのbindを解除してから再びbindし直す必要があります。
単に新しくbindしてしまうとoutputに対して複数の入力元をもたせることになり、最初の問題にぶつかります。

let merged2 = Observable
                .of(merged, stream3)
                .merge()
mergeBind.dispose() //一旦outputへのbindを解除してから
merged2.bind(to: output) //3つのストリームを合体したものをbindし直し

本命! flatMapなら後から足せるよ

やっとflatMapの登場です。
flatMapは、
引数;「流れて来る本来のイベントの値を加工した上で、<任意のObservabl型> を返す関数」
戻り値 : <任意のObservabl型>

というう関数です。これがややこしい


//Observable<String>型をイベントとして流すObserverは、
let observableSubject = PublishSubject<Observable<String>>()
//onNextで流せるイベントは、Observable<String>型です。(当たり前ですね)
observableSubject.onNext(stream1)
observableSubject.onNext(stream2)

//一方で、flatMap関数は、「流れて来る本来のイベントの値を加工した上で、任意のObservabl型を返す関数」を引数に取ると
let flatMapped: Observable<String> = observableSubject
    .flatMap { (origin: Observable<String>) -> Observable<String> in
        //ここでreturnするObservableの型を返します
        return origin
}

とりあえず、何が出来るのかというと、

let output = PublishSubject<String>()

output
    .subscribe(
        onNext: { print($0) },
        onError: nil,
        onCompleted: { print("onCompleted") }
)
//入力用のObservable
let stream1 = PublishSubject<String>()
let stream2 = PublishSubject<String>()

//Observable<String>型をイベントとして流すObserverを
let observableSubject = PublishSubject<Observable<String>>()
//flatMapでStringを流すObservableにしてあげると、outputにbindできます
observableSubject.flatMap{$0}.bind(to: output)

//observableSubject自体がonNextで流せるイベントは、Observable<String>型
observableSubject.onNext(stream1)
observableSubject.onNext(stream2)

//ここでstream1、stream2で値を流すと、mergeと同じ挙動になる
stream1.onNext("flatMap-stream1-1")
stream2.onNext("flatMap-stream2-1")
stream2.onNext("flatMap-stream2-2")
stream2.onCompleted()
stream1.onNext("flatMap-stream1-2")
stream1.onNext("flatMap-stream1-3")
結果
flatMap-stream1-1
flatMap-stream2-1
flatMap-stream2-2
flatMap-stream1-2
flatMap-stream1-3

あとからObservableを追加できるんです!

let stream3 = PublishSubject<String>()
observableSubject.onNext(stream3)

stream1.onNext("after-stream1")
stream3.onNext("after-stream3")
結果
after-stream1
after-stream3

使い所は様々です

あとからObservable (イベントストリーム)を追加していけるので、いちいち再登録しないで良いんです。

//兎に角、結果は全部 outputに束ねてしまう
observableSubject.flatMap{$0}.bind(to: output)

observableSubject.onNext(stream1)
observableSubject.onNext(stream2)

//notification は Observable<String>
//次の一回だけ、notificationからの値をマージしたい
observableSubject.onNext(notification.take(1))

//takeResultは通信結果をObservable<String>で返す関数
observableSubject.onNext(takeResult())