Android-RxJavaの一般的なAPIの使用とスレッド変換のポイント


RxJavaとは?RxJavaのGitHubでの自己紹介は「a library for composing asynchronous and event-based programs using observable sequences for the Java VM」(JavaVM上で観測可能なシーケンスを使用して非同期でイベントベースのプログラムを構成するライブラリ)である.これがRxJavaです.
これからは何も言わないで、直接今日のテーマに入りましょう.
一.API使用
RxJavaのAPIは、Observableの作成、変換、データのフィルタリングなどを含む
1.Observable作成
Observableの作成にはcreate,just,from,interval,timerなどいくつかの方法があります
1.1 createメソッドは、RxjavaでObservableを作成する最も基本的なメソッドです.例は次のとおりです.Observableを作成すると、onSubstribeにパラメータとして渡され、Observableオブジェクトに格納され、Observableが購読されると、そのcallメソッドがトリガーされます.
private void testRxMethod_create() {
        Observable observable = Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber super RxBeanInfo> subscriber) {
                RxBeanInfo info = new RxBeanInfo();
                info.setName("lqm.test");
                subscriber.onNext(info);
                subscriber.onCompleted();
            }});
        Subscriber subscriber = new Subscriber()
        {
            @Override
            public void onCompleted() {
                Log.i("rxjava", "onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                Log.i("rxjava", "error");
            }
            @Override
            public void onNext(RxBeanInfo o) {
                Log.i("rxjava", o.getName());
            }};
        //    subscribe              ,    ,Observable    call().
        observable.subscribe(subscriber);
    }
11-17 14:27:14.513 14944-14944/pa.test.com.testapp I/rxjava: lqm.test
11-17 14:27:14.513 14944-14944/pa.test.com.testapp I/rxjava: onCompleted

1.2.just-justメソッドを呼び出すことで、送信したいデータソースに転送され、サブスクライバがサブスクリプションを行うとデータの印刷が開始されます.just()メソッドでは、最大10個のパラメータが送信され、転送されたパラメータの順序で送信されます.
private void testRxMethod_just() {
        Observable observable = Observable.just("1", "2", "3","4", "5","6","7","8","9","10");
        Subscriber subscriber = new Subscriber()
        {
            @Override
            public void onCompleted() {
                Log.i("rxjava", "onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                Log.i("rxjava", "error");
            }
            @Override
            public void onNext(String o) {
                Log.i("rxjava", o);
            }};
        observable.subscribe(subscriber);
   }

1.3.from-メソッドパラメータは配列であり、fromはオブジェクトに分割され、放出されます.
private void testRxMethod_from() {
        List items = new ArrayList<>();
        items.add("1");
        items.add("2");
        items.add("3");
        Observable observable = Observable.from(items);
        Subscriber subscriber = new Subscriber()
        {
            @Override
            public void onCompleted() {
                Log.i("rxjava", "onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                Log.i("rxjava", "error");
            }
            @Override
            public void onNext(String o) {
                Log.i("rxjava", o);
            }};
        observable.subscribe(subscriber);
    }
11-17 14:44:13.043 30921-30921/pa.test.com.testapp I/rxjava: 1
11-17 14:44:13.043 30921-30921/pa.test.com.testapp I/rxjava: 2
11-17 14:44:13.043 30921-30921/pa.test.com.testapp I/rxjava: 3
11-17 14:44:13.043 30921-30921/pa.test.com.testapp I/rxjava: onCompleted

1.4.interval-タイマ、この方法はメッセージを一定時間ごとに送信し、interval()関数の2つのパラメータ:1つは2回の送信の時間間隔を指定し、もう1つは使用する時間単位である.
private void testRxMethod_interval() {
        Observable.interval(2, TimeUnit.SECONDS)
                .subscribe(new Observer() {

                    @Override
                    public void onCompleted() {
                        Log.i("rxjava", "onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.i("rxjava", "onError");
                    }

                    @Override
                    public void onNext(Long number) {
                        Log.i("rxjava", "onNext"+number);
                    }
                });
    }

1.5.timer-遅延器の効果を達成することができて、購読した後に、Observableは1つの0を発射して、終わりました、onCompleted()を呼び出します.
private void testRxMethod_timer() {
         Observable.timer(3,TimeUnit.SECONDS)
                 .subscribe(new Observer() {

                     @Override
                     public void onCompleted() {
                         Log.i("rxjava", "onCompleted");
                     }

                     @Override
                     public void onError(Throwable e) {
                         Log.i("rxjava", "onError");
                     }

                     @Override
                     public void onNext(Long number) {
                         Log.i("rxjava", "onNext"+number);
                     }
                 });
     }

2.Observable変換およびフィルタリング
2.1.map-オブジェクトへの直接変換関数です.Fun関数をパラメータとして受け入れ、Observableによって送信される各値に適用します.入力されたデータを処理した後、新しい形式のデータを返します.いずれかのタイプです.適用シーンは、ピクチャパス(String)処理後にBitmapピクチャに戻るか、元のデータを計算し、処理後に必要なデータにすることができる.
private void testRxMethod_map() {
        Observable.just(6)
                .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return integer/3+"";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.d("lqm","call:"+s);
                    }
                });
    }

2.2.flatMap—Observable.flatMap()は、一方のObservableの出力を入力として受信し、他方のObservableを出力する.
private void testRxMethod_flatMap() {
        Observable.just("girl;boy;man")
                .flatMap(new Func1>() {
                    @Override
                    public Observable call(String s) {
                        return Observable.from(s.split(";"));
                    }
                })
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        Log.d("lqm", "onNext:" + s);
                    }
                }, new Action1() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.d("lqm", "onerror");
                    }
                }, new Action0() {
                    @Override
                    public void call() {
                        Log.d("lqm", "onComplete");
                    }
                });
    }
11-17 15:41:04.743 18653-18653/pa.test.com.testapp D/lqm: onNext:girl
11-17 15:41:04.743 18653-18653/pa.test.com.testapp D/lqm: onNext:boy
11-17 15:41:04.743 18653-18653/pa.test.com.testapp D/lqm: onNext:man
11-17 15:41:04.743 18653-18653/pa.test.com.testapp D/lqm: onComplete

flatMapとmapを比較すると、flatMapが入力するパラメータはObservableであり、戻ってくるのもObservableである.どちらも1つのオブジェクトを入力し、新しいオブジェクトに変換するが、この点はまだ異なる.flatMapの原理については、ここでは下の投げ物線の一節を引用して、flatMap()の原理はこうである.入力されたイベントオブジェクトを使用してObservableオブジェクトを作成します.2.このObservableを送信するのではなく、それをアクティブ化し、イベントの送信を開始する.3.作成された各Observableが送信したイベントは、同じObservableに集約され、このObservableは、これらのイベントをSubscriberに統一的に渡すコールバックメソッドを担当する.
2.3 filter-フィルタリング方法、この方法を使って私たちが必要としないデータをフィルタリングして、falseに戻って、このオブジェクトが要求に合わないことを説明して、フィルタリングされます;trueを返します.これは、オブジェクトが標準に合致するように正常に下に移動できることを示します.
private void testRxMethod_filter() {
        String[] strArr = {"a123","b123","c123","d123"};
        Observable.from(strArr)
                .filter(new Func1() {
                    @Override
                    public Boolean call(String s) {
                        return s.contains("a") || s.contains("b") || s.contains("c");
                    }
                })
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        Log.d("lqm","call:"+s);
                    }
                });
    }
11-17 20:39:54.761 32378-32378/pa.test.com.testapp D/lqm: call:a123
11-17 20:39:54.761 32378-32378/pa.test.com.testapp D/lqm: call:b123
11-17 20:39:54.761 32378-32378/pa.test.com.testapp D/lqm: call:c123

2.4 distinct-1つのシーケンスから重複データを削除
private void testRxMethod_distinct() {
        Observable.just("Jack", "Honey","Lucy","Lucy","Jane")
                .distinct().subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                Log.i("lqm","onCompleted");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                Log.i("lqm","onNext" + s);
            }
        });
    }
11-17 20:47:00.141 19536-19536/pa.test.com.testapp I/lqm: onNextJack
11-17 20:47:00.141 19536-19536/pa.test.com.testapp I/lqm: onNextHoney
11-17 20:47:00.141 19536-19536/pa.test.com.testapp I/lqm: onNextLucy
11-17 20:47:00.141 19536-19536/pa.test.com.testapp I/lqm: onNextJane
11-17 20:47:00.141 19536-19536/pa.test.com.testapp I/lqm: onCompleted

ここで注意したいのは、distinctは基本タイプの重複と同じオブジェクトかどうかを区別するしかなく、異なるオブジェクトの内容が同じであれば、重くすることはできません.
 private void testRxMethod_distinct() {
        RxBeanInfo info1 = new RxBeanInfo();
        info1.setName("jack");
        RxBeanInfo info2 = new RxBeanInfo();
        info2.setName("jack");
        RxBeanInfo info3 = new RxBeanInfo();
        info3.setName("jack");

        Observable.just(info1,info2,info3)
                .distinct().subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                Log.i("lqm","onCompleted");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(RxBeanInfo s) {
                Log.i("lqm","onNext" + s.getName());
            }
        });
    }
11-17 20:50:01.101 25465-25465/pa.test.com.testapp I/lqm: onNextjack
11-17 20:50:01.101 25465-25465/pa.test.com.testapp I/lqm: onNextjack
11-17 20:50:01.101 25465-25465/pa.test.com.testapp I/lqm: onNextjack
11-17 20:50:01.101 25465-25465/pa.test.com.testapp I/lqm: onCompleted

これらのAPIは比較的よく使われていますが、現在はこれらについて話しています.興味があれば、Android開発者のRxJavaに詳しく説明して詳しく見ることができます.
二.スレッド変換のポイント
ここで主に紹介するのがRxJavaのSchedulers(スケジューラ)機能です
任意のObservableについては、2つの異なるスレッドで定義できます.1.Observableを使用する.observeOn()は、Observableから最新に発行されたitems(SubscriberのonNext、onCompletedおよびonErrorメソッドがobserveOnで指定されたスレッド上で実行される)をリスニングおよびチェックするためのスレッド上で定義できます.2.Observableを使用する.subscribeOn()は、subscribeOn()がsubscribe()で発生するスレッド、すなわちObservableを指定するスレッドを定義する.OnSubscribeがアクティブ化されたときのスレッド.
RxJavaのデフォルトは単一スレッドであり、observeOn()とsubscribeOn()メソッドを使用してアプリケーションにマルチスレッド操作をもたらす必要があります.RxJavaはObservablesに既存のSchedulersをいくつか添付して使用します.例えば、Schedulers.io()(I/O操作用)、Schedulers.計算(計算)とSchedulers.新Thread()は、タスクのために作成された新しいスレッドです.また、Androidには専用のAndroid Schedulersがあります.mainThread()は、Androidメインスレッドで指定された操作を実行します.
これらを知って、実際のRxjavaがオンラインスレッド間でどのように実行されているかを見てみましょう.ここではcaseに分けて詳しく説明します.ObserveOnしか配置されていませんが、異なる位置では、どのような影響がありますか?
    private void observableEvent() {
        Log.d("lqm","mainthread :"+android.os.Process.myTid());
        Observable.just(1)
                .map(new Func1() {
                    @Override
                    public String call(Integer integer) {
                        Log.d("lqm","map:"+android.os.Process.myTid());
                        return integer+"a";
                    }
                })
                .filter(new Func1() {
                    @Override
                    public Boolean call(String s) {
                        Log.d("lqm","filter:"+android.os.Process.myTid());
                        return s.contains("1");
                    }
                })
                .observeOn(Schedulers.io())
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        Log.d("lqm","call:"+android.os.Process.myTid());
                    }
                });
    }
11-17 21:08:58.551 7624-7624/pa.test.com.testapp D/lqm: mainthread :7624
11-17 21:08:58.591 7624-7624/pa.test.com.testapp D/lqm: map:7624
11-17 21:08:58.591 7624-7624/pa.test.com.testapp D/lqm: filter:7624
11-17 21:08:58.591 7624-8857/pa.test.com.testapp D/lqm: call:8857

以上の実行結果から,メインスレッドidは7624,map()とfilter()はメインスレッドで実行され,subscribe()を呼び出すとobserveOn(Schedulers.io()が呼び出されるためcall()メソッドはioスレッドで実行される.
では、observeOn()を早めると、スレッドに影響を与えるのではないでしょうか.
private void observableEvent() {
        Log.d("lqm","mainthread :"+android.os.Process.myTid());
        Observable.just(1)
                .observeOn(Schedulers.io())
                .map(new Func1() {
                    @Override
                    public String call(Integer integer) {                                  Log.d("lqm","map:"+android.os.Process.myTid());
                        return integer+"a";
                    }
                })
                .filter(new Func1() {
                    @Override
                    public Boolean call(String s) {                    Log.d("lqm","filter:"+android.os.Process.myTid());
                        return s.contains("1");
                    }
                })
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {                      Log.d("lqm","onNext:"+android.os.Process.myTid());
                    }
                });
    }
11-18 10:02:35.691 21563-21563/pa.test.com.testapp D/lqm: mainthread :21563
11-18 10:02:35.711 21563-22093/pa.test.com.testapp D/lqm: map:22093
11-18 10:02:35.711 21563-22093/pa.test.com.testapp D/lqm: filter:22093
11-18 10:02:35.711 21563-22093/pa.test.com.testapp D/lqm: onNext:22093

見たか、現在map()、filter()、onNext()はIOスレッド上で実行されているので、observeOn()はSubscriber操作の他にmap、filterなどにも影響を与えるが、ObserveOn()操作後の動作にのみ影響を与える
次にテストを続行します.