[RxSwift] - Time Based Operators (Buffer,DelaySubscription,Replay, delay, interval, timer, window)


1. Replay


パブリッシュ後にアイテムをサブスクリプションしても、すべてのサブスクリプションが同じパブリッシュ済みアイテムシーケンスを表示できることが観察されます.
let hi = PublishSubject<String>()
let parrot = hi.replay(1)
parrot.connect()

hi.onNext("hi")
hi.onNext("hello")

parrot
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)
hi.onNext("good")
  • を使用して関連演算子を再生する場合はconnect()に接続する必要があります.
  • 再生(bufferSize)で1を作成したので、最新のポップアップアイテムを受け取ります.
  • //運転時出力:
    hello
    good
    bufferSizeが2の場合、helloの前にhiが出力されます.

    + replayAll


    :サブスクリプションの前に解放されたすべてのアイテムを、再放送と同じですがbuffersizeとは関係ありません.
    let a = PublishSubject<String>()
    let statements = a.replayAll()
    statements.connect()
    
    a.onNext("a")
    a.onNext("b")
    
    statements
        .subscribe(onNext: {
            print($0)
        })
        .disposed(by: disposeBag)
    
    //運転時出力:
    a
    b

    2. Buffer


    :観察可能なアイテムを定期的に収集し、アイテムではなくbundleに並べます.
    
     public func buffer(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
            -> Observable<[Element]> {
            BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
        }
  • timeSpan:bufferの最大時間長
  • count:bufferが最も多く持つことができる項目数
  • スケジューラ:バッファタイマを実行するスケジューラ
  • let source = PublishSubject<String>()
    
    var count = 0
    let timer = DispatchSource.makeTimerSource()
    
    timer.schedule(deadline: .now() + 2, repeating: .seconds(1))
    timer.setEventHandler {
        count += 1
        source.onNext("\(count)")
    }
    timer.resume()
    
    source
        .buffer(timeSpan: .seconds(2), count: 2, scheduler: MainScheduler.instance)
        .subscribe(onNext: {
            print($0)
        })
        .disposed(by: disposeBag)
  • 初回購読、または前回のイベント発生から最大2秒、または
  • bufferには2つの例外があります
    イベントを解放します.
  • //運転時出力:
    ["1"]//2秒以内に1しか受信しないため、1のみ出力する
    ["2"、"3")/item数が最大値(2)に達したため、出力
    ["4", "5"]
    ["6", "7"]
    ["8", "9"]
    ["10", "11"]
    ["12"]
    ["13", "14"]
    ["15", "16"]
    ["17", "18"]
    .
    .
    .//タイマーなので出力を続けます.

    3. Window


    :観察可能項目を周期的に観察可能ウィンドウに分割し、これらの観察可能ウィンドウを解放する.
     public func window(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
            -> Observable<Observable<Element>> {
                return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
        }
    パラメータは
  • bufferと同じですが、戻り値は異なります.
  • bufferは観測可能な<[要素]を放出し、windowは観測可能な<観測可能な>を放出する.
  • 観測可能な測定を統合して観測可能な測定シーケンスで放出したFlatMapを窓として操作した結果を見てみましょう.
    let window = PublishSubject<String>()
    var windowCount = 0
    let windowTimeSource = DispatchSource.makeTimerSource()
    
    windowTimeSource.schedule(deadline: .now()+2, repeating: .seconds(1))
    windowTimeSource.setEventHandler {
        windowCount += 1
        window.onNext("\(windowCount)")
    }
    windowTimeSource.resume()
    
    window
        .window(timeSpan: .seconds(3), count: 3, scheduler: MainScheduler.instance)
        .flatMap { windowObservable -> Observable<(index: Int, element: String)> in
            return windowObservable.enumerated()
        }
        .subscribe(onNext: {
            print("\($0.index) 번째 observable의 요소 \($0.element)")
        })
        .disposed(by: disposeBag)
       
  • 初回購読、または前回のイベント発生から最大3秒、または
  • windowには3つの異常項目があります
    イベントを解放します.
  • //運転時出力:
    0番目の観察可能元素1
    最初の観察可能元素2
    0番目の観察可能元素3
    最初の観察可能元素4
    2番目の観察可能元素5
    0番目の観察可能元素6
    最初の観察可能元素7
    2番目の観察可能元素8
    0番目の観察可能元素9
    1番目の観察可能元素10
    2番目の観察可能元素11
    .
    .
    .//タイマーの使用を続行します.
  • timespanは3、countは3で、出力は以下の通りです.
  • countが3でtimespanが2の場合、0と1つの可視要素のみが出力されます.
  • 4. DelaySubscription


    :遅延可能なサブスクリプションソースの観測可能な演算子
    public func delaySubscription(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
            -> Observable<Element> {
            DelaySubscription(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
        }
  • dueTimeパラメータの遅延時間を作成します.
  • let delaysource = PublishSubject<String>()
    
    var delayCount = 0
    let delayTimeSource = DispatchSource.makeTimerSource()
    delayTimeSource.schedule(deadline: .now(), repeating: .seconds(1))
    delayTimeSource.setEventHandler {
        delayCount += 1
        delaysource.onNext("\(delayCount)")
    }
    delayTimeSource.resume()
    
    delaysource
        .delaySubscription(.seconds(5), scheduler: MainScheduler.instance)
        .subscribe(onNext: {
            print($0)
        })
        .disposed(by: disposeBag)
  • 5秒後に購読を開始します.
  • //運転時出力:
    6
    7
    8
    9
    10
    11
    12
    13
    .
    .
    .//タイマーなので出力を続けます.

    + Delay


    :特定の時間内に観察可能なコンテンツを移動することによって、放出を遅延させる.
    public func delay(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
            -> Observable<Element> {
                return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
        }
        
  • dueTimeパラメータに遅延解放時間を作成します.
  • let delaySubject = PublishSubject<String>()
    
    var delayCount = 0
    let delayTimerSource = DispatchSource.makeTimerSource()
    delayTimerSource.schedule(deadline: .now(), repeating: .seconds(1))
    delayTimerSource.setEventHandler {
        delayCount += 1
        delaySubject.onNext("\(delayCount)")
    }
    delayTimerSource.resume()
    
    delaySubject
        .delay(.seconds(3), scheduler: MainScheduler.instance)
        .subscribe(onNext: {
            print($0)
        })
        .disposed(by: disposeBag)
  • 3秒後に解放された.
  • //運転時出力:
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    delay(.seconds(5)の場合、schedule:MainScheduler.instance)でも出力値は同じです.
    釈放時間を遅らせただけだからだ.
    では、これらの違いをどのように知るのでしょうか.


    遅延時間が3秒の場合、timerとtimerの回数は4回異なる.
    5秒遅れると2秒遅れる.
    timerは回数と6回差があり、2回増加していることがわかります.

    5. Interval


    :指定した時間間隔で整数を解放するObserverableを作成します.
    
    Observable<Int>
    	.interval(.second(3), schedule: MainSchedule.instance)
        .subscribe(onNext: {
        	print($0)
        }
        .dispose(by: disposeBag)
        
    //運転時出力:
    0
    1
    2
    3
    .
    .
    出力を続行します.

    6. Timer


    :所与の遅延後にアイテムを解放する観察可能なアイテムを作成します.
    public static func timer(_ dueTime: RxSwift.RxTimeInterval, period: RxSwift.RxTimeInterval? = nil, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Self.Element>
  • dueTime:最初の値
  • が生成されるタイミング
  • サイクル:次の値を生成する間隔
  • Observable<Int>
        .timer(.seconds(5), period: .seconds(2), scheduler: MainScheduler.instance)
        .subscribe(onNext: {
            print($0)
        })
        .disposed(by: disposeBag)
    //運転時出力:
    0//サブスクリプション開始後5秒で出力します.
    1/0出力、2秒後出力.
    2/1が出力され、2秒後に推進されます.
    3
    4
    5
    .
    .
    出力を続行します.
    ソース-Reactive X