RxSwiftで一つ前の値と現在の値を両方Subscribeする方法


Observable.zipを用いて、初期値を与え1つ目の値が流れてきたときにSubscribeする方法初期値を与えずに2つ目の値が流れたときにSubscribeする方法があります。

初期値を与える場合

extension ObservableType {
    func preSubscribe(startWith e: Self.E,
                      onNext: (((Self.E, Self.E)) -> Void)? = nil,
                      onError: ((Error) -> Void)? = nil,
                      onCompleted: (() -> Void)? = nil,
                      onDisposed: (() -> Void)? = nil) -> Disposable {
        return Observable.zip(self.startWith(e), self)
            .subscribe(onNext: onNext, onError: onError, onCompleted: onCompleted, onDisposed: onDisposed)
    }
}

初期値を与えない場合

extension ObservableType {
    func preSubscribe(onNext: (((Self.E, Self.E)) -> Void)? = nil,
                      onError: ((Error) -> Void)? = nil,
                      onCompleted: (() -> Void)? = nil,
                      onDisposed: (() -> Void)? = nil) -> Disposable {
        return Observable.zip(self, self.skip(1))
            .subscribe(onNext: onNext, onError: onError, onCompleted: onCompleted, onDisposed: onDisposed)
    }
}

使い方

let sequence = Observable.of(1, 2, 3)

sequence
    .preSubscribe(startWith: 0, onNext: { n0, n1 in
        print(n0, n1)
    })
    .disposed(by: disposeBag)

// (0, 1), (1, 2), (2, 3)

sequence
    .preSubscribe(onNext: { n0, n1 in
        print(n0, n1)
    })
    .disposed(by: disposeBag)

// (1, 2), (2, 3)