RxSwift - Mathematical & Aggregate & Connectable Observable Operators



concat



concatは、RxSwiftとCombineOperatorにあるOperatorです.
concatは2つの観測可能なイベントを結合し、
観測可能な機能にする.
        let firstObservable = Observable<Int>.of(1,2,3)
        let secondObservable = Observable<Int>.of(4,5,6)
        
        firstObservable
            .concat(secondObservable)
            .subscribe(onNext: {
            print($0)
        }).disposed(by: disposBag)
        
//print
1
2
3
4
5
6
2つの観測可能な値の組合せを解放した.
また、異なるOperatorを使用してイベントの値を変更することもできます.
        let firstObservable = Observable<Int>.of(1,2,3)
        let secondObservable = Observable<Int>.of(4,5,6)
        
        firstObservable
            .concat(secondObservable)
            .filter { $0 % 2 == 0 }
            .toArray()
            .subscribe { singleEvent in
                print(singleEvent)
            }.disposed(by: disposBag)
            
            
            //print
            success([2, 4, 6])
しかし、上記の例は、最も基本的な動作をテストする例である.
ここには見逃す可能性のある点がありますが、より多くの例を見てみましょう.
        let first = Observable<Int>.create { observer in
            observer.onNext(1)
            return Disposables.create()
        }
        
        let second = Observable<Int>.of(2,3,4)

        let concat = Observable.concat([first,second])
        concat
            .debug()
            .subscribe(onNext : {
            print($0)
        }).disposed(by: disposBag)
createとofを作成するには、2つのIntタイプの観測性があります.
この2つの観測性をconcatにより新しい観測性に統合した.
では、出力の値はいくらですか?
1 2 3 4を思いついた
1
以上の画像とdebugは原因を説明しています.
2022-03-17 11:28:11.132: SetupVC.swift:54 (testConcat()) -> subscribed
2022-03-17 11:28:11.135: SetupVC.swift:54 (testConcat()) -> Event next(1)
デバッグ出力文にDisposeの出力はありません.
これは、サブスクリプション行為自体が処理されていないことを意味し、理由はパブリケーションから知ることができる.
concat演算子は、最初の観測可能ライフサイクルが終了した後に、2番目の観測可能を解放し、マージします.
これにより、最初の観測可能なライフサイクルが終了せず、統合できません.
この問題はonCompleted Eventリリースで解決できます.
        let first = Observable<Int>.create { observer in
            observer.onNext(1)
            observer.onCompleted()
            return Disposables.create()
        }
onCompletedリリース
2022-03-17 11:30:01.756: SetupVC.swift:54 (testConcat()) -> subscribed
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event next(1)
1
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event next(2)
2
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event next(3)
3
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event next(4)
4
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event completed
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> isDisposed
正常に動作
        let subject = PublishSubject<String>()
        
        subject.onNext("FirstSubject 이벤트 요소 === 하나")
        
        let secondSubject = BehaviorSubject<String>(value: "SecondSubject 이벤트 요소 === 하나")
        
        
        secondSubject.onNext("SecondeSubject === 둘")
        
        let concatObservable = Observable.concat([subject,secondSubject])
        
        concatObservable
            .debug()
            .subscribe(onNext: {
            print($0)
        }).disposed(by: disposBag)
        subject.onNext("FirstSubject 이벤트 요소 === 셋")
        secondSubject.onNext("SecondSubject 이벤트 요소 === 셋")
subjectにも似たような脈絡があります.
Publish/Behaviorの2つのsubjectを別のプロパティに接続します.
2022-03-17 11:35:23.577: SetupVC.swift:75 (testConcat()) -> subscribed
2022-03-17 11:35:23.579: SetupVC.swift:75 (testConcat()) -> Event next(FirstSubject 이벤트 요소 ===)
FirstSubject 이벤트 요소 ===
これは出力文です.
ここで,Publish Subjectの特徴は正しい.
BehavorSubjectの特徴はあまり明らかではありません.サブスクリプション前のイベント要素を取得できません.
理解しやすいサンプルコードを作成します.
       let pSub = PublishSubject<String>()
        pSub.onNext("p")
        let bSub = BehaviorSubject<String>(value: "bA")
        pSub
            .concat(bSub)
            .toArray()
            .debug()
            .subscribe()
            .disposed(by: disposBag)
        pSub.onNext("pA")
        bSub.onNext("bB")
        pSub.onNext("pB")
        bSub.onNext("bC")
        pSub.onNext("pC")
        bSub.onNext("bD")
        pSub.onNext("pD")
        pSub.onCompleted()
        bSub.onNext("bF")
        bSub.onNext("bG")
        bSub.onNext("bH")
        bSub.onCompleted()
上記のコードでconcatが実行されると、
publishSubject
  • 購読前にイベント解放を無視
    -無視>p
  • 動作オブジェクトは次のとおりです.
  • 購読前の最新値+購読後の値を観察します.
    ->bA->bHから
  • 2022-03-17 12:17:46.180: SetupVC.swift:119 (testConcat()) -> subscribed
    2022-03-17 12:17:46.182: SetupVC.swift:119 (testConcat()) -> Event next(["pA", "pB", "pC", "pD", "bD", "bF", "bG", "bH"])
    2022-03-17 12:17:46.182: SetupVC.swift:119 (testConcat()) -> Event completed
    2022-03-17 12:17:46.182: SetupVC.swift:119 (testConcat()) -> isDisposed
    ただし、outputに表示される動作オブジェクトは、パブリッシュオブジェクトの解放が終了する前に解放されたイベントのみ観察できます.
    これは、サブスクリプション条件ではなくパブリッシュオブジェクトのライフサイクル終了ポイントに基づいてイベントを観察できることを意味します.
    また、2つのsubjectのいずれかがonCompleted Eventを実行していない場合、observerはonNext Eventを観察できません.

    reduce



    SWIFT高次関数のreduceと同じです.
    reduceは、すべてのイベントの合計を解放します.
    (スキャンはすべての中間プロシージャを解放します.)
            let observer = Observable.range(start: 1, count: 5)
            observer.scan(0, accumulator: +)
               .subscribe { print("scan === \($0)") }
               .disposed(by: disposBag)
    
            observer.reduce(0, accumulator: +)
                .subscribe{ print("reduce === \($0)") }
                .disposed(by: disposBag)
    また、モジュールを使用して複数の演算を実行することもできます.
       Observable.from([1, 2, 3])
              .reduce(2) { $0 * $1 }
              .debug()
              .subscribe()
              .disposed(by: disposBag)

    Connectable Observable Operators


    connectable operaotrを使用すると、共有可能(接続可能)な観察可能に変換できます.
    Observerableは、ユニキャストとして1つのobserverしか接続できません.
     func buttonLabelConfigure(){
            view.addSubview(bt)
            bt.snp.makeConstraints {
                $0.centerX.centerY.equalToSuperview()
                $0.width.equalTo(200)
                $0.height.equalTo(80)
            }
            view.addSubview(lb)
            lb.snp.makeConstraints {
                $0.top.equalTo(bt.snp.bottom).offset(20)
                $0.centerX.equalToSuperview()
                $0.width.equalTo(200)
                $0.height.equalTo(80)
                
            }
        }
        func testShare(){
        // API Requst 를 통해 Responde 받는 Observable 이라고 가정
            let requestAPI = Observable.just(100).debug("API - Request")
            
            let tapResult = bt.rx.tap
                .flatMap { requestAPI }
            
            tapResult
                .map { $0 > 3 }
                // bind(to:)는subscribe()의 별칭(Alias)으로 Subscribe()를 호출한 것과 동일
                .bind(to: bt.rx.isHidden )
                .disposed(by: disposBag)
            
            tapResult
                .map { "Count : \($0)" }
                .bind(to: lb.rx.text )
                .disposed(by: disposBag)
        }
    ボタンtapの観測可能(シーケンス)が2回にbind(サブスクリプション)があると仮定する.
    この場合、API呼び出しの観測可能性はbind(2回)によって2つのシーケンスを解放する.
    2022-03-17 13:42:20.669: API - Request -> subscribed
    2022-03-17 13:42:20.671: API - Request -> Event next(100)
    2022-03-17 13:42:20.671: API - Request -> Event completed
    2022-03-17 13:42:20.671: API - Request -> isDisposed
    2022-03-17 13:42:20.672: API - Request -> subscribed
    2022-03-17 13:42:20.672: API - Request -> Event next(100)
    2022-03-17 13:42:20.673: API - Request -> Event completed
    2022-03-17 13:42:20.673: API - Request -> isDisposed
    出力文に示すように、応答として2つのonNextイベントが受信されました.
    これらの問題の解決策は次のとおりです.
    share()オペレータ.

    share


    演算子を使用すると、Subscribe()のたびに新しい観測可能なシーケンスが作成されず、1つのシーケンスから解放されるイベント要素を共有できます.
    unicast -> multicast ??
        let tapResult = bt.rx.tap
                .flatMap { requestAPI }
                .share()
    2022-03-17 13:59:34.499: API - Request -> subscribed
    2022-03-17 13:59:34.501: API - Request -> Event next(100)
    2022-03-17 13:59:34.502: API - Request -> Event completed
    2022-03-17 13:59:34.502: API - Request -> isDisposed
    しかし、これはSubjectの機能を完全に実行できるという意味ではありません.
    まず、コードを表示できます.
            let observable = Observable<Int>.create { observer in
                observer.onNext(Int.random(in: 0..<10))
                return Disposables.create()
            }
            
            let observer1 = observable
                .subscribe { print("observer 1 === \($0)") }
            observer1.disposed(by: disposBag)
            
            let observer2 = observable
                .subscribe {print("observer 2 === \($0)") }
            observer2.disposed(by: disposBag)
            //print
            observer 1 === next(5)
            observer 2 === next(2)
    shareを無効にしたときの観測性.
    前述したように、サブスクリプションを開始すると、Observableが作成されて解放されます.
    サブスクリプションポイントによって、異なるシーケンスが作成されます.
    ただし、shareを使うと
     let observable = Observable<Int>.create { observer in
                observer.onNext(Int.random(in: 0..<10))
                return Disposables.create()
            }.share()
    observer 1 === next(6)
    これらの出力文が表示されます.
    ここでSubjectとの違いを見つけることができます.
    Subjectはobserver間でイベントを共有します.
    ただし、shareを使用するObserverableについては、
    Subscribeは、最初のSubscribeが完了したとき(exサブスクリプションカウントが0から1になったとき)にのみ作成されます.その後生成されるサブスクリプション動作では、最初に呼び出されたサブスクリプションは無視され、共有されます.

    replay(default : 0 ) , scope(default : .whileConnected )


    パラメータをshareに入れることができます.share(replay: , scope :)
    ここでは、新しいサブスクリプションのobserverが解放するイベント要素の再生を設定できます.
            let observable = Observable<Int>.create { observer in
                observer.onNext(Int.random(in: 0..<10))
                observer.onNext(Int.random(in: 11..<20))
                observer.onNext(Int.random(in: 21..<30))
                observer.onNext(Int.random(in: 31..<40))
                return Disposables.create()
            }.share(replay: 2 , scope:  .whileConnected )
            
            let observer1 = observable
                .subscribe { print("observer 1 === \($0)") }
            observer1.disposed(by: disposBag)
            
            let observer2 = observable
                .subscribe {print("observer 2 === \($0)") }
            observer2.disposed(by: disposBag)
            
          let observer3 = observable
                .subscribe { print("observer 3 === \($0)") }
            observer3.disposed(by: disposBag)
    
           let observer4 = observable
               .subscribe { print("observer 4 === \($0)")}
                observer4.disposed(by: disposBag)
    observer 1 === next(9)
    observer 1 === next(14)
    observer 1 === next(22)
    observer 1 === next(34)
    observer 2 === next(22)
    observer 2 === next(34)
    observer 3 === next(22)
    observer 3 === next(34)
    observer 4 === next(22)
    observer 4 === next(34)
    再生中に設定した値は、バッファサイズと考えられます.
    observer 1からsubscribe 0->1に変更
    observer 2は、新しく購読されたobserverから始まります.
    したがってobserver 2からshareのobserver 1を介して共有を購読する.
    リリース値は、再生中に設定した値と一致します.
    さらに、Observableが完了したイベントを解放すると、
    すべての共有が無効になります.
     let observable = Observable<Int>.create { observer in
                observer.onNext(Int.random(in: 0..<10))
                observer.onNext(Int.random(in: 11..<20))
                observer.onNext(Int.random(in: 21..<30))
                observer.onNext(Int.random(in: 31..<40))
                observer.onCompleted()
                return Disposables.create()
            }.share(replay: 2 , scope:  .whileConnected )
    2022-03-17 15:11:09.723: SetupVC.swift:95 (testShareTwo()) -> subscribed
    2022-03-17 15:11:09.725: SetupVC.swift:95 (testShareTwo()) -> Event next(3)
    observer 1 === next(3)
    2022-03-17 15:11:09.725: SetupVC.swift:95 (testShareTwo()) -> Event next(13)
    observer 1 === next(13)
    2022-03-17 15:11:09.726: SetupVC.swift:95 (testShareTwo()) -> Event next(21)
    observer 1 === next(21)
    2022-03-17 15:11:09.726: SetupVC.swift:95 (testShareTwo()) -> Event next(37)
    observer 1 === next(37)
    2022-03-17 15:11:09.726: SetupVC.swift:95 (testShareTwo()) -> Event completed
    observer 1 === completed
    2022-03-17 15:11:09.726: SetupVC.swift:95 (testShareTwo()) -> isDisposed
    2022-03-17 15:11:09.726: SetupVC.swift:100 (testShareTwo()) -> subscribed
    2022-03-17 15:11:09.726: SetupVC.swift:100 (testShareTwo()) -> Event next(8)
    observer 2 === next(8)
    2022-03-17 15:11:09.726: SetupVC.swift:100 (testShareTwo()) -> Event next(18)
    observer 2 === next(18)
    2022-03-17 15:11:09.727: SetupVC.swift:100 (testShareTwo()) -> Event next(28)
    observer 2 === next(28)
    2022-03-17 15:11:09.727: SetupVC.swift:100 (testShareTwo()) -> Event next(38)
    observer 2 === next(38)
    2022-03-17 15:11:09.727: SetupVC.swift:100 (testShareTwo()) -> Event completed
    observer 2 === completed
    2022-03-17 15:11:09.728: SetupVC.swift:100 (testShareTwo()) -> isDisposed
    2022-03-17 15:11:09.728: SetupVC.swift:105 (testShareTwo()) -> subscribed
    2022-03-17 15:11:09.728: SetupVC.swift:105 (testShareTwo()) -> Event next(6)
    observer 3 === next(6)
    2022-03-17 15:11:09.728: SetupVC.swift:105 (testShareTwo()) -> Event next(15)
    observer 3 === next(15)
    2022-03-17 15:11:09.729: SetupVC.swift:105 (testShareTwo()) -> Event next(22)
    observer 3 === next(22)
    2022-03-17 15:11:09.729: SetupVC.swift:105 (testShareTwo()) -> Event next(35)
    observer 3 === next(35)
    2022-03-17 15:11:09.729: SetupVC.swift:105 (testShareTwo()) -> Event completed
    observer 3 === completed
    2022-03-17 15:11:09.729: SetupVC.swift:105 (testShareTwo()) -> isDisposed
    2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> subscribed
    2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event next(0)
    observer 4 === next(0)
    2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event next(13)
    observer 4 === next(13)
    2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event next(21)
    observer 4 === next(21)
    2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event next(33)
    observer 4 === next(33)
    2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event completed
    observer 4 === completed
    2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> isDisposed
    
    scopeには2つの設定値があります

  • .whileConnected(1人以上のオブザーバーがいる場合は再放送が保持され、処理が0の場合は再放送バッファが初期化されます)

  • .ever(観察者がいなくてもバッファを保つ)
  • whileConnectedは、disposeおよびdisposeBagによって最初の観察者を選択することができる.
    新しい観察者にも影響します.
            let observer1 = observable
             
                .subscribe { print("observer 1 === \($0)") }
            observer1.dispose()//첫번째 observer 지만 dispose 됨
            
            let observer2 = observable
                .subscribe {print("observer 2 === \($0)") }
            observer2.disposed(by: disposBag) // disposeBage 
            
            let observer3 = observable
               
                .subscribe { print("observer 3 === \($0)") }
            observer3.dispose()
    
            let observer4 = observable
               
                .subscribe { print("observer 4 === \($0)")}
                observer4.dispose()
    observer 1 === next(5)
    observer 1 === next(11)
    observer 1 === next(23)
    observer 1 === next(39)
    
    // observer 2 에 영향을 받음
    observer 2 === next(2)
    observer 2 === next(18)
    observer 2 === next(28)
    observer 2 === next(38)
    
    observer 3 === next(28)
    observer 3 === next(38)
    
    observer 4 === next(28)
    observer 4 === next(38)
    でもここでforeverを使えば
    share(replay: 2 , scope:  .forever )
    observer 1 === next(5)
    observer 1 === next(19)
    observer 1 === next(22) // a
    observer 1 === next(31) // b
    
    observer 2 === next(22) // a
    observer 2 === next(31) // b
    observer 2 === next(2)
    observer 2 === next(14)
    observer 2 === next(22)
    observer 2 === next(38)
    
    observer 3 === next(22)
    observer 3 === next(38)
    
    observer 4 === next(22)
    observer 4 === next(38)
    observer 1にobserver 2の1~2イベントとして3~4個のイベントを追加します.
    observer 2は、6番目の放出イベントではなく、観測可能な4番目の放出イベントを観察している.
    なぜなら、永続ストリームの内部キャッシュがクリアされていないからです.
    これがobserver 1から3~4番目のイベントを受信した理由です(bufferSize:2)
  • publish():この演算子は、通常の観測可能を接続可能な観測可能に変換します.
  •         let observable = Observable<Int>.create { observer  in
                observer.onNext(Int.random(in: 10..<30))
                return Disposables.create()
            }.publish()
            
            let observer = observable
                .debug()
                .subscribe(onNext : { print($0)})
            observer.disposed(by: disposBag)
            
            let observer2 = observable
                .debug()
                .subscribe(onNext : { print($0)})
            observer2.disposed(by: disposBag)
            
            observable.connect().disposed(by: disposBag)
            
            //print
    2022-03-17 15:57:43.384: SetupVC.swift:54 (testPublish()) -> subscribed
    2022-03-17 15:57:43.385: SetupVC.swift:59 (testPublish()) -> subscribed
    2022-03-17 15:57:43.387: SetupVC.swift:54 (testPublish()) -> Event next(21)
    21
    2022-03-17 15:57:43.387: SetupVC.swift:59 (testPublish()) -> Event next(21)
    21
  • 接続可視性:Subscriberがあってもconnect()を呼び出すまでアイテムは解放されません.connect()を呼び出してからアイテムの解放を開始します.
  • refcount():refcount()は、接続の観測不可能性を通常の観測可能性のように使用できるように、接続と切断を自動的に担当します.すなわち、速博アプリカウントまで数え続け、速博アプリカウントが0から1のときに接続()を行い、速博アプリカウントが0の場合は接続()を切断する.