RxJava学習のまとめ

26764 ワード

RxJavaとは
1.定義
RxJava is a Java VM implementation of Reactive Extensions:a library for composing asynchronous and event-based programs by using observable sequences.RxJavaは、観察可能なシーケンスを使用して非同期とイベントベースのプログラムを組み合わせたライブラリです.
2.特徴
(1)観察者モードRxJavaは,設計モードにおける観察者モードを用いた.データまたはイベント・シーケンスをサポートし、シーケンスの組合せを可能にし、スレッド、同期、および同時データ構造を抽象化します.
(2)軽量無依存ライブラリ、Jarパッケージが1 M未満
(3)Java 6+とAndroid 2.3+を多言語でサポート.RxJavaの設計の初心はすべてのJVM言語を互換することであり、現在サポートされているJVM言語にはGroovy、Clojure、JRuby、Kotlin、Scalaがある.
(4)マルチスレッドはthreads,pools,event loops,fibers,actorsなどの様々な同時実装をカプセル化することをサポートする.
3. RxJava vs. Java
この新しいパートナーをよりよく理解するために、先輩はRxJavaとJavaの異同をまとめました.以下,非同期シーケンス,データ取得方式,データ伝達方式,拡張機能について4つの側面から述べる.
(1)非同期シーケンスについては、通常、T getData()と書くことができる同期オブジェクトを取得する.Future getData()と書くことができる非同期オブジェクトを取得します.同期シーケンスを取得すると、Iterable getData()と書くことができます.では、非同期シーケンスを取得します.Javaは対応する方法を提供していません.RxJavaはこの空白を埋めています.Observable getData()と書くことができます.Observableに関する紹介は後であります.
(2)データ取得方式Javaではオブザーバーモードを使用しない場合,データはすべてアクティブ取得,すなわちPull方式であり,リストデータに対してもIteratorポーリングを用いて取得する.RxJavaは,観察者モードを用いたため,データは受動的に取得され,観察者から観察者への通知,すなわちPush方式である.
(3)データ転送方式同期データ操作の場合,Javaでは結果,すなわちoperation 1->operation 2->operation 3を順次転送できる.非同期オペレーションでは、通常、Callbackコールバックを使用してコールバックを続行する必要があります.すなわち、Callback 1->Callback 2->Callback 3では、多くのレイヤがネストされている可能性があります.RxJava同期と非同期はチェーン呼び出しであり、operation 1->operation 2->operation 3であり、この方法の利点は、即時で複雑な論理が簡単明瞭で、エラーが発生しにくいことです.
(4)拡張機能は観察者モード機能よりも強く,onNext()コールバックメソッドに基づいてonCompleted()とOnError()を追加し,イベントの実行が完了したりエラーが発生したりした場合にコールバックする.また、イベントの生産と消費のスレッドを容易に切り替えることができます.イベントは組み合わせて処理することもできます.
こんなにたくさん話したのに、何の役にも立たないようだから、例をあげましょう.
ローカルurlリスト内のピクチャのローカルディレクトリを特定し、対応するピクチャをロードしてUIに表示する必要があると仮定します.
new Thread() {
@Override
public void run() {
    super.run();
    for (String url : urls) {
        if (url.endsWith(".png")) {
            final Bitmap bitmap = getBitmap(url);
            getActivity().runOnUiThread(new Runnable() {
                @Override
                public void run() {
                    imageView.setImageBitmap(bitmap);
                }
            });
        }
    }
}
}.start();

RxJavaを使用すると、コードは次のようになります.
Observable.from(urls)
.filter(new Func1<String, Boolean>() {
    @Override
    public Boolean call(String url) {
        return url.endsWith(".png");
    }
})
.map(new Func1<String, Bitmap>() {
    @Override
    public Bitmap call(String url) {
        return getBitmap(url);
    }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
    @Override
    public void call(Bitmap bitmap) {
        imageView.addImage(bitmap);
    }
});

APIの紹介と使用
RxJavaは内容が多いので、先輩は基礎から高級まで段階的に説明しようとしています.
RxJavaの重要な考え方は観察者モードであり,APIの紹介もこのモードによって区分されている.
1.基礎
(1)オブザーバー(Observer,Subscriber)Observerはインタフェースであり,onNext(T t),onError(Throwable),onCompleted()の3つの方法を提供している.
SubscriberはObserverのサブクラス、class Subscriber implements Observer、Subscriptionです.
SubscriberはObserverに基づいて次のように拡張されています.
  • はonStart()を追加しました.この方法は、観察者と被観察者がサブスクリプション関係を確立した後、被観察者が観察者にメッセージを送信する前に呼び出され、主にデータのクリアやリセットなどの初期化作業に用いられる.
  • はunsubscribe()を追加した.この方法は、isUnsubscribed()がtrueの場合、観察者は観察者のメッセージを受信できないため、購読をキャンセルするために使用される.

  • Observerを作成するには、次の手順に従います.
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onCompleted() {
            Log.d(TAG, "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "onError" + e);
        }
    
        @Override
        public void onNext(String s) {
            Log.d(TAG, "onNext -> " + s);
        }
    };

    Subscriberを作成します.
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            Log.d(TAG, "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "onError" + e);
        }
    
        @Override
        public void onNext(String s) {
            Log.d(TAG, "onNext -> " + s);
        }
    };

    (2)被観察者(Observable)Observableは、イベントがいつトリガーされ、どのようなイベントがトリガーされるかを決定する.一般的なObservableを作成する方法は3つあります.
    1 Observable.create(Observable.OnSubscribe)
    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("message 1");
                subscriber.onNext("message 2");
                subscriber.onCompleted();
            }
        });

    ObservableがObserver/subscriberとサブスクリプション関係を確立すると、call()が呼び出されます.
    2 Observable.just(T…)
    Observable<String> observable1 = Observable.just("message 1", "message 2");

    この方法は1−N個のタイプの同じパラメータを伝達することができ,上記の例と等価である.最終的にはObserveber/subscriberのonNext(「message 1」)、onNext(「message 2」)、onCompleted()も呼び出されます.
    3 Observable.from(T[]), Observable.from(Iterable
    String[] array = {"message 1", "message 2"};
        Observable<String> observable2 = Observable.from(array);

    この方法は配列やIterableを伝達することができ,上記の例と等価である.
    (3)サブスクリプション(Subscription)サブスクリプション関係の確立には2つの方式がある:1.Observable.subscribe(Subscriber);2.Observable.subscribe(Action)
    ObservableとObserver/subscriberは、Observable.subscribe(Subscriber)によってサブスクリプション関係を確立し、その内部実装は以下のように抽出される.
    public Subscription subscribe(Subscriber subscriber) {
        subscriber.onStart();
        onSubscribe.call(subscriber);
        return subscriber;
    }

    ソースコードから分かるように、サブスクリプション関係が確立されると、subscriberのonStart()メソッドが最初に呼び出され、ここではデータのクリアやリセットなどの初期化操作を行うことができる.次にonSubscribe.call(subscriber)が呼び出され、ここでonSubscribeはObservableが作成されたときObservable.create(OnSubscribe)である入力されたOnSubscribeパラメータは、Observableの作成時に入力されたOnSubscribeのcall()コールバックがサブスクリプション関係の確立後に呼び出されることを示します.
    Actionという方式は、実装もやはりSubscriberで包装されており、本質的には上のSubscriberのような方式である.ただ、入力されたパラメータによってコールバックの方法が異なるだけで、下のコードはそれぞれSubscriberのonNext,onNext&onError,onNext&onError&onCompletedを呼び出す.
     Subscription subscribe(final Action1<? super T> onNext)
     Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)
     Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete)

    ここの先輩はActionについて話したいと思っています.RxJavaはAction 0-Action 9とActionNを提供しています.ここでの数字はパラメータの個数をそれぞれ0-9とN個表しています.
    Subscriptionというインタフェースについて、このクラスは2つのメソッドunsubscribe()とisUnsuubscribed()を提供し、サブスクリプション関係を解除し、サブスクリプション関係を判断することができる.subscribe()サブスクリプションメソッドの戻り値もSubscriptionである.
    (4)シーン例
    demoリファレンスgithubhttps://github.com/wangxinghe/tech_explore
    2.中級
    (1)スレッド(Scheduler)Schedulerは、コード実行のスレッドを指定できるRxJavaのスレッドスケジューラです.RxJavaには、いくつかのスレッドが内蔵されています.
    Android Schedulers.mainThread()メインスレッド
    Schedulers.immediate()現在のスレッド、すなわちデフォルトのScheduler
    Schedulers.newThread()新規スレッドの有効化
    Schedulers.io()IOスレッドは、ファイル、データベース、ネットワーク操作が可能な上限のないスレッドプールです.
    Schedulers.computation()CPU計算用のスレッドは、内部にCPUコア数に固定されたスレッドプールがあり、CPU密集型計算に適しており、ファイル、データベース、ネットワークを操作できない.
    subscribeOn()とobserveOn()はコードの実行スレッドを制御するのに使えます.先輩がここを勉強するとき、この2つの方法を混ぜてどのコードを制御するか、実はdemoを直接走ると分かります.
    Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            Log.d(TAG, "OnSubscribe.call Thread -> " + Thread.currentThread().getName());
            subscriber.onNext("message");
        }
    }).subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Subscriber<String>() {
          @Override
          public void onCompleted() {
    
          }
    
          @Override
          public void onError(Throwable e) {
    
          }
    
          @Override
          public void onNext(String s) {
              Log.d(TAG, "Subscriber.onNext Thread -> " + Thread.currentThread().getName());
          }
      });

    印刷されたログから結論を出すことができます.
    subscribeOn()は、OnSubscribe.call()の実行スレッド、すなわちObservableがSubscriberに通知するスレッドを指定する.
    observeOn()は、Subscriberコールバックの実行スレッド、すなわちイベント消費スレッドを指定します.
    (2)変換(map,flatMap)RxJavaは、イベントまたはイベントシーケンスを異なるイベントまたはイベントシーケンスに変換できる強力な機能を提供する.
    変換には、map()とflatMap()の2つの一般的な方法があります.
    mapは1対1の変換です.1つのオブジェクトを別のオブジェクトに変換するか、オブジェクト配列の各オブジェクトを新しいオブジェクト配列の各オブジェクトに変換できます.
    flatMap()は、1対のマルチトランスフォームです.1つのオブジェクトを1組のオブジェクトに変換するか、オブジェクト配列の各オブジェクトを新しいオブジェクト配列の各オブジェクトグループに変換できます.
    Personを例にとると、1つのPersonは1つの身分証明書idに対応し、1つのPersonは複数のEmailを持つことができる.map()によってPersonをidに変換することができ、1つのPersonの身分証明書番号を得ることができる.flatMap()によってPersonを1組のEmailに変換することができ、1つのPersonのすべてのEmailを得ることができる.
    例は次のとおりです.
    /** * map: Person -> id(String) *      id */
    private void testMap0() {
        Observable.just(getPersonArray()[0])
                .map(new Func1<Person, String>() {
                    @Override
                    public String call(Person person) {
                        return person.id;
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
    
                    }
    
                    @Override
                    public void onError(Throwable e) {
    
                    }
    
                    @Override
                    public void onNext(String id) {
                        Log.d(TAG, "id -> " + id);
                    }
                });
    }
    
    /** * map: array Person -> id(String) *       id */
    private void testMap() {
        Observable.from(getPersonArray())
                .map(new Func1<Person, String>() {
                    @Override
                    public String call(Person person) {
                        return person.id;
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
    
                    }
    
                    @Override
                    public void onError(Throwable e) {
    
                    }
    
                    @Override
                    public void onNext(String id) {
                        Log.d(TAG, "id -> " + id);
                    }
                });
    }
    
    /** * flatMap: array Person -> email  (String[]) *         email */
    private void testFlatMap() {
        Observable.from(getPersonArray())
                .flatMap(new Func1<Person, Observable<Person.Email>>() {
                    @Override
                    public Observable<Person.Email> call(Person person) {
                        Log.d(TAG, "flatMap " + person.id);
                        return Observable.from(person.emails);
                    }
                })
                .subscribe(new Subscriber<Person.Email>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted");
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError " + e.getMessage());
                    }
    
                    @Override
                    public void onNext(Person.Email email) {
                        Log.d(TAG, "onNext " + email.name);
                    }
                });
    }

    3.高級
    先輩も今週RxJavaを習い始めたばかりで、新しい知識点については、やはりどのように使うかといくつかの基礎APIの基本原理を知っていることを主としていると思います.もっと高級なのはAPIが上手になってから勉強するべきだと思います.そのため、ここではもう述べません.デブになることはできません.
    RxJavaとRetrofitの組み合わせ
    RetrofitはSquare社が提供する安全なタイプのHttp Clientで、Retrofit自体がRxJavaをサポートしているため、当然のように組み合わせて使用されています.
    シーン1
    まず、Retrofitを単独で使用してネットワーク操作を行う例を見てみましょう.
    public interface GithubAPI {
        @GET("/users/{user}")
        public void getUserInfo(@Path("user") String user, Callback<UserInfo> callback);
    }
    
    private void fetchUserInfo() {
        String username = mEditText.getText().toString();
        getGithubAPI()
                .getUserInfo(username, new Callback<UserInfo>() {
                    @Override
                    public void success(UserInfo userInfo, Response response) {
                        mTextView.setText(userInfo.email);
                    }
    
                    @Override
                    public void failure(RetrofitError error) {
                        mTextView.setText(error.getMessage());
                    }
                });
    
    }

    Retrofitを単独で使用するにはコールバックが必要ですが、論理が少し複雑な点であれば、Callbackで多くのことをしなければならないかもしれません.コードのメンテナンスは大変です.
    以下、RetrofitとRxJavaを組み合わせた例を見てみましょう.
    public interface GithubAPI {
        @GET("/users/{user}")
        public Observable<UserInfo> getUserInfo(@Path("user") String user);
    }
    
    private void fetchUserInfoRx() {
        String username = mEditText.getText().toString();
        getGithubAPI()
                .getUserInfo(username);
                .subscribe(new Observer<UserInfo>() {
                    @Override
                    public void onCompleted() {
    
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        mTextView.setText(e.getMessage());
                    }
    
                    @Override
                    public void onNext(UserInfo userInfo) {
                        mTextView.setText(userInfo.email);
                    }
                });
    }

    シーン2
    次に、usernameがユーザ情報を取得するためにネットワーク操作に従って取得する必要があるなど、複数の操作の場合を見る.
    Retrofitのコードは次のとおりです.
    public interface GithubAPI {
        @GET("/username")
        public void getUserName(Callback<String> callback);
    
        @GET("/users/{user}")
        public void getUserInfo(@Path("user") String user, Callback<UserInfo> callback);
    }
    
    private void fetchUserInfo() {
        String username = mEditText.getText().toString();
        getGithubAPI()
                .getUserName(new Callback<String>() {//  username
                    @Override
                    public void success(String username, Response response) {
                        /  UserInfo
                        getUserInfo(username, new Callback<UserInfo>() {
                            @Override
                            public void success(UserInfo userInfo, Response response) {
                                mTextView.setText(userInfo.email);
                            }
    
                            @Override
                            public void failure(RetrofitError error) {
                                mTextView.setText(error.getMessage());
                            }
                        })
                    }               
                });
    }

    RetrofitがRxJavaと組み合わせて使用するコードは次のとおりです.
    public interface GithubAPI {
        @GET("/username")
        public Observable<String> getUserName();
    
        @GET("/users/{user}")
        public Observable<UserInfo> getUserInfo(@Path("user") String user);
    }
    
    private void fetchUserInfoRx() {
        String username = mEditText.getText().toString();
        getGithubAPI()
                .getUserName()    //  username
                .flatMap(new Func1<String, Observable<UserInfo>>() {
                    @Override
                    public Observable<UserInfo> onNext(String username) {
                        //  UserInfo
                        return getGithubAPI().getUserInfo(username);
                    })
                }
                .subscribe(new Observer<UserInfo>() {
                    @Override
                    public void onCompleted() {
    
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        mTextView.setText(e.getMessage());
                    }
    
                    @Override
                    public void onNext(UserInfo userInfo) {
                        mTextView.setText(userInfo.email);
                    }
                });
    }

    発見されたかどうかは、RxJava構造を使用するとより明確になります.
    補足すると、RetrofitとRxJavaを使用するには、依存を追加する必要があります.
    compile 'io.reactivex:rxjava:1.1.0'
    compile 'io.reactivex:rxandroid:1.1.0'
    compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'
    compile 'com.squareup.retrofit:retrofit:1.9.0'

    例demoはgithubに置かれ、https://github.com/wangxinghe/tech_explore
    まとめ
    1.RxJavaの最大の特徴はチェーン呼び出しであり、非同期論理構造をより明確にする
    2.オブザーバーモード:Obverable(オブザーバー)、Observer/subscription(サブスクリプション)
    by:wangxinghe