リアクティブデータストリーム


(反応運動−マニフェスト)に触発されるhttps://www.reactivemanifesto.org/ ], RXJavaは、Netflixによって作成された反応性ストリーム仕様の実装です.リアクティブストリームは、データストリームの非同期処理の概念です.
Netflixによって効果的にネットワークの難しさを減らすことができるように構築されます.RXJavaとの目標は、クライアントがサーバー上で並列に実行される単一の“重い”クライアント要求を呼び出すことを許可することです.
そのコンセプトはObservable/Iterable 種類Subscribing これらのデータストリームを非同期に送信します.
以下のノートについては詳しく説明しない.これを書いている過程でReactor Java 8とRXJavaがJava 6を実行するのを必要とするスプリングブート2にアップグレードするので、ライブラリ.どちらも同様のコンセプトに基づいているが、構造は異なる.

観測可能/反復性

Observable データ型は「プッシュ」等価物と考えられるIterable は"プル"です.Iterable データ型は、それらの値が到着するまで、プロデューサとスレッドブロックから値をプルします.
値が利用可能なときは、生産者は値を消費者にプッシュします.
これは、値が同期または非同期に到達できるため、より柔軟なアプローチを作成します.

観測可能型


に存在する2つの欠落したsemantiqcsを追加するIterable 種類:
  • プロデューサーは消費者には、データが利用可能であることをシグナルすることができます.
  • プロデューサーは、エラーharが発生した消費者に信号を送ることができます.
  • これによりObservable and Iterable 統一.
    唯一の違いは、データが流れる方向です.
    常に戻るObservable , 常に要求Iterable .

    既存のデータ構造から観測可能な作成

  • 既存のデータ構造体から
  • 観測可能にするjust() and from() オブジェクト、リスト、またはオブジェクトの配列をオブジェクトを変換できるメソッドに変換するメソッドです.
  • 
    Observable<String> o = Observable.from("a","b","c");
    
    // Inserting a list into an observable
    def list = [5,6,7,8]
    Observable<Integer> o = Observable.from(list);
    
    //
    Observable<String> o = Observable.just("one object");
    
    

    create ()メソッドを使用して観測可能な作成

  • 経由でcreate() メソッドを使用すると、非同期I/O、計算操作、またはデータの'無限'ストリームを実装できます.

  • 同期可能な例:
  • 購読したときにブロックするカスタムオブザーバ
  • def customObservableBlocking() {
        return Observable.create(
            { aSub ->
                for (int i=0; i<50; i++) {
                    if (false == aSub.isUnsubscribed()) {
                        aSub.onNext("value_" + i);
                    };
            }
            // after sending all values we complete the sequence
            if (false == aSub.isUnsubscribed()) {
                aSub.onCompleted();
            }
        });
    }
    
    // Output:
    customObservableBlocking().sub({ it -> println(it); });
    

    非同期観測可能例:
  • 観測可能な文字列は75文字です.
  • それが別々の糸を産むとき、購読するとき、ブロックしません.
  • def customObservableNonBlocking() {
        return Observable.create(
            { sub ->
                final Thread t = new Thread(new Runnable() {
                    void run() {
                        for (int i = 0; i < 75; i++) {
                            if (true == sub.isUnsubscribed()) {
                                return;
                            }
                            sub.onNext("value_" + i);
                        }
                        if (false == sub.isUnsubscribed())
                    }
                });
                t.start();
            }
        );
    }
    // Output:
    customObservableNonBlocking().sub({ println(it) })
    
  • wiki記事のリストを取得します.
  • def fetchWikiArticleAsync(String... wikiArticleNames) {
        return Observable.create({ sub ->
            Thread.start( {
                for (articleName in wikiArticleNames) {
                    if (true == sub.isUnsubscribed()) {
                        return;
                    }
                    sub.onNext(new URL("http://en.wikipedia.org/wiki/" + articleName).getText());
                }
                if (false == sub.isUnsubscribed()) {
                    sub.onCompleted();
                }
            });
            return(sub);
        });
    }
    
    // Output:
    fetchWikiArticleAsync("Tiger", "Elephant")
        .sub({println "--- Article ---\n" + it.substring(0, 125); });
    

    オブザーバブルを操作者に変換する

  • チェーンoperators 一緒にオブザーバブルを変換し、構成する
  • async呼び出しcustomObservableNonBlock 連鎖して
  • def simpleComposition() {
        customObservableNonBlocking().skip(10).take(5)
            .map({ stringValue -> return stringValue + "_xform" })
            .subscribe({ println "onNext => " + it })
    }
    
  • skip(10) - 10番目の値にジャンプ
  • take(5) - グラブ5次の値
  • map(...) - 各値をマップし、連結するstringValue with _xform => $stringValue_xform
  • subscribe(...) - マップされた値を返すonNext => 連結する
  • これは初期のrxjava夏です.への切り替えreactor 代わりにライブラリは、コンセプトは異なるが、構造化された/より単純に、Java 8を受け入れる.
    RXJavaについての詳細はこちらをご覧くださいhttps://github.com/ReactiveX/RxJava ]