連結フローの演算子


🔮  テンプレートグラフのソース:  https://reactivex.io/documentation/operators.html

連結フローの演算子


2つ以上のストリームをマージして新しいストリームの演算子を作成します.

startWith


特定のストリームの先頭にデータの演算子を挿入します.startWithパラメータとして渡される値は、購読後すぐに有効になります.
import RxSwift

let subject = PublishSubject<String>()
let disposeBag = DisposeBag()

subject
    .startWith("발행 0")
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

print("아직 발행 안함")
subject.onNext("발행 1")
subject.onNext("발행 2")
subject.onNext("발행 3")
//🖨 출력결과
발행 0 //👉 구독하는 순간에 바로 발행됨
아직 발행 안함
발행 1
발행 2
발행 3

Concat


複数のストリームを接続するフォーマット.アレイに渡す順番で簡単に接続します.次のストリームがボリューム期間の場合、concatは前のストリームが完了するまでパブリッシュされません.
import RxSwift

let disposeBag = DisposeBag()

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
let subject3 = PublishSubject<String>()

let observable = Observable.concat([subject1, subject2, subject3])

observable
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("subject1: 발행 1")
subject1.onNext("subject1: 발행 2")
subject1.onNext("subject1: 발행 3")

subject2.onNext("subject2: 발행 1") //🚫 1이 complete 되기 전에 2 발행 (출력 안됨)
subject1.onCompleted()

subject2.onNext("subject2: 발행 2")
subject3.onNext("subject3: 발행 1") //🚫 2가 complete 되기 전에 3 발행 (출력 안됨)
subject2.onCompleted()

subject3.onNext("subject3: 발행 2")
//🖨 출력 결과
subject1: 발행 1
subject1: 발행 2
subject1: 발행 3
subject2: 발행 2
subject3: 발행 2

merge


複数のストリームをconcatと同様に1つにマージします.ただし、順次マージするconcatとは異なり、mergeは並列マージされる.したがって、mergeもマージフローの発行に伴って発行されます.
import RxSwift

let disposeBag = DisposeBag()

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
let subject3 = PublishSubject<String>()

let observable = Observable.merge([subject1, subject2, subject3])

observable
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject3.onNext("subject3: 발행 1")
subject1.onNext("subject1: 발행 1")
subject1.onNext("subject1: 발행 2")
subject2.onNext("subject2: 발행 1")
subject1.onNext("subject1: 발행 3")
subject3.onNext("subject3: 발행 2")
subject2.onNext("subject2: 발행 2")
//🖨 출력 결과 👉 merge된 순서와 관계 없이 바로바로 발행됨
subject3: 발행 1
subject1: 발행 1
subject1: 발행 2
subject2: 발행 1
subject1: 발행 3
subject3: 발행 2
subject2: 발행 2

combineLatest


複数のストリームをペアリングしてリリースします.ペアリングができない場合は発行しません.
あるストリームでデータがパブリッシュされると、別のストリームで最近パブリッシュされた値とペアになります.これは、すべてのストリームが1回以上発行されている場合、任意のストリームが発行されると、組合せLatestが発行されることを意味します.
ペアリング表現を使用しますが、3つ以上のストリームをマージすることもできます.
ペアデータのフォーマットを定義するresultSelectorというモジュールを持つことができます.(省略すると配列形式で配布されます.)

2つのストリームをマージ

import RxSwift

let disposeBag = DisposeBag()

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
let subject3 = PublishSubject<String>()

let observable = Observable.combineLatest(subject1, subject2) { s1, s2 in
    "\(s1) 그리고 \(s2)"
}
//let observable = Observable.combineLatest([subject1, subject2, subject3])

observable
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("subject1: 발행 1")
subject1.onNext("subject1: 발행 2")
subject2.onNext("subject2: 발행 1") //👉 짝 지어져서 발행 시작!
subject1.onNext("subject1: 발행 3")
subject2.onNext("subject2: 발행 2")
//🖨 출력 결과
subject1: 발행 2 그리고 subject2: 발행 1 //👉 resultSelector에서 정의한 String의 모습
subject1: 발행 3 그리고 subject2: 발행 1
subject1: 발행 3 그리고 subject2: 발행 2

3つ以上のストリームをマージ

import RxSwift

let disposeBag = DisposeBag()

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
let subject3 = PublishSubject<String>()

let observable = Observable.combineLatest([subject1, subject2, subject3])

observable
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject3.onNext("subject3: 발행 1")
subject1.onNext("subject1: 발행 1")
subject1.onNext("subject1: 발행 2")
subject2.onNext("subject2: 발행 1") //👉 이 때 모든 스트림이 1번 이상 발행되면서 짝이 지어짐
subject1.onNext("subject1: 발행 3")
subject3.onNext("subject3: 발행 2")
subject2.onNext("subject2: 발행 2")
//🖨 출력 결과
["subject1: 발행 2", "subject2: 발행 1", "subject3: 발행 1"]
["subject1: 발행 3", "subject2: 발행 1", "subject3: 발행 1"]
["subject1: 발행 3", "subject2: 발행 1", "subject3: 발행 2"]
["subject1: 발행 3", "subject2: 발행 2", "subject3: 발행 2"]

withLatestFrom


1番目のストリームは、実行時に2番目のストリームで最も近いデータとペアリングされます.同様に、ペアリングができない場合は発行されません.
combineLatestとほぼ同じですが、3番目のストリームを作成するcombineとは異なり、演算子を使用して他のストリームを既存のストリームに直接マージできます.

resultSelectorが定義されていません。


最初のストリームはトリガのロールのみを実行し、2番目のストリームのデータのみを実行します.
import RxSwift

let disposeBag = DisposeBag()

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

let observable = subject1.withLatestFrom(subject2)

subject1
    .withLatestFrom(subject2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject2.onNext("1")
subject2.onNext("2")
subject1.onNext("A")
subject2.onNext("3")
subject2.onNext("4")
subject1.onNext("B")
//🖨 출력 결과
2
4

ResultSelectorが定義されています。


定義されたエンクロージャの戻り値を発行します.
import RxSwift

let disposeBag = DisposeBag()

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

let observable = subject1.withLatestFrom(subject2)

subject1
    .withLatestFrom(subject2) { "\($0)\($1)" }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject2.onNext("1")
subject2.onNext("2")
subject1.onNext("A")
subject2.onNext("3")
subject2.onNext("4")
subject1.onNext("B")
//🖨 출력 결과
A2
B4