CombineのflatMapで実行数を制御する


この文章の結論

例えばWeb APIによる通信処理でPOST/DELETEやファイルアクセスによる作成/削除などでflatMapを使う際に、引数として.flatMap(maxPublishers: .max(1))を利用し、flatMapで作成するPublisher数を1つとして利用するほうが良さそうなんじゃないか?という文章です。

もし、「そりゃ違うよ」などのご意見があればコメントで頂けると幸いです。

はじめに

CombineのflatMapはパラメータにmaxPublishers: Subscribers.Demandがあり、リファレンスではThe maximum number of publishers produced by this methodとあります。これを試してみると、flatMapのmaxの数より多くの新しいストリームは同時には作られません。

検証したいこと

言葉で説明しても分からないと思うので、そのflatMapする際の検証コードを載せておきます。

  • 検証1
    • やること
      • 1...3でイベントを流してflatMapで非同期のイベントを流してみる
    • 検証コード1
      • flatMapを.unlimitedにして実行を観測
    • 検証コード2
      • .max(1)にして実行を観測
  • 検証2
    • やること
      • .unlimitedでは上流自体が止められていないことを確かめたいのでエラー流す
      • 非同期ではないほうがわかりやすいはず
    • 検証コード1
      • flatMapを.unlimitedにして実行を観測

検証1:

検証コードの仕様

  • Coldなストリームとして1, 2, 3が流れる
  • flatMapを挟む
    • 1つ目のイベントでは1秒後に遅らせてイベントを文字列にして流す
    • 1つ目でなければ非同期でイベントを文字列にして流す
  • sinkで観測

何がやりたいかと言うと、flatMapで一発目のストリームの終了時間を遅くして次のストリームまでの実行が遅れるかどうかが知りたい。

先に結果

  • maxPublishersが.unlimitedならflatMapで作成したストリームは非同期に実行される
  • maxPublishersが.max(1)ならflatMapで作成したストリームは同期的に実行される

検証コード

デフォルトの.unlimitedでの例

引数maxPublishersを省略すると、.flatMap(maxPublishers: .unlimited)になります。

import Foundation
import Combine
import PlaygroundSupport

let page = PlaygroundPage.current
page.needsIndefiniteExecution = true

let cancelable = (1...3).publisher
    .flatMap { value -> Future<String, Never> in
        print("🍏flatMap: \(value)")
        if value == 1 {
            return Future<String, Never> { promise in
                let v = value
                // 1発目だけ遅らせたい
                DispatchQueue.main.asyncAfter(deadline: .now() + 1.0) {
                    print("🐢after: \(v)")
                    promise(.success("\(v)"))
                }
            }
        } else {
            return Future<String, Never> { promise in
                let v = value
                DispatchQueue.main.async {
                    print("🐰after : \(v)")
                    promise(.success("\(v)"))
                }
            }
        }
    }
    .sink(receiveCompletion: { completion in
        switch completion {
        case .finished:
            print("🍎sink finished:", completion)
        case .failure(let error):
            print("🍎sink failure:", error)
        }
    }) { value in
        print("🍎sink received: \(String(describing: value))")
    }

DispatchQueue.main.asyncAfter(deadline: .now() + 5.0) {
    print("終わったやろ")
    page.finishExecution()
}

出力は次の通り。解説を//で加えます

🍏flatMap: 1
🍏flatMap: 2        // 1の購読を待たず2のflatMap開始
🍏flatMap: 3        // 2の購読も待たず
🐰after : 2
🍎sink received: 2
🐰after : 3
🍎sink received: 3
🐢after: 1
🍎sink received: 1   // 1のflatMapのストリームが購読された
🍎sink finished: finished
終わったやろ

flatMapは呼び出されそのままflatMap内でストリームが実行されました。

  • デフォルトのflatMapはすでに実行されたストリームの購読の終了を待たない

.max(1)での例

.flatMap(maxPublishers: .max(1))だけ変更します。

先程のコードから次のように変更してください

.flatMap { value -> Future<String, Error> in

.flatMap(maxPublishers: .max(1)) { value -> Future<String, Error> in

出力を見ると、1が購読されるまで遅延させられていたのが分かるはずです。//で解説を加えています

🍏flatMap: 1
🐢after: 1
🍎sink received: 1 // 1の購読が終わり
🍏flatMap: 2       // flatMapの2つ目が遅延させられていた
🐰after: 2
🍎sink received: 2
🍏flatMap: 3
🐰after: 3
🍎sink received: 3
🍎sink finished: finished
終わったやろ

これでflatMapはストリームのイベント発生とそのsinkが終わるまで次のは実行されていないのがわかります

検証2: flatMapでエラーを流す場合上流は止まってる?

次にflatMapでエラーを流してみるとどうなるか、というのをやってみたいと思います。
なぜかというと、flatMapでエラーを流してその下流は止められるんですが、上流は止まらないのです。

デフォルトの.unlimitedでの例

import Foundation
import Combine
import PlaygroundSupport

let page = PlaygroundPage.current
page.needsIndefiniteExecution = true

let cancelable = (1...3).publisher
    .handleEvents(receiveOutput: {
        print("❗handle \($0)")
    }, receiveCompletion: { _ in
        print("❗handle completion")
    }, receiveCancel: {
        print("❗handle cancel")
    })
    .setFailureType(to: Error.self)
    .flatMap { value -> Future<String, Error> in
        print("🍏flatMap: \(value)")
        if value == 1 {
            return Future<String, Error> { promise in
                let v = value
                // 1発目からエラー
                print("error : \(v)")
                promise(.failure(NSError(domain: "test", code: 1)))
            }
        } else {
            return Future<String, Error> { promise in
                let v = value
                print("success : \(v)")
                promise(.success("\(v)"))
            }
        }
    }
    .sink(receiveCompletion: { completion in
        switch completion {
        case .finished:
            print("🍎sink finished:", completion)
        case .failure(let error):
            print("🍎sink failure:", error)
        }
    }) { value in
        print("🍎sink received: \(String(describing: value))")
    }

DispatchQueue.main.asyncAfter(deadline: .now() + 5.0) {
    print("終わったやろ")
    page.finishExecution()
}

出力は次の通り、handleが出力されているので止まってないと思えます。

❗handle 1
🍏flatMap: 1
error : 1
🍎sink failure: Error Domain=test Code=1 "(null)"
❗handle 2            // flatMapより上流は動作している
❗handle 3            // これも
❗handle completion   // エラーが起きてるのにそれは関係なく
終わったやろ

これはすごく不思議な気がします。エラーになってるのに上流は止まっていない。

.max(1)での例

.max(1)にするも念の為やってみましょう

先程のコードから次のように変更してください

.flatMap { value -> Future<String, Error> in

.flatMap(maxPublishers: .max(1)) { value -> Future<String, Error> in

結果は次の通り、1発目のエラーで終了していますが、handleも出力していないため上流も止めています。

❗handle 1
🍏flatMap: 1
error : 1
🍎sink failure: Error Domain=test Code=1 "(null)"
終わったやろ

上流を止めていると言うか、coldなストリームを下流のflatMapで購読していないと思えるのでこれが普通な気がします。

つまり、flatMapはエラーにしてるのにストリームをdisposeしてない。RxSwiftだとdisposeされて数珠つなぎになった購読が止まると思うんですが、Combineはそうなっておらず、そもそもdisposeという概念がないため、ストリームの上流を止めるにも使っていけということですかね?

そしてこれがバックプレッシャーを使うということでしょうかね?

.flatMap(maxPublishers: .max(1))が向いてない処理は

インクリメンタルサーチには向いてない

インクリメンタルサーチは次々に入力が来て、古い検索は気にせず新しい検索を投げたい処理です。
そのため、.flatMap(maxPublishers: .max(1))は処理を待ってしまうので向いてなさそうです。

まとめ

言いたいことが2つになっちゃってますが、次のようなことを確認しました。

  • flatMapのmaxPublishers: Subscribers.Demand引数
    • maxの数より多くflatMapで新しいストリームはつくられない
      • maxの数はflatMapで作成するPublisherの数を指定する
        • Publisherの数を指定する際に同期的になっている?
  • Combineのストリームはエラーになっても上流は止まらない
    • エラーになった場合にmaxPublishersに指定数超えて新しくリクエストしないのでflatMapの上流も処理を止められるけども

感想

感想としては.max(1)でWeb API利用時やファイルアクセス時には積極的にバックプレッシャー使えば良さそうなんじゃない?と思います。デフォルトでバックプレッシャーを使わず無制限なのはRxJavaでもデフォルトだから仕様としてそうなってるんだとは思いますが...。

(余談)flatMapLatestについて

ちなみに、RxSwiftにあるflatMapLatestオペレータがCombineにはありません。つまり連続するストリームのイベントに対してなるべく無駄なく新しいストリームを作りたい場合に、オペレータで制御する方法がないのかなと思っていました。

しかし、コメント欄含め、やり方はありますよということを指摘してもらっていたのでそれについて補足します。