RxJS:forkJoin、zip、combineLatestの違いを詳しく説明します.

3936 ワード

前言forkJoinzipcombineLatestは、複数のストリームをマージするためのrxjsの中のマージオペレータである.多くの人が初めてrxjsに接触すると、その違いがよく分かりません.実はこれは正常です.一度のデータを送信するだけで閉じられるストリームであるとき、結果としてはこの3つのオペレータは何の違いもありません.
const ob1 = Rx.Observable.of(1).delay(1000);
const ob2 = Rx.Observable.of(2).delay(2000);
const ob3 = Rx.Observable.of(3).delay(3000);

Rx.Observable.forkJoin(ob1, ob2, ob3).subscribe((data) => console.log(data));
Rx.Observable.zip(ob1, ob2, ob3).subscribe((data) => console.log(data));
Rx.Observable.combineLatest(ob1, ob2, ob3).subscribe((data) => console.log(data));

// [1, 2, 3]
// [1, 2, 3]
// [1, 2, 3]
//    3      
httpには多くのオペレータの機能が似ていますが、その動作の流れがデータを何度も送信する場合にのみ、それらの間の違いが示されます.次に、rxjsforkJoinzip、およびcombineLatestを詳しく説明します.
一つの基本概念
まず、1つのストリーム(またはObservableシーケンス)のライフサイクルにおいて、送信データがnext信号(Notification)を発行し、送信が終了するとcomplete信号を送信し、エラーが発生するとerror信号を発行し、3つの信号はそれぞれobserverの3つの方法に対応することを知るべきである.next信号は、送信元のために0から数回送信される.一方、completeおよびerrorは、1つだけ送信され、1回だけ送信され、ストリームの終了を示している.subscribeは、上記3つの信号を処理するためのobserverオブジェクトを受信し、1つの関数だけがnext方法とみなされるので、subscribeに着信したnextの方法は、0回からN回、Nはシーケンスの正常な送信信号の回数を実行する.forkJoinforkJoinで結合されたストリームは、各結合されたストリームが終了信号を送るときに送信されるのも唯一のデータである.私たちが二つの流れを持っていると仮定します.
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.forkJoin(ob1, ob2).subscribe((data) => console.log(data));
// ["ob1:2", "ob2:1"]
ob1は、第3のデータを送信し終わったときに送信を停止し、ob2は第2のデータを送信し終わったときに停止し、forkJoinが結合した後のフローは、ob1およびob2が全部終了するまで1回のデータを送信し、つまりsubscribe内のコールバックをトリガし、受信したデータはob1およびob2の最後のデータの配列である.zipzipの動作原理は、各着信zipのストリームが第1のデータを送信し終えたとき、zipはこれらのデータを配列に結合して送信することである.これらのストリームが第2のデータを送信し終えたとき、zipは再びそれらを配列に結合して送信する.これに類推して、あるストリームが終了信号を送信し、統合されたストリーム全体が終了し、データが送信されなくなる.
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.zip(ob1, ob2).subscribe({
  next: (data) => console.log(data),
  complete: () => console.log('complete')
});
// ["ob1:0", "ob2:0"] ob1  ob2    ,    
// ["ob1:1", "ob2:1"]   ob2  ,         
// "complete"
zipforkJoinとの違いは、forkJoinは、各サブストリームの最後に送信された1回のデータのみをマージし、1回のフィードバックをトリガすることである.zipは、各サブストリームが1回のデータを送信し終わってから結合送信するのを待ち、その後、あるストリームが終了するまで待ち続ける(このとき、結合されたデータは各サブストリームのデータを含むことができないので).combineLatestcombineLatestzipとよく似ています.combineLatestは最初に各サブストリームが一度のデータを送信し終わるのを待っていますが、マージ時に、サブストリーム1が他のストリームの送信データを待つ間に新しいデータを再送信した場合、サブストリームの最新送信データを用いてマージし、その後、あるストリームが新しいデータを送信するたびに、他のストリームの同期の送信データを待たなくなります.他のストリームを使用する前の最近のデータを統合します.
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.combineLatest(ob1, ob2).subscribe({
  next: (data) => console.log(data),
  complete: () => console.log('complete')
});
// ["ob1:1", "ob2:0"] ob1  ob2  , ob2   ob1          ,  ob1      
// ["ob1:2", "ob2:0"] ob1               ,ob2      ,             
// ["ob1:2", "ob2:1"] ob2             , ob1      。
// "complete"
本期の内容は終了しました.次の号は引き続きrxjsの他のオペレータまたは概念の詳細を持ってきます.