命令式から応答式へ(五)


前節のオペレータに続いて、テーマに直行しましょう.

コンポジットクラスオペレータ


クラスを組み合わせたオペレータは、異なるストリームデータを一定のルールでマージし、必要な完全なデータを得ることができます.
  • combineLatest

  • インスタンスメソッド
    複数の入力ストリームを1つの出力ストリームに結合し、出力ストリーム上のデータは、各入力ストリーム上の最後のデータ群から合成されたデータである.
    いつでも入力ストリームからデータが発行されると、すべての入力ストリームの最新データを新しいデータに組み合わせて出力ストリームにプッシュします.キーは、任意の組み合わせられたストリームがデータを発行し、出力ストリームに新しいデータが生成されることです.
        -a------b----c-----d------e---|
    
        --1--2-----3--4-----5---6---|
    
                combineLatest
    
        --a1-a2-b2--b3-c3-c4----d4-d5---e5-e6-|
    

    また、最後のパラメータとしてマッピング関数を受け入れることができ、すべての入力ストリームの値がマッピング関数のパラメータとして順次入力されます.
    Observableは、すべての入力ストリームの最新値を含むストリーム、またはすべての最新値のマッピング値を含むストリームを返します.
    const weight = of(70,72,76,79,75);
    
    const height = of(1.76,1.77,1.78);
    
    const bmi = weight.combineLatest(height, (w, h) => w/h*h);
    
    bmi.subscribe(x => console.log('BMI is ' + x);
    
  • withLatestFrom

  • インスタンスメソッド
    複数の入力ストリームを1つの出力ストリームに結合し、出力ストリーム上のデータは各入力ストリーム上の最後のデータ群によって合成されるが、組合せ動作はソースObservableが値を発行したときにのみ発生し、ここでのソースObservableはwithLatestFromを呼び出すインスタンスを指す.つまり、ソースObservableが値を生成する場合にのみ、新しい値が生成されます.
        -a-----b--------------c------------d--e---|-->
    
        ----1-----2-----3--4--------5----|---->
    
                withLatestFrom
    
        ------b1------------c4-----------d5--e5---|-->
    

    戻り値Observableは、すべての入力ストリームの最新値を含むストリーム、またはすべての最新値のマッピング値を含むストリームです.
    combineLatestと同様に、パラメータとしてマッピング関数を受け入れることもできます.
    const clicks = fromEvent(document, 'click');
    const timer = interval(1000);
    const result = clicks.withLatestFrom(timer)
    
    result.subscribe(x => console.log(x));
    
  • zip

  • インスタンスメソッド
    すべての入力がストリームである場合、すべての入力ストリームの対応する位置の値を新しい値に結合し、この新しい値を出力ストリームの値とします.さらにzipオペレータは、配列、promise、文字列など、ストリームに変換可能なデータをパラメータとして直接使用することもできます.
    --a------b------c------d---2--|->
    
    -----e------f------g-----h--|->
    
                zip
    
    ----ae------fb-----gc----hd-|->
    

    戻り値Observableは、各入力ストリームに対応する値を新しい値に組み合わせて出力ストリームに送信するか、マッピング関数処理後に出力ストリームに送信する.
    パラメータとしてマッピング関数を受け入れることもできます.
    const obs1 = from([1,2,3,4]);
    const obs2 = from(['a','b','c']);
    
    obs1.zip(obs2)
        .subscribe(v => console.log(v));
    
    const obs = interval(1000);
    const promise = new Promise(resolve => {
        setTimeout(() => resolve('hello'), 2000);
    });
    
    obs.zip(promise, (obs, promise) => promise + obs)
        .subscribe(v => console.log(v));
    
  • merge

  • インスタンスメソッド
    すべての入力ストリームの値を同時にプッシュできるストリームを作成します.複数の入力ストリームを1つの入力ストリームに統合し、すべての入力ストリームの値をこの出力ストリームで得ることができます.
        -a----------b----------c---|----->
    
        ------d---------e----------e----|-->
    
                    merge
    
        -a----d----b----e-----c----e----|-->
    

    すべての入力ストリームのサブスクリプションをマージします.ソースObservableでもパラメータとして入力されたObservableでも、値を出力ストリームに順次プッシュするだけで、追加の変更はありません.出力ストリームは、すべての入力ストリームが完了してからのみ完了し、いずれの入力ストリームのエラーもすぐに出力ストリームにプッシュされます.
    戻り値Observableは、すべての入力ストリーム上の値のObservableを送信することができる.
    const clicks = fromEvent(document, 'click');
    const timer = interval(1000);
    const clicksOrTimer = clicks.merge(timer);
    
    clicksOrTimer.subscribe(x => console.log(x));
    
  • forkJoin

  • スタティツクメソッド
    入力ストリームの最後の値をマージして出力ストリームに渡します.その効果はPromiseに等しい.all()です.したがって、複数の同時要求が結果を返す必要がある場合に使用できます.
    forkJoinは、任意の数の入力ストリームをパラメータまたは配列で受信することができる.入力ストリームに転送されない場合、出力ストリームはすぐに終了通知を発行します.
    すべての入力ストリームが終了通知を発行し、すべての入力ストリームが発行する最終値を含む配列が発行されるので、n個の入力ストリームが入力され、得られる配列にはn個の要素が含まれ、各要素は対応する順序の入力ストリームから最後の値となる.これは、出力ストリームが1つの値しか発行されず、終了通知が発行されることを意味します.
    出力ストリームで取得された配列の長さを入力ストリームの数と一致させるために、ある入力ストリームが終了通知を発行する前に有効な値を発行しなかった場合、出力ストリームもすぐに終了し、他の入力ストリームで有効な値が発行される可能性があるにもかかわらず、出力ストリームはすぐに終了し、何も発行されません.逆に、1つの入力ストリームが終了通知を発行していない場合、出力ストリームは値を発行しません.他の入力が前述したように終了通知のみを送信しない限り.総じて、出力ストリームに値を発行させるためには、すべての入力ストリームが終了通知を発行し、その前に少なくとも1つの有効値を発行しなければならない.
    入力ストリームのいずれかがエラー通知を発行した場合、出力ストリームは直ちにこのエラー通知を発行し、他のストリームのサブスクリプションを直ちにキャンセルします.
        ----2---3------4----5--|-->
    
        ----a----b------------d---|-->
    
                    forkJoin
    
        --------------------------5d|
    

    戻り値Observableが配列形式で取得した各入力ストリームの値、またはマッピング関数からの値を返します.
    パラメータとしてマッピング関数を受信できます.
    const observable = forkJoin(
        of(1, 2, 3, 4),
        of(5, 6, 7, 8)
    );
    
    observable.subscribe(
        value => console.log(value),
        err => {},
        () => console.log('This is how it ends!')
    );
    
    const observable = forkJoin(
        interval(1000).take(3),
        interval(500).take(4)
    );
    
    observable.subscribe(
        value => console.log(value),
        err => {},
        () => console.log('This is how it ends!')
    );
    

    変換タイプオペレータ


    ここでは主に高次ストリームを処理できるオペレータについて説明します.
  • map

  • インスタンスメソッド
    出力ストリームの値は、マッピング関数を使用して入力ストリームの値をマッピングして新しい値を得ることです.
        --------1-----------2----------3----------|--->
    
                map(v => v * 10);
    
        --------10----------20---------30---------|---->
    

    戻り値Observableは、マッピングされた値のストリームを発行します.
    fromEvent(document, 'click')
        .map(event => event.clientX)
        .subscribe(v => console.log(v));
    
  • mergeMap

  • インスタンスメソッド
    すべての入力ストリームの値を1つのストリームにマージします.
        -1-----2-----3---------|-->
    
        ---2----2----2--|-----> //        , 2,3    ,          
    
                mergeMap(v => Observable.of(v + 1).repeat(3));
    
        -2--2--2--3--3--3--4--4--4----|-->
    

    出力ストリームは、マッピング関数から返されるすべての内部ストリームを1つのストリームに平らにします.マッピング関数は、入力ストリームの値を使用して内部ストリームを生成できます.
    戻り値Observableは、すべてのマッピング関数を結合して生成された内部ストリームのストリームであり、内部ストリームの値はこのストリームで発行されます.
    of('a','b','c')
        .mergeMap(v => Rx.Observable.interval(1000).map(x => x + v ))
        .subscribe(v => console.log(v));
    
    const source = of('Hello');
    
    const createPromise = v => new Promise(resolve => resolve(`I got ${v} from promise`));
    
    source.mergeMap(
        v => createPromise(v),
        (outValue, innerValue) => `Source: ${outValue},${innerValue}`
    )
    .subscribe(v => console.log(v));
    
  • switchMap

  • インスタンスメソッド
    入力ストリームが値を発行するときは、それを内部ストリームにマッピングし、この内部ストリームを出力ストリームに平成化し、出力ストリームには最も近い内部ストリームの値しか発行されません.
        -1---------3-----5----|->
    
        -10---10---10-| //    
    
                switchMap(v => Observable.from([10,10,10]).map(x => x * v))
    
        -10---10---10-30---30-50---50---50-|
    

    出力ストリームの値は、マッピング関数によって入力ストリームの値の基本的に生成された内部ストリームによって発行され、出力ストリームは1つの内部ストリームのみを観察することができ、新しい内部ストリームが到着すると、出力ストリームは前の内部ストリームの購読をキャンセルし、この最新の内部ストリームを購読し、その上の値を発行します.
    戻り値Observableは、最新の内部ストリームからのみ値を取得するストリームです.
    fromEvent(document, 'click')
        .switchMap(event => interval(1000))
        .subscribe(v => console.log(v));
    
  • concatMap

  • インスタンスメソッド
    複数のストリームを1つのストリームにマージするには、現在のストリームが完了するまで次のマージを開始する必要があります.マージプロセスは、入力された順序で実行されます.
        --1------------3---------5-----------|-->
    
        --10---10---10--|-->
    
                    concatMap(i => 10*i----10*i---10*i)
    
        --10---10---10-30---30---30-50---50---50--|->
    

    戻り値Observable入力した値をストリームにマッピングして接続するストリーム.
    fromEvent(document, 'click')
        .concatMap(event => Rx.Observable.interval(1000).take(3))
        .subscribe(v => console.log(v));
    
  • groupBy

  • インスタンスメソッド
    入力ストリームの値を一定の規則に従って異なるストリームに分割し、出力ストリームに送信します.各ストリームには同じ条件を満たす値のセットがあります.
    ----1----2----3----4----5----|->
    
                groupBy(v => v%2);
    
    -----------------------------|->
        \   \
         \   2---------4---------|
          1------3----------5----|
    

    戻り値Observableは、パケット後のストリームの高次ストリームを発行し、パケットのストリームには一意のkeyがあり、そのストリームの値は入力ストリーム上である条件に合致する値である.
    of(
        {id: 1, name: 'aze1'},
        {id: 2, name: 'sf2'},
        {id: 2, name: 'dg2'},
        {id: 1, name: 'erg1'},
        {id: 1, name: 'df1'},
        {id: 2, name: 'sf2'},
        {id: 3, name: 'qfs3'},
        {id: 2, name: 'qsg'}
    )
    .groupBy(v => v.id)
    .mergeMap(group => group.reduce((acc,cur) => [...acc, cur],[]))
    .subscribe(v => console.log(v))
    

    集約クラスオペレータ

  • reduce

  • インスタンスメソッド
    ソースObservableに累積関数を適用し、ソースObservableが終了すると累積値を出力ストリームにプッシュし、初期の累積値を受け入れることができます.
        --------1-------2-------3---|-->
    
                reduce((acc,cur) => acc + cur, 0);
    
        ----------------------------4|
    

    このオペレータの動作は配列のreduce法と同じである.1つの累積関数を使用してストリーム上の各値を処理し、前回の累積後の値を次の累積の初期値として、累積によって得られた最終値を出力ストリームにプッシュします.なお、reduceオペレータは、ソースObservableが終了通知を発行した後に1つの値しか発行しません.
    ソースObservableで発行された値は、累積関数で処理されます.初期値が指定されている場合、この値は累積プロセス全体の初期値となり、指定されていない場合、ソースObservableから発行される最初の値は累積プロセス全体の初期値となります.
    戻り値Observableは、ソースObservable上の値をすべて累積関数で計算した値を値として送信するストリームである.
    const clicksInFiveSeconds = fromEvent(document, 'click')
                                .takeUntil(interval(5000));
    const ones = clicksInFiveSeconds.mapTo(1);
    const seed = 0;
    const count = ones.reduce((acc,cur) => acc + cur, seed);
    
    count.subscribe(v => console.log(v));
    
  • scan

  • インスタンスメソッド
    入力ストリームに累積関数を適用し、累積のたびに出力ストリームに送信し、累積時にオプションの初期値を受け入れることができます.reduceオペレータと同様に、蓄積されるたびに結果が送信されます.
    戻り値Observableは、累積値のストリームを出力します.
    fromEvent(document, 'click')
        .mapTo(1)
        .scan((acc,cur) => acc + cur, 0)
        .subscribe(v => console.log(v))
    

    オペレータはここまで紹介されていますが、非常によく使われているものがあります.また、紹介されていないものは役に立たないわけではありません.必要なものは自分で検索してください.オペレータ間の組み合わせによって、非常に効率的で便利なデータ処理を実現することができます.