リアクティブデータストリーム
13131 ワード
(反応運動−マニフェスト)に触発されるhttps://www.reactivemanifesto.org/ ], RXJavaは、Netflixによって作成された反応性ストリーム仕様の実装です.リアクティブストリームは、データストリームの非同期処理の概念です.
Netflixによって効果的にネットワークの難しさを減らすことができるように構築されます.RXJavaとの目標は、クライアントがサーバー上で並列に実行される単一の“重い”クライアント要求を呼び出すことを許可することです.
そのコンセプトは
以下のノートについては詳しく説明しない.これを書いている過程で
値が利用可能なときは、生産者は値を消費者にプッシュします.
これは、値が同期または非同期に到達できるため、より柔軟なアプローチを作成します.
に存在する2つの欠落したsemantiqcsを追加する プロデューサーは消費者には、データが利用可能であることをシグナルすることができます. プロデューサーは、エラーharが発生した消費者に信号を送ることができます. これにより
唯一の違いは、データが流れる方向です.
常に戻る
既存のデータ構造体から 観測可能にする
経由で
同期可能な例: 購読したときにブロックするカスタムオブザーバ
非同期観測可能例: 観測可能な文字列は75文字です. それが別々の糸を産むとき、購読するとき、ブロックしません.
wiki記事のリストを取得します.
チェーン async呼び出し
これは初期のrxjava夏です.への切り替え
RXJavaについての詳細はこちらをご覧くださいhttps://github.com/ReactiveX/RxJava ]
Netflixによって効果的にネットワークの難しさを減らすことができるように構築されます.RXJavaとの目標は、クライアントがサーバー上で並列に実行される単一の“重い”クライアント要求を呼び出すことを許可することです.
そのコンセプトは
Observable
/Iterable
種類Subscribing
これらのデータストリームを非同期に送信します.以下のノートについては詳しく説明しない.これを書いている過程で
Reactor
Java 8とRXJavaがJava 6を実行するのを必要とするスプリングブート2にアップグレードするので、ライブラリ.どちらも同様のコンセプトに基づいているが、構造は異なる.観測可能/反復性
Observable
データ型は「プッシュ」等価物と考えられるIterable
は"プル"です.Iterable
データ型は、それらの値が到着するまで、プロデューサとスレッドブロックから値をプルします.値が利用可能なときは、生産者は値を消費者にプッシュします.
これは、値が同期または非同期に到達できるため、より柔軟なアプローチを作成します.
観測可能型
に存在する2つの欠落したsemantiqcsを追加する
Iterable
種類:Observable
and Iterable
統一.唯一の違いは、データが流れる方向です.
常に戻る
Observable
, 常に要求Iterable
.既存のデータ構造から観測可能な作成
just()
and from()
オブジェクト、リスト、またはオブジェクトの配列をオブジェクトを変換できるメソッドに変換するメソッドです.
Observable<String> o = Observable.from("a","b","c");
// Inserting a list into an observable
def list = [5,6,7,8]
Observable<Integer> o = Observable.from(list);
//
Observable<String> o = Observable.just("one object");
create ()メソッドを使用して観測可能な作成
create()
メソッドを使用すると、非同期I/O、計算操作、またはデータの'無限'ストリームを実装できます.同期可能な例:
def customObservableBlocking() {
return Observable.create(
{ aSub ->
for (int i=0; i<50; i++) {
if (false == aSub.isUnsubscribed()) {
aSub.onNext("value_" + i);
};
}
// after sending all values we complete the sequence
if (false == aSub.isUnsubscribed()) {
aSub.onCompleted();
}
});
}
// Output:
customObservableBlocking().sub({ it -> println(it); });
非同期観測可能例:
def customObservableNonBlocking() {
return Observable.create(
{ sub ->
final Thread t = new Thread(new Runnable() {
void run() {
for (int i = 0; i < 75; i++) {
if (true == sub.isUnsubscribed()) {
return;
}
sub.onNext("value_" + i);
}
if (false == sub.isUnsubscribed())
}
});
t.start();
}
);
}
// Output:
customObservableNonBlocking().sub({ println(it) })
def fetchWikiArticleAsync(String... wikiArticleNames) {
return Observable.create({ sub ->
Thread.start( {
for (articleName in wikiArticleNames) {
if (true == sub.isUnsubscribed()) {
return;
}
sub.onNext(new URL("http://en.wikipedia.org/wiki/" + articleName).getText());
}
if (false == sub.isUnsubscribed()) {
sub.onCompleted();
}
});
return(sub);
});
}
// Output:
fetchWikiArticleAsync("Tiger", "Elephant")
.sub({println "--- Article ---\n" + it.substring(0, 125); });
オブザーバブルを操作者に変換する
operators
一緒にオブザーバブルを変換し、構成するcustomObservableNonBlock
連鎖してdef simpleComposition() {
customObservableNonBlocking().skip(10).take(5)
.map({ stringValue -> return stringValue + "_xform" })
.subscribe({ println "onNext => " + it })
}
skip(10)
- 10番目の値にジャンプtake(5)
- グラブ5次の値map(...)
- 各値をマップし、連結するstringValue
with _xform
=> $stringValue_xform
subscribe(...)
- マップされた値を返すonNext =>
連結するreactor
代わりにライブラリは、コンセプトは異なるが、構造化された/より単純に、Java 8を受け入れる.RXJavaについての詳細はこちらをご覧くださいhttps://github.com/ReactiveX/RxJava ]
Reference
この問題について(リアクティブデータストリーム), 我々は、より多くの情報をここで見つけました https://dev.to/andrehatlo/reactive-data-streams-quick-rxjava-summary-2pciテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol