命令式から応答式へ(五)
9868 ワード
前節のオペレータに続いて、テーマに直行しましょう.
クラスを組み合わせたオペレータは、異なるストリームデータを一定のルールでマージし、必要な完全なデータを得ることができます. combineLatest
インスタンスメソッド
複数の入力ストリームを1つの出力ストリームに結合し、出力ストリーム上のデータは、各入力ストリーム上の最後のデータ群から合成されたデータである.
いつでも入力ストリームからデータが発行されると、すべての入力ストリームの最新データを新しいデータに組み合わせて出力ストリームにプッシュします.キーは、任意の組み合わせられたストリームがデータを発行し、出力ストリームに新しいデータが生成されることです.
また、最後のパラメータとしてマッピング関数を受け入れることができ、すべての入力ストリームの値がマッピング関数のパラメータとして順次入力されます.
Observableは、すべての入力ストリームの最新値を含むストリーム、またはすべての最新値のマッピング値を含むストリームを返します.
例 withLatestFrom
インスタンスメソッド
複数の入力ストリームを1つの出力ストリームに結合し、出力ストリーム上のデータは各入力ストリーム上の最後のデータ群によって合成されるが、組合せ動作はソースObservableが値を発行したときにのみ発生し、ここでのソースObservableはwithLatestFromを呼び出すインスタンスを指す.つまり、ソースObservableが値を生成する場合にのみ、新しい値が生成されます.
戻り値Observableは、すべての入力ストリームの最新値を含むストリーム、またはすべての最新値のマッピング値を含むストリームです.
combineLatestと同様に、パラメータとしてマッピング関数を受け入れることもできます.
例 zip
インスタンスメソッド
すべての入力がストリームである場合、すべての入力ストリームの対応する位置の値を新しい値に結合し、この新しい値を出力ストリームの値とします.さらにzipオペレータは、配列、promise、文字列など、ストリームに変換可能なデータをパラメータとして直接使用することもできます.
戻り値Observableは、各入力ストリームに対応する値を新しい値に組み合わせて出力ストリームに送信するか、マッピング関数処理後に出力ストリームに送信する.
パラメータとしてマッピング関数を受け入れることもできます.
例 merge
インスタンスメソッド
すべての入力ストリームの値を同時にプッシュできるストリームを作成します.複数の入力ストリームを1つの入力ストリームに統合し、すべての入力ストリームの値をこの出力ストリームで得ることができます.
すべての入力ストリームのサブスクリプションをマージします.ソースObservableでもパラメータとして入力されたObservableでも、値を出力ストリームに順次プッシュするだけで、追加の変更はありません.出力ストリームは、すべての入力ストリームが完了してからのみ完了し、いずれの入力ストリームのエラーもすぐに出力ストリームにプッシュされます.
戻り値Observableは、すべての入力ストリーム上の値のObservableを送信することができる.
例 forkJoin
スタティツクメソッド
入力ストリームの最後の値をマージして出力ストリームに渡します.その効果はPromiseに等しい.all()です.したがって、複数の同時要求が結果を返す必要がある場合に使用できます.
forkJoinは、任意の数の入力ストリームをパラメータまたは配列で受信することができる.入力ストリームに転送されない場合、出力ストリームはすぐに終了通知を発行します.
すべての入力ストリームが終了通知を発行し、すべての入力ストリームが発行する最終値を含む配列が発行されるので、n個の入力ストリームが入力され、得られる配列にはn個の要素が含まれ、各要素は対応する順序の入力ストリームから最後の値となる.これは、出力ストリームが1つの値しか発行されず、終了通知が発行されることを意味します.
出力ストリームで取得された配列の長さを入力ストリームの数と一致させるために、ある入力ストリームが終了通知を発行する前に有効な値を発行しなかった場合、出力ストリームもすぐに終了し、他の入力ストリームで有効な値が発行される可能性があるにもかかわらず、出力ストリームはすぐに終了し、何も発行されません.逆に、1つの入力ストリームが終了通知を発行していない場合、出力ストリームは値を発行しません.他の入力が前述したように終了通知のみを送信しない限り.総じて、出力ストリームに値を発行させるためには、すべての入力ストリームが終了通知を発行し、その前に少なくとも1つの有効値を発行しなければならない.
入力ストリームのいずれかがエラー通知を発行した場合、出力ストリームは直ちにこのエラー通知を発行し、他のストリームのサブスクリプションを直ちにキャンセルします.
戻り値Observableが配列形式で取得した各入力ストリームの値、またはマッピング関数からの値を返します.
パラメータとしてマッピング関数を受信できます.
例
ここでは主に高次ストリームを処理できるオペレータについて説明します. map
インスタンスメソッド
出力ストリームの値は、マッピング関数を使用して入力ストリームの値をマッピングして新しい値を得ることです.
戻り値Observableは、マッピングされた値のストリームを発行します.
例 mergeMap
インスタンスメソッド
すべての入力ストリームの値を1つのストリームにマージします.
出力ストリームは、マッピング関数から返されるすべての内部ストリームを1つのストリームに平らにします.マッピング関数は、入力ストリームの値を使用して内部ストリームを生成できます.
戻り値Observableは、すべてのマッピング関数を結合して生成された内部ストリームのストリームであり、内部ストリームの値はこのストリームで発行されます.
例 switchMap
インスタンスメソッド
入力ストリームが値を発行するときは、それを内部ストリームにマッピングし、この内部ストリームを出力ストリームに平成化し、出力ストリームには最も近い内部ストリームの値しか発行されません.
出力ストリームの値は、マッピング関数によって入力ストリームの値の基本的に生成された内部ストリームによって発行され、出力ストリームは1つの内部ストリームのみを観察することができ、新しい内部ストリームが到着すると、出力ストリームは前の内部ストリームの購読をキャンセルし、この最新の内部ストリームを購読し、その上の値を発行します.
戻り値Observableは、最新の内部ストリームからのみ値を取得するストリームです.
例 concatMap
インスタンスメソッド
複数のストリームを1つのストリームにマージするには、現在のストリームが完了するまで次のマージを開始する必要があります.マージプロセスは、入力された順序で実行されます.
戻り値Observable入力した値をストリームにマッピングして接続するストリーム.
例 groupBy
インスタンスメソッド
入力ストリームの値を一定の規則に従って異なるストリームに分割し、出力ストリームに送信します.各ストリームには同じ条件を満たす値のセットがあります.
戻り値Observableは、パケット後のストリームの高次ストリームを発行し、パケットのストリームには一意のkeyがあり、そのストリームの値は入力ストリーム上である条件に合致する値である.
例
reduce
インスタンスメソッド
ソースObservableに累積関数を適用し、ソースObservableが終了すると累積値を出力ストリームにプッシュし、初期の累積値を受け入れることができます.
このオペレータの動作は配列のreduce法と同じである.1つの累積関数を使用してストリーム上の各値を処理し、前回の累積後の値を次の累積の初期値として、累積によって得られた最終値を出力ストリームにプッシュします.なお、reduceオペレータは、ソースObservableが終了通知を発行した後に1つの値しか発行しません.
ソースObservableで発行された値は、累積関数で処理されます.初期値が指定されている場合、この値は累積プロセス全体の初期値となり、指定されていない場合、ソースObservableから発行される最初の値は累積プロセス全体の初期値となります.
戻り値Observableは、ソースObservable上の値をすべて累積関数で計算した値を値として送信するストリームである.
例 scan
インスタンスメソッド
入力ストリームに累積関数を適用し、累積のたびに出力ストリームに送信し、累積時にオプションの初期値を受け入れることができます.reduceオペレータと同様に、蓄積されるたびに結果が送信されます.
戻り値Observableは、累積値のストリームを出力します.
例
オペレータはここまで紹介されていますが、非常によく使われているものがあります.また、紹介されていないものは役に立たないわけではありません.必要なものは自分で検索してください.オペレータ間の組み合わせによって、非常に効率的で便利なデータ処理を実現することができます.
コンポジットクラスオペレータ
クラスを組み合わせたオペレータは、異なるストリームデータを一定のルールでマージし、必要な完全なデータを得ることができます.
インスタンスメソッド
複数の入力ストリームを1つの出力ストリームに結合し、出力ストリーム上のデータは、各入力ストリーム上の最後のデータ群から合成されたデータである.
いつでも入力ストリームからデータが発行されると、すべての入力ストリームの最新データを新しいデータに組み合わせて出力ストリームにプッシュします.キーは、任意の組み合わせられたストリームがデータを発行し、出力ストリームに新しいデータが生成されることです.
-a------b----c-----d------e---|
--1--2-----3--4-----5---6---|
combineLatest
--a1-a2-b2--b3-c3-c4----d4-d5---e5-e6-|
また、最後のパラメータとしてマッピング関数を受け入れることができ、すべての入力ストリームの値がマッピング関数のパラメータとして順次入力されます.
Observableは、すべての入力ストリームの最新値を含むストリーム、またはすべての最新値のマッピング値を含むストリームを返します.
例
const weight = of(70,72,76,79,75);
const height = of(1.76,1.77,1.78);
const bmi = weight.combineLatest(height, (w, h) => w/h*h);
bmi.subscribe(x => console.log('BMI is ' + x);
インスタンスメソッド
複数の入力ストリームを1つの出力ストリームに結合し、出力ストリーム上のデータは各入力ストリーム上の最後のデータ群によって合成されるが、組合せ動作はソースObservableが値を発行したときにのみ発生し、ここでのソースObservableはwithLatestFromを呼び出すインスタンスを指す.つまり、ソースObservableが値を生成する場合にのみ、新しい値が生成されます.
-a-----b--------------c------------d--e---|-->
----1-----2-----3--4--------5----|---->
withLatestFrom
------b1------------c4-----------d5--e5---|-->
戻り値Observableは、すべての入力ストリームの最新値を含むストリーム、またはすべての最新値のマッピング値を含むストリームです.
combineLatestと同様に、パラメータとしてマッピング関数を受け入れることもできます.
例
const clicks = fromEvent(document, 'click');
const timer = interval(1000);
const result = clicks.withLatestFrom(timer)
result.subscribe(x => console.log(x));
インスタンスメソッド
すべての入力がストリームである場合、すべての入力ストリームの対応する位置の値を新しい値に結合し、この新しい値を出力ストリームの値とします.さらにzipオペレータは、配列、promise、文字列など、ストリームに変換可能なデータをパラメータとして直接使用することもできます.
--a------b------c------d---2--|->
-----e------f------g-----h--|->
zip
----ae------fb-----gc----hd-|->
戻り値Observableは、各入力ストリームに対応する値を新しい値に組み合わせて出力ストリームに送信するか、マッピング関数処理後に出力ストリームに送信する.
パラメータとしてマッピング関数を受け入れることもできます.
例
const obs1 = from([1,2,3,4]);
const obs2 = from(['a','b','c']);
obs1.zip(obs2)
.subscribe(v => console.log(v));
const obs = interval(1000);
const promise = new Promise(resolve => {
setTimeout(() => resolve('hello'), 2000);
});
obs.zip(promise, (obs, promise) => promise + obs)
.subscribe(v => console.log(v));
インスタンスメソッド
すべての入力ストリームの値を同時にプッシュできるストリームを作成します.複数の入力ストリームを1つの入力ストリームに統合し、すべての入力ストリームの値をこの出力ストリームで得ることができます.
-a----------b----------c---|----->
------d---------e----------e----|-->
merge
-a----d----b----e-----c----e----|-->
すべての入力ストリームのサブスクリプションをマージします.ソースObservableでもパラメータとして入力されたObservableでも、値を出力ストリームに順次プッシュするだけで、追加の変更はありません.出力ストリームは、すべての入力ストリームが完了してからのみ完了し、いずれの入力ストリームのエラーもすぐに出力ストリームにプッシュされます.
戻り値Observableは、すべての入力ストリーム上の値のObservableを送信することができる.
例
const clicks = fromEvent(document, 'click');
const timer = interval(1000);
const clicksOrTimer = clicks.merge(timer);
clicksOrTimer.subscribe(x => console.log(x));
スタティツクメソッド
入力ストリームの最後の値をマージして出力ストリームに渡します.その効果はPromiseに等しい.all()です.したがって、複数の同時要求が結果を返す必要がある場合に使用できます.
forkJoinは、任意の数の入力ストリームをパラメータまたは配列で受信することができる.入力ストリームに転送されない場合、出力ストリームはすぐに終了通知を発行します.
すべての入力ストリームが終了通知を発行し、すべての入力ストリームが発行する最終値を含む配列が発行されるので、n個の入力ストリームが入力され、得られる配列にはn個の要素が含まれ、各要素は対応する順序の入力ストリームから最後の値となる.これは、出力ストリームが1つの値しか発行されず、終了通知が発行されることを意味します.
出力ストリームで取得された配列の長さを入力ストリームの数と一致させるために、ある入力ストリームが終了通知を発行する前に有効な値を発行しなかった場合、出力ストリームもすぐに終了し、他の入力ストリームで有効な値が発行される可能性があるにもかかわらず、出力ストリームはすぐに終了し、何も発行されません.逆に、1つの入力ストリームが終了通知を発行していない場合、出力ストリームは値を発行しません.他の入力が前述したように終了通知のみを送信しない限り.総じて、出力ストリームに値を発行させるためには、すべての入力ストリームが終了通知を発行し、その前に少なくとも1つの有効値を発行しなければならない.
入力ストリームのいずれかがエラー通知を発行した場合、出力ストリームは直ちにこのエラー通知を発行し、他のストリームのサブスクリプションを直ちにキャンセルします.
----2---3------4----5--|-->
----a----b------------d---|-->
forkJoin
--------------------------5d|
戻り値Observableが配列形式で取得した各入力ストリームの値、またはマッピング関数からの値を返します.
パラメータとしてマッピング関数を受信できます.
例
const observable = forkJoin(
of(1, 2, 3, 4),
of(5, 6, 7, 8)
);
observable.subscribe(
value => console.log(value),
err => {},
() => console.log('This is how it ends!')
);
const observable = forkJoin(
interval(1000).take(3),
interval(500).take(4)
);
observable.subscribe(
value => console.log(value),
err => {},
() => console.log('This is how it ends!')
);
変換タイプオペレータ
ここでは主に高次ストリームを処理できるオペレータについて説明します.
インスタンスメソッド
出力ストリームの値は、マッピング関数を使用して入力ストリームの値をマッピングして新しい値を得ることです.
--------1-----------2----------3----------|--->
map(v => v * 10);
--------10----------20---------30---------|---->
戻り値Observableは、マッピングされた値のストリームを発行します.
例
fromEvent(document, 'click')
.map(event => event.clientX)
.subscribe(v => console.log(v));
インスタンスメソッド
すべての入力ストリームの値を1つのストリームにマージします.
-1-----2-----3---------|-->
---2----2----2--|-----> // , 2,3 ,
mergeMap(v => Observable.of(v + 1).repeat(3));
-2--2--2--3--3--3--4--4--4----|-->
出力ストリームは、マッピング関数から返されるすべての内部ストリームを1つのストリームに平らにします.マッピング関数は、入力ストリームの値を使用して内部ストリームを生成できます.
戻り値Observableは、すべてのマッピング関数を結合して生成された内部ストリームのストリームであり、内部ストリームの値はこのストリームで発行されます.
例
of('a','b','c')
.mergeMap(v => Rx.Observable.interval(1000).map(x => x + v ))
.subscribe(v => console.log(v));
const source = of('Hello');
const createPromise = v => new Promise(resolve => resolve(`I got ${v} from promise`));
source.mergeMap(
v => createPromise(v),
(outValue, innerValue) => `Source: ${outValue},${innerValue}`
)
.subscribe(v => console.log(v));
インスタンスメソッド
入力ストリームが値を発行するときは、それを内部ストリームにマッピングし、この内部ストリームを出力ストリームに平成化し、出力ストリームには最も近い内部ストリームの値しか発行されません.
-1---------3-----5----|->
-10---10---10-| //
switchMap(v => Observable.from([10,10,10]).map(x => x * v))
-10---10---10-30---30-50---50---50-|
出力ストリームの値は、マッピング関数によって入力ストリームの値の基本的に生成された内部ストリームによって発行され、出力ストリームは1つの内部ストリームのみを観察することができ、新しい内部ストリームが到着すると、出力ストリームは前の内部ストリームの購読をキャンセルし、この最新の内部ストリームを購読し、その上の値を発行します.
戻り値Observableは、最新の内部ストリームからのみ値を取得するストリームです.
例
fromEvent(document, 'click')
.switchMap(event => interval(1000))
.subscribe(v => console.log(v));
インスタンスメソッド
複数のストリームを1つのストリームにマージするには、現在のストリームが完了するまで次のマージを開始する必要があります.マージプロセスは、入力された順序で実行されます.
--1------------3---------5-----------|-->
--10---10---10--|-->
concatMap(i => 10*i----10*i---10*i)
--10---10---10-30---30---30-50---50---50--|->
戻り値Observable入力した値をストリームにマッピングして接続するストリーム.
例
fromEvent(document, 'click')
.concatMap(event => Rx.Observable.interval(1000).take(3))
.subscribe(v => console.log(v));
インスタンスメソッド
入力ストリームの値を一定の規則に従って異なるストリームに分割し、出力ストリームに送信します.各ストリームには同じ条件を満たす値のセットがあります.
----1----2----3----4----5----|->
groupBy(v => v%2);
-----------------------------|->
\ \
\ 2---------4---------|
1------3----------5----|
戻り値Observableは、パケット後のストリームの高次ストリームを発行し、パケットのストリームには一意のkeyがあり、そのストリームの値は入力ストリーム上である条件に合致する値である.
例
of(
{id: 1, name: 'aze1'},
{id: 2, name: 'sf2'},
{id: 2, name: 'dg2'},
{id: 1, name: 'erg1'},
{id: 1, name: 'df1'},
{id: 2, name: 'sf2'},
{id: 3, name: 'qfs3'},
{id: 2, name: 'qsg'}
)
.groupBy(v => v.id)
.mergeMap(group => group.reduce((acc,cur) => [...acc, cur],[]))
.subscribe(v => console.log(v))
集約クラスオペレータ
インスタンスメソッド
ソースObservableに累積関数を適用し、ソースObservableが終了すると累積値を出力ストリームにプッシュし、初期の累積値を受け入れることができます.
--------1-------2-------3---|-->
reduce((acc,cur) => acc + cur, 0);
----------------------------4|
このオペレータの動作は配列のreduce法と同じである.1つの累積関数を使用してストリーム上の各値を処理し、前回の累積後の値を次の累積の初期値として、累積によって得られた最終値を出力ストリームにプッシュします.なお、reduceオペレータは、ソースObservableが終了通知を発行した後に1つの値しか発行しません.
ソースObservableで発行された値は、累積関数で処理されます.初期値が指定されている場合、この値は累積プロセス全体の初期値となり、指定されていない場合、ソースObservableから発行される最初の値は累積プロセス全体の初期値となります.
戻り値Observableは、ソースObservable上の値をすべて累積関数で計算した値を値として送信するストリームである.
例
const clicksInFiveSeconds = fromEvent(document, 'click')
.takeUntil(interval(5000));
const ones = clicksInFiveSeconds.mapTo(1);
const seed = 0;
const count = ones.reduce((acc,cur) => acc + cur, seed);
count.subscribe(v => console.log(v));
インスタンスメソッド
入力ストリームに累積関数を適用し、累積のたびに出力ストリームに送信し、累積時にオプションの初期値を受け入れることができます.reduceオペレータと同様に、蓄積されるたびに結果が送信されます.
戻り値Observableは、累積値のストリームを出力します.
例
fromEvent(document, 'click')
.mapTo(1)
.scan((acc,cur) => acc + cur, 0)
.subscribe(v => console.log(v))
オペレータはここまで紹介されていますが、非常によく使われているものがあります.また、紹介されていないものは役に立たないわけではありません.必要なものは自分で検索してください.オペレータ間の組み合わせによって、非常に効率的で便利なデータ処理を実現することができます.