RxJava学習ノート(1)-RxJava概要

6882 ワード

1年前にJavaからnodeに切り替えたのを覚えていますが、当初はJava開発の固有の思考に限られてnodeの理念に難解を感じ、無数の穴を踏んだ後、new個のthreadだけでなく、非ブロックI/O、非同期イベント、関数式プログラミングを利用して並列問題を優雅に処理できることがだんだん発見されました.合格した開発者として、新しい技術を絶えず学び、背後の原理を深く理解し、実際の仕事のニーズに応じて適切な技術を選ぶべきだ.最近githubでRxJavaもJava開発者が簡単に応答式のプログラミングを行うことができることを発見し、時間をかけて勉強し、いくつかの知識点を集めて整理し、みんなと共有しました.
    RxはReactive Extensionであり、観察者モード、反復器モード、関数式プログラミングの応答式プログラミングモデルであり、開発者が非同期データストリームを処理するために一貫したプログラミングインターフェースを提供する.Rxライブラリは当初マイクロソフトによって開発され、現在はReactiveXコミュニティによって維持されている(http://reactivex.io/)、ほとんどのポピュラーなプログラミング言語がサポートされています.
    RxJavaはRxライブラリJavaプラットフォームの拡張であり,観察可能なオブジェクト(Observable)を抽象化し,関連する高次関数を提供している.
    Rxライブラリを使用する前に、応答プログラミングの基本的な概念と理念を理解する必要があります.
    多くのJava開発者にとって、非同期プログラミングは難点であり、エラーも発生しやすい.まず、マルチスレッド、Futureを思い浮かべ、単層非同期操作Futureに対しては簡単で効果的(Future.get()もブロックされている)に見えるが、複数の非同期操作の組合せフロー、ネストに関わると、異常に煩雑になり、少し注意しないとコールバック「地獄」(callbackhell)につながる.RxのObservableは、非同期実行プロセスを組み合わせるのに便利です.
Rxは、観察者モードを拡張してデータとイベントシーケンスをサポートし、スレッド、同期、スレッドセキュリティ、同時データ構造、および非ブロックIOなどの最下位の実装に注目することなく、宣言的にこれらのシーケンスを組み合わせるオペレータを追加します.
    構造上、Rxの主要な構成部分は、Observable、Operator、Subscriber(Observer)、Subscription、Schedulerである.
  • Observableはオブジェクトを観察し、非同期データストリームやイベントシーケンスを生成することができる.Observableは抽象的な概念であり、単独のスカラー値(Futureでできるように)の処理をサポートするだけでなく、データシーケンス、さらには無限のデータストリームもサポートする.例えば、マイクロブログ購読イベントによって生成されたデータストリームは、開発者は、このデータストリームを傍受し、それに基づいて、ファンの増加、購読数の統計、購読内容の取得などの一連のデータ処理を実現することができる.データ・ストリームは、通常のイベント(値イベント)、エラー・イベント、完了イベントの3つのイベント・タイプを含む時間順にソートされた時間シーケンスと見なすことができます.Observableの操作はIterableと似ており、Iterableのすべての柔軟性を持っています.ObservableはIterableのプッシュ方式の等価物として、Iterableを使用して、消費者は生産者からデータを引き出し、スレッドがデータの準備ができるまでブロックすることができます.Observableを使用して、データの準備ができたら、生産者はデータプッシュ(push)を消費者に送ります.データは同期または非同期で到着することができ、この方法はより柔軟です.Observableは非同期の双方向pushであり、Iterableは同期の一方向pullであり、比較は以下の通りである:
  • ≪イベント|Events|ldap≫
    Iterable(pull)
    Observable(push)
    データの取得
    T next()
    onNext(T)
    例外処理
    throwsException
    onError(Exception)
    タスク完了
    !hasNext
    onCompleted()
    ObservableタイプはGOFのオブザーバーモードに2つの欠落した意味を追加し、Iterableタイプで使用可能な操作と一致し、node開発者はPromiseを連想することができます.
    1.   生産者は消費者に信号を送ることができ、より多くのデータが利用できないことを通知することができる(Iterableの場合、forサイクルが正常に完了したことはデータがないことを示している;Observableの場合、観察者を呼び出すonCompletedメソッドである).
    2.   生産者は消費者に信号を送り、エラーに遭遇したことを通知することができる(Iterableの場合、反復中にエラーが発生すると異常が放出される;Observableの場合、オブザーバー(Observer)を呼び出すonErrorメソッド)
  • Operatorはオペレータであり、フィルタ、選択(select)、変換(transform)、結合(combine)、および組合せ(compose)の複数のObservableデータストリームをフィルタリングするために使用することができ、これらのオペレータは実行と複合を非常に効率的にする
  • .
  • Subscriptionはサブスクリプションであり、SubscriberとObservableはサブスクリプションによって
  • に関連付けられる.
  • Subscriberは、非同期データストリームまたはイベントシーケンスを消費する消費者であり、Rxは非同期であり、イベント駆動であるとも言える.観察者はこのデータストリームを購読した後、通常のイベントが発生したときに、フォーム提出データがデータベースに書き込まれるなど、観察者が通常のイベントを処理する関連関数を呼び出す.データストリームにエラーが発生すると、try/catchメソッドと同様に、観察者によってキャプチャされ、エラー処理関数で処理される異常が放出されます.データ・ストリーム全体が完了すると、ウィンドウを閉じる、データベース接続を切断するなどの機能を提供するなど、観察者はデータ・ストリーム完了関数を実行します.もちろん、開発者は、値イベントを発行するときに実行する関数をどのように定義し、設計するかに焦点を当てるだけで、エラーイベントと完了イベントを無視することができます.
  • Scheduler制御非同期データストリームの同時処理
  • 上記の概念を初歩的に理解した後、具体的な例を結びつけて徐々に深く入り込みましょう.
    例1:最も簡単なRxデータストリームモード:
                 Observable -> Subscriber
    //java 7
    Observable observable = Observable.create(new Observable.OnSubscribe() {
    	@Override
    	public void call(Subscriber super String> subscriber) {
    			System.out.println("Current Thread: " + Thread.currentThread().getId());
    			subscriber.onNext("Hello RxJava");
    			subscriber.onCompleted();
    		}	
    	});
    		
    	Subscriber subscriber = new Subscriber() {
    		@Override
    		public void onCompleted() { 
    		}
    		@Override
    		public void onError(Throwable e) { 
    		}
    		@Override
    		public void onNext(String s) {
    		    System.out.println("Current Thread: " + Thread.currentThread().getId());
    		    System.out.println(s);
    	      }
    	};
    		 
    	observable.subscribe(subscriber);

    この例では、ObservableからSubscriberに直接データストリームが移行する、Operatorによるデータストリームの修正は行われない.
    1. Subscriberオブジェクトobserableオブジェクトを呼び出すsubcriberメソッドとobservableオブジェクトを関連付ける
             2.被観察者はonNext()を呼び出してデータストリーム"Hello World"を生成し、complete.
             3.ObservableオブジェクトsubscriberオブジェクトのonNext()出力データを呼び出し、そのonComplete()メソッドを呼び出して終了し、異常が発生した場合subscriberのonError(Throwablee)を呼び出す
    古典的な観察者モードと比較して,彼らの最も重要な違いの一つはsubscriberがない前にobservableがイベントを生じないことである.コードを実行し、出力可視ObservableとSubscriberから同じスレッドで実行します.
    Observableとオペレータはエラー状態を処理する必要はありません.エラーが発生すると、現在のオペレータと後続のオペレータはスキップされます.すべてのエラー処理は購読者に任せます.
    Java 8ではlambdaでコードを挿入するように簡略化できます.
    // Use Java 8 lambda instead
    Observable.just("Hello RxJava")
    	.subscribe(s -> System.out.println(s));

    例2:Operatorを加えると、Rxデータストリームは以下の通りである.
    Observable-> Operator -> … -> Operator-> Subscriber
    // run in java 8 
    Observable.just(1, 2, 3, 4, 5)
    		.filter(new Func1() {
    			@Override
    			public Boolean call(Integer item) {
    				return (item < 4);
    			}
    		})
    		.map(s -> s * s)
    		.subscribe(new Subscriber() {
    			@Override
    			public void onNext(Integer item) {
    				System.out.println("Next: " + item);
    			}
    
    			@Override
    			public void onError(Throwable error) {
    				System.err.println("Error: " + error.getMessage());
    			}
    
    			@Override
    			public void onCompleted() {
    				System.out.println("Sequence complete.");
    			}
    		});

        上記の例では、filterオペレータを呼び出して4未満の整数をフィルタし、mapオペレータを呼び出して新しいシーケンスを得、各エントリは前のシーケンスの対応する要素の二乗であり、最後にsubscriberによって出力される.Rxオペレータは、データストリームの処理に非常に便利であることがわかります.
    例3:Schedulerマルチスレッド
    Observable.just("www.sina.com","www.baidu.com")
    	.flatMap(url -> getTitle(url))
    	.subscribeOn(Schedulers.newThread()).subscribe(title -> {
    	        System.out.println("Subscriber on thread: " + Thread.currentThread().getName());
    		System.out.println(title != null ? title : "title not found");
    	});
    
    	try {
    		Thread.sleep(2000); //sleep is required or subscriber may not receive the data
    	} catch (InterruptedException e) {
    		Thread.currentThread().interrupt();
    		e.printStackTrace();
    	}

    この例はsubscribeOn()によってネットワークIO動作を独立したスレッドに分離して実行する
    observeOnとsubscribeOnオペレータを使用すると、Observable、オペレータ、およびSubscriberを特定のスケジューラ上で実行できます.observeOnは、Observableが特定のスケジューラ上で観察者を呼び出すonNext、onErrorおよびonCompletedメソッドを示すObservableを示し、subscribeOnは、Observableがデータの送信および通知を含むすべての処理プロセスを指示する一歩前進します.特定のスケジューラに配置して実行します.
            これで、RxプログラミングモデルとRxJavaライブラリについて初歩的な認識を得ました.後続のシリーズでは、Rxの各コンポーネントと原理をより多くの例と結びつけて深く理解します).Rxを効率的に使用するには、公式ドキュメントや元のコードをよく研究し、多くの実践の中で体得する必要があります.
     以下はRxに関するいくつかの学習リソースです.
    ReactiveX公式サイト
    RxJava Wiki
    ReactiveX Tutorials
    RxJava中国語ドキュメント(繁体字)
    Blog: Grokking RxJava
    Awsome RxJava
      サンプルコード: [ダウンロード]